/
asyncstore.go
120 lines (105 loc) · 2.69 KB
/
asyncstore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package stores
import (
"bytes"
"context"
"sync"
"time"
"github.com/brendoncarroll/go-state/cadata"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
var _ cadata.Store = &AsyncStore{}
// AsyncStore allows blobs to be Posted in the background by a pool of workers.
// It is not safe for concurrent use by multiple callers.
//
// Close must return nil for any previous Post to be considered successful.
// When in doubt, call Exists on the underlying store.
//
// If you have a loop that repeatedly calls Post, and Post is high latency, AsyncStore will probably improve performance.
// AsyncStore is not Read-Your-Writes consistent.
type AsyncStore struct {
target Store
ctx context.Context
eg *errgroup.Group
todo chan *bytes.Buffer
pool sync.Pool
}
func NewAsyncStore(target Store, numWorkers int) *AsyncStore {
if numWorkers < 1 {
numWorkers = 1
}
eg, ctx := errgroup.WithContext(context.Background())
as := &AsyncStore{
target: target,
ctx: ctx,
eg: eg,
todo: make(chan *bytes.Buffer),
pool: sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, target.MaxSize()))
},
},
}
for i := 0; i < numWorkers; i++ {
as.eg.Go(func() error {
for buf := range as.todo {
if err := func() error {
ctx, cf := context.WithTimeout(ctx, time.Second*1)
defer cf()
_, err := as.target.Post(ctx, buf.Bytes())
return err
}(); err != nil {
return err
}
buf.Reset()
as.releasebuffer(buf)
}
return nil
})
}
return as
}
func (s *AsyncStore) Post(ctx context.Context, data []byte) (ID, error) {
// TODO: error if closed
buf := s.acquireBuffer()
buf.Reset()
buf.Write(data)
id := s.target.Hash(data)
select {
case <-ctx.Done():
return ID{}, ctx.Err()
case s.todo <- buf:
case <-s.ctx.Done():
return ID{}, errors.Errorf("AsyncStore is closed")
}
return id, nil
}
func (s *AsyncStore) Get(ctx context.Context, id ID, buf []byte) (int, error) {
return s.target.Get(ctx, id, buf)
}
func (s *AsyncStore) Delete(ctx context.Context, id ID) error {
return s.target.Delete(ctx, id)
}
func (s *AsyncStore) Exists(ctx context.Context, id ID) (bool, error) {
return cadata.Exists(ctx, s.target, id)
}
func (s *AsyncStore) List(ctx context.Context, span cadata.Span, ids []ID) (int, error) {
return s.target.List(ctx, span, ids)
}
func (s *AsyncStore) Close() error {
close(s.todo)
return s.eg.Wait()
}
func (s *AsyncStore) Hash(x []byte) ID {
return s.target.Hash(x)
}
func (s *AsyncStore) MaxSize() int {
return s.target.MaxSize()
}
func (s *AsyncStore) acquireBuffer() *bytes.Buffer {
x := s.pool.Get()
return x.(*bytes.Buffer)
}
func (s *AsyncStore) releasebuffer(x *bytes.Buffer) {
s.pool.Put(x)
}