-
Notifications
You must be signed in to change notification settings - Fork 669
/
state.go
266 lines (233 loc) · 8.6 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package atomic
import (
"bytes"
"errors"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/linkeddb"
"github.com/ava-labs/avalanchego/database/prefixdb"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/hashing"
"github.com/ava-labs/avalanchego/utils/set"
)
var errDuplicatedOperation = errors.New("duplicated operation on provided value")
type dbElement struct {
// Present indicates the value was removed before existing.
// If set to false, when this element is added to the shared memory, it will
// be immediately removed.
// If set to true, then this element will be removed normally when remove is
// called.
Present bool `serialize:"true"`
// Value is the body of this element.
Value []byte `serialize:"true"`
// Traits are a collection of features that can be used to lookup this
// element.
Traits [][]byte `serialize:"true"`
}
// state is used to maintain a mapping from keys to dbElements and an index that
// maps traits to the keys with those traits.
type state struct {
// valueDB contains a mapping from key to the corresponding dbElement.
valueDB database.Database
// indexDB stores the trait -> key mappings.
// To get this mapping, we construct a prefixdb using the trait as a prefix
// and then construct a linkeddb on the result.
// The linkeddb contains the keys that the trait maps to as the key and map
// to nil values.
indexDB database.Database
}
// Value returns the Element associated with [key].
func (s *state) Value(key []byte) (*Element, error) {
value, err := s.loadValue(key)
if err != nil {
return nil, err
}
// If [key] is indexed, but has been marked as deleted, return
// [database.ErrNotFound].
if !value.Present {
return nil, database.ErrNotFound
}
return &Element{
Key: key,
Value: value.Value,
Traits: value.Traits,
}, nil
}
// SetValue places the element [e] into the state and maps each of the traits of
// the element to its key, so that the element can be looked up by any of its
// traits.
//
// If the value was preemptively marked as deleted, then the operations cancel
// and the removal marker is deleted.
func (s *state) SetValue(e *Element) error {
value, err := s.loadValue(e.Key)
if err == nil {
// The key was already registered with the state.
if !value.Present {
// This was previously optimistically deleted from the database, so
// it should be immediately removed.
return s.valueDB.Delete(e.Key)
}
// This key was written twice, which is invalid
return errDuplicatedOperation
}
if err != database.ErrNotFound {
// An unexpected error occurred, so we should propagate that error
return err
}
for _, trait := range e.Traits {
traitDB := prefixdb.New(trait, s.indexDB)
traitList := linkeddb.NewDefault(traitDB)
if err := traitList.Put(e.Key, nil); err != nil {
return err
}
}
dbElem := dbElement{
Present: true,
Value: e.Value,
Traits: e.Traits,
}
valueBytes, err := codecManager.Marshal(codecVersion, &dbElem)
if err != nil {
return err
}
return s.valueDB.Put(e.Key, valueBytes)
}
// RemoveValue removes [key] from the state.
// This operation removes the element associated with the key from both the
// valueDB and the indexDB.
// If [key] has already been marked as removed, an error is returned as the same
// element cannot be removed twice.
// If [key] is not present in the db, then it is marked as deleted so that it
// will be removed immediately when it gets added in the future.
//
// This ensures that we can consume a UTXO before it has been added into shared
// memory in bootstrapping.
// Ex. P-Chain attempts to consume atomic UTXO from the C-Chain in block 100.
// P-Chain executes before the C-Chain, so when bootstrapping it must be able to
// verify and accept this block before the node has processed the block on the
// C-Chain where the atomic UTXO is added to shared memory.
// Additionally, when the C-Chain actually does add the atomic UTXO to shared
// memory, RemoveValue must handle the case that the atomic UTXO was marked as
// deleted before it was actually added.
// To do this, the node essentially adds a tombstone marker when the P-Chain
// consumes the non-existent UTXO, which is deleted when the C-Chain actually
// adds the atomic UTXO to shared memory.
//
// This implies that chains interacting with shared memory must be able to
// generate their chain state without actually performing the read of shared
// memory. Shared memory should only be used to verify that the the transition
// being performed is valid. That ensures that such verification can be skipped
// during bootstrapping. It is up to the chain to ensure this based on the
// current engine state.
func (s *state) RemoveValue(key []byte) error {
value, err := s.loadValue(key)
if err != nil {
if err != database.ErrNotFound {
// An unexpected error occurred, so we should propagate that error
return err
}
// The value doesn't exist, so we should optimistically delete it
dbElem := dbElement{Present: false}
valueBytes, err := codecManager.Marshal(codecVersion, &dbElem)
if err != nil {
return err
}
return s.valueDB.Put(key, valueBytes)
}
// Don't allow the removal of something that was already removed.
if !value.Present {
return errDuplicatedOperation
}
// Remove [key] from the indexDB for each trait that has indexed this key.
for _, trait := range value.Traits {
traitDB := prefixdb.New(trait, s.indexDB)
traitList := linkeddb.NewDefault(traitDB)
if err := traitList.Delete(key); err != nil {
return err
}
}
return s.valueDB.Delete(key)
}
// loadValue retrieves the dbElement corresponding to [key] from the value
// database.
func (s *state) loadValue(key []byte) (*dbElement, error) {
valueBytes, err := s.valueDB.Get(key)
if err != nil {
return nil, err
}
// The key was in the database
value := &dbElement{}
_, err = codecManager.Unmarshal(valueBytes, value)
return value, err
}
// getKeys returns up to [limit] keys starting at [startTrait]/[startKey] which
// possess at least one of the specified [traits].
// Returns the set of keys possessing traits and the ending [lastTrait] and
// [lastKey] to use as an index for pagination.
func (s *state) getKeys(traits [][]byte, startTrait, startKey []byte, limit int) ([][]byte, []byte, []byte, error) {
// Note: We use a reference to [keySet] since otherwise, depending on how
// this variable is declared, the map may not be initialized from the
// start. The first add to the underlying map of the set would then
// result in the map being initialized.
keySet := set.Set[ids.ID]{}
keys := [][]byte(nil)
lastTrait := startTrait
lastKey := startKey
// Iterate over the traits in order appending all of the keys that possess
// the given [traits].
utils.SortBytes(traits)
for _, trait := range traits {
switch bytes.Compare(trait, startTrait) {
case -1:
continue // Skip the trait, if we have already paginated past it.
case 1:
// We are already past [startTrait]/[startKey], so we should now
// start indexing all of [trait].
startKey = nil
}
lastTrait = trait
var err error
// Add any additional keys that possess [trait] to [keys].
lastKey, err = s.appendTraitKeys(&keys, &keySet, &limit, trait, startKey)
if err != nil {
return nil, nil, nil, err
}
if limit == 0 {
break
}
}
// Return the [keys] that we found as well as the index given by [lastTrait]
// and [lastKey].
return keys, lastTrait, lastKey, nil
}
// appendTraitKeys iterates over the indexDB of [trait] starting at [startKey]
// and adds keys that possess [trait] to [keys] until the iteration completes or
// limit hits 0. If a key possesses multiple traits, it will be de-duplicated
// with [keySet].
func (s *state) appendTraitKeys(keys *[][]byte, keySet *set.Set[ids.ID], limit *int, trait, startKey []byte) ([]byte, error) {
lastKey := startKey
// Create the prefixDB for the specified trait.
traitDB := prefixdb.New(trait, s.indexDB)
// Interpret [traitDB] as a linkeddb that contains a set of keys.
traitList := linkeddb.NewDefault(traitDB)
iter := traitList.NewIteratorWithStart(startKey)
defer iter.Release()
// Iterate over the keys that possess [trait].
for iter.Next() && *limit > 0 {
key := iter.Key()
lastKey = key
// Calculate the hash of the key to check against the set and ensure
// we don't add the same element twice.
id := hashing.ComputeHash256Array(key)
if keySet.Contains(id) {
continue
}
keySet.Add(id)
*keys = append(*keys, key)
*limit--
}
return lastKey, iter.Error()
}