Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validation: sync chainstate to disk after syncing to tip #15218

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/init.cpp
Expand Up @@ -139,6 +139,9 @@ static constexpr bool DEFAULT_REST_ENABLE{false};
static constexpr bool DEFAULT_I2P_ACCEPT_INCOMING{true};
static constexpr bool DEFAULT_STOPAFTERBLOCKIMPORT{false};

//! Check if initial sync is done with no change in block height or queued downloads every 30s
andrewtoth marked this conversation as resolved.
Show resolved Hide resolved
static constexpr auto SYNC_CHECK_INTERVAL{30s};

#ifdef WIN32
// Win32 LevelDB doesn't use filedescriptors, and the ones used for
// accessing block files don't count towards the fd_set size limit
Expand Down Expand Up @@ -1121,6 +1124,44 @@ bool AppInitLockDataDirectory()
return true;
}

/**
* Once initial block sync is finished and no change in block height or queued downloads,
* sync utxo state to protect against data loss
*/
static void SyncCoinsTipAfterChainSync(const NodeContext& node)
{
LOCK(node.chainman->GetMutex());
if (node.chainman->IsInitialBlockDownload()) {
LogDebug(BCLog::COINDB, "Node is still in IBD, rescheduling chainstate disk sync...\n");
node.scheduler->scheduleFromNow([&node] {
SyncCoinsTipAfterChainSync(node);
}, SYNC_CHECK_INTERVAL);
return;
}

static auto last_chain_height{-1};
const auto current_height{node.chainman->ActiveHeight()};
if (last_chain_height != current_height) {
LogDebug(BCLog::COINDB, "Chain height updated since last check, rescheduling chainstate disk sync...\n");
last_chain_height = current_height;
node.scheduler->scheduleFromNow([&node] {
SyncCoinsTipAfterChainSync(node);
}, SYNC_CHECK_INTERVAL);
return;
}

if (node.peerman->GetNumberOfPeersWithValidatedDownloads() > 0) {
LogDebug(BCLog::COINDB, "Still downloading blocks from peers, rescheduling chainstate disk sync...\n");
node.scheduler->scheduleFromNow([&node] {
SyncCoinsTipAfterChainSync(node);
}, SYNC_CHECK_INTERVAL);
return;
}

LogDebug(BCLog::COINDB, "Finished syncing to tip, syncing chainstate to disk\n");
node.chainman->ActiveChainstate().CoinsTip().Sync();
}

bool AppInitInterfaces(NodeContext& node)
{
node.chain = node.init->makeChain();
Expand Down Expand Up @@ -1995,6 +2036,10 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
StartupNotify(args);
#endif

node.scheduler->scheduleFromNow([&node] {
SyncCoinsTipAfterChainSync(node);
}, SYNC_CHECK_INTERVAL);

return true;
}

Expand Down
7 changes: 7 additions & 0 deletions src/net_processing.cpp
Expand Up @@ -527,6 +527,7 @@ class PeerManagerImpl final : public PeerManager
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex, !m_recent_confirmed_transactions_mutex, !m_most_recent_block_mutex, !m_headers_presync_mutex, g_msgproc_mutex);
void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) override;
ServiceFlags GetDesirableServiceFlags(ServiceFlags services) const override;
int GetNumberOfPeersWithValidatedDownloads() const override EXCLUSIVE_LOCKS_REQUIRED(::cs_main);

private:
/** Consider evicting an outbound peer based on the amount of time they've been behind our tip */
Expand Down Expand Up @@ -1796,6 +1797,12 @@ bool PeerManagerImpl::GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) c
return true;
}

int PeerManagerImpl::GetNumberOfPeersWithValidatedDownloads() const
{
AssertLockHeld(m_chainman.GetMutex());
return m_peers_downloading_from;
}

void PeerManagerImpl::AddToCompactExtraTransactions(const CTransactionRef& tx)
{
if (m_opts.max_extra_txs <= 0)
Expand Down
3 changes: 3 additions & 0 deletions src/net_processing.h
Expand Up @@ -133,6 +133,9 @@ class PeerManager : public CValidationInterface, public NetEventsInterface
* we do not have a confirmed set of service flags.
*/
virtual ServiceFlags GetDesirableServiceFlags(ServiceFlags services) const = 0;

/** Get number of peers from which we're downloading blocks */
virtual int GetNumberOfPeersWithValidatedDownloads() const EXCLUSIVE_LOCKS_REQUIRED(::cs_main) = 0;
};

