Skip to content

Commit 0d82adc

Browse files
committed
inflight: add data structure to detect long-running operations
This will be used to report long-lived iterators.
1 parent 14834f0 commit 0d82adc

File tree

5 files changed

+407
-3
lines changed

5 files changed

+407
-3
lines changed

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ require (
55
github.com/HdrHistogram/hdrhistogram-go v1.1.2
66
github.com/RaduBerinde/axisds v0.0.0-20250419182453-5135a0650657
77
github.com/cespare/xxhash/v2 v2.2.0
8-
github.com/cockroachdb/crlib v0.0.0-20250916151006-1094cb39adac
8+
github.com/cockroachdb/crlib v0.0.0-20251001180057-2a49e1873587
99
github.com/cockroachdb/datadriven v1.0.3-0.20250911232732-d959cf14706c
1010
github.com/cockroachdb/errors v1.11.3
1111
github.com/cockroachdb/metamorphic v0.0.0-20231108215700-4ba948b56895
@@ -24,6 +24,7 @@ require (
2424
github.com/pmezard/go-difflib v1.0.0
2525
github.com/prometheus/client_golang v1.16.0
2626
github.com/prometheus/client_model v0.3.0
27+
github.com/puzpuzpuz/xsync/v3 v3.5.1
2728
github.com/spf13/cobra v1.0.0
2829
github.com/stretchr/testify v1.9.0
2930
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
3434
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
3535
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
3636
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
37-
github.com/cockroachdb/crlib v0.0.0-20250916151006-1094cb39adac h1:L+7nhSrQ9WzW91AJHP0LmjwMHEed2nl5b7PvN9eIw58=
38-
github.com/cockroachdb/crlib v0.0.0-20250916151006-1094cb39adac/go.mod h1:Gq51ZeKaFCXk6QwuGM0w1dnaOqc/F5zKT2zA9D6Xeac=
37+
github.com/cockroachdb/crlib v0.0.0-20251001180057-2a49e1873587 h1:qjG2TrBrPbGRVYp5obcAi8OSsuFJ8s1AElDImHLV9tY=
38+
github.com/cockroachdb/crlib v0.0.0-20251001180057-2a49e1873587/go.mod h1:ae57yNis2F1FThSNdPdoXfiPOVi8G1TLreCBQYPOdqo=
3939
github.com/cockroachdb/datadriven v1.0.3-0.20250911232732-d959cf14706c h1:a0m7gmtv2mzJQ4wP9BkxCmJAnjZ7fsvCi2IORGD1als=
4040
github.com/cockroachdb/datadriven v1.0.3-0.20250911232732-d959cf14706c/go.mod h1:jsaKMvD3RBCATk1/jbUZM8C9idWBJME9+VRZ5+Liq1g=
4141
github.com/cockroachdb/errors v1.11.3 h1:5bA+k2Y6r+oz/6Z/RFlNeVCesGARKuC6YymtcDrbC/I=
@@ -190,6 +190,8 @@ github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7z
190190
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
191191
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
192192
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
193+
github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++5Fg=
194+
github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
193195
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
194196
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
195197
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=

internal/inflight/in_flight.go

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
// Package inflight provides a lightweight, sharded tracker for reporting
6+
// long-running operations.
7+
package inflight
8+
9+
import (
10+
"cmp"
11+
"fmt"
12+
"iter"
13+
"maps"
14+
"runtime"
15+
"slices"
16+
"strings"
17+
"sync"
18+
"sync/atomic"
19+
"time"
20+
21+
"github.com/cockroachdb/crlib/crsync"
22+
"github.com/cockroachdb/crlib/crtime"
23+
"github.com/puzpuzpuz/xsync/v3"
24+
)
25+
26+
// Tracker is a lightweight, sharded tracker for detecting long-running
27+
// operations. Call Start() at the beginning of an operation and Stop() when it
28+
// completes; if an operation exceeds a configured age threshold, a formatted
29+
// report that includes the caller stack of Start() can be produced via Report,
30+
// or periodically via NewPollingTracker.
31+
//
32+
// The tracker is concurrency-safe and designed to have very low overhead in the
33+
// common path (Start/Stop). Observability work (capturing up to a small fixed
34+
// number of program counters and formatting stack frames) is deferred to when a
35+
// report is requested.
36+
//
37+
// Example:
38+
//
39+
// pollInterval := time.Minute
40+
// maxAge := 10 * time.Minute
41+
// t := NewPollingTracker(pollInterval, maxAge, func(r string) {
42+
// log.Infof(ctx, "slow operations:\n%s", r)
43+
// })
44+
// defer t.Close()
45+
//
46+
// h := t.Start()
47+
// // ... do work ...
48+
// t.Stop(h)
49+
//
50+
// If any operations have been running for more than 10 minutes, the report
51+
// function is called every 1 minute with a human‑readable dump that deduplicates
52+
// by Start() stack trace and shows the oldest occurrence per trace.
53+
type Tracker struct {
54+
// The tracker has a fixed number of shards to reduce contention on
55+
// Start/Stop. The first byte of Handle identifies the shard that was used
56+
// during Start().
57+
58+
// Both maps and handleCounters have one entry per shard. We separate them
59+
// because the map pointers don't change.
60+
maps []*xsync.MapOf[Handle, entry]
61+
handleCounters []handleCounter
62+
63+
mu struct {
64+
sync.Mutex
65+
// timer is non-nil if this tracker was created with NewPollingTracker() and
66+
// hasn't been closed yet.
67+
timer *time.Timer
68+
}
69+
}
70+
71+
// Handle uniquely identifies a started operation. Treat it as an opaque token.
72+
// A zero Handle is not valid.
73+
type Handle uint64
74+
75+
// ReportFn is called (typically periodically) with a formatted report that
76+
// describes entries older than some threshold. An empty string report is never
77+
// delivered by the polling tracker.
78+
//
79+
// See Tracker.Report() for details on the report format.
80+
type ReportFn func(report string)
81+
82+
// NewTracker creates a new tracker that can be used to generate reports on
83+
// long-running operations. It does not start any background goroutines; callers
84+
// can invoke Report() on demand.
85+
func NewTracker() *Tracker {
86+
return newTracker(crsync.NumShards())
87+
}
88+
89+
// NewPollingTracker creates a new tracker that will periodically generate
90+
// reports on long-running operations.
91+
//
92+
// Specifically, every pollInterval, reportFn will be called if there are
93+
// operations that have been running for more than maxAge.
94+
//
95+
// The tracker must be closed with Close() when no longer needed.
96+
func NewPollingTracker(
97+
pollInterval time.Duration, maxAge time.Duration, reportFn ReportFn,
98+
) *Tracker {
99+
t := newTracker(crsync.NumShards())
100+
101+
t.mu.Lock()
102+
defer t.mu.Unlock()
103+
t.mu.timer = time.AfterFunc(pollInterval, func() {
104+
t.mu.Lock()
105+
defer t.mu.Unlock()
106+
if t.mu.timer == nil {
107+
// Close was called.
108+
return
109+
}
110+
if report := t.Report(maxAge); report != "" {
111+
reportFn(report)
112+
}
113+
t.mu.timer.Reset(pollInterval)
114+
})
115+
return t
116+
}
117+
118+
// Close stops background polling (if enabled).
119+
//
120+
// Note that Start, Stop, and Report can still be used during/after Close.
121+
func (t *Tracker) Close() {
122+
t.mu.Lock()
123+
defer t.mu.Unlock()
124+
if t.mu.timer != nil {
125+
t.mu.timer.Stop()
126+
// If the timer function is waiting for the mutex, it will notice that timer
127+
// is nil and exit.
128+
t.mu.timer = nil
129+
}
130+
}
131+
132+
// Start records the start of an operation and returns a handle that should be
133+
// used with Stop.
134+
func (t *Tracker) Start() Handle {
135+
// We record a monotonic start time and capture program counters from the
136+
// caller’s stack. We generate a handle that includes the shard index in its
137+
// high byte, making Stop an O(1) operation on the right shard without a
138+
// lookup.
139+
var e entry
140+
e.startTime = crtime.NowMono()
141+
runtime.Callers(2, e.stack[:])
142+
shardIdx := crsync.CPUBiasedInt() % len(t.maps)
143+
h := Handle(t.handleCounters[shardIdx].Add(1))
144+
t.maps[shardIdx].Store(h, e)
145+
return h
146+
}
147+
148+
// Stop records the end of an operation. Does nothing if the operation was
149+
// already stopped.
150+
func (t *Tracker) Stop(h Handle) {
151+
// The high byte of h is the shard index; see newTracker.
152+
t.maps[h>>56].Delete(h)
153+
}
154+
155+
// Report returns a multi-line, human-readable summary of all entries that were
156+
// started before the given threshold (i.e., with age > threshold at the call
157+
// time). The result is suitable for logging or debugging. If no entries exceed
158+
// the threshold, Report returns "".
159+
//
160+
// The output is grouped by Start() stack trace: for each unique stack we show
161+
// the number of occurrences and the oldest start time among them. Groups are
162+
// ordered by age (oldest first), and for each group we print the resolved stack
163+
// frames.
164+
func (t *Tracker) Report(threshold time.Duration) string {
165+
type infoForStackTrace struct {
166+
oldestStartTime crtime.Mono
167+
occurrences int
168+
}
169+
now := crtime.NowMono()
170+
cutoff := now - crtime.Mono(threshold)
171+
// We deduplicate stack traces in the report. For each stack, we mention the
172+
// number of long-running operations and the oldest operation time.
173+
var m map[stack]infoForStackTrace
174+
for e := range t.olderThan(cutoff) {
175+
if m == nil {
176+
m = make(map[stack]infoForStackTrace)
177+
}
178+
info, ok := m[e.stack]
179+
if ok {
180+
info.occurrences++
181+
info.oldestStartTime = min(info.oldestStartTime, e.startTime)
182+
} else {
183+
info.occurrences = 1
184+
info.oldestStartTime = e.startTime
185+
}
186+
m[e.stack] = info
187+
}
188+
if len(m) == 0 {
189+
return ""
190+
}
191+
// Sort by oldest start time.
192+
stacks := slices.Collect(maps.Keys(m))
193+
slices.SortFunc(stacks, func(a, b stack) int {
194+
return cmp.Compare(m[a].oldestStartTime, m[b].oldestStartTime)
195+
})
196+
var b strings.Builder
197+
for _, stack := range stacks {
198+
info := m[stack]
199+
if info.occurrences == 1 {
200+
fmt.Fprintf(&b, "started %s ago:\n", now.Sub(info.oldestStartTime))
201+
} else {
202+
fmt.Fprintf(&b, "%d occurrences, oldest started %s ago:\n", info.occurrences, now.Sub(info.oldestStartTime))
203+
}
204+
pcs := stack[:]
205+
for i := range pcs {
206+
if pcs[i] == 0 {
207+
pcs = pcs[:i]
208+
break
209+
}
210+
}
211+
frames := runtime.CallersFrames(pcs)
212+
for {
213+
frame, more := frames.Next()
214+
fmt.Fprintf(&b, " %s\n %s:%d\n", frame.Function, frame.File, frame.Line)
215+
if !more {
216+
break
217+
}
218+
}
219+
b.WriteString("\n")
220+
}
221+
return b.String()
222+
}
223+
224+
func (t *Tracker) olderThan(cutoff crtime.Mono) iter.Seq[entry] {
225+
return func(yield func(entry) bool) {
226+
for i := range t.maps {
227+
for _, e := range t.maps[i].Range {
228+
if e.startTime < cutoff && !yield(e) {
229+
return
230+
}
231+
}
232+
}
233+
}
234+
}
235+
236+
// We use the high byte of Handle as a shard index. This limits us to 256 shards
237+
// (which is plenty).
238+
const maxShards = 256
239+
const mapPresize = 16
240+
241+
// handleCounter is an atomic counter with padding to avoid false sharing.
242+
type handleCounter struct {
243+
atomic.Uint64
244+
_ [7]uint64
245+
}
246+
247+
type entry struct {
248+
startTime crtime.Mono
249+
stack stack
250+
}
251+
252+
// stack contains program counters from Start()'s stack frame. Only the first 7
253+
// frames are recorded to keep Start() overhead low. Unused slots are zero.
254+
//
255+
// We chose 7 so that entry{} fits in a typical cache line. It should be
256+
// sufficient to identify the call path in most cases.
257+
type stack [7]uintptr
258+
259+
func newTracker(numShards int) *Tracker {
260+
numShards = min(numShards, maxShards)
261+
262+
maps := make([]*xsync.MapOf[Handle, entry], numShards)
263+
handleCounters := make([]handleCounter, numShards)
264+
for i := range numShards {
265+
// All handles have the shard index in the high byte; see Start().
266+
handleCounters[i].Store(uint64(i) << 56)
267+
maps[i] = xsync.NewMapOf[Handle, entry](xsync.WithPresize(mapPresize))
268+
}
269+
return &Tracker{
270+
maps: maps,
271+
handleCounters: handleCounters,
272+
}
273+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package inflight
6+
7+
import (
8+
"fmt"
9+
"runtime"
10+
"sync"
11+
"testing"
12+
)
13+
14+
// BenchmarkTracker benchmarks the overhead of Start/Stop under varying parallelism.
15+
//
16+
// Sample results on an Apple M1 Pro (10 core), without and with the cockroach
17+
// Go runtime:
18+
//
19+
// name vanilla-go time/op crdb-go time/op delta
20+
// Tracker/p=1-10 231ns ± 1% 215ns ± 0% -7.00% (p=0.008 n=5+5)
21+
// Tracker/p=5-10 325ns ± 1% 332ns ± 1% +2.18% (p=0.008 n=5+5)
22+
// Tracker/p=10-10 540ns ±15% 527ns ± 4% ~ (p=0.690 n=5+5)
23+
// Tracker/p=20-10 1.05µs ± 8% 1.07µs ± 1% ~ (p=0.135 n=5+5)
24+
func BenchmarkTracker(b *testing.B) {
25+
procs := runtime.GOMAXPROCS(0)
26+
for _, parallelism := range []int{1, procs / 2, procs, 2 * procs} {
27+
b.Run(fmt.Sprintf("p=%d", parallelism), func(b *testing.B) {
28+
const batchSize = 100
29+
// Each element of ch corresponds to a batch of operations to be performed.
30+
// The batch size is intended to amortize the overhead of channel operations.
31+
ch := make(chan int, 1+b.N/batchSize)
32+
33+
tr := NewTracker()
34+
var wg sync.WaitGroup
35+
for range parallelism {
36+
wg.Add(1)
37+
go func() {
38+
defer wg.Done()
39+
40+
for numOps := range ch {
41+
for range numOps {
42+
h := tr.Start()
43+
tr.Stop(h)
44+
}
45+
}
46+
}()
47+
}
48+
49+
numOps := int64(b.N) * int64(parallelism)
50+
for i := int64(0); i < numOps; i += batchSize {
51+
ch <- int(min(batchSize, numOps-i))
52+
}
53+
close(ch)
54+
wg.Wait()
55+
})
56+
}
57+
}

0 commit comments

Comments
 (0)