Skip to content

Commit 96bb758

Browse files
committed
metricsutil: add Window data structure
Window maintains instances of metric at regular intervals and is able to return the metrics from ~10 minutes and ~1hr ago. This is useful to provide statistics (e.g. cache hits) from these timeframes.
1 parent 8b8ee48 commit 96bb758

File tree

2 files changed

+219
-0
lines changed

2 files changed

+219
-0
lines changed

internal/metricsutil/window.go

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package metricsutil
6+
7+
import (
8+
"sync"
9+
"time"
10+
11+
"github.com/cockroachdb/crlib/crtime"
12+
)
13+
14+
// NewWindow creates a new Window instance, which maintains a sliding window of
15+
// metrics collected at regular intervals. It collects metrics every minute.
16+
//
17+
// Sample usage:
18+
//
19+
// w := NewWindow[MyMetricsType](func() MyMetricsType {
20+
// return retrieveCurrentMetrics()
21+
// })
22+
// w.Start()
23+
// defer w.Stop()
24+
// ..
25+
// currentMetrics := retrieveCurrentMetrics()
26+
// prevMetrics, prevTime := w.TenMinutesAgo()
27+
// fmt.Printf("statistics over last %s: %s\n", prevTime.Elapsed(), currentMetrics.Subtract(prevMetrics))
28+
func NewWindow[M any](collectFn CollectFn[M]) *Window[M] {
29+
return &Window[M]{collectFn: collectFn}
30+
}
31+
32+
// CollectFn is a function used to collect the current metrics.
33+
type CollectFn[M any] func() M
34+
35+
// Window maintains a sliding window of metrics collected at regular intervals,
36+
// allowing the retrieval of metrics from approximately 10 minutes and 1 hour
37+
// ago. These metrics can be used against the current metrics to get statistics
38+
// for these recent timeframes.
39+
type Window[M any] struct {
40+
collectFn CollectFn[M]
41+
mu struct {
42+
sync.Mutex
43+
running bool
44+
startTime crtime.Mono
45+
nextA crtime.Mono
46+
ringA ring[M]
47+
nextB crtime.Mono
48+
ringB ring[M]
49+
timer *time.Timer
50+
}
51+
}
52+
53+
// TenMinutesAgo returns the metrics from ~10 minutes ago (normally ±30s) and
54+
// the time when they were collected.
55+
//
56+
// If no metrics are available (less than 10m passed), the zero values are
57+
// returned.
58+
func (w *Window[M]) TenMinutesAgo() (_ M, collectedAt crtime.Mono) {
59+
w.mu.Lock()
60+
defer w.mu.Unlock()
61+
return w.mu.ringA.Oldest()
62+
}
63+
64+
// OneHourAgo returns the metrics from ~1h ago (normally ±3m) and the time when
65+
// they were collected.
66+
//
67+
// If no metrics are available (i.e. less than 1h passed), the zero values are
68+
// returned.
69+
func (w *Window[M]) OneHourAgo() (_ M, collectedAt crtime.Mono) {
70+
w.mu.Lock()
71+
defer w.mu.Unlock()
72+
return w.mu.ringB.Oldest()
73+
}
74+
75+
// Start background collection of metrics.
76+
func (w *Window[M]) Start() {
77+
w.mu.Lock()
78+
defer w.mu.Unlock()
79+
if w.mu.running {
80+
return
81+
}
82+
w.mu.ringA = ring[M]{}
83+
w.mu.ringB = ring[M]{}
84+
w.mu.running = true
85+
// Initialize both rings. We do this in a separate goroutine in case the
86+
// caller is holding a lock that the collect function uses as well.
87+
go func() {
88+
w.mu.Lock()
89+
defer w.mu.Unlock()
90+
if !w.mu.running {
91+
// We were stopped before we got here.
92+
return
93+
}
94+
w.mu.startTime = crtime.NowMono()
95+
w.mu.nextA = w.mu.startTime + crtime.Mono(tickA)
96+
w.mu.nextB = w.mu.startTime + crtime.Mono(tickB)
97+
m := w.collectFn()
98+
w.mu.ringA.Add(m, w.mu.startTime)
99+
w.mu.ringB.Add(m, w.mu.startTime)
100+
// We prefer to use a timer instead of a ticker and goroutine to avoid yet
101+
// another goroutine showing up in goroutine dumps.
102+
w.mu.timer = time.AfterFunc(tickA, w.tick)
103+
}()
104+
}
105+
106+
// Stop background collection of metrics and wait for any in-progress collection
107+
// to finish.
108+
func (w *Window[M]) Stop() {
109+
w.mu.Lock()
110+
defer w.mu.Unlock()
111+
w.mu.running = false
112+
if w.mu.timer != nil {
113+
// If Stop fails, the timer function didn't reach the critical section yet;
114+
// when it does it will notice running=false and exit.
115+
w.mu.timer.Stop()
116+
w.mu.timer = nil
117+
}
118+
}
119+
120+
func (w *Window[M]) tick() {
121+
w.mu.Lock()
122+
defer w.mu.Unlock()
123+
if !w.mu.running {
124+
return
125+
}
126+
m := w.collectFn()
127+
now := crtime.NowMono()
128+
for w.mu.nextA <= now {
129+
w.mu.ringA.Add(m, now)
130+
w.mu.nextA += crtime.Mono(tickA)
131+
}
132+
for w.mu.nextB <= now {
133+
w.mu.ringB.Add(m, now)
134+
w.mu.nextB += crtime.Mono(tickB)
135+
}
136+
w.mu.timer.Reset(min(w.mu.nextA, w.mu.nextB).Sub(now))
137+
}
138+
139+
// We scale down the timeframes by 5% so that the oldest sample is within ±5% of
140+
// the desired target.
141+
const timeframeA = 10 * time.Minute * 95 / 100
142+
const timeframeB = time.Hour * 95 / 100
143+
144+
const resolution = 10
145+
const tickA = timeframeA / time.Duration(resolution)
146+
const tickB = timeframeB / time.Duration(resolution)
147+
148+
type ring[M any] struct {
149+
pos int
150+
buf [resolution]struct {
151+
m M
152+
collectedAt crtime.Mono
153+
}
154+
}
155+
156+
func (r *ring[M]) Add(value M, collectedAt crtime.Mono) {
157+
r.buf[r.pos].m = value
158+
r.buf[r.pos].collectedAt = collectedAt
159+
r.pos = (r.pos + 1) % resolution
160+
}
161+
162+
func (r *ring[M]) Oldest() (_ M, collectedAt crtime.Mono) {
163+
return r.buf[r.pos].m, r.buf[r.pos].collectedAt
164+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
//go:build go1.25
6+
7+
package metricsutil
8+
9+
import (
10+
"testing"
11+
"testing/synctest"
12+
"time"
13+
14+
"github.com/cockroachdb/crlib/crtime"
15+
)
16+
17+
func TestWindow(t *testing.T) {
18+
synctest.Test(t, func(t *testing.T) {
19+
startTime := crtime.NowMono()
20+
w := NewWindow[time.Duration](startTime.Elapsed)
21+
w.Start()
22+
defer w.Stop()
23+
<-time.After(5 * time.Minute)
24+
v, x := w.TenMinutesAgo()
25+
if v != 0 || x != 0 {
26+
t.Fatalf("expected empty, got %v %v", v, x)
27+
}
28+
<-time.After(5*time.Minute + time.Second)
29+
v, x = w.TenMinutesAgo()
30+
if x := x.Elapsed(); x < 9*time.Minute || x > 11*time.Minute {
31+
t.Fatalf("expected ~10md, got %s", x)
32+
}
33+
if v < 0 || v > time.Minute {
34+
t.Fatalf("expected ~0, got %s", v)
35+
}
36+
37+
<-time.After(20*time.Minute + time.Second)
38+
v, x = w.TenMinutesAgo()
39+
if x := x.Elapsed(); x < 9*time.Minute || x > 11*time.Minute {
40+
t.Fatalf("expected ~10m, got %s", x)
41+
}
42+
if v < 19*time.Minute || v > 21*time.Minute {
43+
t.Fatalf("expected ~0, got %s", v)
44+
}
45+
46+
<-time.After(30*time.Minute + time.Second)
47+
v, x = w.OneHourAgo()
48+
if x := x.Elapsed(); x < 50*time.Minute || x > 70*time.Minute {
49+
t.Fatalf("expected ~1h, got %s", x)
50+
}
51+
if v < 0 || v > 10*time.Minute {
52+
t.Fatalf("expected 0-10m, got %s", v)
53+
}
54+
})
55+
}

0 commit comments

Comments
 (0)