Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ lint:
go tool govulncheck ./...

bench:
go test -run=^$ -bench=. -benchmem ./...
go test -run=^$$ -bench=. -benchmem ./...
28 changes: 14 additions & 14 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ type customError struct {
basePath string // snapshot of basePath at capture time
cause error
wrapped error // immediate parent for Unwrap() chain; may differ from cause
shouldNotify bool
shouldNotify atomic.Bool
status *grpcstatus.Status
}

// ShouldNotify returns true if the error should be reported to notifiers.
func (c *customError) ShouldNotify() bool {
return c.shouldNotify
return c.shouldNotify.Load()
}

// Notified marks the error as having been notified (or not).
func (c *customError) Notified(status bool) {
c.shouldNotify = !status
c.shouldNotify.Store(!status)
}

// Error returns the error message.
Expand Down Expand Up @@ -233,30 +233,30 @@ func WrapWithSkipAndStatus(err error, msg string, skip int, status *grpcstatus.S
//if we have stack information reuse that
if e, ok := err.(ErrorExt); ok {
c := &customError{
Msg: msg + e.Error(),
cause: e.Cause(),
wrapped: err, // preserve full chain for errors.Is/errors.As
status: status,
shouldNotify: true,
Msg: msg + e.Error(),
cause: e.Cause(),
wrapped: err, // preserve full chain for errors.Is/errors.As
status: status,
}
c.shouldNotify.Store(true)

c.stack = e.Callers()
if ce, ok := e.(*customError); ok {
c.basePath = ce.basePath
}
if n, ok := e.(NotifyExt); ok {
c.shouldNotify = n.ShouldNotify()
c.shouldNotify.Store(n.ShouldNotify())
}
return c
}

c := &customError{
Msg: msg + err.Error(),
cause: err,
wrapped: err,
shouldNotify: true,
status: status,
Msg: msg + err.Error(),
cause: err,
wrapped: err,
status: status,
}
c.shouldNotify.Store(true)
c.captureStack(skip + 1)
return c

Expand Down
29 changes: 21 additions & 8 deletions notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

gobrake "github.com/airbrake/gobrake/v5"
Expand Down Expand Up @@ -35,12 +36,20 @@ var (
hostname string
traceHeader string = "x-trace-id"

// asyncSem is a semaphore that bounds the number of concurrent async
// notification goroutines. When full, new notifications are dropped
// to prevent goroutine explosion under sustained error bursts.
asyncSem = make(chan struct{}, 1000)
)

// asyncSem is a semaphore that bounds the number of concurrent async
// notification goroutines. When full, new notifications are dropped
// to prevent goroutine explosion under sustained error bursts.
// Stored as atomic.Pointer to eliminate the race between SetMaxAsyncNotifications
// and NotifyAsync goroutines reading the channel variable.
var asyncSem atomic.Pointer[chan struct{}]

func init() {
ch := make(chan struct{}, 20)
asyncSem.Store(&ch)
}

const (
tracerID = "tracerId"
)
Expand All @@ -50,11 +59,13 @@ var asyncSemOnce sync.Once
// SetMaxAsyncNotifications sets the maximum number of concurrent async
// notification goroutines. When the limit is reached, new async notifications
// are dropped to prevent goroutine explosion under sustained error bursts.
// Default is 1000. Can only be called once; subsequent calls are no-ops.
// Default is 20. The first successful call wins; subsequent calls are no-ops.
// It is safe to call concurrently with NotifyAsync.
func SetMaxAsyncNotifications(n int) {
if n > 0 {
asyncSemOnce.Do(func() {
asyncSem = make(chan struct{}, n)
ch := make(chan struct{}, n)
asyncSem.Store(&ch)
})
}
}
Expand All @@ -67,7 +78,7 @@ func NotifyAsync(err error, rawData ...interface{}) error {
if err == nil {
return nil
}
sem := asyncSem
sem := *asyncSem.Load()
select {
case sem <- struct{}{}:
data := append([]interface{}(nil), rawData...)
Expand Down Expand Up @@ -553,7 +564,9 @@ func SetTraceId(ctx context.Context) context.Context {
func GetTraceId(ctx context.Context) string {
if o := options.FromContext(ctx); o != nil {
if data, found := o.Get(tracerID); found {
return data.(string)
if traceID, ok := data.(string); ok {
return traceID
}
}
}
if logCtx := loggers.FromContext(ctx); logCtx != nil {
Expand Down
77 changes: 77 additions & 0 deletions notifier/notifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package notifier

import (
"context"
"sync"
"testing"

"github.com/go-coldbrew/errors"
"github.com/go-coldbrew/options"
)

func TestGetTraceId_NonStringValue(t *testing.T) {
// Regression test: GetTraceId must not panic when the tracerID
// option holds a non-string value.
ctx := options.AddToOptions(context.Background(), tracerID, 12345)

// Before the fix this panicked with "interface conversion: interface {} is int, not string".
got := GetTraceId(ctx)
if got != "" {
t.Errorf("expected empty string for non-string tracerID, got %q", got)
}
}

func TestGetTraceId_StringValue(t *testing.T) {
ctx := options.AddToOptions(context.Background(), tracerID, "abc-123")

got := GetTraceId(ctx)
if got != "abc-123" {
t.Errorf("expected 'abc-123', got %q", got)
}
}

func TestNotifyAsync_BoundedConcurrency(t *testing.T) {
// Use a 1-slot semaphore and pre-fill it to simulate a full pool.
ch := make(chan struct{}, 1)
ch <- struct{}{} // pre-fill: pool is now full
asyncSem.Store(&ch)
t.Cleanup(func() {
// Restore default. Drain first so cleanup is safe.
for len(ch) > 0 {
<-ch
}
def := make(chan struct{}, 20)
asyncSem.Store(&def)
})

// With the semaphore full, NotifyAsync must drop (hit default branch).
// It should not block and should not spawn a goroutine.
NotifyAsync(errors.New("should-drop"))

// Verify the semaphore is still exactly full (1 token, capacity 1).
// If NotifyAsync had somehow acquired a slot, len would be < cap.
if len(ch) != cap(ch) {
t.Errorf("expected semaphore to remain full (len=%d, cap=%d); NotifyAsync should have dropped", len(ch), cap(ch))
}
}
Comment thread
ankurs marked this conversation as resolved.

func TestSetMaxAsyncNotifications_ConcurrentAccess(t *testing.T) {
// Regression test: SetMaxAsyncNotifications and NotifyAsync must not
// race on the asyncSem variable. Run with -race to verify.
var wg sync.WaitGroup

wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
SetMaxAsyncNotifications(50)
}
}()
Comment thread
ankurs marked this conversation as resolved.
go func() {
defer wg.Done()
for i := 0; i < 20; i++ {
NotifyAsync(errors.New("race test"))
}
}()
wg.Wait()
}
Loading