/
database.go
296 lines (250 loc) · 8.8 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/*
* database.go
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// FoundationDB Go API
package fdb
// #define FDB_API_VERSION 740
// #include <foundationdb/fdb_c.h>
import "C"
import (
"errors"
"runtime"
)
// Database is a handle to a FoundationDB database. Database is a lightweight
// object that may be efficiently copied, and is safe for concurrent use by
// multiple goroutines.
//
// Although Database provides convenience methods for reading and writing data,
// modifications to a database are usually made via transactions, which are
// usually created and committed automatically by the (Database).Transact
// method.
type Database struct {
// String reference to the cluster file.
clusterFile string
// This variable is to track if we have to remove the database from the cached
// database structs. We can't use clusterFile alone, since the default clusterFile
// would be an empty string.
isCached bool
*database
}
type database struct {
ptr *C.FDBDatabase
}
// DatabaseOptions is a handle with which to set options that affect a Database
// object. A DatabaseOptions instance should be obtained with the
// (Database).Options method.
type DatabaseOptions struct {
d *database
}
// Close will close the Database and clean up all resources.
// You have to ensure that you're not resuing this database.
func (d *Database) Close() {
// Remove database object from the cached databases
if d.isCached {
openDatabases.Delete(d.clusterFile)
}
// Destroy the database
d.destroy()
}
func (opt DatabaseOptions) setOpt(code int, param []byte) error {
return setOpt(func(p *C.uint8_t, pl C.int) C.fdb_error_t {
return C.fdb_database_set_option(opt.d.ptr, C.FDBDatabaseOption(code), p, pl)
}, param)
}
func (d *database) destroy() {
if d.ptr == nil {
return
}
C.fdb_database_destroy(d.ptr)
}
// CreateTransaction returns a new FoundationDB transaction. It is generally
// preferable to use the (Database).Transact method, which handles
// automatically creating and committing a transaction with appropriate retry
// behavior.
func (d Database) CreateTransaction() (Transaction, error) {
var outt *C.FDBTransaction
if err := C.fdb_database_create_transaction(d.ptr, &outt); err != 0 {
return Transaction{}, Error{int(err)}
}
t := &transaction{outt, d}
runtime.SetFinalizer(t, (*transaction).destroy)
return Transaction{t}, nil
}
// RebootWorker is a wrapper around fdb_database_reboot_worker and allows to reboot processes
// from the go bindings. If a suspendDuration > 0 is provided the rebooted process will be
// suspended for suspendDuration seconds. If checkFile is set to true the process will check
// if the data directory is writeable by creating a validation file. The address must be a
// process address is the form of IP:Port pair.
func (d Database) RebootWorker(address string, checkFile bool, suspendDuration int) error {
t := &futureInt64{
future: newFuture(C.fdb_database_reboot_worker(
d.ptr,
byteSliceToPtr([]byte(address)),
C.int(len(address)),
C.fdb_bool_t(boolToInt(checkFile)),
C.int(suspendDuration),
),
),
}
dbVersion, err := t.Get()
if dbVersion == 0 {
return errors.New("failed to send reboot process request")
}
return err
}
func retryable(wrapped func() (interface{}, error), onError func(Error) FutureNil) (ret interface{}, e error) {
for {
ret, e = wrapped()
// No error means success!
if e == nil {
return
}
// Check if the error chain contains an
// fdb.Error
var ep Error
if errors.As(e, &ep) {
e = onError(ep).Get()
}
// If OnError returns an error, then it's not
// retryable; otherwise take another pass at things
if e != nil {
return
}
}
}
// Transact runs a caller-provided function inside a retry loop, providing it
// with a newly created Transaction. After the function returns, the Transaction
// will be committed automatically. Any error during execution of the function
// (by panic or return) or the commit will cause the function and commit to be
// retried or, if fatal, return the error to the caller.
//
// When working with Future objects in a transactional function, you may either
// explicitly check and return error values using Get, or call MustGet. Transact
// will recover a panicked Error and either retry the transaction or return the
// error.
//
// The transaction is retried if the error is or wraps a retryable Error.
// The error is unwrapped.
//
// Do not return Future objects from the function provided to Transact. The
// Transaction created by Transact may be finalized at any point after Transact
// returns, resulting in the cancellation of any outstanding
// reads. Additionally, any errors returned or panicked by the Future will no
// longer be able to trigger a retry of the caller-provided function.
//
// See the Transactor interface for an example of using Transact with
// Transaction and Database objects.
func (d Database) Transact(f func(Transaction) (interface{}, error)) (interface{}, error) {
tr, e := d.CreateTransaction()
// Any error here is non-retryable
if e != nil {
return nil, e
}
wrapped := func() (ret interface{}, e error) {
defer panicToError(&e)
ret, e = f(tr)
if e == nil {
e = tr.Commit().Get()
}
return
}
return retryable(wrapped, tr.OnError)
}
// ReadTransact runs a caller-provided function inside a retry loop, providing
// it with a newly created Transaction (as a ReadTransaction). Any error during
// execution of the function (by panic or return) will cause the function to be
// retried or, if fatal, return the error to the caller.
//
// When working with Future objects in a read-only transactional function, you
// may either explicitly check and return error values using Get, or call
// MustGet. ReadTransact will recover a panicked Error and either retry the
// transaction or return the error.
//
// The transaction is retried if the error is or wraps a retryable Error.
// The error is unwrapped.
//
// Do not return Future objects from the function provided to ReadTransact. The
// Transaction created by ReadTransact may be finalized at any point after
// ReadTransact returns, resulting in the cancellation of any outstanding
// reads. Additionally, any errors returned or panicked by the Future will no
// longer be able to trigger a retry of the caller-provided function.
//
// See the ReadTransactor interface for an example of using ReadTransact with
// Transaction, Snapshot and Database objects.
func (d Database) ReadTransact(f func(ReadTransaction) (interface{}, error)) (interface{}, error) {
tr, e := d.CreateTransaction()
// Any error here is non-retryable
if e != nil {
return nil, e
}
wrapped := func() (ret interface{}, e error) {
defer panicToError(&e)
ret, e = f(tr)
if e == nil {
e = tr.Commit().Get()
}
return
}
return retryable(wrapped, tr.OnError)
}
// Options returns a DatabaseOptions instance suitable for setting options
// specific to this database.
func (d Database) Options() DatabaseOptions {
return DatabaseOptions{d.database}
}
// LocalityGetBoundaryKeys returns a slice of keys that fall within the provided
// range. Each key is located at the start of a contiguous range stored on a
// single server.
//
// If limit is non-zero, only the first limit keys will be returned. In large
// databases, the number of boundary keys may be large. In these cases, a
// non-zero limit should be used, along with multiple calls to
// LocalityGetBoundaryKeys.
//
// If readVersion is non-zero, the boundary keys as of readVersion will be
// returned.
func (d Database) LocalityGetBoundaryKeys(er ExactRange, limit int, readVersion int64) ([]Key, error) {
tr, e := d.CreateTransaction()
if e != nil {
return nil, e
}
if readVersion != 0 {
tr.SetReadVersion(readVersion)
}
tr.Options().SetReadSystemKeys()
tr.Options().SetLockAware()
bk, ek := er.FDBRangeKeys()
ffer := KeyRange{
append(Key("\xFF/keyServers/"), bk.FDBKey()...),
append(Key("\xFF/keyServers/"), ek.FDBKey()...),
}
kvs, e := tr.Snapshot().GetRange(ffer, RangeOptions{Limit: limit}).GetSliceWithError()
if e != nil {
return nil, e
}
size := len(kvs)
if limit != 0 && limit < size {
size = limit
}
boundaries := make([]Key, size)
for i := 0; i < size; i++ {
boundaries[i] = kvs[i].Key[13:]
}
return boundaries, nil
}