Skip to content

fix: race-safe asyncSem, safe type assertion, atomic shouldNotify#21

Merged
ankurs merged 4 commits intomainfrom
fix/notifier-races-and-type-assertion
Apr 2, 2026
Merged

fix: race-safe asyncSem, safe type assertion, atomic shouldNotify#21
ankurs merged 4 commits intomainfrom
fix/notifier-races-and-type-assertion

Conversation

@ankurs
Copy link
Copy Markdown
Member

@ankurs ankurs commented Mar 31, 2026

Summary

  • Replace bare asyncSem channel variable with atomic.Pointer[chan struct{}] to eliminate data race between SetMaxAsyncNotifications and NotifyAsync goroutines. Retain sync.Once guard to prevent repeated channel replacement. Reduce default concurrency from 1000 to 20.
  • Use comma-ok type assertion in GetTraceId() to prevent panic when tracerID option holds a non-string value.
  • Convert customError.shouldNotify from bool to atomic.Bool to fix data race between concurrent ShouldNotify/Notified calls in NotifyAsync goroutines.

Test plan

  • go test -race ./... passes (errors + notifier packages)
  • New tests: TestGetTraceId_NonStringValue, TestGetTraceId_StringValue, TestNotifyAsync_BoundedConcurrency, TestSetMaxAsyncNotifications_ConcurrentAccess
  • Verify no downstream breakage in interceptors/core

Summary by CodeRabbit

  • Bug Fixes

    • Improved thread-safety of error notification state and async dispatch; reduced default async concurrency limit to 20.
    • Prevented panics when extracting trace identifiers from context by validating stored values.
  • Tests

    • Added tests for trace-id handling, bounded async concurrency, and concurrent configuration to ensure stable, race-free behavior.
  • Chores

    • Adjusted benchmark run pattern in build tooling.

…ouldNotify

- Replace bare asyncSem channel variable with atomic.Pointer[chan struct{}]
  to eliminate data race between SetMaxAsyncNotifications and NotifyAsync.
  Retain sync.Once guard to prevent repeated channel replacement.
  Reduce default concurrency from 1000 to 20.

- Use comma-ok type assertion in GetTraceId() to prevent panic when
  tracerID option holds a non-string value.

- Convert customError.shouldNotify from bool to atomic.Bool to fix
  data race between concurrent ShouldNotify/Notified calls in
  NotifyAsync goroutines.
Copilot AI review requested due to automatic review settings March 31, 2026 14:15
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 31, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 849d309f-a4c7-4a0c-a389-47f5966a236a

📥 Commits

Reviewing files that changed from the base of the PR and between 31fc081 and eaa9ebf.

📒 Files selected for processing (1)
  • Makefile

📝 Walkthrough

Walkthrough

Switches an internal error notify flag to atomic.Bool, makes the notifier's async semaphore an atomic.Pointer to a channel with a single-init default capacity of 20, hardens GetTraceId against non-string values, and adds notifier tests for trace ID and semaphore behavior.

Changes

Cohort / File(s) Summary
Error atomic flag
errors.go
Replaced customError.shouldNotify bool with atomic.Bool; ShouldNotify() and Notified() now use Load()/Store(). Constructors and wrapper initialize via Store(...), including notifier-derived values.
Notifier concurrency & trace guard
notifier/notifier.go
Replaced global buffered channel with atomic.Pointer[chan struct{}]; added init() setting default capacity to 20; SetMaxAsyncNotifications creates/stores semaphore once (sync.Once); NotifyAsync reads semaphore via *asyncSem.Load(); GetTraceId checks stored value is a string before returning.
Notifier tests
notifier/notifier_test.go
Added tests: TestGetTraceId_NonStringValue, TestGetTraceId_StringValue, TestNotifyAsync_BoundedConcurrency (pre-filled semaphore drop behavior), and TestSetMaxAsyncNotifications_ConcurrentAccess (concurrent safety/race check).
Build target tweak
Makefile
Adjusted bench target regex from ^$ to ^$$ for -run filtering; no other behavioral changes.

Sequence Diagram(s)

