forked from hashicorp/consul
/
tombstone_gc.go
172 lines (147 loc) · 4.91 KB
/
tombstone_gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package state
import (
"fmt"
"sync"
"time"
)
// TombstoneGC is used to track creation of tombstones so that they can be
// garbage collected after their TTL expires. The tombstones allow queries to
// provide monotonic index values within the TTL window. The GC is used to
// prevent monotonic growth in storage usage. This is a trade off between the
// length of the TTL and the storage overhead.
//
// In practice, this is required to fix the issue of delete visibility. When
// data is deleted from the KV store, the "latest" row can go backwards if the
// newest row is removed. The tombstones provide a way to ensure time doesn't
// move backwards within some interval.
//
type TombstoneGC struct {
// ttl sets the TTL for tombstones.
ttl time.Duration
// granularity determines how we bin TTLs into timers.
granularity time.Duration
// enabled controls if we actually setup any timers.
enabled bool
// expires maps the time of expiration to the highest tombstone value
// that should be expired.
expires map[time.Time]*expireInterval
// expireCh is used to stream expiration to the leader for processing.
expireCh chan uint64
sync.Mutex
}
// expireInterval is used to track the maximum index to expire in a given
// interval with a timer.
type expireInterval struct {
// maxIndex has the highest tombstone index that should be GC-d.
maxIndex uint64
// timer is the timer tracking this bin.
timer *time.Timer
}
// NewTombstoneGC is used to construct a new TombstoneGC given a TTL for
// tombstones and a tracking granularity. Longer TTLs ensure correct behavior
// for more time, but use more storage. A shorter granularity increases the
// number of Raft transactions and reduce how far past the TTL we perform GC.
func NewTombstoneGC(ttl, granularity time.Duration) (*TombstoneGC, error) {
// Sanity check the inputs
if ttl <= 0 || granularity <= 0 {
return nil, fmt.Errorf("Tombstone TTL and granularity must be positive")
}
t := &TombstoneGC{
ttl: ttl,
granularity: granularity,
expires: make(map[time.Time]*expireInterval),
expireCh: make(chan uint64, 1),
}
return t, nil
}
// ExpireCh is used to return a channel that streams the next index that should
// be expired.
func (t *TombstoneGC) ExpireCh() <-chan uint64 {
return t.expireCh
}
// SetEnabled is used to control if the tombstone GC is
// enabled. Should only be enabled by the leader node.
func (t *TombstoneGC) SetEnabled(enabled bool) {
t.Lock()
defer t.Unlock()
if enabled == t.enabled {
return
}
// Stop all the timers and clear
if !enabled {
for _, exp := range t.expires {
exp.timer.Stop()
}
t.expires = make(map[time.Time]*expireInterval)
}
// Update the status
t.enabled = enabled
}
// Hint is used to indicate that keys at the given index have been
// deleted, and that their GC should be scheduled.
func (t *TombstoneGC) Hint(index uint64) {
expires := t.nextExpires()
t.Lock()
defer t.Unlock()
if !t.enabled {
return
}
// Check for an existing expiration timer and bump its index if we
// find one.
exp, ok := t.expires[expires]
if ok {
if index > exp.maxIndex {
exp.maxIndex = index
}
return
}
// Create a new expiration timer.
t.expires[expires] = &expireInterval{
maxIndex: index,
timer: time.AfterFunc(expires.Sub(time.Now()), func() {
t.expireTime(expires)
}),
}
}
// PendingExpiration is used to check if any expirations are pending.
func (t *TombstoneGC) PendingExpiration() bool {
t.Lock()
defer t.Unlock()
return len(t.expires) > 0
}
// nextExpires is used to calculate the next expiration time, based on the
// granularity that is set. This allows us to bin expirations and avoid a ton
// of timers.
func (t *TombstoneGC) nextExpires() time.Time {
// The Round(0) call here is to shed the monotonic time so that we
// can safely use these as map keys. See #3670 for more details.
expires := time.Now().Add(t.ttl).Round(0)
remain := expires.UnixNano() % int64(t.granularity)
adj := expires.Add(t.granularity - time.Duration(remain))
return adj
}
// purgeBin gets the index for the given bin and then deletes the bin. If there
// is no bin then this will return 0 for the index, which is ok.
func (t *TombstoneGC) purgeBin(expires time.Time) uint64 {
t.Lock()
defer t.Unlock()
// Get the maximum index and clear the entry. It's possible that the GC
// has been shut down while this timer fired and got blocked on the lock,
// so if there's nothing in the map for us we just exit out since there
// is no work to do.
exp, ok := t.expires[expires]
if !ok {
return 0
}
delete(t.expires, expires)
return exp.maxIndex
}
// expireTime is used to expire the entries at the given time.
func (t *TombstoneGC) expireTime(expires time.Time) {
// This is careful to take the lock only while we are fetching the index
// since the channel write might get blocked for reasons that could also
// need to hint GC (see #3700).
if index := t.purgeBin(expires); index > 0 {
t.expireCh <- index
}
}