From 70a375da4c2fd2ca11b77be8ab4aa636761ed284 Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Tue, 31 Mar 2026 22:03:32 +0800 Subject: [PATCH 1/4] fix: race-safe asyncSem, safe type assertion in GetTraceId, atomic shouldNotify - Replace bare asyncSem channel variable with atomic.Pointer[chan struct{}] to eliminate data race between SetMaxAsyncNotifications and NotifyAsync. Retain sync.Once guard to prevent repeated channel replacement. Reduce default concurrency from 1000 to 20. - Use comma-ok type assertion in GetTraceId() to prevent panic when tracerID option holds a non-string value. - Convert customError.shouldNotify from bool to atomic.Bool to fix data race between concurrent ShouldNotify/Notified calls in NotifyAsync goroutines. --- errors.go | 28 +++++++++--------- notifier/notifier.go | 29 ++++++++++++++----- notifier/notifier_test.go | 61 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+), 22 deletions(-) create mode 100644 notifier/notifier_test.go diff --git a/errors.go b/errors.go index 25e1b51..215b693 100644 --- a/errors.go +++ b/errors.go @@ -60,18 +60,18 @@ type customError struct { basePath string // snapshot of basePath at capture time cause error wrapped error // immediate parent for Unwrap() chain; may differ from cause - shouldNotify bool + shouldNotify atomic.Bool status *grpcstatus.Status } // ShouldNotify returns true if the error should be reported to notifiers. func (c *customError) ShouldNotify() bool { - return c.shouldNotify + return c.shouldNotify.Load() } // Notified marks the error as having been notified (or not). func (c *customError) Notified(status bool) { - c.shouldNotify = !status + c.shouldNotify.Store(!status) } // Error returns the error message. @@ -233,30 +233,30 @@ func WrapWithSkipAndStatus(err error, msg string, skip int, status *grpcstatus.S //if we have stack information reuse that if e, ok := err.(ErrorExt); ok { c := &customError{ - Msg: msg + e.Error(), - cause: e.Cause(), - wrapped: err, // preserve full chain for errors.Is/errors.As - status: status, - shouldNotify: true, + Msg: msg + e.Error(), + cause: e.Cause(), + wrapped: err, // preserve full chain for errors.Is/errors.As + status: status, } + c.shouldNotify.Store(true) c.stack = e.Callers() if ce, ok := e.(*customError); ok { c.basePath = ce.basePath } if n, ok := e.(NotifyExt); ok { - c.shouldNotify = n.ShouldNotify() + c.shouldNotify.Store(n.ShouldNotify()) } return c } c := &customError{ - Msg: msg + err.Error(), - cause: err, - wrapped: err, - shouldNotify: true, - status: status, + Msg: msg + err.Error(), + cause: err, + wrapped: err, + status: status, } + c.shouldNotify.Store(true) c.captureStack(skip + 1) return c diff --git a/notifier/notifier.go b/notifier/notifier.go index 165ec9e..63e7100 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" gobrake "github.com/airbrake/gobrake/v5" @@ -35,12 +36,20 @@ var ( hostname string traceHeader string = "x-trace-id" - // asyncSem is a semaphore that bounds the number of concurrent async - // notification goroutines. When full, new notifications are dropped - // to prevent goroutine explosion under sustained error bursts. - asyncSem = make(chan struct{}, 1000) ) +// asyncSem is a semaphore that bounds the number of concurrent async +// notification goroutines. When full, new notifications are dropped +// to prevent goroutine explosion under sustained error bursts. +// Stored as atomic.Pointer to eliminate the race between SetMaxAsyncNotifications +// and NotifyAsync goroutines reading the channel variable. +var asyncSem atomic.Pointer[chan struct{}] + +func init() { + ch := make(chan struct{}, 20) + asyncSem.Store(&ch) +} + const ( tracerID = "tracerId" ) @@ -50,11 +59,13 @@ var asyncSemOnce sync.Once // SetMaxAsyncNotifications sets the maximum number of concurrent async // notification goroutines. When the limit is reached, new async notifications // are dropped to prevent goroutine explosion under sustained error bursts. -// Default is 1000. Can only be called once; subsequent calls are no-ops. +// Default is 20. Can only be called once; subsequent calls are no-ops. +// Must be called during initialization, before any notifications are sent. func SetMaxAsyncNotifications(n int) { if n > 0 { asyncSemOnce.Do(func() { - asyncSem = make(chan struct{}, n) + ch := make(chan struct{}, n) + asyncSem.Store(&ch) }) } } @@ -67,7 +78,7 @@ func NotifyAsync(err error, rawData ...interface{}) error { if err == nil { return nil } - sem := asyncSem + sem := *asyncSem.Load() select { case sem <- struct{}{}: data := append([]interface{}(nil), rawData...) @@ -553,7 +564,9 @@ func SetTraceId(ctx context.Context) context.Context { func GetTraceId(ctx context.Context) string { if o := options.FromContext(ctx); o != nil { if data, found := o.Get(tracerID); found { - return data.(string) + if traceID, ok := data.(string); ok { + return traceID + } } } if logCtx := loggers.FromContext(ctx); logCtx != nil { diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go new file mode 100644 index 0000000..ac6782e --- /dev/null +++ b/notifier/notifier_test.go @@ -0,0 +1,61 @@ +package notifier + +import ( + "context" + "sync" + "testing" + + "github.com/go-coldbrew/errors" + "github.com/go-coldbrew/options" +) + +func TestGetTraceId_NonStringValue(t *testing.T) { + // Regression test: GetTraceId must not panic when the tracerID + // option holds a non-string value. + ctx := options.AddToOptions(context.Background(), tracerID, 12345) + + // Before the fix this panicked with "interface conversion: interface {} is int, not string". + got := GetTraceId(ctx) + if got != "" { + t.Errorf("expected empty string for non-string tracerID, got %q", got) + } +} + +func TestGetTraceId_StringValue(t *testing.T) { + ctx := options.AddToOptions(context.Background(), tracerID, "abc-123") + + got := GetTraceId(ctx) + if got != "abc-123" { + t.Errorf("expected 'abc-123', got %q", got) + } +} + +func TestNotifyAsync_BoundedConcurrency(t *testing.T) { + // Verify that NotifyAsync respects the semaphore and doesn't panic. + // Each iteration uses a fresh error to avoid the pre-existing race + // on customError.shouldNotify across concurrent Notify calls. + for i := 0; i < 20; i++ { + NotifyAsync(errors.New("test error")) + } +} + +func TestSetMaxAsyncNotifications_ConcurrentAccess(t *testing.T) { + // Regression test: SetMaxAsyncNotifications and NotifyAsync must not + // race on the asyncSem variable. Run with -race to verify. + var wg sync.WaitGroup + + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + SetMaxAsyncNotifications(50) + } + }() + go func() { + defer wg.Done() + for i := 0; i < 20; i++ { + NotifyAsync(errors.New("race test")) + } + }() + wg.Wait() +} From 17b4eeaa3a82a426bd72eba3844d9862cddce1ac Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Tue, 31 Mar 2026 23:20:27 +0800 Subject: [PATCH 2/4] fix: address PR review comments - Update SetMaxAsyncNotifications doc to reflect race-safe behavior - Rewrite TestNotifyAsync_BoundedConcurrency to actually assert drops when semaphore is full - Remove outdated comment about pre-existing shouldNotify race --- notifier/notifier.go | 4 ++-- notifier/notifier_test.go | 50 +++++++++++++++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/notifier/notifier.go b/notifier/notifier.go index 63e7100..3db5480 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -59,8 +59,8 @@ var asyncSemOnce sync.Once // SetMaxAsyncNotifications sets the maximum number of concurrent async // notification goroutines. When the limit is reached, new async notifications // are dropped to prevent goroutine explosion under sustained error bursts. -// Default is 20. Can only be called once; subsequent calls are no-ops. -// Must be called during initialization, before any notifications are sent. +// Default is 20. The first successful call wins; subsequent calls are no-ops. +// It is safe to call concurrently with NotifyAsync. func SetMaxAsyncNotifications(n int) { if n > 0 { asyncSemOnce.Do(func() { diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index ac6782e..8731fad 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -3,7 +3,9 @@ package notifier import ( "context" "sync" + "sync/atomic" "testing" + "time" "github.com/go-coldbrew/errors" "github.com/go-coldbrew/options" @@ -31,12 +33,50 @@ func TestGetTraceId_StringValue(t *testing.T) { } func TestNotifyAsync_BoundedConcurrency(t *testing.T) { - // Verify that NotifyAsync respects the semaphore and doesn't panic. - // Each iteration uses a fresh error to avoid the pre-existing race - // on customError.shouldNotify across concurrent Notify calls. - for i := 0; i < 20; i++ { - NotifyAsync(errors.New("test error")) + // Set a tiny semaphore so we can observe drops. + ch := make(chan struct{}, 1) + asyncSem.Store(&ch) + t.Cleanup(func() { + // Drain any tokens left by test goroutines. + select { + case <-ch: + default: + } + // Restore default. + def := make(chan struct{}, 20) + asyncSem.Store(&def) + }) + + // Fill the single slot with a blocking goroutine. + block := make(chan struct{}) + blockErr := errors.New("blocker") + NotifyAsync(blockErr) // takes the one slot + // Give the goroutine a moment to acquire the semaphore token. + time.Sleep(10 * time.Millisecond) + + // Now the semaphore is full. Additional calls should be dropped. + var dropped atomic.Int32 + originalDebug := NotifyAsync(errors.New("should-drop")) + // NotifyAsync returns the error regardless of drop/send, so we can't + // check the return value. Instead, verify the semaphore is still full + // by checking we can't send another token. + select { + case ch <- struct{}{}: + // We could send — means the slot was free, which means the previous + // call was dropped (it didn't acquire). That's the expected path. + <-ch // put it back + dropped.Add(1) + default: + // Slot is full — the previous NotifyAsync got in, which shouldn't + // happen since we already filled it. This is also fine if timing + // allowed the blocker to finish. } + _ = originalDebug + + // Unblock the first goroutine so it releases the token. + close(block) + // Wait a bit for cleanup. + time.Sleep(50 * time.Millisecond) } func TestSetMaxAsyncNotifications_ConcurrentAccess(t *testing.T) { From 31fc081a47bdd3ff29e3eb19f396d90352ae34b1 Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Wed, 1 Apr 2026 09:34:35 +0800 Subject: [PATCH 3/4] fix: rewrite bounded concurrency test to be deterministic Pre-fill the semaphore channel directly instead of relying on timing. Assert that len(ch) == cap(ch) after NotifyAsync to verify the drop path was taken. No time.Sleep, no unused channels, no flaky cleanup. --- notifier/notifier_test.go | 48 ++++++++++----------------------------- 1 file changed, 12 insertions(+), 36 deletions(-) diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 8731fad..29b1589 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -3,9 +3,7 @@ package notifier import ( "context" "sync" - "sync/atomic" "testing" - "time" "github.com/go-coldbrew/errors" "github.com/go-coldbrew/options" @@ -33,50 +31,28 @@ func TestGetTraceId_StringValue(t *testing.T) { } func TestNotifyAsync_BoundedConcurrency(t *testing.T) { - // Set a tiny semaphore so we can observe drops. + // Use a 1-slot semaphore and pre-fill it to simulate a full pool. ch := make(chan struct{}, 1) + ch <- struct{}{} // pre-fill: pool is now full asyncSem.Store(&ch) t.Cleanup(func() { - // Drain any tokens left by test goroutines. - select { - case <-ch: - default: + // Restore default. Drain first so cleanup is safe. + for len(ch) > 0 { + <-ch } - // Restore default. def := make(chan struct{}, 20) asyncSem.Store(&def) }) - // Fill the single slot with a blocking goroutine. - block := make(chan struct{}) - blockErr := errors.New("blocker") - NotifyAsync(blockErr) // takes the one slot - // Give the goroutine a moment to acquire the semaphore token. - time.Sleep(10 * time.Millisecond) + // With the semaphore full, NotifyAsync must drop (hit default branch). + // It should not block and should not spawn a goroutine. + NotifyAsync(errors.New("should-drop")) - // Now the semaphore is full. Additional calls should be dropped. - var dropped atomic.Int32 - originalDebug := NotifyAsync(errors.New("should-drop")) - // NotifyAsync returns the error regardless of drop/send, so we can't - // check the return value. Instead, verify the semaphore is still full - // by checking we can't send another token. - select { - case ch <- struct{}{}: - // We could send — means the slot was free, which means the previous - // call was dropped (it didn't acquire). That's the expected path. - <-ch // put it back - dropped.Add(1) - default: - // Slot is full — the previous NotifyAsync got in, which shouldn't - // happen since we already filled it. This is also fine if timing - // allowed the blocker to finish. + // Verify the semaphore is still exactly full (1 token, capacity 1). + // If NotifyAsync had somehow acquired a slot, len would be < cap. + if len(ch) != cap(ch) { + t.Errorf("expected semaphore to remain full (len=%d, cap=%d); NotifyAsync should have dropped", len(ch), cap(ch)) } - _ = originalDebug - - // Unblock the first goroutine so it releases the token. - close(block) - // Wait a bit for cleanup. - time.Sleep(50 * time.Millisecond) } func TestSetMaxAsyncNotifications_ConcurrentAccess(t *testing.T) { From eaa9ebf71542e4bd5bd0c2e8b15ab6dad1c63530 Mon Sep 17 00:00:00 2001 From: Ankur Shrivastava Date: Wed, 1 Apr 2026 21:46:55 +0800 Subject: [PATCH 4/4] fix: escape $ in Makefile bench target for correct regex Use ^$$ so Make passes literal ^$ to the shell, ensuring go test -run matches no test names (benchmarks only). --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 67e1439..0cc0a83 100644 --- a/Makefile +++ b/Makefile @@ -14,4 +14,4 @@ lint: go tool govulncheck ./... bench: - go test -run=^$ -bench=. -benchmem ./... + go test -run=^$$ -bench=. -benchmem ./...