-
Notifications
You must be signed in to change notification settings - Fork 1
/
blockio.go
773 lines (695 loc) · 27.5 KB
/
blockio.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
// Copyright (c) 2015-2016 The btcsuite developers
// Copyright (c) 2016 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
// This file contains the implementation functions for reading, writing, and
// otherwise working with the flat files that house the actual blocks.
package ffldb
import (
"container/list"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"os"
"path/filepath"
"sync"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/database"
"github.com/decred/dcrd/wire"
)
const (
// blockFilenameTemplate is the pattern used for naming block files.
//
// The protocol encodes block height as int32, so max number of blocks
// is 2^31. Max block size per the protocol is 32MiB per block. So the
// theoretical max at the time this comment was written is 64PiB
// (pebibytes). With files @ 512MiB each, this would require a maximum
// of 134,217,728 files. Thus, choose 9 digits of precision for the
// filenames. An additional benefit is 9 digits provides 10^9 files @
// 512MiB each for a total of ~476.84PiB (roughly 7.4 times the current
// theoretical max), so there is room for the max block size to grow in
// the future.
blockFilenameTemplate = "%09d.fdb"
// maxOpenFiles is the max number of open files to maintain in the
// open blocks cache. Note that this does not include the current
// write file, so there will typically be one more than this value open.
maxOpenFiles = 25
// maxBlockFileSize is the maximum size for each file used to store
// blocks.
//
// NOTE: The current code uses uint32 for all offsets, so this value
// must be less than 2^32 (4 GiB). This is also why it's a typed
// constant.
maxBlockFileSize uint32 = 512 * 1024 * 1024 // 512 MiB
// blockLocSize is the number of bytes the serialized block location
// data that is stored in the block index.
//
// The serialized block location format is:
//
// [0:4] Block file (4 bytes)
// [4:8] File offset (4 bytes)
// [8:12] Block length (4 bytes)
blockLocSize = 12
)
var (
// castagnoli houses the Castagnoli polynomial used for CRC-32
// checksums.
castagnoli = crc32.MakeTable(crc32.Castagnoli)
)
// filer is an interface which acts very similar to a *os.File and is typically
// implemented by it. It exists so the test code can provide mock files for
// properly testing corruption and file system issues.
type filer interface {
io.Closer
io.WriterAt
io.ReaderAt
Truncate(size int64) error
Sync() error
}
// lockableFile represents a block file on disk that has been opened for either
// read or read/write access. It also contains a read-write mutex to support
// multiple concurrent readers.
type lockableFile struct {
sync.RWMutex
file filer
}
// writeCursor represents the current file and offset of the block file on disk
// for performing all writes. It also contains a read-write mutex to support
// multiple concurrent readers which can reuse the file handle.
type writeCursor struct {
sync.RWMutex
// curFile is the current block file that will be appended to when
// writing new blocks.
curFile *lockableFile
// curFileNum is the current block file number and is used to allow
// readers to use the same open file handle.
curFileNum uint32
// curOffset is the offset in the current write block file where the
// next new block will be written.
curOffset uint32
}
// blockStore houses information used to handle reading and writing blocks (and
// part of blocks) into flat files with support for multiple concurrent readers.
type blockStore struct {
// maxBlockFileSize is the maximum size for each file used to store
// blocks. It is defined on the store so the whitebox tests can
// override the value.
maxBlockFileSize uint32
// network is the specific network to use in the flat files for each
// block.
network wire.CurrencyNet
// basePath is the base path used for the flat block files and metadata.
basePath string
// The following fields are related to the flat files which hold the
// actual blocks. The number of open files is limited by maxOpenFiles.
//
// obfMutex protects concurrent access to the openBlockFiles map. It is
// a RWMutex so multiple readers can simultaneously access open files.
//
// openBlockFiles houses the open file handles for existing block files
// which have been opened read-only along with an individual RWMutex.
// This scheme allows multiple concurrent readers to the same file while
// preventing the file from being closed out from under them.
//
// lruMutex protects concurrent access to the least recently used list
// and lookup map.
//
// openBlocksLRU tracks how the open files are refenced by pushing the
// most recently used files to the front of the list thereby trickling
// the least recently used files to end of the list. When a file needs
// to be closed due to exceeding the the max number of allowed open
// files, the one at the end of the list is closed.
//
// fileNumToLRUElem is a mapping between a specific block file number
// and the associated list element on the least recently used list.
//
// Thus, with the combination of these fields, the database supports
// concurrent non-blocking reads across multiple and individual files
// along with intelligently limiting the number of open file handles by
// closing the least recently used files as needed.
//
// NOTE: The locking order used throughout is well-defined and MUST be
// followed. Failure to do so could lead to deadlocks. In particular,
// the locking order is as follows:
// 1) obfMutex
// 2) lruMutex
// 3) writeCursor mutex
// 4) specific file mutexes
//
// None of the mutexes are required to be locked at the same time, and
// often aren't. However, if they are to be locked simultaneously, they
// MUST be locked in the order previously specified.
//
// Due to the high performance and multi-read concurrency requirements,
// write locks should only be held for the minimum time necessary.
obfMutex sync.RWMutex
lruMutex sync.Mutex
openBlocksLRU *list.List // Contains uint32 block file numbers.
fileNumToLRUElem map[uint32]*list.Element
openBlockFiles map[uint32]*lockableFile
// writeCursor houses the state for the current file and location that
// new blocks are written to.
writeCursor *writeCursor
// These functions are set to openFile, openWriteFile, and deleteFile by
// default, but are exposed here to allow the whitebox tests to replace
// them when working with mock files.
openFileFunc func(fileNum uint32) (*lockableFile, error)
openWriteFileFunc func(fileNum uint32) (filer, error)
deleteFileFunc func(fileNum uint32) error
}
// blockLocation identifies a particular block file and location.
type blockLocation struct {
blockFileNum uint32
fileOffset uint32
blockLen uint32
}
// deserializeBlockLoc deserializes the passed serialized block location
// information. This is data stored into the block index metadata for each
// block. The serialized data passed to this function MUST be at least
// blockLocSize bytes or it will panic. The error check is avoided here because
// this information will always be coming from the block index which includes a
// checksum to detect corruption. Thus it is safe to use this unchecked here.
func deserializeBlockLoc(serializedLoc []byte) blockLocation {
// The serialized block location format is:
//
// [0:4] Block file (4 bytes)
// [4:8] File offset (4 bytes)
// [8:12] Block length (4 bytes)
return blockLocation{
blockFileNum: byteOrder.Uint32(serializedLoc[0:4]),
fileOffset: byteOrder.Uint32(serializedLoc[4:8]),
blockLen: byteOrder.Uint32(serializedLoc[8:12]),
}
}
// serializeBlockLoc returns the serialization of the passed block location.
// This is data to be stored into the block index metadata for each block.
func serializeBlockLoc(loc blockLocation) []byte {
// The serialized block location format is:
//
// [0:4] Block file (4 bytes)
// [4:8] File offset (4 bytes)
// [8:12] Block length (4 bytes)
var serializedData [12]byte
byteOrder.PutUint32(serializedData[0:4], loc.blockFileNum)
byteOrder.PutUint32(serializedData[4:8], loc.fileOffset)
byteOrder.PutUint32(serializedData[8:12], loc.blockLen)
return serializedData[:]
}
// blockFilePath return the file path for the provided block file number.
func blockFilePath(dbPath string, fileNum uint32) string {
fileName := fmt.Sprintf(blockFilenameTemplate, fileNum)
return filepath.Join(dbPath, fileName)
}
// openWriteFile returns a file handle for the passed flat file number in
// read/write mode. The file will be created if needed. It is typically used
// for the current file that will have all new data appended. Unlike openFile,
// this function does not keep track of the open file and it is not subject to
// the maxOpenFiles limit.
func (s *blockStore) openWriteFile(fileNum uint32) (filer, error) {
// The current block file needs to be read-write so it is possible to
// append to it. Also, it shouldn't be part of the least recently used
// file.
filePath := blockFilePath(s.basePath, fileNum)
file, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE, 0666)
if err != nil {
str := fmt.Sprintf("failed to open file %q: %v", filePath, err)
return nil, makeDbErr(database.ErrDriverSpecific, str, err)
}
return file, nil
}
// openFile returns a read-only file handle for the passed flat file number.
// The function also keeps track of the open files, performs least recently
// used tracking, and limits the number of open files to maxOpenFiles by closing
// the least recently used file as needed.
//
// This function MUST be called with the overall files mutex (s.obfMutex) locked
// for WRITES.
func (s *blockStore) openFile(fileNum uint32) (*lockableFile, error) {
// Open the appropriate file as read-only.
filePath := blockFilePath(s.basePath, fileNum)
file, err := os.Open(filePath)
if err != nil {
return nil, makeDbErr(database.ErrDriverSpecific, err.Error(),
err)
}
blockFile := &lockableFile{file: file}
// Close the least recently used file if the file exceeds the max
// allowed open files. This is not done until after the file open in
// case the file fails to open, there is no need to close any files.
//
// A write lock is required on the LRU list here to protect against
// modifications happening as already open files are read from and
// shuffled to the front of the list.
//
// Also, add the file that was just opened to the front of the least
// recently used list to indicate it is the most recently used file and
// therefore should be closed last.
s.lruMutex.Lock()
lruList := s.openBlocksLRU
if lruList.Len() >= maxOpenFiles {
lruFileNum := lruList.Remove(lruList.Back()).(uint32)
oldBlockFile := s.openBlockFiles[lruFileNum]
// Close the old file under the write lock for the file in case
// any readers are currently reading from it so it's not closed
// out from under them.
oldBlockFile.Lock()
_ = oldBlockFile.file.Close()
oldBlockFile.Unlock()
delete(s.openBlockFiles, lruFileNum)
delete(s.fileNumToLRUElem, lruFileNum)
}
s.fileNumToLRUElem[fileNum] = lruList.PushFront(fileNum)
s.lruMutex.Unlock()
// Store a reference to it in the open block files map.
s.openBlockFiles[fileNum] = blockFile
return blockFile, nil
}
// deleteFile removes the block file for the passed flat file number. The file
// must already be closed and it is the responsibility of the caller to do any
// other state cleanup necessary.
func (s *blockStore) deleteFile(fileNum uint32) error {
filePath := blockFilePath(s.basePath, fileNum)
if err := os.Remove(filePath); err != nil {
return makeDbErr(database.ErrDriverSpecific, err.Error(), err)
}
return nil
}
// blockFile attempts to return an existing file handle for the passed flat file
// number if it is already open as well as marking it as most recently used. It
// will also open the file when it's not already open subject to the rules
// described in openFile.
//
// NOTE: The returned block file will already have the read lock acquired and
// the caller MUST call .RUnlock() to release it once it has finished all read
// operations. This is necessary because otherwise it would be possible for a
// separate goroutine to close the file after it is returned from here, but
// before the caller has acquired a read lock.
func (s *blockStore) blockFile(fileNum uint32) (*lockableFile, error) {
// When the requested block file is open for writes, return it.
wc := s.writeCursor
wc.RLock()
if fileNum == wc.curFileNum && wc.curFile.file != nil {
obf := wc.curFile
obf.RLock()
wc.RUnlock()
return obf, nil
}
wc.RUnlock()
// Try to return an open file under the overall files read lock.
s.obfMutex.RLock()
if obf, ok := s.openBlockFiles[fileNum]; ok {
s.lruMutex.Lock()
s.openBlocksLRU.MoveToFront(s.fileNumToLRUElem[fileNum])
s.lruMutex.Unlock()
obf.RLock()
s.obfMutex.RUnlock()
return obf, nil
}
s.obfMutex.RUnlock()
// Since the file isn't open already, need to check the open block files
// map again under write lock in case multiple readers got here and a
// separate one is already opening the file.
s.obfMutex.Lock()
if obf, ok := s.openBlockFiles[fileNum]; ok {
obf.RLock()
s.obfMutex.Unlock()
return obf, nil
}
// The file isn't open, so open it while potentially closing the least
// recently used one as needed.
obf, err := s.openFileFunc(fileNum)
if err != nil {
s.obfMutex.Unlock()
return nil, err
}
obf.RLock()
s.obfMutex.Unlock()
return obf, nil
}
// writeData is a helper function for writeBlock which writes the provided data
// at the current write offset and updates the write cursor accordingly. The
// field name parameter is only used when there is an error to provide a nicer
// error message.
//
// The write cursor will be advanced the number of bytes actually written in the
// event of failure.
//
// NOTE: This function MUST be called with the write cursor current file lock
// held and must only be called during a write transaction so it is effectively
// locked for writes. Also, the write cursor current file must NOT be nil.
func (s *blockStore) writeData(data []byte, fieldName string) error {
wc := s.writeCursor
n, err := wc.curFile.file.WriteAt(data, int64(wc.curOffset))
wc.curOffset += uint32(n)
if err != nil {
str := fmt.Sprintf("failed to write %s to file %d at "+
"offset %d: %v", fieldName, wc.curFileNum,
wc.curOffset-uint32(n), err)
return makeDbErr(database.ErrDriverSpecific, str, err)
}
return nil
}
// writeBlock appends the specified raw block bytes to the store's write cursor
// location and increments it accordingly. When the block would exceed the max
// file size for the current flat file, this function will close the current
// file, create the next file, update the write cursor, and write the block to
// the new file.
//
// The write cursor will also be advanced the number of bytes actually written
// in the event of failure.
//
// Format: <network><block length><serialized block><checksum>
func (s *blockStore) writeBlock(rawBlock []byte) (blockLocation, error) {
// Compute how many bytes will be written.
// 4 bytes each for block network + 4 bytes for block length +
// length of raw block + 4 bytes for checksum.
blockLen := uint32(len(rawBlock))
fullLen := blockLen + 12
// Move to the next block file if adding the new block would exceed the
// max allowed size for the current block file. Also detect overflow
// to be paranoid, even though it isn't possible currently, numbers
// might change in the future to make it possible.
//
// NOTE: The writeCursor.offset field isn't protected by the mutex
// since it's only read/changed during this function which can only be
// called during a write transaction, of which there can be only one at
// a time.
wc := s.writeCursor
finalOffset := wc.curOffset + fullLen
if finalOffset < wc.curOffset || finalOffset > s.maxBlockFileSize {
// This is done under the write cursor lock since the curFileNum
// field is accessed elsewhere by readers.
//
// Close the current write file to force a read-only reopen
// with LRU tracking. The close is done under the write lock
// for the file to prevent it from being closed out from under
// any readers currently reading from it.
wc.Lock()
wc.curFile.Lock()
if wc.curFile.file != nil {
_ = wc.curFile.file.Close()
wc.curFile.file = nil
}
wc.curFile.Unlock()
// Start writes into next file.
wc.curFileNum++
wc.curOffset = 0
wc.Unlock()
}
// All writes are done under the write lock for the file to ensure any
// readers are finished and blocked first.
wc.curFile.Lock()
defer wc.curFile.Unlock()
// Open the current file if needed. This will typically only be the
// case when moving to the next file to write to or on initial database
// load. However, it might also be the case if rollbacks happened after
// file writes started during a transaction commit.
if wc.curFile.file == nil {
file, err := s.openWriteFileFunc(wc.curFileNum)
if err != nil {
return blockLocation{}, err
}
wc.curFile.file = file
}
// Currency network.
origOffset := wc.curOffset
hasher := crc32.New(castagnoli)
var scratch [4]byte
byteOrder.PutUint32(scratch[:], uint32(s.network))
if err := s.writeData(scratch[:], "network"); err != nil {
return blockLocation{}, err
}
_, _ = hasher.Write(scratch[:])
// Block length.
byteOrder.PutUint32(scratch[:], blockLen)
if err := s.writeData(scratch[:], "block length"); err != nil {
return blockLocation{}, err
}
_, _ = hasher.Write(scratch[:])
// Serialized block.
if err := s.writeData(rawBlock[:], "block"); err != nil {
return blockLocation{}, err
}
_, _ = hasher.Write(rawBlock)
// Castagnoli CRC-32 as a checksum of all the previous.
if err := s.writeData(hasher.Sum(nil), "checksum"); err != nil {
return blockLocation{}, err
}
loc := blockLocation{
blockFileNum: wc.curFileNum,
fileOffset: origOffset,
blockLen: fullLen,
}
return loc, nil
}
// readBlock reads the specified block record and returns the serialized block.
// It ensures the integrity of the block data by checking that the serialized
// network matches the current network associated with the block store and
// comparing the calculated checksum against the one stored in the flat file.
// This function also automatically handles all file management such as opening
// and closing files as necessary to stay within the maximum allowed open files
// limit.
//
// Returns ErrDriverSpecific if the data fails to read for any reason and
// ErrCorruption if the checksum of the read data doesn't match the checksum
// read from the file.
//
// Format: <network><block length><serialized block><checksum>
func (s *blockStore) readBlock(hash *chainhash.Hash, loc blockLocation) ([]byte, error) {
// Get the referenced block file handle opening the file as needed. The
// function also handles closing files as needed to avoid going over the
// max allowed open files.
blockFile, err := s.blockFile(loc.blockFileNum)
if err != nil {
return nil, err
}
serializedData := make([]byte, loc.blockLen)
n, err := blockFile.file.ReadAt(serializedData, int64(loc.fileOffset))
blockFile.RUnlock()
if err != nil {
str := fmt.Sprintf("failed to read block %s from file %d, "+
"offset %d: %v", hash, loc.blockFileNum, loc.fileOffset,
err)
return nil, makeDbErr(database.ErrDriverSpecific, str, err)
}
// Calculate the checksum of the read data and ensure it matches the
// serialized checksum. This will detect any data corruption in the
// flat file without having to do much more expensive merkle root
// calculations on the loaded block.
serializedChecksum := binary.BigEndian.Uint32(serializedData[n-4:])
calculatedChecksum := crc32.Checksum(serializedData[:n-4], castagnoli)
if serializedChecksum != calculatedChecksum {
str := fmt.Sprintf("block data for block %s checksum "+
"does not match - got %x, want %x", hash,
calculatedChecksum, serializedChecksum)
return nil, makeDbErr(database.ErrCorruption, str, nil)
}
// The network associated with the block must match the current active
// network, otherwise somebody probably put the block files for the
// wrong network in the directory.
serializedNet := byteOrder.Uint32(serializedData[:4])
if serializedNet != uint32(s.network) {
str := fmt.Sprintf("block data for block %s is for the "+
"wrong network - got %d, want %d", hash, serializedNet,
uint32(s.network))
return nil, makeDbErr(database.ErrDriverSpecific, str, nil)
}
// The raw block excludes the network, length of the block, and
// checksum.
return serializedData[8 : n-4], nil
}
// readBlockRegion reads the specified amount of data at the provided offset for
// a given block location. The offset is relative to the start of the
// serialized block (as opposed to the beginning of the block record). This
// function automatically handles all file management such as opening and
// closing files as necessary to stay within the maximum allowed open files
// limit.
//
// Returns ErrDriverSpecific if the data fails to read for any reason.
func (s *blockStore) readBlockRegion(loc blockLocation, offset, numBytes uint32) ([]byte, error) {
// Get the referenced block file handle opening the file as needed. The
// function also handles closing files as needed to avoid going over the
// max allowed open files.
blockFile, err := s.blockFile(loc.blockFileNum)
if err != nil {
return nil, err
}
// Regions are offsets into the actual block, however the serialized
// data for a block includes an initial 4 bytes for network + 4 bytes
// for block length. Thus, add 8 bytes to adjust.
readOffset := loc.fileOffset + 8 + offset
serializedData := make([]byte, numBytes)
_, err = blockFile.file.ReadAt(serializedData, int64(readOffset))
blockFile.RUnlock()
if err != nil {
str := fmt.Sprintf("failed to read region from block file %d, "+
"offset %d, len %d: %v", loc.blockFileNum, readOffset,
numBytes, err)
return nil, makeDbErr(database.ErrDriverSpecific, str, err)
}
return serializedData, nil
}
// syncBlocks performs a file system sync on the flat file associated with the
// store's current write cursor. It is safe to call even when there is not a
// current write file in which case it will have no effect.
//
// This is used when flushing cached metadata updates to disk to ensure all the
// block data is fully written before updating the metadata. This ensures the
// metadata and block data can be properly reconciled in failure scenarios.
func (s *blockStore) syncBlocks() error {
wc := s.writeCursor
wc.RLock()
defer wc.RUnlock()
// Nothing to do if there is no current file associated with the write
// cursor.
wc.curFile.RLock()
defer wc.curFile.RUnlock()
if wc.curFile.file == nil {
return nil
}
// Sync the file to disk.
if err := wc.curFile.file.Sync(); err != nil {
str := fmt.Sprintf("failed to sync file %d: %v", wc.curFileNum,
err)
return makeDbErr(database.ErrDriverSpecific, str, err)
}
return nil
}
// handleRollback rolls the block files on disk back to the provided file number
// and offset. This involves potentially deleting and truncating the files that
// were partially written.
//
// There are effectively two scenarios to consider here:
// 1) Transient write failures from which recovery is possible
// 2) More permanent failures such as hard disk death and/or removal
//
// In either case, the write cursor will be repositioned to the old block file
// offset regardless of any other errors that occur while attempting to undo
// writes.
//
// For the first scenario, this will lead to any data which failed to be undone
// being overwritten and thus behaves as desired as the system continues to run.
//
// For the second scenario, the metadata which stores the current write cursor
// position within the block files will not have been updated yet and thus if
// the system eventually recovers (perhaps the hard drive is reconnected), it
// will also lead to any data which failed to be undone being overwritten and
// thus behaves as desired.
//
// Therefore, any errors are simply logged at a warning level rather than being
// returned since there is nothing more that could be done about it anyways.
func (s *blockStore) handleRollback(oldBlockFileNum, oldBlockOffset uint32) {
// Grab the write cursor mutex since it is modified throughout this
// function.
wc := s.writeCursor
wc.Lock()
defer wc.Unlock()
// Nothing to do if the rollback point is the same as the current write
// cursor.
if wc.curFileNum == oldBlockFileNum && wc.curOffset == oldBlockOffset {
return
}
// Regardless of any failures that happen below, reposition the write
// cursor to the old block file and offset.
defer func() {
wc.curFileNum = oldBlockFileNum
wc.curOffset = oldBlockOffset
}()
log.Debugf("ROLLBACK: Rolling back to file %d, offset %d",
oldBlockFileNum, oldBlockOffset)
// Close the current write file if it needs to be deleted. Then delete
// all files that are newer than the provided rollback file while
// also moving the write cursor file backwards accordingly.
if wc.curFileNum > oldBlockFileNum {
wc.curFile.Lock()
if wc.curFile.file != nil {
_ = wc.curFile.file.Close()
wc.curFile.file = nil
}
wc.curFile.Unlock()
}
for ; wc.curFileNum > oldBlockFileNum; wc.curFileNum-- {
if err := s.deleteFileFunc(wc.curFileNum); err != nil {
log.Warnf("ROLLBACK: Failed to delete block file "+
"number %d: %v", wc.curFileNum, err)
return
}
}
// Open the file for the current write cursor if needed.
wc.curFile.Lock()
if wc.curFile.file == nil {
obf, err := s.openWriteFileFunc(wc.curFileNum)
if err != nil {
wc.curFile.Unlock()
log.Warnf("ROLLBACK: %v", err)
return
}
wc.curFile.file = obf
}
// Truncate the to the provided rollback offset.
if err := wc.curFile.file.Truncate(int64(oldBlockOffset)); err != nil {
wc.curFile.Unlock()
log.Warnf("ROLLBACK: Failed to truncate file %d: %v",
wc.curFileNum, err)
return
}
// Sync the file to disk.
err := wc.curFile.file.Sync()
wc.curFile.Unlock()
if err != nil {
log.Warnf("ROLLBACK: Failed to sync file %d: %v",
wc.curFileNum, err)
return
}
}
// scanBlockFiles searches the database directory for all flat block files to
// find the end of the most recent file. This position is considered the
// current write cursor which is also stored in the metadata. Thus, it is used
// to detect unexpected shutdowns in the middle of writes so the block files
// can be reconciled.
func scanBlockFiles(dbPath string) (int, uint32) {
lastFile := -1
fileLen := uint32(0)
for i := 0; ; i++ {
filePath := blockFilePath(dbPath, uint32(i))
st, err := os.Stat(filePath)
if err != nil {
break
}
lastFile = i
fileLen = uint32(st.Size())
}
log.Tracef("Scan found latest block file #%d with length %d", lastFile,
fileLen)
return lastFile, fileLen
}
// newBlockStore returns a new block store with the current block file number
// and offset set and all fields initialized.
func newBlockStore(basePath string, network wire.CurrencyNet) *blockStore {
// Look for the end of the latest block to file to determine what the
// write cursor position is from the viewpoing of the block files on
// disk.
fileNum, fileOff := scanBlockFiles(basePath)
if fileNum == -1 {
fileNum = 0
fileOff = 0
}
store := &blockStore{
network: network,
basePath: basePath,
maxBlockFileSize: maxBlockFileSize,
openBlockFiles: make(map[uint32]*lockableFile),
openBlocksLRU: list.New(),
fileNumToLRUElem: make(map[uint32]*list.Element),
writeCursor: &writeCursor{
curFile: &lockableFile{},
curFileNum: uint32(fileNum),
curOffset: fileOff,
},
}
store.openFileFunc = store.openFile
store.openWriteFileFunc = store.openWriteFile
store.deleteFileFunc = store.deleteFile
return store
}