/
reconcile.go
executable file
·130 lines (88 loc) · 4.27 KB
/
reconcile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package ffldb
import (
"fmt"
"hash/crc32"
database "git.parallelcoin.io/dev/9/pkg/db"
cl "git.parallelcoin.io/dev/9/pkg/util/cl"
)
// The serialized write cursor location format is:
// [0:4] Block file (4 bytes)
// [4:8] File offset (4 bytes)
// [8:12] Castagnoli CRC-32 checksum (4 bytes)
// serializeWriteRow serialize the current block file and offset where new will be written into a format suitable for storage into the metadata.
func serializeWriteRow(
curBlockFileNum, curFileOffset uint32) []byte {
var serializedRow [12]byte
byteOrder.PutUint32(serializedRow[0:4], curBlockFileNum)
byteOrder.PutUint32(serializedRow[4:8], curFileOffset)
checksum := crc32.Checksum(serializedRow[:8], castagnoli)
byteOrder.PutUint32(serializedRow[8:12], checksum)
return serializedRow[:]
}
// deserializeWriteRow deserializes the write cursor location stored in the metadata. Returns ErrCorruption if the checksum of the entry doesn't match.
func deserializeWriteRow(
writeRow []byte) (uint32, uint32, error) {
// Ensure the checksum matches. The checksum is at the end.
gotChecksum := crc32.Checksum(writeRow[:8], castagnoli)
wantChecksumBytes := writeRow[8:12]
wantChecksum := byteOrder.Uint32(wantChecksumBytes)
if gotChecksum != wantChecksum {
str := fmt.Sprintf("metadata for write cursor does not match "+
"the expected checksum - got %d, want %d", gotChecksum,
wantChecksum)
return 0, 0, makeDbErr(database.ErrCorruption, str, nil)
}
fileNum := byteOrder.Uint32(writeRow[0:4])
fileOffset := byteOrder.Uint32(writeRow[4:8])
return fileNum, fileOffset, nil
}
// reconcileDB reconciles the metadata with the flat block files on disk. It will also initialize the underlying database if the create flag is set.
func reconcileDB(
pdb *db, create bool) (database.DB, error) {
// Perform initial internal bucket and value creation during database creation.
if create {
if err := initDB(pdb.cache.ldb); err != nil {
return nil, err
}
}
// Load the current write cursor position from the metadata.
var curFileNum, curOffset uint32
err := pdb.View(func(tx database.Tx) error {
writeRow := tx.Metadata().Get(writeLocKeyName)
if writeRow == nil {
str := "write cursor does not exist"
return makeDbErr(database.ErrCorruption, str, nil)
}
var err error
curFileNum, curOffset, err = deserializeWriteRow(writeRow)
return err
})
if err != nil {
return nil, err
}
// When the write cursor position found by scanning the block files on disk is AFTER the position the metadata believes to be true, truncate the files on disk to match the metadata. This can be a fairly common occurrence in unclean shutdown scenarios while the block files are in the middle of being written. Since the metadata isn't updated until after the block data is written, this is effectively just a rollback to the known good point before the unclean shutdown.
wc := pdb.store.writeCursor
if wc.curFileNum > curFileNum || (wc.curFileNum == curFileNum &&
wc.curOffset > curOffset) {
log <- cl.Inf("Detected unclean shutdown - Repairing...")
log <- cl.Debugf{
"Metadata claims file %d, offset %d. Block data is at file %d, offset %d",
curFileNum,
curOffset,
wc.curFileNum,
wc.curOffset,
}
pdb.store.handleRollback(curFileNum, curOffset)
log <- cl.Inf("Database sync complete")
}
// When the write cursor position found by scanning the block files on disk is BEFORE the position the metadata believes to be true, return a corruption error. Since sync is called after each block is written and before the metadata is updated, this should only happen in the case of missing, deleted, or truncated block files, which generally is not an easily recoverable scenario. In the future, it might be possible to rescan and rebuild the metadata from the block files, however, that would need to happen with coordination from a higher layer since it could invalidate other metadata.
if wc.curFileNum < curFileNum || (wc.curFileNum == curFileNum &&
wc.curOffset < curOffset) {
str := fmt.Sprintf("metadata claims file %d, offset %d, but "+
"block data is at file %d, offset %d", curFileNum,
curOffset, wc.curFileNum, wc.curOffset)
log <- cl.Warn{"***Database corruption detected***:", str}
return nil, makeDbErr(database.ErrCorruption, str, nil)
}
return pdb, nil
}