-
Notifications
You must be signed in to change notification settings - Fork 338
/
inmembatch.go
110 lines (86 loc) · 2.21 KB
/
inmembatch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2022 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package inmemstore
import (
"context"
"fmt"
"sync"
storage "github.com/ethersphere/bee/pkg/storage"
)
// batchOp represents a batch operations.
type batchOp interface {
Item() storage.Item
}
// batchOpBase is a base type for batch operations holding data.
type batchOpBase struct{ item storage.Item }
// Item implements batchOp interface Item method.
func (b batchOpBase) Item() storage.Item { return b.item }
type (
batchOpPut struct{ batchOpBase }
batchOpDelete struct{ batchOpBase }
)
type Batch struct {
ctx context.Context
mu sync.Mutex // mu guards batch, ops, and done.
ops map[string]batchOp
store *Store
done bool
}
// Batch implements storage.BatchedStore interface Batch method.
func (s *Store) Batch(ctx context.Context) (storage.Batch, error) {
return &Batch{
ctx: ctx,
ops: make(map[string]batchOp),
store: s,
}, nil
}
// Put implements storage.Batch interface Put method.
func (i *Batch) Put(item storage.Item) error {
if err := i.ctx.Err(); err != nil {
return err
}
i.mu.Lock()
i.ops[key(item)] = batchOpPut{batchOpBase{item: item}}
i.mu.Unlock()
return nil
}
// Delete implements storage.Batch interface Delete method.
func (i *Batch) Delete(item storage.Item) error {
if err := i.ctx.Err(); err != nil {
return err
}
i.mu.Lock()
i.ops[key(item)] = batchOpDelete{batchOpBase{item: item}}
i.mu.Unlock()
return nil
}
// Commit implements storage.Batch interface Commit method.
func (i *Batch) Commit() error {
if err := i.ctx.Err(); err != nil {
return err
}
i.mu.Lock()
defer i.mu.Unlock()
if i.done {
return storage.ErrBatchCommitted
}
defer func() { i.done = true }()
i.store.mu.Lock()
defer i.store.mu.Unlock()
for _, ops := range i.ops {
switch op := ops.(type) {
case batchOpPut:
err := i.store.put(op.Item())
if err != nil {
return fmt.Errorf("unable to put item %s: %w", key(op.Item()), err)
}
case batchOpDelete:
err := i.store.delete(op.Item())
if err != nil {
return fmt.Errorf("unable to delete item %s: %w", key(op.Item()), err)
}
}
}
return nil
}