Skip to content

Commit

Permalink
feat: deactivates collection of stats after 6 hours of idle (#2830)
Browse files Browse the repository at this point in the history
  • Loading branch information
amir20 committed Mar 20, 2024
1 parent f9aabf1 commit 0ca6624
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 7 deletions.
15 changes: 14 additions & 1 deletion internal/docker/container_store.go
Expand Up @@ -28,7 +28,6 @@ func NewContainerStore(ctx context.Context, client Client) *ContainerStore {
s.wg.Add(1)

go s.init(ctx)
go s.statsCollector.StartCollecting(ctx)

return s
}
Expand All @@ -49,9 +48,23 @@ func (s *ContainerStore) Client() Client {
}

func (s *ContainerStore) Subscribe(ctx context.Context, events chan ContainerEvent) {
go func() {
if s.statsCollector.Start(context.Background()) {
log.Debug("clearing container stats as stats collector has been stopped")
s.containers.Range(func(_ string, c *Container) bool {
c.Stats.Clear()
return true
})
}
}()
s.subscribers.Store(ctx, events)
}

func (s *ContainerStore) Unsubscribe(ctx context.Context) {
s.subscribers.Delete(ctx)
s.statsCollector.Stop()
}

func (s *ContainerStore) SubscribeStats(ctx context.Context, stats chan ContainerStat) {
s.statsCollector.Subscribe(ctx, stats)
}
Expand Down
61 changes: 55 additions & 6 deletions internal/docker/stats_collector.go
Expand Up @@ -4,18 +4,27 @@ import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"

"github.com/puzpuzpuz/xsync/v3"
log "github.com/sirupsen/logrus"
)

type StatsCollector struct {
stream chan ContainerStat
subscribers *xsync.MapOf[context.Context, chan ContainerStat]
client Client
cancelers *xsync.MapOf[string, context.CancelFunc]
stream chan ContainerStat
subscribers *xsync.MapOf[context.Context, chan ContainerStat]
client Client
cancelers *xsync.MapOf[string, context.CancelFunc]
stopper context.CancelFunc
timer *time.Timer
mu sync.Mutex
totalStarted atomic.Int32
}

var timeToStop = 6 * time.Hour

func NewStatsCollector(client Client) *StatsCollector {
return &StatsCollector{
stream: make(chan ContainerStat),
Expand All @@ -29,7 +38,46 @@ func (c *StatsCollector) Subscribe(ctx context.Context, stats chan ContainerStat
c.subscribers.Store(ctx, stats)
}

func (sc *StatsCollector) StartCollecting(ctx context.Context) {
func (c *StatsCollector) forceStop() {
c.mu.Lock()
defer c.mu.Unlock()
if c.stopper != nil {
c.stopper()
c.stopper = nil
log.Debug("stopping container stats collector due to inactivity")
}
}

func (c *StatsCollector) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
if c.totalStarted.Add(-1) == 0 {
log.Debug("scheduled to stop container stats collector")
c.timer = time.AfterFunc(timeToStop, func() {
c.forceStop()
})
}
}

func (c *StatsCollector) reset() {
c.mu.Lock()
defer c.mu.Unlock()
if c.timer != nil {
c.timer.Stop()
}
c.timer = nil
}

// Start starts the stats collector and blocks until it's stopped. It returns true if the collector was stopped, false if it was already running
func (sc *StatsCollector) Start(ctx context.Context) bool {
sc.reset()
if sc.totalStarted.Add(1) > 1 {
return false
}
sc.mu.Lock()
ctx, sc.stopper = context.WithCancel(ctx)
sc.mu.Unlock()

if containers, err := sc.client.ListContainers(); err == nil {
for _, c := range containers {
if c.State == "running" {
Expand Down Expand Up @@ -75,7 +123,8 @@ func (sc *StatsCollector) StartCollecting(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
log.Info("stopped collecting container stats")
return true
case stat := <-sc.stream:
sc.subscribers.Range(func(c context.Context, stats chan ContainerStat) bool {
select {
Expand Down
7 changes: 7 additions & 0 deletions internal/utils/ring_buffer.go
Expand Up @@ -31,6 +31,13 @@ func (r *RingBuffer[T]) Push(data T) {
}
}

func (r *RingBuffer[T]) Clear() {
r.mutex.Lock()
defer r.mutex.Unlock()
r.data = r.data[:0]
r.start = 0
}

func (r *RingBuffer[T]) Data() []T {
r.mutex.RLock()
defer r.mutex.RUnlock()
Expand Down
33 changes: 33 additions & 0 deletions internal/utils/ring_buffer_test.go
Expand Up @@ -25,3 +25,36 @@ func TestRingBuffer(t *testing.T) {
t.Errorf("Expected data to be %v, got %v", expectedData, data)
}
}

func TestRingBuffer_MarshalJSON(t *testing.T) {
rb := NewRingBuffer[int](3)

rb.Push(1)
rb.Push(2)
rb.Push(3)

data, err := rb.MarshalJSON()
if err != nil {
t.Errorf("Expected error to be nil, got %v", err)
}

expectedData := []byte("[1,2,3]")
if !reflect.DeepEqual(data, expectedData) {
t.Errorf("Expected data to be %v, got %v", expectedData, data)
}
}

func TestRingBuffer_Clear(t *testing.T) {
rb := NewRingBuffer[int](3)

rb.Push(1)
rb.Push(2)
rb.Push(3)

rb.Clear()
data := rb.Data()
expectedData := []int{}
if !reflect.DeepEqual(data, expectedData) {
t.Errorf("Expected data to be %v, got %v", expectedData, data)
}
}
6 changes: 6 additions & 0 deletions internal/web/events.go
Expand Up @@ -49,6 +49,12 @@ func (h *handler) streamEvents(w http.ResponseWriter, r *http.Request) {
store.Subscribe(ctx, events)
}

defer func() {
for _, store := range h.stores {
store.Unsubscribe(ctx)
}
}()

if err := sendContainersJSON(allContainers, w); err != nil {
log.Errorf("error writing containers to event stream: %v", err)
}
Expand Down

0 comments on commit 0ca6624

Please sign in to comment.