-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
union.go
128 lines (110 loc) · 2.72 KB
/
union.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package blockstore
import (
"context"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)
type unionBlockstore []Blockstore
// Union returns an unioned blockstore.
//
// - Reads return from the first blockstore that has the value, querying in the
// supplied order.
// - Writes (puts and deletes) are broadcast to all stores.
func Union(stores ...Blockstore) Blockstore {
return unionBlockstore(stores)
}
func (m unionBlockstore) Has(ctx context.Context, cid cid.Cid) (has bool, err error) {
for _, bs := range m {
if has, err = bs.Has(ctx, cid); has || err != nil {
break
}
}
return has, err
}
func (m unionBlockstore) Get(ctx context.Context, cid cid.Cid) (blk blocks.Block, err error) {
for _, bs := range m {
if blk, err = bs.Get(ctx, cid); err == nil || !ipld.IsNotFound(err) {
break
}
}
return blk, err
}
func (m unionBlockstore) View(ctx context.Context, cid cid.Cid, callback func([]byte) error) (err error) {
for _, bs := range m {
if err = bs.View(ctx, cid, callback); err == nil || !ipld.IsNotFound(err) {
break
}
}
return err
}
func (m unionBlockstore) GetSize(ctx context.Context, cid cid.Cid) (size int, err error) {
for _, bs := range m {
if size, err = bs.GetSize(ctx, cid); err == nil || !ipld.IsNotFound(err) {
break
}
}
return size, err
}
func (m unionBlockstore) Flush(ctx context.Context) (err error) {
for _, bs := range m {
if err = bs.Flush(ctx); err != nil {
break
}
}
return err
}
func (m unionBlockstore) Put(ctx context.Context, block blocks.Block) (err error) {
for _, bs := range m {
if err = bs.Put(ctx, block); err != nil {
break
}
}
return err
}
func (m unionBlockstore) PutMany(ctx context.Context, blks []blocks.Block) (err error) {
for _, bs := range m {
if err = bs.PutMany(ctx, blks); err != nil {
break
}
}
return err
}
func (m unionBlockstore) DeleteBlock(ctx context.Context, cid cid.Cid) (err error) {
for _, bs := range m {
if err = bs.DeleteBlock(ctx, cid); err != nil {
break
}
}
return err
}
func (m unionBlockstore) DeleteMany(ctx context.Context, cids []cid.Cid) (err error) {
for _, bs := range m {
if err = bs.DeleteMany(ctx, cids); err != nil {
break
}
}
return err
}
func (m unionBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
// this does not deduplicate; this interface needs to be revisited.
outCh := make(chan cid.Cid)
go func() {
defer close(outCh)
for _, bs := range m {
ch, err := bs.AllKeysChan(ctx)
if err != nil {
return
}
for cid := range ch {
outCh <- cid
}
}
}()
return outCh, nil
}
func (m unionBlockstore) HashOnRead(enabled bool) {
for _, bs := range m {
bs.HashOnRead(enabled)
}
}