Skip to content

Commit

Permalink
Simplify rewind/commit control flow
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanofsky committed Mar 27, 2019
1 parent e6c6654 commit 8bb65ce
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 49 deletions.
59 changes: 28 additions & 31 deletions src/index/base.cpp
Expand Up @@ -95,21 +95,19 @@ void BaseIndex::ThreadSync()
int64_t last_locator_write_time = 0;
while (true) {
if (m_interrupt) {
m_best_block_index = pindex;
Commit();
InternalCommit(pindex);
return;
}

{
LOCK(cs_main);
const CBlockIndex* pindex_next = NextSyncBlock(pindex);
if (!pindex_next) {
m_best_block_index = pindex;
InternalCommit(pindex);
m_synced = true;
Commit();
break;
}
if (pindex_next->pprev != pindex && !Rewind(pindex, pindex_next->pprev)) {
if (pindex_next->pprev != pindex && !InternalCommit(pindex_next->pprev, /* rewind= */ true)) {
FatalError("%s: Failed to rewind index %s to a previous chain tip",
__func__, GetName());
return;
Expand All @@ -125,9 +123,8 @@ void BaseIndex::ThreadSync()
}

if (last_locator_write_time + SYNC_LOCATOR_WRITE_INTERVAL < current_time) {
m_best_block_index = pindex;
InternalCommit(pindex);
last_locator_write_time = current_time;
Commit();
}

CBlock block;
Expand All @@ -151,35 +148,35 @@ void BaseIndex::ThreadSync()
}
}

bool BaseIndex::Commit()
bool BaseIndex::InternalCommit(const CBlockIndex* new_tip, bool rewind)
{
const CBlockIndex* best_block = m_best_block_index.load();
CDBBatch batch(GetDB());
if (!Commit(batch) || !GetDB().WriteBatch(batch)) {
return error("%s: Failed to commit latest %s state", __func__, GetName());
if (new_tip) {
if (rewind) {
assert(best_block->GetAncestor(new_tip->nHeight) == new_tip);
if (!Rewind(batch, best_block, new_tip)) {
return error("%s: Failed to rewind %s tip", __func__, GetName());
}
}
best_block = new_tip;
}
return true;
}

bool BaseIndex::Commit(CDBBatch& batch)
{
LOCK(cs_main);
GetDB().WriteBestBlock(batch, chainActive.GetLocator(m_best_block_index));
return true;
}
if (!Commit(batch)) {
return error("%s: Failed to commit latest %s state", __func__, GetName());
}

bool BaseIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
{
assert(current_tip == m_best_block_index);
assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip);

// In the case of a reorg, ensure persisted block locator is not stale.
m_best_block_index = new_tip;
if (!Commit()) {
// If commit fails, revert the best block index to avoid corruption.
m_best_block_index = current_tip;
return false;
CBlockLocator locator;
{
LOCK(cs_main);
locator = chainActive.GetLocator(best_block);
}
GetDB().WriteBestBlock(batch, locator);

if (!GetDB().WriteBatch(batch)) {
return error("%s: Failed to flush latest %s state", __func__, GetName());
}
m_best_block_index = best_block;
return true;
}

Expand Down Expand Up @@ -210,7 +207,7 @@ void BaseIndex::BlockConnected(const std::shared_ptr<const CBlock>& block, const
best_block_index->GetBlockHash().ToString());
return;
}
if (best_block_index != pindex->pprev && !Rewind(best_block_index, pindex->pprev)) {
if (best_block_index != pindex->pprev && !InternalCommit(pindex->pprev, /* rewind= */ true )) {
FatalError("%s: Failed to rewind index %s to a previous chain tip",
__func__, GetName());
return;
Expand Down Expand Up @@ -259,7 +256,7 @@ void BaseIndex::ChainStateFlushed(const CBlockLocator& locator)
return;
}

Commit();
InternalCommit();
}

bool BaseIndex::BlockUntilSyncedToCurrentChain()
Expand Down
15 changes: 7 additions & 8 deletions src/index/base.h
Expand Up @@ -55,27 +55,26 @@ class BaseIndex : public CValidationInterface
void ThreadSync();

/// Write the current chain block locator and other index state to the DB.
bool Commit();
bool InternalCommit(const CBlockIndex* new_tip = nullptr, bool rewind = false);

protected:
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex,
const std::vector<CTransactionRef>& txn_conflicted) override;

