This repository has been archived by the owner on Jun 25, 2024. It is now read-only.
forked from lightninglabs/pool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metadata.go
182 lines (154 loc) · 5.3 KB
/
metadata.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package clientdb
import (
"crypto/rand"
"errors"
"fmt"
"github.com/btcsuite/btcwallet/wtxmgr"
"github.com/getvoltage/pool/clientdb/migrations"
"go.etcd.io/bbolt"
)
// migration is a function which takes a prior outdated version of the database
// instance and mutates the key/bucket structure to arrive at a more up-to-date
// version of the database.
type migration func(tx *bbolt.Tx) error
var (
// metadataBucketKey stores all the metadata concerning the state of the
// database.
metadataBucketKey = []byte("metadata")
// dbVersionKey is the key used for storing/retrieving the current
// database version.
dbVersionKey = []byte("version")
// lockIDKey is the database key used for storing/retrieving the global
// lock ID to use when leasing outputs from the backing lnd node's
// wallet. This is mostly required so that calls to LeaseOutput are
// idempotent when attempting to lease an output we already have a lease
// for.
lockIDKey = []byte("lock-id")
// ErrDBReversion is returned when detecting an attempt to revert to a
// prior database version.
ErrDBReversion = errors.New("cannot revert to prior version")
// dbVersions is storing all versions of database. If current version
// of database don't match with latest version this list will be used
// for retrieving all migration function that are need to apply to the
// current db.
dbVersions = []migration{
migrations.AddInitialOrderTimestamps,
}
latestDBVersion = uint32(len(dbVersions))
)
// getDBVersion retrieves the current database version.
func getDBVersion(bucket *bbolt.Bucket) (uint32, error) {
versionBytes := bucket.Get(dbVersionKey)
if versionBytes == nil {
return 0, errors.New("database version not found")
}
return byteOrder.Uint32(versionBytes), nil
}
// setDBVersion updates the current database version.
func setDBVersion(bucket *bbolt.Bucket, version uint32) error {
var b [4]byte
byteOrder.PutUint32(b[:], version)
return bucket.Put(dbVersionKey, b[:])
}
// getBucket retrieves the bucket with the given key.
func getBucket(tx *bbolt.Tx, key []byte) (*bbolt.Bucket, error) {
bucket := tx.Bucket(key)
if bucket == nil {
return nil, fmt.Errorf("bucket \"%v\" does not exist",
string(key))
}
return bucket, nil
}
// getNestedBucket retrieves the nested bucket with the given key found within
// the given bucket. If the bucket does not exist and `create` is true, then the
// bucket is created.
func getNestedBucket(bucket *bbolt.Bucket, key []byte,
create bool) (*bbolt.Bucket, error) {
nestedBucket := bucket.Bucket(key)
if nestedBucket == nil && create {
return bucket.CreateBucketIfNotExists(key)
}
if nestedBucket == nil {
return nil, fmt.Errorf("nested bucket \"%v\" does not exist",
string(key))
}
return nestedBucket, nil
}
// syncVersions function is used for safe db version synchronization. It
// applies migration functions to the current database and recovers the
// previous state of db if at least one error/panic appeared during migration.
func syncVersions(db *bbolt.DB) error {
var currentVersion uint32
err := db.View(func(tx *bbolt.Tx) error {
metadata, err := getBucket(tx, metadataBucketKey)
if err != nil {
return err
}
currentVersion, err = getDBVersion(metadata)
return err
})
if err != nil {
return err
}
log.Infof("Checking for schema update: latest_version=%v, "+
"db_version=%v", latestDBVersion, currentVersion)
switch {
// If the database reports a higher version that we are aware of, the
// user is probably trying to revert to a prior version of lnd. We fail
// here to prevent reversions and unintended corruption.
case currentVersion > latestDBVersion:
log.Errorf("Refusing to revert from db_version=%d to "+
"lower version=%d", currentVersion,
latestDBVersion)
return ErrDBReversion
// If the current database version matches the latest version number,
// then we don't need to perform any migrations.
case currentVersion == latestDBVersion:
return nil
}
log.Infof("Performing database schema migration")
// Otherwise we execute the migrations serially within a single database
// transaction to ensure the migration is atomic.
return db.Update(func(tx *bbolt.Tx) error {
for v := currentVersion; v < latestDBVersion; v++ {
log.Infof("Applying migration #%v", v+1)
migration := dbVersions[v]
if err := migration(tx); err != nil {
log.Infof("Unable to apply migration #%v", v+1)
return err
}
}
metadata, err := getBucket(tx, metadataBucketKey)
if err != nil {
return err
}
return setDBVersion(metadata, latestDBVersion)
})
}
// storeRandomLockID generates a random lock ID backed by the system's CSPRNG
// and stores it under the metadata bucket.
func storeRandomLockID(metadata *bbolt.Bucket) error {
var lockID wtxmgr.LockID
if _, err := rand.Read(lockID[:]); err != nil {
return err
}
return metadata.Put(lockIDKey, lockID[:])
}
// LockID retrieves the database's global lock ID used to lease outputs from the
// backing lnd node's wallet.
func (db *DB) LockID() (wtxmgr.LockID, error) {
var lockID wtxmgr.LockID
err := db.View(func(tx *bbolt.Tx) error {
metadata, err := getBucket(tx, metadataBucketKey)
if err != nil {
return err
}
lockIDBytes := metadata.Get(lockIDKey)
if lockIDBytes == nil {
return errors.New("lock ID not found")
}
copy(lockID[:], lockIDBytes)
return nil
})
return lockID, err
}