sequenceDiagram
  participant Caller
  participant Notifier
  participant Sem as "Semaphore (chan struct{})"
  Caller->>Notifier: NotifyAsync(ctx, ...)
  Notifier->>Notifier: semPtr := asyncSem.Load()
  Notifier->>Sem: sem := *semPtr
  Notifier->>Sem: try send token (non-blocking)
  alt token acquired
    Notifier->>Notifier: spawn goroutine to perform notification
    Notifier->>Sem: receive token (release) when done
  else no token
    Notifier-->>Caller: return immediately (dropped)
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 I hopped through atoms, tidy and bright,
Channels queued twenty, all snug for the night.
Traces checked softly, no panic to see,
Notifications bounded — hop, safe and free!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 44.44% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the three main fixes: race-safe asyncSem, safe type assertion, and atomic shouldNotify.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/notifier-races-and-type-assertion

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
notifier/notifier_test.go (2)

42-61: Test is valid for race detection despite sync.Once semantics.

While 99 of the 100 SetMaxAsyncNotifications calls are no-ops due to sync.Once, this test remains effective for race detection: the first call performs an atomic store while NotifyAsync goroutines perform atomic loads concurrently. Running with -race will validate the atomic operations work correctly.

Consider adding a brief comment noting that only the first call mutates state, to clarify why the loop is still useful for race verification.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@notifier/notifier_test.go` around lines 42 - 61, Add a short clarifying
comment inside TestSetMaxAsyncNotifications_ConcurrentAccess explaining that
although SetMaxAsyncNotifications uses sync.Once so 99 of the 100 calls are
no-ops, the first call mutates asyncSem and subsequent NotifyAsync calls
concurrently perform atomic loads, so the loop is still needed to exercise the
concurrent store/load behavior; reference the functions
SetMaxAsyncNotifications, NotifyAsync and the asyncSem variable in that comment
for clarity.

33-40: Test is a smoke test, not a concurrency verification.

This test calls NotifyAsync sequentially and only verifies no panic occurs. It doesn't actually validate bounded concurrency (e.g., that at most N goroutines run simultaneously). Consider renaming to TestNotifyAsync_NoPanic or adding actual concurrency assertions if bounded behavior verification is desired.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@notifier/notifier_test.go` around lines 33 - 40, The test
TestNotifyAsync_BoundedConcurrency is only a sequential smoke test calling
NotifyAsync and doesn't assert bounded concurrency; either rename it to
TestNotifyAsync_NoPanic to reflect its purpose, or change it to actually verify
concurrency by invoking NotifyAsync concurrently (e.g., spawn many goroutines
that call NotifyAsync) and track the active concurrent workers with a shared
counter protected by a mutex or an instrumentation hook into the notifier (or by
injecting a test semaphore channel) to assert the active count never exceeds the
expected limit; update references to the test name or add the concurrency
assertions around NotifyAsync accordingly.
notifier/notifier.go (1)

59-71: Consider clarifying the single-call constraint.

The sync.Once wrapper ensures race safety but also means only the first call to SetMaxAsyncNotifications after package init takes effect. If init() has already set the default (20), then the first user call to this function will successfully change the limit to n, but any subsequent calls are silently ignored.

