forked from ipfs/go-ipld-format
/
batch.go
177 lines (149 loc) · 3.49 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package format
import (
"context"
"errors"
"runtime"
)
// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2
// ErrNotCommited is returned when closing a batch that hasn't been successfully
// committed.
var ErrNotCommited = errors.New("error: batch not commited")
// ErrClosed is returned when operating on a batch that has already been closed.
var ErrClosed = errors.New("error: batch closed")
// NewBatch returns a node buffer (Batch) that buffers nodes internally and
// commits them to the underlying DAGService in batches. Use this if you intend
// to add or remove a lot of nodes all at once.
//
// If the passed context is canceled, any in-progress commits are aborted.
func NewBatch(ctx context.Context, ds DAGService) *Batch {
ctx, cancel := context.WithCancel(ctx)
return &Batch{
ds: ds,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
MaxSize: 8 << 20,
// By default, only batch up to 128 nodes at a time.
// The current implementation of flatfs opens this many file
// descriptors at the same time for the optimized batch write.
MaxNodes: 128,
}
}
// Batch is a buffer for batching adds to a dag.
type Batch struct {
ds DAGService
ctx context.Context
cancel func()
activeCommits int
err error
commitResults chan error
nodes []Node
size int
MaxSize int
MaxNodes int
}
func (t *Batch) processResults() {
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
return
}
default:
return
}
}
}
func (t *Batch) asyncCommit() {
numBlocks := len(t.nodes)
if numBlocks == 0 {
return
}
if t.activeCommits >= ParallelBatchCommits {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
return
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
return
}
}
go func(ctx context.Context, b []Node, result chan error, ds DAGService) {
select {
case result <- ds.AddMany(ctx, b):
case <-ctx.Done():
}
}(t.ctx, t.nodes, t.commitResults, t.ds)
t.activeCommits++
t.nodes = make([]Node, 0, numBlocks)
t.size = 0
return
}
// Add adds a node to the batch and commits the batch if necessary.
func (t *Batch) Add(nd Node) error {
if t.err != nil {
return t.err
}
// Not strictly necessary but allows us to catch errors early.
t.processResults()
if t.err != nil {
return t.err
}
t.nodes = append(t.nodes, nd)
t.size += len(nd.RawData())
if t.size > t.MaxSize || len(t.nodes) > t.MaxNodes {
t.asyncCommit()
}
return t.err
}
// Commit commits batched nodes.
func (t *Batch) Commit() error {
if t.err != nil {
return t.err
}
t.asyncCommit()
loop:
for t.activeCommits > 0 {
select {
case err := <-t.commitResults:
t.activeCommits--
if err != nil {
t.setError(err)
break loop
}
case <-t.ctx.Done():
t.setError(t.ctx.Err())
break loop
}
}
return t.err
}
func (t *Batch) setError(err error) {
t.err = err
t.cancel()
// Drain as much as we can without blocking.
loop:
for {
select {
case <-t.commitResults:
default:
break loop
}
}
// Be nice and cleanup. These can take a *lot* of memory.
t.commitResults = nil
t.ds = nil
t.ctx = nil
t.nodes = nil
t.size = 0
t.activeCommits = 0
}