#endif // BITCOIN_NET_PROCESSING_H
145 changes: 145 additions & 0 deletions test/functional/feature_sync_coins_tip_after_chain_sync.py
@@ -0,0 +1,145 @@
#!/usr/bin/env python3
# Copyright (c) 2024- The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""
Test SyncCoinsTipAfterChainSync logic
"""


from test_framework.blocktools import create_block, create_coinbase
from test_framework.messages import (
MSG_BLOCK,
MSG_TYPE_MASK,
)
from test_framework.p2p import (
CBlockHeader,
msg_block,
msg_headers,
P2PDataStore,
)
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
)


class P2PBlockDelay(P2PDataStore):
def __init__(self, delay_block):
self.delay_block = delay_block
super().__init__()

def on_getdata(self, message):
for inv in message.inv:
self.getdata_requests.append(inv.hash)
if (inv.type & MSG_TYPE_MASK) == MSG_BLOCK:
if inv.hash != self.delay_block:
self.send_message(msg_block(self.block_store[inv.hash]))

def on_getheaders(self, message):
pass

def send_delayed(self):
self.send_message(msg_block(self.block_store[self.delay_block]))


SYNC_CHECK_INTERVAL = 30


class SyncCoinsTipAfterChainSyncTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 1
# Set maxtipage to 1 to get us out of IBD after 1 block past our mocktime
self.extra_args = [[f"-maxtipage=1"]]

def run_test(self):
NUM_BLOCKS = 3
node = self.nodes[0]
tip = int(node.getbestblockhash(), 16)
blocks = []
height = 1
block_time = node.getblock(node.getbestblockhash())["time"] + 1
# Set mock time to 2 past block time, so second block will exit IBD
node.setmocktime(block_time + 2)

# Prepare blocks without sending them to the node
block_dict = {}
for _ in range(NUM_BLOCKS):
blocks.append(create_block(tip, create_coinbase(height), block_time))
blocks[-1].solve()
tip = blocks[-1].sha256
block_time += 1
height += 1
block_dict[blocks[-1].sha256] = blocks[-1]
delay_block = blocks[-1].sha256

# Create peer which will not automatically send last block
peer = node.add_outbound_p2p_connection(
P2PBlockDelay(delay_block),
p2p_idx=1,
connection_type="outbound-full-relay",
)
peer.block_store = block_dict

self.log.info(
"Send headers message for first block, verify it won't sync because node is still in IBD"
)
headers_message = msg_headers()
headers_message.headers = [CBlockHeader(blocks[0])]
peer.send_message(headers_message)
peer.sync_with_ping()
assert_equal(node.getblockchaininfo()["initialblockdownload"], True)
with node.assert_debug_log(
["Node is still in IBD, rescheduling chainstate disk sync..."]
):
node.mockscheduler(SYNC_CHECK_INTERVAL)

self.log.info(
"Send headers message for second block, verify it won't sync because node height has changed"
)
headers_message.headers = [CBlockHeader(blocks[1])]
peer.send_message(headers_message)
peer.sync_with_ping()
assert_equal(node.getblockchaininfo()["initialblockdownload"], False)
with node.assert_debug_log(
[
"Chain height updated since last check, rescheduling chainstate disk sync..."
]
):
node.mockscheduler(SYNC_CHECK_INTERVAL)

self.log.info(
"Send headers message for last block, verify it won't sync because node is still downloading the block"
)
headers_message.headers = [CBlockHeader(blocks[2])]
peer.send_message(headers_message)
peer.sync_with_ping()
with node.assert_debug_log(
[
"Still downloading blocks from peers, rescheduling chainstate disk sync..."
]
):
node.mockscheduler(SYNC_CHECK_INTERVAL)

self.log.info(
"Send last block, verify it won't sync because node height has changed"
)
peer.send_delayed()
peer.sync_with_ping()
with node.assert_debug_log(
[
"Chain height updated since last check, rescheduling chainstate disk sync..."
]
):
node.mockscheduler(SYNC_CHECK_INTERVAL)

self.log.info("Verify node syncs chainstate to disk on next scheduler update")
with node.assert_debug_log(
["Finished syncing to tip, syncing chainstate to disk"]
):
node.mockscheduler(SYNC_CHECK_INTERVAL)


if __name__ == "__main__":
SyncCoinsTipAfterChainSyncTest().main()
1 change: 1 addition & 0 deletions test/functional/test_runner.py
Expand Up @@ -362,6 +362,7 @@
'feature_addrman.py',
'feature_asmap.py',
'feature_fastprune.py',
'feature_sync_coins_tip_after_chain_sync.py',
'mempool_unbroadcast.py',
'mempool_compatibility.py',
'mempool_accept_wtxid.py',
Expand Down