This could surprise callers who attempt runtime reconfiguration. The docstring documents this, but consider whether a runtime-adjustable limit is desirable or if blocking subsequent calls with an explicit log/warning would improve debuggability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@notifier/notifier.go` around lines 59 - 71, SetMaxAsyncNotifications
currently uses asyncSemOnce.Do and silently ignores any calls after the first
(even if init already set the default), which can surprise callers; update
SetMaxAsyncNotifications to detect when asyncSem is already initialized (read
asyncSem via asyncSem.Load()) and, if a subsequent call is made with a different
n, emit a clear warning/debug log mentioning SetMaxAsyncNotifications and the
current vs requested value (using your package logger or the standard log),
while preserving the existing safety of asyncSemOnce and not changing
initialization semantics; reference asyncSemOnce, asyncSem, and
SetMaxAsyncNotifications so reviewers can find and add the check-and-log before
or around the existing asyncSemOnce.Do call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@notifier/notifier_test.go`:
- Around line 42-61: Add a short clarifying comment inside
TestSetMaxAsyncNotifications_ConcurrentAccess explaining that although
SetMaxAsyncNotifications uses sync.Once so 99 of the 100 calls are no-ops, the
first call mutates asyncSem and subsequent NotifyAsync calls concurrently
perform atomic loads, so the loop is still needed to exercise the concurrent
store/load behavior; reference the functions SetMaxAsyncNotifications,
NotifyAsync and the asyncSem variable in that comment for clarity.
- Around line 33-40: The test TestNotifyAsync_BoundedConcurrency is only a
sequential smoke test calling NotifyAsync and doesn't assert bounded
concurrency; either rename it to TestNotifyAsync_NoPanic to reflect its purpose,
or change it to actually verify concurrency by invoking NotifyAsync concurrently
(e.g., spawn many goroutines that call NotifyAsync) and track the active
concurrent workers with a shared counter protected by a mutex or an
instrumentation hook into the notifier (or by injecting a test semaphore
channel) to assert the active count never exceeds the expected limit; update
references to the test name or add the concurrency assertions around NotifyAsync
accordingly.

In `@notifier/notifier.go`:
- Around line 59-71: SetMaxAsyncNotifications currently uses asyncSemOnce.Do and
silently ignores any calls after the first (even if init already set the
default), which can surprise callers; update SetMaxAsyncNotifications to detect
when asyncSem is already initialized (read asyncSem via asyncSem.Load()) and, if
a subsequent call is made with a different n, emit a clear warning/debug log
mentioning SetMaxAsyncNotifications and the current vs requested value (using
your package logger or the standard log), while preserving the existing safety
of asyncSemOnce and not changing initialization semantics; reference
asyncSemOnce, asyncSem, and SetMaxAsyncNotifications so reviewers can find and
add the check-and-log before or around the existing asyncSemOnce.Do call.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 03ef1c7a-7770-4f14-bbc8-2237e7938cd5

📥 Commits

Reviewing files that changed from the base of the PR and between 55be20d and 70a375d.

📒 Files selected for processing (3)
  • errors.go
  • notifier/notifier.go
  • notifier/notifier_test.go

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses several concurrency-safety issues and a panic-risk in trace ID retrieval, primarily in the notifier and error-wrapping flows.

Changes:

  • Make NotifyAsync concurrency limiting race-safe by storing the semaphore channel in an atomic.Pointer, and lower the default async notification concurrency limit to 20.
  • Prevent GetTraceId() from panicking by switching to a comma-ok type assertion when reading the tracerId option.
  • Fix a data race on customError.shouldNotify by converting it from bool to atomic.Bool, and add regression tests.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
notifier/notifier.go Introduces atomic storage for the async semaphore channel, updates default concurrency, and hardens GetTraceId type handling.
notifier/notifier_test.go Adds regression tests for trace ID type assertion and concurrent access to async notification configuration.
errors.go Replaces customError.shouldNotify with atomic.Bool and updates construction/accessors to be race-safe.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread notifier/notifier.go Outdated
Comment thread notifier/notifier_test.go
Comment thread notifier/notifier_test.go
Comment thread notifier/notifier_test.go Outdated
- Update SetMaxAsyncNotifications doc to reflect race-safe behavior
- Rewrite TestNotifyAsync_BoundedConcurrency to actually assert drops
  when semaphore is full
- Remove outdated comment about pre-existing shouldNotify race
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread notifier/notifier_test.go Outdated
Comment on lines +57 to +73
// Now the semaphore is full. Additional calls should be dropped.
var dropped atomic.Int32
originalDebug := NotifyAsync(errors.New("should-drop"))
// NotifyAsync returns the error regardless of drop/send, so we can't
// check the return value. Instead, verify the semaphore is still full
// by checking we can't send another token.
select {
case ch <- struct{}{}:
// We could send — means the slot was free, which means the previous
// call was dropped (it didn't acquire). That's the expected path.
<-ch // put it back
dropped.Add(1)
default:
// Slot is full — the previous NotifyAsync got in, which shouldn't
// happen since we already filled it. This is also fine if timing
// allowed the blocker to finish.
}
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestNotifyAsync_BoundedConcurrency is currently non-asserting: the select treats both the "could send" and "default" branches as acceptable, so the test will pass regardless of whether NotifyAsync actually drops when the semaphore is full. Please make the test deterministic by forcing the semaphore to be full (e.g., pre-fill the channel) and asserting the drop path (e.g., channel remains full / no token acquired).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 31fc081 — rewrote the test to pre-fill the semaphore channel directly (no timing). Asserts len(ch) == cap(ch) after NotifyAsync to verify the drop path was taken.

