/
addr_book_gc.go
118 lines (98 loc) · 2.73 KB
/
addr_book_gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package lstoreds
import (
"context"
"fmt"
"time"
pb "github.com/dcnetio/gothreads-lib/net/pb"
query "github.com/ipfs/go-datastore/query"
)
var (
purgeStoreQuery = query.Query{
Prefix: logBookBase.String(),
Orders: []query.Order{query.OrderByKey{}},
KeysOnly: false,
}
)
// dsAddrBookGc is responsible for garbage collection in a datastore-backed address book.
type dsAddrBookGc struct {
ctx context.Context
ab *DsAddrBook
running chan struct{}
purgeFunc func()
}
func newAddressBookGc(ctx context.Context, ab *DsAddrBook) (*dsAddrBookGc, error) {
if ab.opts.GCPurgeInterval < 0 {
return nil, fmt.Errorf("negative GC purge interval provided: %s", ab.opts.GCPurgeInterval)
}
gc := &dsAddrBookGc{
ctx: ctx,
ab: ab,
running: make(chan struct{}, 1),
}
gc.purgeFunc = gc.purgeStore
// do not start GC timers if purge is disabled; this GC can only be triggered manually.
if ab.opts.GCPurgeInterval > 0 {
gc.ab.childrenDone.Add(1)
go gc.background()
}
return gc, nil
}
// gc prunes expired addresses from the datastore at regular intervals. It should be spawned as a goroutine.
func (gc *dsAddrBookGc) background() {
defer gc.ab.childrenDone.Done()
select {
case <-time.After(gc.ab.opts.GCInitialDelay):
case <-gc.ab.ctx.Done():
// yield if we have been cancelled/closed before the delay elapses.
return
}
purgeTimer := time.NewTicker(gc.ab.opts.GCPurgeInterval)
defer purgeTimer.Stop()
for {
select {
case <-purgeTimer.C:
gc.purgeFunc()
case <-gc.ctx.Done():
return
}
}
}
func (gc *dsAddrBookGc) purgeStore() {
select {
case gc.running <- struct{}{}:
defer func() { <-gc.running }()
default:
// yield if lookahead is running.
return
}
batch, err := newCyclicBatch(gc.ab.ds, defaultOpsPerCyclicBatch)
if err != nil {
log.Warnf("failed while creating batch to purge GC entries: %v", err)
}
results, err := gc.ab.ds.Query(context.Background(), purgeStoreQuery)
if err != nil {
log.Warnf("failed while opening iterator: %v", err)
return
}
defer results.Close()
record := &addrsRecord{AddrBookRecord: &pb.AddrBookRecord{}} // empty record to reuse and avoid allocs.
// keys: /thread/addrs/<thread ID b32>
for result := range results.Next() {
record.Reset()
if err = record.Unmarshal(result.Value); err != nil {
log.Warnf("key %v has an unmarshable record", result.Key)
continue
}
if !record.clean() {
continue
}
id := genCacheKey(record.ThreadID.ID, record.PeerID.ID)
if err := record.flush(batch); err != nil {
log.Warnf("failed to flush entry modified by GC for peer: &v, err: %v", id, err)
}
gc.ab.cache.Remove(id)
}
if err = batch.Commit(context.Background()); err != nil {
log.Warnf("failed to commit GC purge batch: %v", err)
}
}