/
transient.go
136 lines (109 loc) · 2.7 KB
/
transient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package kv
import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)
var _ Session = (*TransientSession)(nil)
type TransientSession struct {
db *pebble.DB
batch *pebble.Batch
store *Store
maxBatchSize int
closed bool
}
func (s *Store) NewTransientSession() *TransientSession {
return &TransientSession{
db: s.db,
maxBatchSize: s.opts.MaxTransientBatchSize,
store: s,
}
}
func (s *TransientSession) Commit() error {
return errors.New("cannot commit in transient mode")
}
func (s *TransientSession) Close() error {
if s.closed {
return errors.New("already closed")
}
s.closed = true
return s.batch.Close()
}
func (s *TransientSession) Insert(k, v []byte) error {
return errors.New("cannot insert in transient mode")
}
// Put stores a key value pair. If it already exists, it overrides it.
func (s *TransientSession) Put(k, v []byte) error {
if len(k) == 0 {
return errors.New("cannot store empty key")
}
if len(v) == 0 {
return errors.New("cannot store empty value")
}
if s.batch == nil {
s.batch = s.db.NewIndexedBatch()
}
if s.batch.Len() > s.maxBatchSize && s.batch.Count() > 0 {
err := s.batch.Commit(pebble.NoSync)
if err != nil {
return err
}
s.batch.Reset()
}
return s.batch.Set(k, v, nil)
}
// Get returns a value associated with the given key. If not found, returns ErrKeyNotFound.
func (s *TransientSession) Get(k []byte) ([]byte, error) {
if s.batch == nil {
return nil, errors.WithStack(ErrKeyNotFound)
}
return get(s.batch, k)
}
// Exists returns whether a key exists and is visible by the current session.
func (s *TransientSession) Exists(k []byte) (bool, error) {
if s.batch == nil {
return false, nil
}
return exists(s.batch, k)
}
// Delete a record by key. If not found, returns ErrKeyNotFound.
func (s *TransientSession) Delete(k []byte) error {
if s.batch == nil {
return errors.WithStack(ErrKeyNotFound)
}
_, closer, err := s.batch.Get(k)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return errors.WithStack(ErrKeyNotFound)
}
return err
}
err = closer.Close()
if err != nil {
return err
}
return s.batch.Delete(k, nil)
}
func (s *TransientSession) DeleteRange(start []byte, end []byte) error {
if s.batch == nil {
return nil
}
return s.batch.DeleteRange(start, end, nil)
}
func (s *TransientSession) Iterator(opts *IterOptions) (Iterator, error) {
var popts *pebble.IterOptions
if opts != nil {
popts = &pebble.IterOptions{
LowerBound: opts.LowerBound,
UpperBound: opts.UpperBound,
}
}
var it *pebble.Iterator
if s.batch == nil {
it = s.db.NewIter(popts)
} else {
it = s.batch.NewIter(popts)
}
return &iterator{
Iterator: it,
}, nil
}