Comment thread notifier/notifier_test.go Outdated
Comment on lines +50 to +56
// Fill the single slot with a blocking goroutine.
block := make(chan struct{})
blockErr := errors.New("blocker")
NotifyAsync(blockErr) // takes the one slot
// Give the goroutine a moment to acquire the semaphore token.
time.Sleep(10 * time.Millisecond)

Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "blocking goroutine" setup doesn’t actually block anything: block is never observed by the async notification goroutine, so the semaphore token is likely released immediately. Relying on time.Sleep here makes the test flaky; prefer a synchronization mechanism that guarantees the token is held (or just pre-fill the semaphore channel to simulate a full pool).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 31fc081 — removed the blocking goroutine approach entirely. Now pre-fills the channel directly, making the test deterministic with no time.Sleep.

Comment thread notifier/notifier_test.go Outdated
Comment on lines +40 to +44
// Drain any tokens left by test goroutines.
select {
case <-ch:
default:
}
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup drains a token from ch without ensuring all NotifyAsync goroutines have finished. If a goroutine is still running, draining its token can cause its deferred <-s release to block forever. Consider avoiding the drain entirely, or synchronizing on goroutine completion before manipulating the semaphore channel.

Suggested change
// Drain any tokens left by test goroutines.
select {
case <-ch:
default:
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 31fc081 — cleanup now drains with a for-len loop, and since we pre-fill rather than spawning goroutines, there's no risk of blocking a deferred release.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread notifier/notifier_test.go Outdated
Comment on lines +35 to +78
func TestNotifyAsync_BoundedConcurrency(t *testing.T) {
// Set a tiny semaphore so we can observe drops.
ch := make(chan struct{}, 1)
asyncSem.Store(&ch)
t.Cleanup(func() {
// Drain any tokens left by test goroutines.
select {
case <-ch:
default:
}
// Restore default.
def := make(chan struct{}, 20)
asyncSem.Store(&def)
})

// Fill the single slot with a blocking goroutine.
block := make(chan struct{})
blockErr := errors.New("blocker")
NotifyAsync(blockErr) // takes the one slot
// Give the goroutine a moment to acquire the semaphore token.
time.Sleep(10 * time.Millisecond)

// Now the semaphore is full. Additional calls should be dropped.
var dropped atomic.Int32
originalDebug := NotifyAsync(errors.New("should-drop"))
// NotifyAsync returns the error regardless of drop/send, so we can't
// check the return value. Instead, verify the semaphore is still full
// by checking we can't send another token.
select {
case ch <- struct{}{}:
// We could send — means the slot was free, which means the previous
// call was dropped (it didn't acquire). That's the expected path.
<-ch // put it back
dropped.Add(1)
default:
// Slot is full — the previous NotifyAsync got in, which shouldn't
// happen since we already filled it. This is also fine if timing
// allowed the blocker to finish.
}
_ = originalDebug

// Unblock the first goroutine so it releases the token.
close(block)
// Wait a bit for cleanup.
Copy link

Copilot AI Apr 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestNotifyAsync_BoundedConcurrency doesn’t actually ensure the semaphore slot is held: block is never used by the notify path, and the test accepts both outcomes in the select without asserting that any calls were dropped. As written, it can pass even if NotifyAsync ignores the semaphore. Consider using a custom error implementing errors.NotifyExt where Notified(true) blocks to hold the token, then assert subsequent NotifyAsync calls hit the drop path (and wait for goroutines to exit to avoid leaks).

Suggested change
func TestNotifyAsync_BoundedConcurrency(t *testing.T) {
// Set a tiny semaphore so we can observe drops.
ch := make(chan struct{}, 1)
asyncSem.Store(&ch)
t.Cleanup(func() {
// Drain any tokens left by test goroutines.
select {
case <-ch:
default:
}
// Restore default.
def := make(chan struct{}, 20)
asyncSem.Store(&def)
})
// Fill the single slot with a blocking goroutine.
block := make(chan struct{})
blockErr := errors.New("blocker")
NotifyAsync(blockErr) // takes the one slot
// Give the goroutine a moment to acquire the semaphore token.
time.Sleep(10 * time.Millisecond)
// Now the semaphore is full. Additional calls should be dropped.
var dropped atomic.Int32
originalDebug := NotifyAsync(errors.New("should-drop"))
// NotifyAsync returns the error regardless of drop/send, so we can't
// check the return value. Instead, verify the semaphore is still full
// by checking we can't send another token.
select {
case ch <- struct{}{}:
// We could send — means the slot was free, which means the previous
// call was dropped (it didn't acquire). That's the expected path.
<-ch // put it back
dropped.Add(1)
default:
// Slot is full — the previous NotifyAsync got in, which shouldn't
// happen since we already filled it. This is also fine if timing
// allowed the blocker to finish.
}
_ = originalDebug
// Unblock the first goroutine so it releases the token.
close(block)
// Wait a bit for cleanup.
// testNotifyErr implements errors.NotifyExt and allows the test to
// distinguish between accepted and dropped async notifications.
type testNotifyErr struct {
msg string
block <-chan struct{}
dropped *atomic.Int32
}
func (e *testNotifyErr) Error() string {
return e.msg
}
// Notified is called by NotifyAsync. When async is true, the notification
// was accepted and we block to hold the semaphore token. When async is
// false, the notification was dropped and we record that fact.
func (e *testNotifyErr) Notified(async bool) {
if async {
// Hold the token until the test closes e.block.
<-e.block
return
}
if e.dropped != nil {
e.dropped.Add(1)
}
}
func TestNotifyAsync_BoundedConcurrency(t *testing.T) {
// Set a tiny semaphore so we can observe drops.
ch := make(chan struct{}, 1)
asyncSem.Store(&ch)
t.Cleanup(func() {
// Drain any tokens left by test goroutines.
for {
select {
case <-ch:
default:
goto drained
}
}
drained:
// Restore default.
def := make(chan struct{}, 20)
asyncSem.Store(&def)
})
// Fill the single slot with a blocking notification.
block := make(chan struct{})
var dropped atomic.Int32
holder := &testNotifyErr{
msg: "holder",
block: block,
dropped: &dropped,
}
NotifyAsync(holder) // should take the one slot and block in Notified(true)
// Give the goroutine a moment to acquire the semaphore token.
time.Sleep(10 * time.Millisecond)
// Now the semaphore is full. Additional calls should be dropped and
// reported via Notified(false).
for i := 0; i < 5; i++ {
NotifyAsync(&testNotifyErr{
msg: "should-drop",
block: block,
dropped: &dropped,
})
}
// Allow callbacks to run.
time.Sleep(50 * time.Millisecond)
if got := dropped.Load(); got == 0 {
t.Fatalf("expected some async notifications to be dropped while semaphore slot held, got %d drops", got)
}
// Unblock the goroutines so they can release the token and exit.
close(block)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 31fc081 — test now pre-fills the 1-slot semaphore and asserts len(ch) == cap(ch) after NotifyAsync.

Pre-fill the semaphore channel directly instead of relying on timing.
Assert that len(ch) == cap(ch) after NotifyAsync to verify the drop
path was taken. No time.Sleep, no unused channels, no flaky cleanup.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Use ^$$ so Make passes literal ^$ to the shell, ensuring
go test -run matches no test names (benchmarks only).
@ankurs ankurs merged commit c46c89c into main Apr 2, 2026
7 checks passed
@ankurs ankurs deleted the fix/notifier-races-and-type-assertion branch April 2, 2026 15:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants