/
rewind.go
344 lines (301 loc) · 11.5 KB
/
rewind.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
// Copyright (c) 2019, The Decred-Next developers
// See LICENSE for details.
package dcrpg
// Deletion of all data for a certain block (identified by hash), is performed
// using the following relationships:
//
// blocks -> hash identifies: votes, misses, tickets, transactions
// -> txdbids & stxdbids identifies: transactions
// -> use previous_hash to continue to parent block
//
// transactions -> vin_db_ids identifies: vins, addresses where is_funding=false
// -> vout_db_ids identifies vouts, addresses where is_funding=true
//
// tickets -> purchase_tx_db_id identifies the corresponding txn (but rows may
// be removed directly by block hash)
//
// addresses -> tx_vin_vout_row_id where is_funding=true corresponds to transactions.vout_db_ids
// -> tx_vin_vout_row_id where is_funding=false corresponds to transactions.vin_db_ids
//
// For example, REMOVAL of a block's data could be performed in the following
// manner, where [] indicates primary key/row ID lookup:
// 1. vin_DB_IDs = transactions[blocks.txdbids].vin_db_ids
// 2. Remove vins[vin_DB_IDs]
// 3. vout_DB_IDs = transactions[blocks.txdbids].vout_db_ids
// 4. Remove vouts[vout_DB_IDs]
// 5. Remove addresses WHERE tx_vin_vout_row_id=vout_DB_IDs AND is_funding=true
// 6. Remove addresses WHERE tx_vin_vout_row_id=vin_DB_IDs AND is_funding=false
// 7. Repeat 1-6 for blocks.stxdbids (instead of blocks.txdbids)
// 8. Remove tickets where purchase_tx_db_id = blocks.stxdbids
// OR Remove tickets by block_hash
// 9. Remove votes by block_hash
// 10. Remove misses by block_hash
// 11. Remove transactions[txdbids] and transactions[stxdbids]
//
// Use DeleteBlockData to delete all data across these tables for a certain block.
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/Decred-Next/dcrndata/db/dbtypes/v8"
"github.com/Decred-Next/dcrndata/db/dcrpg/v8/internal"
)
func deleteMissesForBlock(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteMisses, "failed to delete misses", hash)
}
func deleteVotesForBlock(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteVotes, "failed to delete votes", hash)
}
func deleteTicketsForBlock(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteTicketsSimple, "failed to delete tickets", hash)
}
func deleteTransactionsForBlock(dbTx *sql.Tx, hash string) (txRowIds []int64, err error) {
var rows *sql.Rows
rows, err = dbTx.Query(internal.DeleteTransactionsSimple, hash)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var id int64
if err = rows.Scan(&id); err != nil {
return nil, err
}
txRowIds = append(txRowIds, id)
}
if err = rows.Err(); err != nil {
return nil, err
}
return
}
func deleteVoutsForBlock(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteVouts, "failed to delete vouts", hash)
}
func deleteVoutsForBlockSubQry(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteVoutsSubQry, "failed to delete vouts", hash)
}
func deleteVinsForBlock(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteVins, "failed to delete vins", hash)
}
func deleteVinsForBlockSubQry(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteVinsSubQry, "failed to delete vins", hash)
}
func deleteAddressesForBlock(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteAddresses, "failed to delete addresses", hash)
}
func deleteAddressesForBlockSubQry(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteAddressesSubQry, "failed to delete addresses", hash)
}
func deleteBlock(dbTx SqlExecutor, hash string) (rowsDeleted int64, err error) {
return sqlExec(dbTx, internal.DeleteBlock, "failed to delete block", hash)
}
func deleteBlockFromChain(dbTx *sql.Tx, hash string) (err error) {
// Delete the row from block_chain where this_hash is the specified hash,
// returning the previous block hash in the chain.
var prevHash string
err = dbTx.QueryRow(internal.DeleteBlockFromChain, hash).Scan(&prevHash)
if err != nil {
// If a row with this_hash was not found, and thus prev_hash is not set,
// attempt to locate a row with next_hash set to the hash of this block,
// and set it to the empty string.
if err == sql.ErrNoRows {
err = UpdateBlockNextByNextHash(dbTx, hash, "")
}
return
}
// For any row where next_hash is the prev_hash of the removed row, set
// next_hash to and empty string since that block is no longer in the chain.
return UpdateBlockNextByHash(dbTx, prevHash, "")
}
// RetrieveTxsBlocksAboveHeight returns all distinct mainchain block heights and
// hashes referenced in the transactions table above the given height.
func RetrieveTxsBlocksAboveHeight(ctx context.Context, db *sql.DB, height int64) (heights []int64, hashes []string, err error) {
var rows *sql.Rows
rows, err = db.QueryContext(ctx, internal.SelectTxsBlocksAboveHeight, height)
if err != nil {
return
}
for rows.Next() {
var height int64
var hash string
if err = rows.Scan(&height, &hash); err != nil {
return nil, nil, err
}
heights = append(heights, height)
hashes = append(hashes, hash)
}
return
}
// RetrieveTxsBestBlockMainchain returns the best mainchain block's height from
// the transactions table. If the table is empty, a height of -1, an empty hash
// string, and a nil error are returned
func RetrieveTxsBestBlockMainchain(ctx context.Context, db *sql.DB) (height int64, hash string, err error) {
err = db.QueryRowContext(ctx, internal.SelectTxsBestBlock).Scan(&height, &hash)
if err == sql.ErrNoRows {
err = nil
height = -1
}
return
}
// DeleteBlockData removes all data for the specified block from every table.
// Data are removed from tables in the following order: vins, vouts, addresses,
// transactions, tickets, votes, misses, blocks, block_chain.
// WARNING: When no indexes are present, these queries are VERY SLOW.
func DeleteBlockData(ctx context.Context, db *sql.DB, hash string) (res dbtypes.DeletionSummary, err error) {
// The data purge is an all or nothing operation (no partial removal of
// data), so use a common sql.Tx for all deletions, and Commit in this
// function rather after each deletion.
var dbTx *sql.Tx
dbTx, err = db.BeginTx(ctx, nil)
if err != nil {
err = fmt.Errorf("failed to start new DB transaction: %v", err)
return
}
res.Timings = new(dbtypes.DeletionSummary)
start := time.Now()
if res.Vins, err = deleteVinsForBlockSubQry(dbTx, hash); err != nil {
err = fmt.Errorf(`deleteVinsForBlockSubQry failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
res.Timings.Vins = time.Since(start).Nanoseconds()
start = time.Now()
if res.Vouts, err = deleteVoutsForBlockSubQry(dbTx, hash); err != nil {
err = fmt.Errorf(`deleteVoutsForBlockSubQry failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
res.Timings.Vouts = time.Since(start).Nanoseconds()
start = time.Now()
if res.Addresses, err = deleteAddressesForBlockSubQry(dbTx, hash); err != nil {
err = fmt.Errorf(`deleteAddressesForBlockSubQry failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
res.Timings.Addresses = time.Since(start).Nanoseconds()
// Deleting transactions rows follow deletion of vins, vouts, and addresses
// rows since the transactions table is used to identify the vin and vout DB
// row IDs for a transaction.
start = time.Now()
var txIDsRemoved []int64
if txIDsRemoved, err = deleteTransactionsForBlock(dbTx, hash); err != nil {
err = fmt.Errorf(`deleteTransactionsForBlock failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
var voutsReset int64
voutsReset, err = resetSpendingForVoutsByTxRowID(dbTx, txIDsRemoved)
if err != nil {
err = fmt.Errorf(`resetSpendingForVoutsByTxRowID failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
if voutsReset != int64(len(txIDsRemoved)) {
log.Warnf(`resetSpendingForVoutsByTxRowID reset %d rows, expected %d`,
voutsReset, len(txIDsRemoved))
}
log.Tracef("Reset spend_tx_row_id for %d vouts.", voutsReset)
res.Transactions = int64(len(txIDsRemoved))
res.Timings.Transactions = time.Since(start).Nanoseconds()
start = time.Now()
if res.Tickets, err = deleteTicketsForBlock(dbTx, hash); err != nil {
err = fmt.Errorf(`deleteTicketsForBlock failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
res.Timings.Tickets = time.Since(start).Nanoseconds()
start = time.Now()
if res.Votes, err = deleteVotesForBlock(dbTx, hash); err != nil {
err = fmt.Errorf(`deleteVotesForBlock failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
res.Timings.Votes = time.Since(start).Nanoseconds()
start = time.Now()
if res.Misses, err = deleteMissesForBlock(dbTx, hash); err != nil {
err = fmt.Errorf(`deleteMissesForBlock failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
res.Timings.Misses = time.Since(start).Nanoseconds()
start = time.Now()
if res.Blocks, err = deleteBlock(dbTx, hash); err != nil {
err = fmt.Errorf(`deleteBlock failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
return
}
res.Timings.Blocks = time.Since(start).Nanoseconds()
if res.Blocks != 1 {
log.Errorf("Expected to delete 1 row of blocks table; actually removed %d.",
res.Blocks)
}
err = deleteBlockFromChain(dbTx, hash)
switch err {
case sql.ErrNoRows:
// Just warn but do not return the error.
err = nil
log.Warnf("Block with hash %s not found in block_chain table.", hash)
case nil:
// Great. Go on to Commit.
default: // err != nil && err != sql.ErrNoRows
// Do not return an error if deleteBlockFromChain just did not delete
// exactly 1 row. Commit and be done.
if strings.HasPrefix(err.Error(), notOneRowErrMsg) {
log.Warnf("deleteBlockFromChain: %v", err)
err = dbTx.Commit()
} else {
err = fmt.Errorf(`deleteBlockFromChain failed with "%v". Rollback: %v`,
err, dbTx.Rollback())
}
return
}
err = dbTx.Commit()
return
}
// DeleteBestBlock removes all data for the best block in the DB from every
// table via DeleteBlockData. The returned height and hash are for the best
// block after successful data removal, or the initial best block if removal
// fails as indicated by a non-nil error value.
func DeleteBestBlock(ctx context.Context, db *sql.DB) (res dbtypes.DeletionSummary, height int64, hash string, err error) {
height, hash, err = RetrieveBestBlock(ctx, db)
if err != nil {
return
}
res, err = DeleteBlockData(ctx, db, hash)
if err != nil {
return
}
height, hash, err = RetrieveBestBlock(ctx, db)
if err != nil {
return
}
err = SetDBBestBlock(db, hash, height)
return
}
// DeleteBlocks removes all data for the N best blocks in the DB from every
// table via repeated calls to DeleteBestBlock.
func DeleteBlocks(ctx context.Context, N int64, db *sql.DB) (res []dbtypes.DeletionSummary, height int64, hash string, err error) {
// If N is less than 1, get the current best block height and hash, then
// return.
if N < 1 {
height, hash, err = RetrieveBestBlock(ctx, db)
return
}
for i := int64(0); i < N; i++ {
var resi dbtypes.DeletionSummary
resi, height, hash, err = DeleteBestBlock(ctx, db)
if err != nil {
return
}
res = append(res, resi)
if hash == "" {
break
}
if (i%100 == 0 && i > 0) || i == N-1 {
log.Debugf("Removed data for %d blocks.", i+1)
}
}
return
}