forked from bmatsuo/lmdb-go
/
lmdbsync.go
276 lines (234 loc) · 8.63 KB
/
lmdbsync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
/*
Package lmdbsync provides advanced synchronization for LMDB environments at the
cost of performance. The package provides a drop-in replacement for *lmdb.Env
that can be used in situations where the database may be resized or where the
flag lmdb.NoLock is used.
Bypassing an Env's methods to access the underlying lmdb.Env is not safe. The
severity of such usage depends such behavior should be strictly avoided as it
may produce undefined behavior from the LMDB C library.
# Resizing the environment
The Env type synchronizes all calls to Env.SetMapSize so that it may, with some
caveats, be safely called in the presence of concurrent transactions after an
environment has been opened. All running transactions must complete before the
method will be called on the underlying lmdb.Env.
If an open transaction depends on a call to Env.SetMapSize then the Env will
deadlock and block all future transactions. When using a Handler to
automatically call Env.SetMapSize this implies the restriction that
transactions must terminate independently of the creation/termination of other
transactions to avoid deadlock.
In the simplest example, a view transaction that attempts an update on the
underlying Env will deadlock the environment if the map is full and a Handler
attempts to resize the map so the update may be retried.
env.View(func(txn *lmdb.Txn) (err error) {
v, err := txn.Get(db, key)
if err != nil {
return err
}
err = env.Update(func(txn *lmdb.Txn) (err error) { // deadlock on lmdb.MapFull!
txn.Put(dbi, key, append(v, b...))
})
return err
}
The update should instead be prepared inside the view and then executed
following its termination. This removes the implicit dependence of the view on
calls to Env.SetMapSize().
var v []byte
env.View(func(txn *lmdb.Txn) (err error) {
// RawRead isn't used because the value will be used outside the
// transaction.
v, err = txn.Get(db, key)
if err != nil {
return err
}
return nil
}
if err != nil {
// ...
}
err = env.Update(func(txn *lmdb.Txn) (err error) { // no deadlock, even if env is resized!
txn.Put(dbi, key, append(v, b...))
})
The developers of LMDB officially recommend against applications changing the
memory map size for an open database. It requires careful synchronization by
all processes accessing the database file. And, a large memory map will not
affect disk usage on operating systems that support sparse files (e.g. Linux,
not OS X).
See mdb_env_set_mapsize.
# MapFull
The MapFullHandler function configures an Env to automatically call increase
the map size with Env.SetMapSize and retry transactions when a lmdb.MapFull
error prevents an update from being committed.
Because updates may need to execute multiple times in the presence of
lmdb.MapFull it is important to make sure their TxnOp functions are idempotent
and do not cause unwanted additive change to the program state.
See mdb_txn_commit and MDB_MAP_FULL.
# MapResized
When multiple processes access and resize an environment it is not uncommon to
encounter a MapResized error which prevents the TxnOp from being executed and
requires a synchronized call to Env.SetMapSize before continuing normal
operation.
The MapResizedHandler function configures an Env to automatically adopt a new
map size when a lmdb.MapResized error is encountered and retry execution of the
TxnOp.
See mdb_txn_begin and MDB_MAP_RESIZED.
# NoLock
When the lmdb.NoLock flag is set on an environment Env handles all transaction
synchronization using Go structures and is an experimental feature. It is
unclear what benefits this provides.
Usage of lmdb.NoLock requires that update transactions acquire an exclusive
lock on the environment. In such cases it is required that view transactions
execute independently of update transactions, a requirement more strict than
that from handling MapFull.
See mdb_env_open and MDB_NOLOCK.
*/
package lmdbsync
import (
"fmt"
"os"
"sync"
"time"
"golang.org/x/net/context"
"github.com/PowerDNS/lmdb-go/lmdb"
)
// Env wraps an *lmdb.Env, receiving all the same methods and proxying some to
// provide transaction management. Transactions run by an Env handle
// lmdb.MapResized error transparently through additional synchronization.
// Additionally, Env is safe to use on environments setting the lmdb.NoLock
// flag. When in NoLock mode write transactions block all read transactions
// from running (in addition to blocking other write transactions like a normal
// lmdb.Env would).
//
// Env proxies several methods to provide synchronization required for safe
// operation in some scenarios. It is important not byprass proxies and call
// the methods directly on the underlying lmdb.Env or synchronization may be
// interfered with. Calling proxied methods directly on the lmdb.Env may
// result in poor transaction performance or unspecified behavior in from the C
// library.
type Env struct {
*lmdb.Env
Handlers HandlerChain
ctx context.Context
noLock bool
txnlock sync.RWMutex
}
// NewEnv returns an newly allocated Env that wraps env. If env is nil then
// lmdb.NewEnv() will be called to allocate an lmdb.Env.
func NewEnv(env *lmdb.Env, h ...Handler) (*Env, error) {
var err error
if env == nil {
env, err = lmdb.NewEnv()
if err != nil {
return nil, err
}
}
flags, err := env.Flags()
if err != nil {
return nil, err
}
noLock := flags&lmdb.NoLock != 0
chain := append(HandlerChain(nil), h...)
_env := &Env{
Env: env,
Handlers: chain,
noLock: noLock,
ctx: context.Background(),
}
return _env, nil
}
// Open is a proxy for r.Env.Open() that detects the lmdb.NoLock flag to
// properly manage transaction synchronization.
func (r *Env) Open(path string, flags uint, mode os.FileMode) error {
err := r.Env.Open(path, flags, mode)
if err != nil {
// no update to flags occurred
return err
}
if flags&lmdb.NoLock != 0 {
r.noLock = true
}
return nil
}
// SetMapSize is a proxy for r.Env.SetMapSize() that blocks while concurrent
// transactions are in progress.
func (r *Env) SetMapSize(size int64) error {
return r.setMapSize(size, 0)
}
func (r *Env) setMapSize(size int64, delay time.Duration) error {
r.txnlock.Lock()
if delay > 0 {
// wait before adopting a map size set from another process. hold on to
// the transaction lock so that other transactions don't attempt to
// begin while waiting.
time.Sleep(delay)
}
err := r.Env.SetMapSize(size)
r.txnlock.Unlock()
return err
}
// BeginTxn overrides the r.Env.BeginTxn and always returns an error. An
// unmanaged transaction.
func (r *Env) BeginTxn(parent *lmdb.Txn, flags uint) (*lmdb.Txn, error) {
return nil, fmt.Errorf("lmdbsync: unmanaged transactions are not supported")
}
// RunTxn is a proxy for r.Env.RunTxn().
//
// If lmdb.NoLock is set on r.Env then RunTxn will block while other updates
// are in progress, regardless of flags.
func (r *Env) RunTxn(flags uint, op lmdb.TxnOp) (err error) {
readonly := flags&lmdb.Readonly != 0
return r.runHandler(readonly, func() error { return r.Env.RunTxn(flags, op) }, r.Handlers)
}
// View is a proxy for r.Env.View().
//
// If lmdb.NoLock is set on r.Env then View will block until any running update
// completes.
func (r *Env) View(op lmdb.TxnOp) error {
return r.runHandler(true, func() error { return r.Env.View(op) }, r.Handlers)
}
// Update is a proxy for r.Env.Update().
//
// If lmdb.NoLock is set on r.Env then Update blocks until all other
// transactions have terminated and blocks all other transactions from running
// while in progress (including readonly transactions).
func (r *Env) Update(op lmdb.TxnOp) error {
return r.runHandler(false, func() error { return r.Env.Update(op) }, r.Handlers)
}
// UpdateLocked is a proxy for r.Env.UpdateLocked().
//
// If lmdb.NoLock is set on r.Env then UpdateLocked blocks until all other
// transactions have terminated and blocks all other transactions from running
// while in progress (including readonly transactions).
func (r *Env) UpdateLocked(op lmdb.TxnOp) error {
return r.runHandler(false, func() error { return r.Env.UpdateLocked(op) }, r.Handlers)
}
// WithHandler returns a TxnRunner than handles transaction errors r.Handlers
// chained with h.
func (r *Env) WithHandler(h Handler) TxnRunner {
return &handlerRunner{
env: r,
h: r.Handlers.Append(h),
}
}
func (r *Env) runHandler(readonly bool, fn func() error, h Handler) error {
ctx := r.ctx
for {
err := r.run(readonly, fn)
ctx, err = h.HandleTxnErr(ctx, r, err)
if err != ErrTxnRetry {
return err
}
}
}
func (r *Env) run(readonly bool, fn func() error) error {
var err error
if r.noLock && !readonly {
r.txnlock.Lock()
err = fn()
r.txnlock.Unlock()
} else {
r.txnlock.RLock()
err = fn()
r.txnlock.RUnlock()
}
return err
}