-
Notifications
You must be signed in to change notification settings - Fork 885
/
adapters.go
66 lines (56 loc) · 1.74 KB
/
adapters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package eds
import (
"context"
"sync"
"github.com/filecoin-project/dagstore"
"github.com/ipfs/boxo/blockservice"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
)
var _ blockservice.BlockGetter = (*BlockGetter)(nil)
// NewBlockGetter creates new blockservice.BlockGetter adapter from dagstore.ReadBlockstore
func NewBlockGetter(store dagstore.ReadBlockstore) *BlockGetter {
return &BlockGetter{store: store}
}
// BlockGetter is an adapter for dagstore.ReadBlockstore to implement blockservice.BlockGetter
// interface.
type BlockGetter struct {
store dagstore.ReadBlockstore
}
// GetBlock gets the requested block by the given CID.
func (bg *BlockGetter) GetBlock(ctx context.Context, cid cid.Cid) (blocks.Block, error) {
return bg.store.Get(ctx, cid)
}
// GetBlocks does a batch request for the given cids, returning blocks as
// they are found, in no particular order.
//
// It implements blockservice.BlockGetter interface, that requires:
// It may not be able to find all requested blocks (or the context may
// be canceled). In that case, it will close the channel early. It is up
// to the consumer to detect this situation and keep track which blocks
// it has received and which it hasn't.
func (bg *BlockGetter) GetBlocks(ctx context.Context, cids []cid.Cid) <-chan blocks.Block {
bCh := make(chan blocks.Block)
go func() {
var wg sync.WaitGroup
wg.Add(len(cids))
for _, c := range cids {
go func(cid cid.Cid) {
defer wg.Done()
block, err := bg.store.Get(ctx, cid)
if err != nil {
log.Debugw("getblocks: error getting block by cid", "cid", cid, "error", err)
return
}
select {
case bCh <- block:
case <-ctx.Done():
return
}
}(c)
}
wg.Wait()
close(bCh)
}()
return bCh
}