-
Notifications
You must be signed in to change notification settings - Fork 1
/
gonudb.go
134 lines (114 loc) · 3.14 KB
/
gonudb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"context"
"errors"
"io/ioutil"
"github.com/go-logr/logr"
"github.com/iand/gonudb"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
)
var _ (BlockCache) = (*DBBlockCache)(nil)
type DBBlockCache struct {
store *gonudb.Store
upstream BlockCache
logger logr.Logger
}
func NewDBBlockCache(s *gonudb.Store, logger logr.Logger) *DBBlockCache {
if logger == nil {
logger = logr.Discard()
}
return &DBBlockCache{
store: s,
logger: logger.V(LogLevelInfo),
}
}
func (d *DBBlockCache) Has(ctx context.Context, c cid.Cid) (bool, error) {
ctx = cacheContext(ctx, "gonudb")
_, err := d.store.FetchReader(string(c.Hash()))
if err != nil {
data, err := d.fillFromUpstream(ctx, c)
if err != nil {
return false, err
}
return data != nil, nil
}
return true, nil
}
func (d *DBBlockCache) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx = cacheContext(ctx, "gonudb")
reportEvent(ctx, getRequest)
stop := startTimer(ctx, getDuration)
defer stop()
r, err := d.store.FetchReader(string(c.Hash()))
if err != nil {
data, err := d.fillFromUpstream(ctx, c)
if err != nil {
reportEvent(ctx, getFailure)
return nil, err
}
reportEvent(ctx, getMiss)
reportSize(ctx, getSize, len(data))
return blocks.NewBlockWithCid(data, c)
}
buf, err := ioutil.ReadAll(r)
if err != nil {
reportEvent(ctx, getFailure)
return nil, err
}
reportEvent(ctx, getHit)
reportSize(ctx, getSize, len(buf))
return blocks.NewBlockWithCid(buf, c)
}
func (d *DBBlockCache) SetUpstream(u BlockCache) {
d.upstream = u
}
func (d *DBBlockCache) fillFromUpstream(ctx context.Context, c cid.Cid) ([]byte, error) {
reportEvent(ctx, fillRequest)
stop := startTimer(ctx, fillDuration)
defer stop()
if d.upstream == nil {
reportEvent(ctx, fillFailure)
return nil, blockstore.ErrNotFound
}
blk, err := d.upstream.Get(ctx, c)
if err != nil {
reportEvent(ctx, fillFailure)
d.logger.Error(err, "upstream get", "cid", c.String())
return nil, err
}
data := blk.RawData()
// gonudb doesn't support zero sized blocks so don't add them
if len(data) == 0 {
reportEvent(ctx, fillZero)
return data, nil
}
// Only insert if the block data and cid match, since we can't delete from the store
chkc, err := c.Prefix().Sum(data)
if err != nil {
reportEvent(ctx, fillFailure)
d.logger.Error(err, "compute block hash", "cid", c.String())
return nil, err
}
if !chkc.Equals(c) {
reportEvent(ctx, fillFailure)
d.logger.Error(err, "wrong block hash", "cid", c.String(), "hash", chkc.String())
return nil, blocks.ErrWrongHash
}
if err := d.store.Insert(string(c.Hash()), data); err != nil {
// Data may have been inserted while we were fetching
if !errors.Is(err, gonudb.ErrKeyExists) {
reportEvent(ctx, fillFailure)
d.logger.Error(err, "insert", "cid", c.String())
}
return data, nil
}
reportEvent(ctx, fillSuccess)
reportSize(ctx, fillSize, len(data))
return data, nil
}
func (d *DBBlockCache) ReportMetrics(ctx context.Context) {
reportMeasurement(ctx, gonudbRecordCount.M(int64(d.store.RecordCount())))
reportMeasurement(ctx, gonudbRate.M(d.store.Rate()))
}