-
Notifications
You must be signed in to change notification settings - Fork 893
/
blockstore.go
158 lines (140 loc) · 4.96 KB
/
blockstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package eds
import (
"context"
"errors"
"fmt"
"github.com/filecoin-project/dagstore"
bstore "github.com/ipfs/boxo/blockstore"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
dshelp "github.com/ipfs/go-ipfs-ds-help"
ipld "github.com/ipfs/go-ipld-format"
)
var _ bstore.Blockstore = (*blockstore)(nil)
var (
blockstoreCacheKey = datastore.NewKey("bs-cache")
errUnsupportedOperation = errors.New("unsupported operation")
)
// blockstore implements the store.Blockstore interface on an EDSStore.
// The lru cache approach is heavily inspired by the existing implementation upstream.
// We simplified the design to not support multiple shards per key, call GetSize directly on the
// underlying RO blockstore, and do not throw errors on Put/PutMany. Also, we do not abstract away
// the blockstore operations.
//
// The intuition here is that each CAR file is its own blockstore, so we need this top level
// implementation to allow for the blockstore operations to be routed to the underlying stores.
type blockstore struct {
store *Store
cache *blockstoreCache
ds datastore.Batching
}
func newBlockstore(store *Store, cache *blockstoreCache, ds datastore.Batching) *blockstore {
return &blockstore{
store: store,
cache: cache,
ds: namespace.Wrap(ds, blockstoreCacheKey),
}
}
func (bs *blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
// key wasn't found in top level blockstore, but could be in datastore while being reconstructed
dsHas, dsErr := bs.ds.Has(ctx, dshelp.MultihashToDsKey(cid.Hash()))
if dsErr != nil {
return false, nil
}
return dsHas, nil
}
if err != nil {
return false, err
}
return len(keys) > 0, nil
}
func (bs *blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
blockData, err := bs.ds.Get(ctx, k)
if err == nil {
return blocks.NewBlockWithCid(blockData, cid)
}
// nmt's GetNode expects an ipld.ErrNotFound when a cid is not found.
return nil, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
log.Debugf("failed to get blockstore for cid %s: %s", cid, err)
return nil, err
}
return blockstr.Get(ctx, cid)
}
func (bs *blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
blockstr, err := bs.getReadOnlyBlockstore(ctx, cid)
if errors.Is(err, ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
k := dshelp.MultihashToDsKey(cid.Hash())
size, err := bs.ds.GetSize(ctx, k)
if err == nil {
return size, nil
}
// nmt's GetSize expects an ipld.ErrNotFound when a cid is not found.
return 0, ipld.ErrNotFound{Cid: cid}
}
if err != nil {
return 0, err
}
return blockstr.GetSize(ctx, cid)
}
func (bs *blockstore) DeleteBlock(ctx context.Context, cid cid.Cid) error {
k := dshelp.MultihashToDsKey(cid.Hash())
return bs.ds.Delete(ctx, k)
}
func (bs *blockstore) Put(ctx context.Context, blk blocks.Block) error {
k := dshelp.MultihashToDsKey(blk.Cid().Hash())
// note: we leave duplicate resolution to the underlying datastore
return bs.ds.Put(ctx, k, blk.RawData())
}
func (bs *blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if len(blocks) == 1 {
// performance fast-path
return bs.Put(ctx, blocks[0])
}
t, err := bs.ds.Batch(ctx)
if err != nil {
return err
}
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())
err = t.Put(ctx, k, b.RawData())
if err != nil {
return err
}
}
return t.Commit(ctx)
}
// AllKeysChan is a noop on the EDS blockstore because the keys are not stored in a single CAR file.
func (bs *blockstore) AllKeysChan(context.Context) (<-chan cid.Cid, error) {
return nil, errUnsupportedOperation
}
// HashOnRead is a noop on the EDS blockstore but an error cannot be returned due to the method
// signature from the blockstore interface.
func (bs *blockstore) HashOnRead(bool) {
log.Warnf("HashOnRead is a noop on the EDS blockstore")
}
// getReadOnlyBlockstore finds the underlying blockstore of the shard that contains the given CID.
func (bs *blockstore) getReadOnlyBlockstore(ctx context.Context, cid cid.Cid) (dagstore.ReadBlockstore, error) {
keys, err := bs.store.dgstr.ShardsContainingMultihash(ctx, cid.Hash())
if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, ErrNotFoundInIndex) {
return nil, ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("failed to find shards containing multihash: %w", err)
}
// a share can exist in multiple EDSes, so just take the first one.
shardKey := keys[0]
accessor, err := bs.store.getCachedAccessor(ctx, shardKey)
if err != nil {
return nil, fmt.Errorf("failed to get accessor for shard %s: %w", shardKey, err)
}
return accessor.bs, nil
}