void ChainStateFlushed(const CBlockLocator& locator) override;

protected:
/// Initialize internal state from the database and block index.
virtual bool Init();

/// Write update index entries for a newly connected block.
virtual bool WriteBlock(const CBlock& block, const CBlockIndex* pindex) { return true; }

/// Virtual method called internally that can be overridden to atomically
/// commit more index state.
virtual bool Commit(CDBBatch& batch);

/// Rewind index to an earlier chain tip during a chain reorg. The tip must
/// Rewind index to an earlier chain tip during a chain reorg. The tip will
/// be an ancestor of the current best block.
virtual bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip);
virtual bool Rewind(CDBBatch& batch, const CBlockIndex* current_tip, const CBlockIndex* new_tip) { return true; }

/// Save additional state after new blocks are added or rewound.
virtual bool Commit(CDBBatch& batch) { return true; }

virtual DB& GetDB() const = 0;

Expand Down
15 changes: 6 additions & 9 deletions src/index/blockfilterindex.cpp
Expand Up @@ -133,7 +133,7 @@ bool BlockFilterIndex::Commit(CDBBatch& batch)
}

batch.Write(DB_FILTER_POS, pos);
return BaseIndex::Commit(batch);
return true;
}

bool BlockFilterIndex::ReadFilterFromDisk(const FlatFilePos& pos, BlockFilter& filter) const
Expand Down Expand Up @@ -271,11 +271,10 @@ static bool CopyHeightIndexToHashIndex(CDBIterator& db_it, CDBBatch& batch,
return true;
}

bool BlockFilterIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip)
bool BlockFilterIndex::Rewind(CDBBatch& batch, const CBlockIndex* current_tip, const CBlockIndex* new_tip)
{
assert(current_tip->GetAncestor(new_tip->nHeight) == new_tip);

CDBBatch batch(*m_db);
std::unique_ptr<CDBIterator> db_it(m_db->NewIterator());

// During a reorg, we need to copy all filters for blocks that are getting disconnected from the
Expand All @@ -285,13 +284,11 @@ bool BlockFilterIndex::Rewind(const CBlockIndex* current_tip, const CBlockIndex*
return false;
}

// The latest filter position gets written in Commit by the call to the BaseIndex::Rewind.
// But since this creates new references to the filter, the position should get updated here
// atomically as well in case Commit fails.
// The latest filter position also gets written later in BlockFilterIndex::Commit.
// But since rewinding creates new references to the filter, the position should get updated here
// as well in case Commit fails.
batch.Write(DB_FILTER_POS, m_next_filter_pos);
if (!m_db->WriteBatch(batch)) return false;

return BaseIndex::Rewind(current_tip, new_tip);
return true;
}

static bool LookupOne(const CDBWrapper& db, const CBlockIndex* block_index, DBVal& result)
Expand Down
2 changes: 1 addition & 1 deletion src/index/blockfilterindex.h
Expand Up @@ -37,7 +37,7 @@ class BlockFilterIndex final : public BaseIndex

bool WriteBlock(const CBlock& block, const CBlockIndex* pindex) override;

bool Rewind(const CBlockIndex* current_tip, const CBlockIndex* new_tip) override;
bool Rewind(CDBBatch& batch, const CBlockIndex* current_tip, const CBlockIndex* new_tip) override;

BaseIndex::DB& GetDB() const override { return *m_db; }

Expand Down

0 comments on commit 8bb65ce

Please sign in to comment.