From 25a1c4b6248979324842ac63d767bf069229fd61 Mon Sep 17 00:00:00 2001 From: Clint Greenwood Date: Tue, 4 Apr 2017 12:39:42 -0400 Subject: [PATCH] Fixed race and added support for all option in docker stats In some cases a race could be encountered when executing docker stats. The race would result in an orphaned go routine that would be a memory leak. In addition if stats --no-stream -all was requested the persona would hang on any stopped containers. Stats now will consider the container state when processing these requests. Finally, the unit testing coverage was increased to just over 90%. Fixes #4549, #4585 --- lib/apiservers/engine/backends/container.go | 21 +- .../engine/backends/container_proxy.go | 29 +- .../engine/backends/convert/stats.go | 177 +++++++----- .../engine/backends/convert/stats_test.go | 268 ++++++++++++++---- .../restapi/handlers/containers_handlers.go | 35 +-- 5 files changed, 361 insertions(+), 169 deletions(-) diff --git a/lib/apiservers/engine/backends/container.go b/lib/apiservers/engine/backends/container.go index a16c1066e7..6188f62270 100644 --- a/lib/apiservers/engine/backends/container.go +++ b/lib/apiservers/engine/backends/container.go @@ -51,6 +51,7 @@ import ( "github.com/vmware/vic/lib/apiservers/engine/backends/cache" viccontainer "github.com/vmware/vic/lib/apiservers/engine/backends/container" + "github.com/vmware/vic/lib/apiservers/engine/backends/convert" "github.com/vmware/vic/lib/apiservers/engine/backends/filter" "github.com/vmware/vic/lib/apiservers/engine/backends/portmap" "github.com/vmware/vic/lib/apiservers/portlayer/client/containers" @@ -1367,7 +1368,25 @@ func (c *Container) ContainerStats(ctx context.Context, name string, config *bac out = io.Writer(wf) } - err = c.containerProxy.StreamContainerStats(ctx, vc.ContainerID, out, config.Stream, cpuMhz, vc.HostConfig.Memory) + // stats configuration + statsConfig := &convert.ContainerStatsConfig{ + VchMhz: cpuMhz, + Stream: config.Stream, + ContainerID: vc.ContainerID, + Out: out, + Memory: vc.HostConfig.Memory, + } + + // if we are not streaming then we need to get the container state + if !config.Stream { + statsConfig.ContainerState, err = c.containerProxy.State(vc) + if err != nil { + return InternalServerError(err.Error()) + } + + } + + err = c.containerProxy.StreamContainerStats(ctx, statsConfig) if err != nil { log.Errorf("error while streaming container (%s) stats: %s", vc.ContainerID, err) } diff --git a/lib/apiservers/engine/backends/container_proxy.go b/lib/apiservers/engine/backends/container_proxy.go index 667cde7f1b..3cb0a3b140 100644 --- a/lib/apiservers/engine/backends/container_proxy.go +++ b/lib/apiservers/engine/backends/container_proxy.go @@ -88,7 +88,7 @@ type VicContainerProxy interface { AddInteractionToContainer(handle string, config types.ContainerCreateConfig) (string, error) CommitContainerHandle(handle, containerID string, waitTime int32) error StreamContainerLogs(name string, out io.Writer, started chan struct{}, showTimestamps bool, followLogs bool, since int64, tailLines int64) error - StreamContainerStats(ctx context.Context, id string, out io.Writer, stream bool, CPUMhz int64, memory int64) error + StreamContainerStats(ctx context.Context, config *convert.ContainerStatsConfig) error Stop(vc *viccontainer.VicContainer, name string, seconds *int, unbound bool) error State(vc *viccontainer.VicContainer) (*types.ContainerState, error) @@ -514,8 +514,8 @@ func (c *ContainerProxy) StreamContainerLogs(name string, out io.Writer, started // StreamContainerStats will provide a stream of container stats written to the provided // io.Writer. Prior to writing to the provided io.Writer there will be a transformation // from the portLayer representation of stats to the docker format -func (c *ContainerProxy) StreamContainerStats(ctx context.Context, id string, out io.Writer, stream bool, CPUMhz int64, mem int64) error { - defer trace.End(trace.Begin(id)) +func (c *ContainerProxy) StreamContainerStats(ctx context.Context, config *convert.ContainerStatsConfig) error { + defer trace.End(trace.Begin(config.ContainerID)) plClient, transport := c.createNewAttachClientWithTimeouts(attachConnectTimeout, 0, attachAttemptTimeout) defer transport.Close() @@ -525,33 +525,26 @@ func (c *ContainerProxy) StreamContainerStats(ctx context.Context, id string, ou defer cancel() params := containers.NewGetContainerStatsParamsWithContext(ctx) - params.ID = id - params.Stream = stream - - // converter config - config := convert.ContainerStatsConfig{ - Ctx: ctx, - Cancel: cancel, - VchMhz: CPUMhz, - Stream: stream, - ContainerID: id, - Out: out, - Memory: mem, - } + params.ID = config.ContainerID + params.Stream = config.Stream + + config.Ctx = ctx + config.Cancel = cancel + // create our converter containerConverter := convert.NewContainerStats(config) // provide the writer for the portLayer and start listening for metrics writer := containerConverter.Listen() if writer == nil { // problem with the listener - return InternalServerError(fmt.Sprintf("unable to gather container(%s) statistics", id)) + return InternalServerError(fmt.Sprintf("unable to gather container(%s) statistics", config.ContainerID)) } _, err := plClient.Containers.GetContainerStats(params, writer) if err != nil { switch err := err.(type) { case *containers.GetContainerStatsNotFound: - return NotFoundError(id) + return NotFoundError(config.ContainerID) case *containers.GetContainerStatsInternalServerError: return InternalServerError("Server error from the interaction port layer") default: diff --git a/lib/apiservers/engine/backends/convert/stats.go b/lib/apiservers/engine/backends/convert/stats.go index 913f48721c..29a47edca8 100644 --- a/lib/apiservers/engine/backends/convert/stats.go +++ b/lib/apiservers/engine/backends/convert/stats.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io" + "sync" "time" log "github.com/Sirupsen/logrus" @@ -31,26 +32,31 @@ import ( // ContainerStats encapsulates the conversion of VMMetrics to // docker specific metrics type ContainerStats struct { - config ContainerStatsConfig - dockerStats *types.StatsJSON - currentMetrics *metrics.VMMetrics - totalVCHMhz uint64 - dblVCHMhz uint64 - preTotalMhz uint64 - - // reader/writer for stream - reader *io.PipeReader - writer *io.PipeWriter + config *ContainerStatsConfig + + totalVCHMhz uint64 + dblVCHMhz uint64 + preTotalMhz uint64 + + preDockerStat *types.StatsJSON + curDockerStat *types.StatsJSON + currentMetric *metrics.VMMetrics + + mu sync.Mutex + reader *io.PipeReader + writer *io.PipeWriter + listening bool } type ContainerStatsConfig struct { - Ctx context.Context - Cancel context.CancelFunc - Out io.Writer - ContainerID string - Memory int64 - Stream bool - VchMhz int64 + Ctx context.Context + Cancel context.CancelFunc + Out io.Writer + ContainerID string + ContainerState *types.ContainerState + Memory int64 + Stream bool + VchMhz int64 } type InvalidOrderError struct { @@ -63,61 +69,81 @@ func (iso InvalidOrderError) Error() string { } // NewContainerStats will return a new instance of ContainerStats -func NewContainerStats(config ContainerStatsConfig) *ContainerStats { +func NewContainerStats(config *ContainerStatsConfig) *ContainerStats { return &ContainerStats{ - config: config, - dockerStats: &types.StatsJSON{}, - totalVCHMhz: uint64(config.VchMhz), - dblVCHMhz: uint64(config.VchMhz * 2), + config: config, + curDockerStat: &types.StatsJSON{}, + totalVCHMhz: uint64(config.VchMhz), + dblVCHMhz: uint64(config.VchMhz * 2), } } -// Stop will clean up remaining conversion resources +// IsListening returns the listening flag +func (cs *ContainerStats) IsListening() bool { + cs.mu.Lock() + defer cs.mu.Unlock() + return cs.listening +} + +// Stop will clean up the pipe and flip listening flag func (cs *ContainerStats) Stop() { - if cs.reader != nil && cs.writer != nil { + cs.mu.Lock() + defer cs.mu.Unlock() + + if cs.listening { cs.reader.Close() cs.writer.Close() + cs.listening = false } } +// newPipe will initialize the pipe for encoding / decoding and +// set the listening flag +func (cs *ContainerStats) newPipe() { + cs.mu.Lock() + defer cs.mu.Unlock() + + // create a new reader / writer + cs.reader, cs.writer = io.Pipe() + cs.listening = true +} + // Listen will listen for new metrics from the portLayer, convert to docker format // and encode to the configured Writer. The returned PipeWriter is the source of // the vic metrics that will be transformed to docker stats func (cs *ContainerStats) Listen() *io.PipeWriter { - // TODO: could split decode / encode into separate funcs -- would provide for easier - // unit testing - - // we already are listening - if cs.reader != nil { + // Are we already listening? + if cs.IsListening() { return nil } - cs.reader, cs.writer = io.Pipe() + // create pipe for encode/decode + cs.newPipe() dec := json.NewDecoder(cs.reader) doc := json.NewEncoder(cs.config.Out) // channel to transfer metric from decoder to encoder - // closed w/in the decoder metric := make(chan metrics.VMMetrics) - // signal to decoder / encoder that we are done - finished := make(chan struct{}) - - var vmm metrics.VMMetrics - var previousStat *types.StatsJSON + // if we aren't streaming and the container is not running, then create an empty + // docker stat to return + if !cs.config.Stream && !cs.config.ContainerState.Running { + cs.preDockerStat = &types.StatsJSON{} + } + // go routine will decode metrics received from the portLayer and + // send them to the encoding routine go func() { for { select { case <-cs.config.Ctx.Done(): - close(finished) - case <-finished: close(metric) cs.Stop() return default: for dec.More() { + var vmm metrics.VMMetrics err := dec.Decode(&vmm) if err != nil { log.Errorf("container metric decoding error for container(%s): %s", cs.config.ContainerID, err) @@ -131,23 +157,29 @@ func (cs *ContainerStats) Listen() *io.PipeWriter { }() + // go routine will convert incoming metrics to docker specific stats and encode for the docker client. go func() { - for { + // docker needs updates quicker than vSphere can produce metrics, so we'll send a minimum of 1 metric/sec + ticker := time.NewTicker(time.Millisecond * 500) + for range ticker.C { select { case <-cs.config.Ctx.Done(): - return - case <-finished: + cs.Stop() + ticker.Stop() return case nm := <-metric: // convert the Stat to docker struct - stats, err := cs.ToContainerStats(&nm) + stat, err := cs.ToContainerStats(&nm) if err != nil { log.Errorf("container metric conversion error for container(%s): %s", cs.config.ContainerID, err) cs.config.Cancel() } - // Do we have a complete stat that can be sent to the client? - if stats != nil { - err = doc.Encode(stats) + if stat != nil { + cs.preDockerStat = stat + } + default: + if cs.IsListening() && cs.preDockerStat != nil { + err := doc.Encode(cs.preDockerStat) if err != nil { log.Warnf("container metric encoding error for container(%s): %s", cs.config.ContainerID, err) cs.config.Cancel() @@ -156,23 +188,11 @@ func (cs *ContainerStats) Listen() *io.PipeWriter { if !cs.config.Stream { cs.config.Cancel() } - // set to previous stat so we can reuse - previousStat = stats - } - default: - // the docker client expects updates quicker than vSphere can produce them, so - // we need to send the previous stats to avoid intermittent empty output - time.Sleep(time.Second * 1) - if previousStat != nil && cs.reader != nil { - err := doc.Encode(previousStat) - if err != nil { - log.Warnf("container previous metric encoding error for container(%s): %s", cs.config.ContainerID, err) - cs.config.Cancel() - } } } } }() + return cs.writer } @@ -180,9 +200,9 @@ func (cs *ContainerStats) Listen() *io.PipeWriter { // struct requires two samples. Func will return nil until a complete stat is available func (cs *ContainerStats) ToContainerStats(current *metrics.VMMetrics) (*types.StatsJSON, error) { // if we have a current metric then validate and transform - if cs.currentMetrics != nil { + if cs.currentMetric != nil { // do we have the same metric as before? - if cs.currentMetrics.SampleTime.Equal(current.SampleTime) { + if cs.currentMetric.SampleTime.Equal(current.SampleTime) { // we've already got this as current, so skip and wait for the // next sample return nil, nil @@ -193,7 +213,7 @@ func (cs *ContainerStats) ToContainerStats(current *metrics.VMMetrics) (*types.S return nil, err } } - cs.currentMetrics = current + cs.currentMetric = current // create the current CPU stats cs.currentCPU() @@ -202,50 +222,50 @@ func (cs *ContainerStats) ToContainerStats(current *metrics.VMMetrics) (*types.S cs.memory() // set sample time - cs.dockerStats.Read = cs.currentMetrics.SampleTime + cs.curDockerStat.Read = cs.currentMetric.SampleTime // PreRead will be zero if we don't have two samples - if cs.dockerStats.PreRead.IsZero() { + if cs.curDockerStat.PreRead.IsZero() { return nil, nil } - return cs.dockerStats, nil + return cs.curDockerStat, nil } func (cs *ContainerStats) memory() { // given MB (i.e. 2048) convert to GB - cs.dockerStats.MemoryStats.Limit = uint64(cs.config.Memory * 1024 * 1024) + cs.curDockerStat.MemoryStats.Limit = uint64(cs.config.Memory * 1024 * 1024) // given KB (i.e. 384.5) convert to Bytes - cs.dockerStats.MemoryStats.Usage = uint64(cs.currentMetrics.Memory.Active * 1024) + cs.curDockerStat.MemoryStats.Usage = uint64(cs.currentMetric.Memory.Active * 1024) } // previousCPU will move the current stats to the previous CPU location func (cs *ContainerStats) previousCPU(current *metrics.VMMetrics) error { // validate that the sampling is in the correct order - if current.SampleTime.Before(cs.dockerStats.Read) { + if current.SampleTime.Before(cs.curDockerStat.Read) { err := InvalidOrderError{ current: current.SampleTime, - previous: cs.dockerStats.Read, + previous: cs.curDockerStat.Read, } return err } // move the stats - cs.dockerStats.PreCPUStats = cs.dockerStats.CPUStats + cs.curDockerStat.PreCPUStats = cs.curDockerStat.CPUStats // set the previousTotal -- this will be added to the current CPU - cs.preTotalMhz = cs.dockerStats.PreCPUStats.CPUUsage.TotalUsage + cs.preTotalMhz = cs.curDockerStat.PreCPUStats.CPUUsage.TotalUsage - cs.dockerStats.PreRead = cs.dockerStats.Read + cs.curDockerStat.PreRead = cs.curDockerStat.Read // previous systemUsage will always be the VCH total // see note in func currentCPU() for detail - cs.dockerStats.PreCPUStats.SystemUsage = cs.totalVCHMhz + cs.curDockerStat.PreCPUStats.SystemUsage = cs.totalVCHMhz return nil } // currentCPU will convert the VM CPU metrics to docker CPU stats func (cs *ContainerStats) currentCPU() { - cpuCount := len(cs.currentMetrics.CPU.CPUs) + cpuCount := len(cs.currentMetric.CPU.CPUs) dockerCPU := types.CPUStats{ CPUUsage: types.CPUUsage{ PercpuUsage: make([]uint64, cpuCount, cpuCount), @@ -253,10 +273,17 @@ func (cs *ContainerStats) currentCPU() { } // collect the current CPU Metrics - for ci, current := range cs.currentMetrics.CPU.CPUs { + for ci, current := range cs.currentMetric.CPU.CPUs { dockerCPU.CPUUsage.PercpuUsage[ci] = uint64(current.MhzUsage) dockerCPU.CPUUsage.TotalUsage += uint64(current.MhzUsage) } + + // vSphere will report negative usage for a starting VM, lets + // set to zero + if dockerCPU.CPUUsage.TotalUsage < 0 { + dockerCPU.CPUUsage.TotalUsage = 0 + } + // The first stat available for a VM will be missing detail if cpuCount > 0 { // TotalUsage is the sum of the individual vCPUs Mhz @@ -283,5 +310,5 @@ func (cs *ContainerStats) currentCPU() { // cpuUsage = (CPUDelta / SystemDelta) * cpuCount * 100 // This will require the addition of the previous total usage dockerCPU.CPUUsage.TotalUsage += cs.preTotalMhz - cs.dockerStats.CPUStats = dockerCPU + cs.curDockerStat.CPUStats = dockerCPU } diff --git a/lib/apiservers/engine/backends/convert/stats_test.go b/lib/apiservers/engine/backends/convert/stats_test.go index 8bdba23658..6b141a3aea 100644 --- a/lib/apiservers/engine/backends/convert/stats_test.go +++ b/lib/apiservers/engine/backends/convert/stats_test.go @@ -23,7 +23,6 @@ import ( "time" "github.com/docker/docker/api/types" - "github.com/docker/docker/pkg/ioutils" "github.com/stretchr/testify/assert" @@ -39,42 +38,43 @@ const ( ) func TestContainerConverter(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + plumb := setup() + defer teardown(plumb) - r, o := io.Pipe() - defer o.Close() - out := io.Writer(o) - // Outstream modification (from Docker's code) so the stream is streamed with the - // necessary headers that the CLI expects. This is Docker's scheme. - wf := ioutils.NewWriteFlusher(out) - defer wf.Close() - wf.Flush() - out = io.Writer(wf) - - config := ContainerStatsConfig{ - VchMhz: int64(vchMhzTotal), - Ctx: ctx, - Cancel: cancel, - ContainerID: "1234", - Out: out, - Stream: true, - Memory: 2048, - } + // grab a config object + config := ccConfig(plumb) cStats := NewContainerStats(config) assert.NotNil(t, cStats) - // this writer goes is provided to the PL + // returned writer is given to PL writer := cStats.Listen() assert.NotNil(t, writer) - + // second call should result in nil writer as + // we are already listening w2 := cStats.Listen() assert.Nil(t, w2) + // // ensure stop closes reader / writer + cStats.Stop() + // verify we stopped listening + assert.False(t, cStats.IsListening()) +} + +func TestToContainerStats(t *testing.T) { + plumb := setup() + defer teardown(plumb) + // grab a config object + config := ccConfig(plumb) + + cStats := NewContainerStats(config) + assert.NotNil(t, cStats) + initCPU := 1000 vmBefore := vmMetrics(vcpuCount, initCPU) - time.Sleep(1 * time.Millisecond) vmm := vmMetrics(vcpuCount, initCPU) + // ensure we are after the initial metric + vmm.SampleTime.Add(time.Second * 1) // first metric sent, should return nil js, err := cStats.ToContainerStats(vmm) @@ -86,7 +86,7 @@ func TestContainerConverter(t *testing.T) { assert.Nil(t, err) assert.Nil(t, js) - // send stat before the previous + // send out of order stat js, err = cStats.ToContainerStats(vmBefore) assert.NotNil(t, err) assert.Nil(t, js) @@ -95,7 +95,8 @@ func TestContainerConverter(t *testing.T) { // create a new metric vmmm := vmMetrics(vcpuCount, secondCPU) // sample will be 20 seconds apart.. - vmmm.SampleTime.Add(time.Second * 20) + vmmm.SampleTime = vmm.SampleTime.Add(time.Second * 20) + js, err = cStats.ToContainerStats(vmmm) assert.NoError(t, err) assert.NotZero(t, js.Read, js.PreRead) @@ -108,47 +109,212 @@ func TestContainerConverter(t *testing.T) { cpuPercent := fmt.Sprintf("%2.2f", calculateCPUPercentUnix(js.PreCPUStats.CPUUsage.TotalUsage, js.PreCPUStats.SystemUsage, js)) assert.Equal(t, "7.58", cpuPercent) - // reset listener, so reader/writer operates - cStats.currentMetrics = nil - cStats.dockerStats = &types.StatsJSON{} + config.Cancel() + <-config.Ctx.Done() + // sleep to let the methods complete + sleepy() + // verify we stopped listening + assert.False(t, cStats.IsListening()) +} + +func TestContainerStatsListener(t *testing.T) { + plumb := setup() + defer teardown(plumb) + // grab a config object + config := ccConfig(plumb) + cStats := NewContainerStats(config) + assert.NotNil(t, cStats) + + // start the listener + writer := cStats.Listen() + assert.NotNil(t, writer) - // simulate portLayer - plEnc := json.NewEncoder(writer) - err = plEnc.Encode(vmm) + // create an initial metric + initCPU := 1000 + vm := vmMetrics(vcpuCount, initCPU) + err := plumb.mockPLMetrics(vm, writer) assert.NoError(t, err) - err = plEnc.Encode(vmmm) + + // send second metric + vmm := vmMetrics(vcpuCount, initCPU+100) + vmm.SampleTime = vm.SampleTime.Add(time.Second * 20) + err = plumb.mockPLMetrics(vmm, writer) assert.NoError(t, err) - // simulate docker client - docClient := json.NewDecoder(r) - dstat := &types.StatsJSON{} - err = docClient.Decode(dstat) + // did client receive metric?? + ds, err := plumb.mockDockerClient() assert.NoError(t, err) + assert.NotNil(t, ds) + assert.Equal(t, uint64((initCPU*2+100)/vcpuCount), ds.CPUStats.CPUUsage.TotalUsage) - // ensure stop closes reader / writer - cStats.Stop() - _, err = cStats.reader.Read([]byte{0, 0, 0}) - assert.Error(t, err) + // docker expects data quicker than vSphere can produce -- sleep for just over 1 sec + // and ensure the previous docker stat is returned to client + time.Sleep(time.Millisecond * 1100) + same, err := plumb.mockDockerClient() + assert.NoError(t, err) + assert.NotNil(t, same) + assert.Equal(t, ds.CPUStats.CPUUsage.TotalUsage, same.CPUStats.CPUUsage.TotalUsage) + + config.Cancel() + <-config.Ctx.Done() + // sleep to let the methods complete + sleepy() + // verify we stopped listening + assert.False(t, cStats.IsListening()) +} - config.Stream = false +func TestContainerConvertCtxCancel(t *testing.T) { + plumb := setup() + defer teardown(plumb) + // grab a config object + config := ccConfig(plumb) + cStats := NewContainerStats(config) + assert.NotNil(t, cStats) + + // start the listener + writer := cStats.Listen() + assert.NotNil(t, writer) - cStats = NewContainerStats(config) + // cancel the context + config.Cancel() + <-config.Ctx.Done() + // sleep to let the methods complete + sleepy() + // verify we stopped listening + assert.False(t, cStats.IsListening()) +} + +func TestContainerConvertNoStream(t *testing.T) { + plumb := setup() + defer teardown(plumb) + // grab a config object + config := ccConfig(plumb) + config.Stream = false + cStats := NewContainerStats(config) assert.NotNil(t, cStats) - writer = cStats.Listen() + // start the listener + writer := cStats.Listen() + assert.NotNil(t, writer) + + // create an initial metric + initCPU := 1000 + vm := vmMetrics(vcpuCount, initCPU) + err := plumb.mockPLMetrics(vm, writer) + assert.NoError(t, err) - // simulate portLayer - plEnc = json.NewEncoder(writer) - err = plEnc.Encode(vmm) + // send second metric + vmm := vmMetrics(vcpuCount, initCPU+100) + vmm.SampleTime = vm.SampleTime.Add(time.Second * 20) + err = plumb.mockPLMetrics(vmm, writer) assert.NoError(t, err) - err = plEnc.Encode(vmmm) + + ds, err := plumb.mockDockerClient() assert.NoError(t, err) + assert.NotNil(t, ds) + + // converter canceled the context + <-config.Ctx.Done() + // sleep to let the methods complete + sleepy() + // verify we stopped listening + assert.False(t, cStats.IsListening()) +} - // simulate docker client - dstat = &types.StatsJSON{} - err = docClient.Decode(dstat) +func TestContainerNotRunningNoStream(t *testing.T) { + plumb := setup() + defer teardown(plumb) + // grab a config object + config := ccConfig(plumb) + config.Stream = false + config.ContainerState.Running = false + cStats := NewContainerStats(config) + assert.NotNil(t, cStats) + + // start the listener + writer := cStats.Listen() + assert.NotNil(t, writer) + + ds, err := plumb.mockDockerClient() assert.NoError(t, err) + assert.NotNil(t, ds) + + // converter canceled the context + <-config.Ctx.Done() + // sleep to let the methods complete + sleepy() + // verify we stopped listening + assert.False(t, cStats.IsListening()) +} + +// Test Helpers + +type plumbing struct { + r *io.PipeReader + w *io.PipeWriter + out io.Writer + // mock portlayer + mockPL *json.Encoder + // mock docker client decoder + mockDoc *json.Decoder +} + +func setup() *plumbing { + r, o := io.Pipe() + out := io.Writer(o) + + return &plumbing{ + r: r, + w: o, + out: out, + mockDoc: json.NewDecoder(r), + } +} + +// sleepy will sleep for 1/2 second -- this is only needed for testing +func sleepy() { + time.Sleep(time.Millisecond * 500) +} +func teardown(p *plumbing) { + // close the reader / writer + p.r.Close() + p.w.Close() +} + +func (p *plumbing) mockPLMetrics(metric *metrics.VMMetrics, writer io.Writer) error { + if p.mockPL == nil { + p.mockPL = json.NewEncoder(writer) + } + return p.mockPL.Encode(metric) +} + +func (p *plumbing) mockDockerClient() (*types.StatsJSON, error) { + docStats := &types.StatsJSON{} + + err := p.mockDoc.Decode(docStats) + if err != nil { + return nil, err + } + return docStats, nil +} + +func ccConfig(p *plumbing) *ContainerStatsConfig { + // test config + ctx, cancel := context.WithCancel(context.Background()) + config := &ContainerStatsConfig{ + VchMhz: int64(vchMhzTotal), + Ctx: ctx, + Cancel: cancel, + ContainerID: "1234", + Out: p.out, + Stream: true, + Memory: 2048, + ContainerState: &types.ContainerState{ + Running: true, + }, + } + return config } func vmMetrics(count int, vcpuMhz int) *metrics.VMMetrics { diff --git a/lib/apiservers/portlayer/restapi/handlers/containers_handlers.go b/lib/apiservers/portlayer/restapi/handlers/containers_handlers.go index bdec6549fe..8bca6b6d3a 100644 --- a/lib/apiservers/portlayer/restapi/handlers/containers_handlers.go +++ b/lib/apiservers/portlayer/restapi/handlers/containers_handlers.go @@ -335,34 +335,21 @@ func (handler *ContainersHandlersImpl) GetContainerStatsHandler(params container enc := json.NewEncoder(w) flusher := NewFlushingReader(r) - // channel used to receive metrics - var ch chan interface{} - - if params.Stream { - subch, err := metrics.Supervisor.VMCollector().Subscribe(c) - if err != nil { - log.Errorf("unable to subscribe container(%s) to stats stream: %s", params.ID, err) - return containers.NewGetContainerStatsInternalServerError() - } - log.Debugf("container(%s) stats stream subscribed @ %d", params.ID, &subch) - ch = subch - } else { - sch, err := metrics.Supervisor.VMCollector().Sample(c) - if err != nil { - log.Errorf("unable to subscribe container(%s) to stats sample: %s", params.ID, err) - return containers.NewGetContainerStatsInternalServerError() - } - log.Debugf("container(%s) stats sample subscribed @ %d", params.ID, &sch) - ch = sch + // subscribe to metrics + // currently all stats requests will be a subscription and it will + // be the responsibility of the caller to close the connection + // and there by release the subscription + ch, err := metrics.Supervisor.VMCollector().Subscribe(c) + if err != nil { + log.Errorf("unable to subscribe container(%s) to stats stream: %s", params.ID, err) + return containers.NewGetContainerStatsInternalServerError() } + log.Debugf("container(%s) stats stream subscribed @ %d", params.ID, &ch) // closer will be run when the http transport is closed cleaner := func() { - // streaming is a subscription, so unsubscribe if streaming - if params.Stream { - log.Debug("unsubscribing %s from stats %d", params.ID, &ch) - metrics.Supervisor.VMCollector().Unsubscribe(c, ch) - } + log.Debugf("unsubscribing %s from stats %d", params.ID, &ch) + metrics.Supervisor.VMCollector().Unsubscribe(c, ch) closePipe(r, w) }