Skip to content

Commit

Permalink
improve mempool reorg logic when the peak is a non-transaction block
Browse files Browse the repository at this point in the history
  • Loading branch information
arvidn committed Jan 20, 2024
1 parent 84f3e3d commit 26c0a51
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
18 changes: 18 additions & 0 deletions chia/consensus/blockchain.py
Expand Up @@ -194,6 +194,24 @@ def get_peak(self) -> Optional[BlockRecord]:
return None
return self.height_to_block_record(self._peak_height)

def get_tx_peak(self) -> Optional[BlockRecord]:
"""
Return the most recent transaction block. i.e. closest to the peak of the blockchain
Requires the blockchain to be initialized and there to be a peak set
"""

if self._peak_height is None:
return None
tx_height = self._peak_height
tx_peak = self.height_to_block_record(tx_height)
while not tx_peak.is_transaction_block:
if tx_height == 0:
return None
tx_height = uint32(tx_height - 1)
tx_peak = self.height_to_block_record(tx_height)

return tx_peak

async def get_full_peak(self) -> Optional[FullBlock]:
if self._peak_height is None:
return None
Expand Down
7 changes: 3 additions & 4 deletions chia/full_node/full_node.py
Expand Up @@ -316,7 +316,7 @@ async def manage(self) -> AsyncIterator[None]:
f"time taken: {int(time_taken)}s"
)
async with self.blockchain.priority_mutex.acquire(priority=BlockchainMutexPriority.high):
pending_tx = await self.mempool_manager.new_peak(peak, None)
pending_tx = await self.mempool_manager.new_peak(self.blockchain.get_tx_peak(), None)
assert len(pending_tx) == 0 # no pending transactions when starting up

full_peak: Optional[FullBlock] = await self.blockchain.get_full_peak()
Expand Down Expand Up @@ -1544,9 +1544,8 @@ async def peak_post_processing(

# Update the mempool (returns successful pending transactions added to the mempool)
spent_coins: List[bytes32] = [coin_id for coin_id, _ in state_change_summary.removals]
mempool_new_peak_result: List[Tuple[SpendBundle, NPCResult, bytes32]] = await self.mempool_manager.new_peak(
self.blockchain.get_peak(), spent_coins
)
mempool_new_peak_result: List[Tuple[SpendBundle, NPCResult, bytes32]]
mempool_new_peak_result = await self.mempool_manager.new_peak(self.blockchain.get_tx_peak(), spent_coins)

# Check if we detected a spent transaction, to load up our generator cache
if block.transactions_generator is not None and self.full_node_store.previous_generator is None:
Expand Down
5 changes: 5 additions & 0 deletions chia/full_node/mempool_manager.py
Expand Up @@ -620,6 +620,11 @@ async def new_peak(
) -> List[Tuple[SpendBundle, NPCResult, bytes32]]:
"""
Called when a new peak is available, we try to recreate a mempool for the new tip.
new_peak should always be the most recent *transaction* block of the chain. Since
the mempool cannot traverse the chain to find the most recent transaction block,
we wouldn't be able to detect, and correctly update the mempool, if we saw a
non-transaction block on a fork. self.peak must always be set to a transaction
block.
"""
if new_peak is None:
return []
Expand Down
73 changes: 73 additions & 0 deletions tests/blockchain/test_blockchain.py
Expand Up @@ -15,10 +15,12 @@

from chia.consensus.block_body_validation import ForkInfo
from chia.consensus.block_header_validation import validate_finished_header_block
from chia.consensus.block_record import BlockRecord
from chia.consensus.block_rewards import calculate_base_farmer_reward
from chia.consensus.blockchain import AddBlockResult, Blockchain
from chia.consensus.coinbase import create_farmer_coin
from chia.consensus.constants import ConsensusConstants
from chia.consensus.full_block_to_block_record import block_to_block_record
from chia.consensus.multiprocess_validation import PreValidationResult
from chia.consensus.pot_iterations import is_overflow_block
from chia.full_node.bundle_tools import detect_potential_template_generator
Expand Down Expand Up @@ -3101,6 +3103,12 @@ async def test_invalid_agg_sig(self, empty_blockchain, bt):
assert preval_results[0].error == Err.BAD_AGGREGATE_SIGNATURE.value


def maybe_header_hash(block: Optional[BlockRecord]) -> Optional[bytes32]:
if block is None:
return None
return block.header_hash


class TestReorgs:
@pytest.mark.anyio
async def test_basic_reorg(self, empty_blockchain, bt):
Expand All @@ -3121,6 +3129,46 @@ async def test_basic_reorg(self, empty_blockchain, bt):
await _validate_and_add_block(b, reorg_block)
assert b.get_peak().height == 16

@pytest.mark.anyio
async def test_get_tx_peak_reorg(self, empty_blockchain, bt, consensus_mode: ConsensusMode):
b = empty_blockchain

if consensus_mode == ConsensusMode.PLAIN:
reorg_point = 13
else:
reorg_point = 12
blocks = bt.get_consecutive_blocks(reorg_point)

last_tx_block: Optional[bytes32] = None
for block in blocks:
assert maybe_header_hash(b.get_tx_peak()) == last_tx_block
await _validate_and_add_block(b, block)
if block.is_transaction_block():
last_tx_block = block.header_hash
assert b.get_peak().height == reorg_point - 1
assert maybe_header_hash(b.get_tx_peak()) == last_tx_block

reorg_last_tx_block: Optional[bytes32] = None

blocks_reorg_chain = bt.get_consecutive_blocks(7, blocks[:10], seed=b"2")
assert blocks_reorg_chain[reorg_point].is_transaction_block() is False
for reorg_block in blocks_reorg_chain:
if reorg_block.height < 10:
await _validate_and_add_block(b, reorg_block, expected_result=AddBlockResult.ALREADY_HAVE_BLOCK)
elif reorg_block.height < reorg_point:
await _validate_and_add_block(b, reorg_block, expected_result=AddBlockResult.ADDED_AS_ORPHAN)
elif reorg_block.height >= reorg_point:
await _validate_and_add_block(b, reorg_block)

if reorg_block.is_transaction_block():
reorg_last_tx_block = reorg_block.header_hash
if reorg_block.height >= reorg_point:
last_tx_block = reorg_last_tx_block

assert maybe_header_hash(b.get_tx_peak()) == last_tx_block

assert b.get_peak().height == 16

@pytest.mark.anyio
@pytest.mark.parametrize("light_blocks", [True, False])
async def test_long_reorg(
Expand Down Expand Up @@ -3693,3 +3741,28 @@ async def test_reorg_flip_flop(empty_blockchain, bt):

for block in chain_b[40:]:
await _validate_and_add_block(b, block)


async def test_get_tx_peak(default_400_blocks, empty_blockchain):
bc = empty_blockchain
test_blocks = default_400_blocks[:100]

res = await bc.pre_validate_blocks_multiprocessing(test_blocks, {}, validate_signatures=False)

last_tx_block: Optional[FullBlock] = None
for b, prevalidation_res in zip(test_blocks, res):
assert bc.get_tx_peak() == last_tx_block
res, err, state = await bc.add_block(b, prevalidation_res)
assert err is None

if b.is_transaction_block():
block_record = block_to_block_record(
bc.constants,
bc,
prevalidation_res.required_iters,
b,
None,
)
last_tx_block = block_record

assert bc.get_tx_peak() == last_tx_block

0 comments on commit 26c0a51

Please sign in to comment.