Skip to content

Commit

Permalink
push.go: add an ability to wait until push workers are stopped via Pu…
Browse files Browse the repository at this point in the history
…shOptions.WaitGroup
  • Loading branch information
valyala committed Jan 15, 2024
1 parent 49f6df7 commit c45a8b1
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
13 changes: 13 additions & 0 deletions push.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type PushOptions struct {
//
// By default the compression is enabled.
DisableCompression bool

// Optional WaitGroup for waiting until all the push workers created with this WaitGroup are stopped.
WaitGroup *sync.WaitGroup
}

// InitPushWithOptions sets up periodic push for globally registered metrics to the given pushURL with the given interval.
Expand Down Expand Up @@ -207,6 +210,13 @@ func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.D
}
pushMetricsSet.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pc.pushURLRedacted)).Set(interval.Seconds())

var wg *sync.WaitGroup
if opts != nil {
wg = opts.WaitGroup
if wg != nil {
wg.Add(1)
}
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
Expand All @@ -221,6 +231,9 @@ func InitPushExtWithOptions(ctx context.Context, pushURL string, interval time.D
log.Printf("ERROR: metrics.push: %s", err)
}
case <-stopCh:
if wg != nil {
wg.Done()
}
return
}
}
Expand Down
6 changes: 6 additions & 0 deletions push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -91,6 +92,10 @@ func TestInitPushWithOptions(t *testing.T) {
}))
defer srv.Close()
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
if opts != nil {
opts.WaitGroup = &wg
}
if err := s.InitPushWithOptions(ctx, srv.URL, time.Millisecond, opts); err != nil {
t.Fatalf("unexpected error: %s", err)
}
Expand All @@ -100,6 +105,7 @@ func TestInitPushWithOptions(t *testing.T) {
case <-doneCh:
// stop the periodic pusher
cancel()
wg.Wait()
}
if reqErr != nil {
t.Fatalf("unexpected error: %s", reqErr)
Expand Down

0 comments on commit c45a8b1

Please sign in to comment.