Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,31 +291,39 @@ func TestSentryReporterDisabled(t *testing.T) {
r.CaptureException(nil, "test", nil)
r.CaptureDivergence(Divergence{})
r.Flush(0)
// ShouldReport always returns false when disabled (no side-effects on lastReport).
assert.False(t, r.ShouldReport("any-fingerprint"))
assert.Empty(t, r.lastReport, "disabled reporter must not modify lastReport")
}

func TestShouldReportCooldown(t *testing.T) {
now := time.Now()
r := &SentryReporter{
enabled: true,
lastReport: make(map[string]time.Time),
cooldown: 50 * time.Millisecond,
nowFunc: func() time.Time { return now },
}

assert.True(t, r.ShouldReport("fp1"))
assert.False(t, r.ShouldReport("fp1")) // within cooldown

time.Sleep(60 * time.Millisecond)
now = now.Add(60 * time.Millisecond) // advance past cooldown
assert.True(t, r.ShouldReport("fp1")) // cooldown elapsed
}

func TestShouldReportEvictsExpired(t *testing.T) {
now := time.Now()
r := &SentryReporter{
enabled: true,
lastReport: make(map[string]time.Time),
cooldown: 1 * time.Millisecond,
nowFunc: func() time.Time { return now },
}
// Fill to maxReportEntries
// Fill to maxReportEntries — all entries already expired
for i := range maxReportEntries {
r.lastReport[string(rune(i))] = time.Now().Add(-time.Hour)
r.lastReport[string(rune(i))] = now.Add(-time.Hour)
}
time.Sleep(2 * time.Millisecond)
assert.True(t, r.ShouldReport("new-fp"))
assert.Less(t, len(r.lastReport), maxReportEntries)
}
Expand Down
8 changes: 7 additions & 1 deletion proxy/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type SentryReporter struct {
hub *sentry.Hub
logger *slog.Logger
cooldown time.Duration
nowFunc func() time.Time // injectable clock; defaults to time.Now

mu sync.Mutex
lastReport map[string]time.Time // fingerprint → last report time
Expand All @@ -39,6 +40,7 @@ func NewSentryReporter(dsn string, environment string, sampleRate float64, logge
r := &SentryReporter{
logger: logger,
cooldown: defaultReportCooldown,
nowFunc: time.Now,
lastReport: make(map[string]time.Time),
}
if dsn == "" {
Expand Down Expand Up @@ -105,11 +107,15 @@ func (r *SentryReporter) CaptureDivergence(div Divergence) {
// ShouldReport checks if this fingerprint has been reported recently (cooldown-based).
// Evicts expired entries when the map reaches maxReportEntries to bound memory usage.
// Returns false (drops the report) if the map is still at capacity after eviction.
// Returns false immediately if Sentry reporting is disabled.
func (r *SentryReporter) ShouldReport(fingerprint string) bool {
if !r.enabled {
return false
}
r.mu.Lock()
defer r.mu.Unlock()

now := time.Now()
now := r.nowFunc()

// Evict expired entries if map grows too large
if len(r.lastReport) >= maxReportEntries {
Expand Down
130 changes: 40 additions & 90 deletions proxy/shadow_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@ type secondaryPending struct {
payload string
}

// unmatchedSecondaries buffers unmatched secondary messages per shadowPubSub
// instance. This allows us to avoid reporting DivExtraData immediately when a
// secondary arrives before the corresponding primary (e.g. due to network
// jitter) and instead only report once the comparison window has elapsed.
// Each key maps to a slice to handle duplicate secondary messages correctly.
var unmatchedSecondaries = struct {
sync.Mutex
data map[*shadowPubSub]map[msgKey][]secondaryPending
}{
data: make(map[*shadowPubSub]map[msgKey][]secondaryPending),
}

// pendingMsg records a message awaiting its counterpart from the other source.
type pendingMsg struct {
pattern string
Expand Down Expand Up @@ -65,24 +53,29 @@ type shadowPubSub struct {
window time.Duration
nowFunc func() time.Time // injectable clock; defaults to time.Now

mu sync.Mutex
pending map[msgKey][]pendingMsg // primary messages awaiting secondary match
closed bool
started bool
startOnce sync.Once
done chan struct{}
mu sync.Mutex
pending map[msgKey][]pendingMsg // primary messages awaiting secondary match
// unmatchedSecondaries buffers secondary messages that arrived before a
// corresponding primary. Protected by mu. Each key maps to a slice to
// handle duplicate messages correctly.
unmatchedSecondaries map[msgKey][]secondaryPending
closed bool
started bool
startOnce sync.Once
done chan struct{}
}

func newShadowPubSub(backend PubSubBackend, metrics *ProxyMetrics, sentry *SentryReporter, logger *slog.Logger, window time.Duration) *shadowPubSub {
return &shadowPubSub{
secondary: backend.NewPubSub(context.Background()),
metrics: metrics,
sentry: sentry,
logger: logger,
window: window,
nowFunc: time.Now,
pending: make(map[msgKey][]pendingMsg),
done: make(chan struct{}),
secondary: backend.NewPubSub(context.Background()),
metrics: metrics,
sentry: sentry,
logger: logger,
window: window,
nowFunc: time.Now,
pending: make(map[msgKey][]pendingMsg),
unmatchedSecondaries: make(map[msgKey][]secondaryPending),
done: make(chan struct{}),
}
}

Expand Down Expand Up @@ -166,14 +159,7 @@ func (sp *shadowPubSub) RecordPrimary(msg *redis.Message) {
// exists for the given key and is still within the comparison window. If one is
// found it is consumed and true is returned. Caller must hold sp.mu.
func (sp *shadowPubSub) reconcileWithBufferedSecondary(key msgKey, now time.Time) bool {
unmatchedSecondaries.Lock()
defer unmatchedSecondaries.Unlock()

secBySP, ok := unmatchedSecondaries.data[sp]
if !ok {
return false
}
secs, ok := secBySP[key]
secs, ok := sp.unmatchedSecondaries[key]
if !ok || len(secs) == 0 {
return false
}
Expand All @@ -185,12 +171,9 @@ func (sp *shadowPubSub) reconcileWithBufferedSecondary(key msgKey, now time.Time
// Consume this secondary — remove it from the slice.
secs = append(secs[:i], secs[i+1:]...)
if len(secs) == 0 {
delete(secBySP, key)
if len(secBySP) == 0 {
delete(unmatchedSecondaries.data, sp)
}
delete(sp.unmatchedSecondaries, key)
} else {
secBySP[key] = secs
sp.unmatchedSecondaries[key] = secs
}
return true
}
Expand All @@ -210,10 +193,6 @@ func (sp *shadowPubSub) Close() {
if started {
<-sp.done
}
// Clean up buffered unmatched secondaries to prevent memory leak.
unmatchedSecondaries.Lock()
delete(unmatchedSecondaries.data, sp)
unmatchedSecondaries.Unlock()
}

// compareLoop reads from the secondary channel and matches messages.
Expand Down Expand Up @@ -244,6 +223,7 @@ func (sp *shadowPubSub) compareLoop(ch <-chan *redis.Message) {
// Collects divergence info under lock and reports after releasing it.
func (sp *shadowPubSub) matchSecondary(msg *redis.Message) {
sp.mu.Lock()
defer sp.mu.Unlock()

key := msgKeyFromMessage(msg)
now := sp.nowFunc()
Expand All @@ -256,28 +236,16 @@ func (sp *shadowPubSub) matchSecondary(msg *redis.Message) {
} else {
sp.pending[key] = entries[1:]
}
sp.mu.Unlock()
return
}
// The oldest pending primary is already past the window. Let the periodic
// sweep report it as DivDataMismatch; fall through to buffer this secondary
// so it can be reported as DivExtraData if it also remains unmatched.
}

sp.mu.Unlock()

// No matching primary message within the window. Buffer the secondary and only
// report DivExtraData if it remains unmatched after the comparison window.
unmatchedSecondaries.Lock()
defer unmatchedSecondaries.Unlock()

perInstance, ok := unmatchedSecondaries.data[sp]
if !ok {
perInstance = make(map[msgKey][]secondaryPending)
unmatchedSecondaries.data[sp] = perInstance
}

perInstance[key] = append(perInstance[key], secondaryPending{
sp.unmatchedSecondaries[key] = append(sp.unmatchedSecondaries[key], secondaryPending{
timestamp: now,
channel: msg.Channel,
payload: msg.Payload,
Expand All @@ -291,22 +259,11 @@ func (sp *shadowPubSub) sweepExpired() {
sp.mu.Lock()
now := sp.nowFunc()

unmatchedSecondaries.Lock()
// Fetch without creating to avoid unnecessary allocation on every tick for idle instances.
perInstance := unmatchedSecondaries.data[sp]

// Expire old buffered secondaries first so they cannot be consumed as a
// "match" during reconciliation (prevents bypassing the comparison window).
if perInstance != nil {
divergences = sweepExpiredSecondaries(now, sp.window, perInstance, divergences)
}
divergences = sp.reconcilePrimaries(now, perInstance, divergences)

if len(perInstance) == 0 {
delete(unmatchedSecondaries.data, sp)
}
divergences = sweepExpiredSecondaries(now, sp.window, sp.unmatchedSecondaries, divergences)
divergences = sp.reconcilePrimaries(now, sp.unmatchedSecondaries, divergences)

unmatchedSecondaries.Unlock()
sp.mu.Unlock()

for _, d := range divergences {
Expand All @@ -316,7 +273,7 @@ func (sp *shadowPubSub) sweepExpired() {

// reconcilePrimaries matches pending primaries against buffered secondaries,
// reporting expired unmatched primaries as divergences.
// Caller must hold sp.mu and unmatchedSecondaries.Lock().
// Caller must hold sp.mu.
func (sp *shadowPubSub) reconcilePrimaries(now time.Time, secBuf map[msgKey][]secondaryPending, out []divergenceEvent) []divergenceEvent {
for key, entries := range sp.pending {
var remaining []pendingMsg
Expand Down Expand Up @@ -385,26 +342,20 @@ func (sp *shadowPubSub) sweepAll() {
}
delete(sp.pending, key)
}
sp.mu.Unlock()

// Also drain any buffered unmatched secondaries for this instance.
unmatchedSecondaries.Lock()
if perInstance, ok := unmatchedSecondaries.data[sp]; ok {
for key, secs := range perInstance {
for _, sec := range secs {
divergences = append(divergences, divergenceEvent{
channel: sec.channel,
payload: sec.payload,
pattern: key.Pattern,
kind: DivExtraData,
isPattern: key.Pattern != "",
})
}
delete(perInstance, key)
for key, secs := range sp.unmatchedSecondaries {
for _, sec := range secs {
divergences = append(divergences, divergenceEvent{
channel: sec.channel,
payload: sec.payload,
pattern: key.Pattern,
kind: DivExtraData,
isPattern: key.Pattern != "",
})
}
delete(unmatchedSecondaries.data, sp)
delete(sp.unmatchedSecondaries, key)
}
unmatchedSecondaries.Unlock()
sp.mu.Unlock()

for _, d := range divergences {
sp.reportDivergence(d)
Expand All @@ -429,11 +380,10 @@ func (sp *shadowPubSub) reportDivergence(d divergenceEvent) {
}

var primary, secondary any
switch d.kind { //nolint:exhaustive // only two kinds apply to pub/sub shadow
case DivExtraData:
if d.kind == DivExtraData {
primary = nil
secondary = d.payload
default:
} else {
primary = d.payload
secondary = nil
}
Expand Down
Loading
Loading