forked from stellar/go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
207 lines (177 loc) · 6.62 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// Package verify provides helpers used for verifying if the ingested data is
// correct.
package verify
import (
"bytes"
"encoding/base64"
"io"
"github.com/stellar/go/ingest"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
// TransformLedgerEntryFunction is a function that transforms ledger entry
// into a form that should be compared to checkpoint state. It can be also used
// to decide if the given entry should be ignored during verification.
// Sometimes the application needs only specific type entries or specific fields
// for a given entry type. Use this function to create a common form of an entry
// that will be used for equality check.
type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry)
// StateVerifier verifies if ledger entries provided by Add method are the same
// as in the checkpoint ledger entries provided by CheckpointChangeReader.
// The algorithm works in the following way:
// 0. Develop TransformFunction. It should remove all fields and objects not
// stored in your app. For example, if you only store accounts, all other
// ledger entry types should be ignored (return ignore = true).
// 1. In a loop, get entries from history archive by calling GetEntries()
// and Write() your version of entries found in the batch (in any order).
// 2. When GetEntries() return no more entries, call Verify with a number of
// entries in your storage (to find if some extra entires exist in your
// storage).
// Functions will return StateError type if state is found to be incorrect.
// It's user responsibility to call `StateReader.Close()` when reading is done.
// Check Horizon for an example how to use this tool.
type StateVerifier struct {
StateReader ingest.ChangeReader
// TransformFunction transforms (or ignores) ledger entries streamed from
// checkpoint buckets to match the form added by `Write`. Read
// TransformLedgerEntryFunction godoc for more information.
TransformFunction TransformLedgerEntryFunction
readEntries int
readingDone bool
currentEntries map[string]xdr.LedgerEntry
}
// GetLedgerKeys returns up to `count` ledger keys from history buckets
// storing actual entries in cache to compare in Write.
func (v *StateVerifier) GetLedgerKeys(count int) ([]xdr.LedgerKey, error) {
err := v.checkUnreadEntries()
if err != nil {
return nil, err
}
keys := make([]xdr.LedgerKey, 0, count)
v.currentEntries = make(map[string]xdr.LedgerEntry)
for count > 0 {
entryChange, err := v.StateReader.Read()
if err != nil {
if err == io.EOF {
v.readingDone = true
return keys, nil
}
return keys, err
}
entry := *entryChange.Post
if v.TransformFunction != nil {
ignore, _ := v.TransformFunction(entry)
if ignore {
continue
}
}
ledgerKey := entry.LedgerKey()
key, err := xdr.MarshalBase64(ledgerKey)
if err != nil {
return keys, errors.Wrap(err, "Error marshaling ledgerKey")
}
keys = append(keys, ledgerKey)
entry.Normalize()
v.currentEntries[key] = entry
count--
v.readEntries++
}
return keys, nil
}
// Write compares the entry with entries in the latest batch of entries fetched
// using `GetEntries`. Entries don't need to follow the order in entries returned
// by `GetEntries`.
// Warning: Write will call Normalize() on `entry` that can modify it!
// Any `StateError` returned by this method indicates invalid state!
func (v *StateVerifier) Write(entry xdr.LedgerEntry) error {
actualEntry := entry.Normalize()
actualEntryMarshaled, err := actualEntry.MarshalBinary()
if err != nil {
return errors.Wrap(err, "Error marshaling actualEntry")
}
key, err := xdr.MarshalBase64(actualEntry.LedgerKey())
if err != nil {
return errors.Wrap(err, "Error marshaling ledgerKey")
}
expectedEntry, exist := v.currentEntries[key]
if !exist {
return ingest.NewStateError(errors.Errorf(
"Cannot find entry in currentEntries map: %s (key = %s)",
base64.StdEncoding.EncodeToString(actualEntryMarshaled),
key,
))
}
delete(v.currentEntries, key)
preTransformExpectedEntry := expectedEntry
preTransformExpectedEntryMarshaled, err := preTransformExpectedEntry.MarshalBinary()
if err != nil {
return errors.Wrap(err, "Error marshaling preTransformExpectedEntry")
}
if v.TransformFunction != nil {
var ignore bool
ignore, expectedEntry = v.TransformFunction(expectedEntry)
// Extra check: if entry was ignored in GetEntries, it shouldn't be
// ignored here.
if ignore {
return errors.Errorf(
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly TransformFunction is buggy.",
base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled),
)
}
}
expectedEntryMarshaled, err := expectedEntry.MarshalBinary()
if err != nil {
return errors.Wrap(err, "Error marshaling expectedEntry")
}
if !bytes.Equal(actualEntryMarshaled, expectedEntryMarshaled) {
return ingest.NewStateError(errors.Errorf(
"Entry does not match the fetched entry. Expected: %s (pretransform = %s), actual: %s",
base64.StdEncoding.EncodeToString(expectedEntryMarshaled),
base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled),
base64.StdEncoding.EncodeToString(actualEntryMarshaled),
))
}
return nil
}
// Verify should be run after all GetEntries/Write calls. If there were no errors
// so far it means that all entries present in history buckets matches the entries
// in application storage. However, it's still possible that state is invalid when:
// * Not all entries have been read from history buckets (ex. due to a bug).
// * Some entries were not compared using Write.
// * There are some extra entries in application storage not present in history
// buckets.
// Any `StateError` returned by this method indicates invalid state!
func (v *StateVerifier) Verify(countAll int) error {
err := v.checkUnreadEntries()
if err != nil {
return err
}
if !v.readingDone {
return errors.New("There are unread entries in state reader. Process all entries before calling Verify.")
}
if v.readEntries != countAll {
return ingest.NewStateError(errors.Errorf(
"Number of entries read using GetEntries (%d) does not match number of entries in your storage (%d).",
v.readEntries,
countAll,
))
}
return nil
}
func (v *StateVerifier) checkUnreadEntries() error {
if len(v.currentEntries) > 0 {
var entry xdr.LedgerEntry
for _, e := range v.currentEntries {
entry = e
break
}
// Ignore error as StateError below is more important
entryString, _ := xdr.MarshalBase64(entry)
return ingest.NewStateError(errors.Errorf(
"Entries (%d) not found locally, example: %s",
len(v.currentEntries),
entryString,
))
}
return nil
}