forked from ava-labs/avalanchego
-
Notifications
You must be signed in to change notification settings - Fork 5
/
window.go
120 lines (102 loc) · 2.82 KB
/
window.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package window
import (
"sync"
"time"
"github.com/MetalBlockchain/metalgo/utils"
"github.com/MetalBlockchain/metalgo/utils/buffer"
"github.com/MetalBlockchain/metalgo/utils/timer/mockable"
)
var _ Window[struct{}] = (*window[struct{}])(nil)
// Window is an interface which represents a sliding window of elements.
type Window[T any] interface {
Add(value T)
Oldest() (T, bool)
Length() int
}
type window[T any] struct {
// mocked clock for unit testing
clock *mockable.Clock
// time-to-live for elements in the window
ttl time.Duration
// max amount of elements allowed in the window
maxSize int
// min amount of elements required in the window before allowing removal
// based on time
minSize int
// mutex for synchronization
lock sync.RWMutex
// elements in the window
elements buffer.Deque[node[T]]
}
// Config exposes parameters for Window
type Config struct {
Clock *mockable.Clock
MaxSize int
MinSize int
TTL time.Duration
}
// New returns an instance of window
func New[T any](config Config) Window[T] {
return &window[T]{
clock: config.Clock,
ttl: config.TTL,
maxSize: config.MaxSize,
minSize: config.MinSize,
elements: buffer.NewUnboundedDeque[node[T]](config.MaxSize + 1),
}
}
// Add adds an element to a window and also evicts any elements if they've been
// present in the window beyond the configured time-to-live
func (w *window[T]) Add(value T) {
w.lock.Lock()
defer w.lock.Unlock()
// add the new block id
w.elements.PushRight(node[T]{
value: value,
entryTime: w.clock.Time(),
})
w.removeStaleNodes()
if w.elements.Len() > w.maxSize {
_, _ = w.elements.PopLeft()
}
}
// Oldest returns the oldest element in the window.
func (w *window[T]) Oldest() (T, bool) {
w.lock.RLock()
defer w.lock.RUnlock()
oldest, ok := w.elements.PeekLeft()
if !ok {
return utils.Zero[T](), false
}
return oldest.value, true
}
// Length returns the number of elements in the window.
func (w *window[T]) Length() int {
w.lock.RLock()
defer w.lock.RUnlock()
return w.elements.Len()
}
// removeStaleNodes removes any nodes beyond the configured ttl of a window node.
func (w *window[T]) removeStaleNodes() {
// If we're beyond the expiry threshold, removeStaleNodes this node from our
// window. Nodes are guaranteed to be strictly increasing in entry time,
// so we can break this loop once we find the first non-stale one.
newest, ok := w.elements.PeekRight()
if !ok {
return
}
for w.elements.Len() > w.minSize {
oldest, ok := w.elements.PeekLeft()
if !ok || newest.entryTime.Sub(oldest.entryTime) <= w.ttl {
return
}
_, _ = w.elements.PopLeft()
}
}
// helper struct to represent elements in the window
type node[T any] struct {
value T
entryTime time.Time
}