-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
timed.go
201 lines (178 loc) · 4.85 KB
/
timed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package blockstore
import (
"context"
"fmt"
"sync"
"time"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/raulk/clock"
"go.uber.org/multierr"
)
// TimedCacheBlockstore is a blockstore that keeps blocks for at least the
// specified caching interval before discarding them. Garbage collection must
// be started and stopped by calling Start/Stop.
//
// Under the covers, it's implemented with an active and an inactive blockstore
// that are rotated every cache time interval. This means all blocks will be
// stored at most 2x the cache interval.
//
// Create a new instance by calling the NewTimedCacheBlockstore constructor.
type TimedCacheBlockstore struct {
mu sync.RWMutex
active, inactive MemBlockstore
clock clock.Clock
interval time.Duration
closeCh chan struct{}
doneRotatingCh chan struct{}
}
func NewTimedCacheBlockstore(interval time.Duration) *TimedCacheBlockstore {
b := &TimedCacheBlockstore{
active: NewMemory(),
inactive: NewMemory(),
interval: interval,
clock: clock.New(),
}
return b
}
func (t *TimedCacheBlockstore) Start(_ context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.closeCh != nil {
return fmt.Errorf("already started")
}
t.closeCh = make(chan struct{})
// Create this timer before starting the goroutine. Otherwise, creating the timer will race
// with adding time to the mock clock, and we could add time _first_, then stall waiting for
// a timer that'll never fire.
ticker := t.clock.Ticker(t.interval)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
t.rotate()
if t.doneRotatingCh != nil {
t.doneRotatingCh <- struct{}{}
}
case <-t.closeCh:
return
}
}
}()
return nil
}
func (t *TimedCacheBlockstore) Stop(_ context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if t.closeCh == nil {
return fmt.Errorf("not started")
}
select {
case <-t.closeCh:
// already closed
default:
close(t.closeCh)
}
return nil
}
func (t *TimedCacheBlockstore) rotate() {
newBs := NewMemory()
t.mu.Lock()
t.inactive, t.active = t.active, newBs
t.mu.Unlock()
}
func (t *TimedCacheBlockstore) Flush(ctx context.Context) error {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.active.Flush(ctx); err != nil {
return err
}
return t.inactive.Flush(ctx)
}
func (t *TimedCacheBlockstore) Put(ctx context.Context, b blocks.Block) error {
// Don't check the inactive set here. We want to keep this block for at
// least one interval.
t.mu.Lock()
defer t.mu.Unlock()
return t.active.Put(ctx, b)
}
func (t *TimedCacheBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
t.mu.Lock()
defer t.mu.Unlock()
return t.active.PutMany(ctx, bs)
}
func (t *TimedCacheBlockstore) View(ctx context.Context, k cid.Cid, callback func([]byte) error) error {
// The underlying blockstore is always a "mem" blockstore so there's no difference,
// from a performance perspective, between view & get. So we call Get to avoid
// calling an arbitrary callback while holding a lock.
t.mu.RLock()
block, err := t.active.Get(ctx, k)
if ipld.IsNotFound(err) {
block, err = t.inactive.Get(ctx, k)
}
t.mu.RUnlock()
if err != nil {
return err
}
return callback(block.RawData())
}
func (t *TimedCacheBlockstore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
t.mu.RLock()
defer t.mu.RUnlock()
b, err := t.active.Get(ctx, k)
if ipld.IsNotFound(err) {
b, err = t.inactive.Get(ctx, k)
}
return b, err
}
func (t *TimedCacheBlockstore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
t.mu.RLock()
defer t.mu.RUnlock()
size, err := t.active.GetSize(ctx, k)
if ipld.IsNotFound(err) {
size, err = t.inactive.GetSize(ctx, k)
}
return size, err
}
func (t *TimedCacheBlockstore) Has(ctx context.Context, k cid.Cid) (bool, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if has, err := t.active.Has(ctx, k); err != nil {
return false, err
} else if has {
return true, nil
}
return t.inactive.Has(ctx, k)
}
func (t *TimedCacheBlockstore) HashOnRead(_ bool) {
// no-op
}
func (t *TimedCacheBlockstore) DeleteBlock(ctx context.Context, k cid.Cid) error {
t.mu.Lock()
defer t.mu.Unlock()
return multierr.Combine(t.active.DeleteBlock(ctx, k), t.inactive.DeleteBlock(ctx, k))
}
func (t *TimedCacheBlockstore) DeleteMany(ctx context.Context, ks []cid.Cid) error {
t.mu.Lock()
defer t.mu.Unlock()
return multierr.Combine(t.active.DeleteMany(ctx, ks), t.inactive.DeleteMany(ctx, ks))
}
func (t *TimedCacheBlockstore) AllKeysChan(_ context.Context) (<-chan cid.Cid, error) {
t.mu.RLock()
defer t.mu.RUnlock()
ch := make(chan cid.Cid, len(t.active)+len(t.inactive))
for _, b := range t.active {
ch <- b.Cid()
}
for _, b := range t.inactive {
c := b.Cid()
if _, ok := t.active[string(c.Hash())]; ok {
continue
}
ch <- c
}
close(ch)
return ch, nil
}