-
Notifications
You must be signed in to change notification settings - Fork 67
/
notifbs.go
85 lines (72 loc) · 1.64 KB
/
notifbs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package node
import (
"context"
"sync"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
)
type NotifyBlockstore struct {
EstuaryBlockstore
subLk sync.Mutex
subs map[cid.Cid][]chan blocks.Block
}
var _ blockstore.Blockstore = (*NotifyBlockstore)(nil)
func NewNotifBs(bstore EstuaryBlockstore) *NotifyBlockstore {
return &NotifyBlockstore{
EstuaryBlockstore: bstore,
subs: make(map[cid.Cid][]chan blocks.Block),
}
}
func (nb *NotifyBlockstore) WaitFor(ctx context.Context, c cid.Cid) <-chan blocks.Block {
nch := make(chan blocks.Block, 1)
nb.subLk.Lock()
nb.subs[c] = append(nb.subs[c], nch)
nb.subLk.Unlock()
// now handle the race condition where the block might have been added
// right before calling this method
blk, err := nb.Get(ctx, c)
if err == nil {
nb.subLk.Lock()
chs, ok := nb.subs[c]
if ok {
for _, ch := range chs {
ch <- blk
close(ch)
}
delete(nb.subs, c)
}
nb.subLk.Unlock()
}
return nch
}
func (nb *NotifyBlockstore) Put(ctx context.Context, blk blocks.Block) error {
c := blk.Cid()
nb.subLk.Lock()
chs, ok := nb.subs[c]
if ok {
for _, ch := range chs {
ch <- blk
close(ch)
}
delete(nb.subs, c)
}
nb.subLk.Unlock()
return nb.EstuaryBlockstore.Put(ctx, blk)
}
func (nb *NotifyBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error {
nb.subLk.Lock()
for _, blk := range blks {
c := blk.Cid()
chs, ok := nb.subs[c]
if ok {
for _, ch := range chs {
ch <- blk
close(ch)
}
delete(nb.subs, c)
}
}
nb.subLk.Unlock()
return nb.EstuaryBlockstore.PutMany(ctx, blks)
}