diff --git a/internal/docker/client.go b/internal/docker/client.go index e70742898c8..2e18b8bd386 100644 --- a/internal/docker/client.go +++ b/internal/docker/client.go @@ -224,7 +224,6 @@ func (d *_client) ContainerStats(ctx context.Context, id string, stats chan<- Co return err } - log.Debugf("starting to stream stats for: %s", id) defer response.Body.Close() decoder := json.NewDecoder(response.Body) var v *types.StatsJSON diff --git a/internal/docker/container_store.go b/internal/docker/container_store.go index b5935ce5031..a4ea06d1ddb 100644 --- a/internal/docker/container_store.go +++ b/internal/docker/container_store.go @@ -89,7 +89,7 @@ func (s *ContainerStore) init(ctx context.Context) { for { select { case event := <-events: - log.Debugf("received event: %+v", event) + log.Tracef("received event: %+v", event) switch event.Name { case "start": if container, err := s.client.FindContainer(event.ActorID); err == nil { diff --git a/internal/docker/stats_collector.go b/internal/docker/stats_collector.go index f1cf7847ac1..74dd8d77c2a 100644 --- a/internal/docker/stats_collector.go +++ b/internal/docker/stats_collector.go @@ -62,34 +62,44 @@ func (c *StatsCollector) Stop() { func (c *StatsCollector) reset() { c.mu.Lock() defer c.mu.Unlock() + log.Debug("resetting timer for container stats collector") if c.timer != nil { c.timer.Stop() } c.timer = nil } +func streamStats(parent context.Context, sc *StatsCollector, id string) { + ctx, cancel := context.WithCancel(parent) + sc.cancelers.Store(id, cancel) + log.Debugf("starting to stream stats for: %s", id) + if err := sc.client.ContainerStats(ctx, id, sc.stream); err != nil { + log.Debugf("stopping to stream stats for: %s", id) + if !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) { + log.Errorf("unexpected error when streaming container stats: %v", err) + } + } +} + // Start starts the stats collector and blocks until it's stopped. It returns true if the collector was stopped, false if it was already running -func (sc *StatsCollector) Start(ctx context.Context) bool { +func (sc *StatsCollector) Start(parentCtx context.Context) bool { sc.reset() - if sc.totalStarted.Add(1) > 1 { + sc.totalStarted.Add(1) + + var ctx context.Context + + sc.mu.Lock() + if sc.stopper != nil { + sc.mu.Unlock() return false } - sc.mu.Lock() - ctx, sc.stopper = context.WithCancel(ctx) + ctx, sc.stopper = context.WithCancel(parentCtx) sc.mu.Unlock() if containers, err := sc.client.ListContainers(); err == nil { for _, c := range containers { if c.State == "running" { - go func(client Client, id string) { - ctx, cancel := context.WithCancel(ctx) - sc.cancelers.Store(id, cancel) - if err := client.ContainerStats(ctx, id, sc.stream); err != nil { - if !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) { - log.Errorf("unexpected error when streaming container stats: %v", err) - } - } - }(sc.client, c.ID) + go streamStats(ctx, sc, c.ID) } } } else { @@ -102,15 +112,7 @@ func (sc *StatsCollector) Start(ctx context.Context) bool { for event := range events { switch event.Name { case "start": - go func(client Client, id string) { - ctx, cancel := context.WithCancel(ctx) - sc.cancelers.Store(id, cancel) - if err := client.ContainerStats(ctx, id, sc.stream); err != nil { - if !errors.Is(err, context.Canceled) && !errors.Is(err, io.EOF) { - log.Errorf("unexpected error when streaming container stats: %v", err) - } - } - }(sc.client, event.ActorID) + go streamStats(ctx, sc, event.ActorID) case "die": if cancel, ok := sc.cancelers.LoadAndDelete(event.ActorID); ok {