Skip to content

Commit

Permalink
refactor(sync-v2): implement P2PStorage and P2PVertexHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed May 8, 2024
1 parent f28c610 commit 197c056
Show file tree
Hide file tree
Showing 14 changed files with 290 additions and 104 deletions.
6 changes: 4 additions & 2 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ def _initialize_components_full_verification(self) -> None:
# TODO: deal with invalid tx
tx._update_parents_children_metadata()

if tx.can_validate_full():
if self.tx_storage.can_validate_full(tx):
tx.update_initial_metadata()
if tx.is_genesis:
assert tx.validate_checkpoint(self.checkpoints)
Expand Down Expand Up @@ -940,7 +940,8 @@ def on_new_tx(
quiet: bool = False,
fails_silently: bool = True,
propagate_to_peers: bool = True,
reject_locked_reward: bool = True
reject_locked_reward: bool = True,
is_sync_v2: bool = False,
) -> bool:
""" New method for adding transactions or blocks that steps the validation state machine.
Expand All @@ -955,6 +956,7 @@ def on_new_tx(
fails_silently=fails_silently,
propagate_to_peers=propagate_to_peers,
reject_locked_reward=reject_locked_reward,
is_sync_v2=is_sync_v2,
)

def has_sync_version_capability(self) -> bool:
Expand Down
88 changes: 40 additions & 48 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_v2.blockchain_streaming_client import BlockchainStreamingClient, StreamingError
from hathor.p2p.sync_v2.mempool import SyncMempoolManager
from hathor.p2p.sync_v2.p2p_storage import P2PStorage
from hathor.p2p.sync_v2.p2p_vertex_handler import P2PVertexHandler
from hathor.p2p.sync_v2.payloads import BestBlockPayload, GetNextBlocksPayload, GetTransactionsBFSPayload
from hathor.p2p.sync_v2.streamers import (
DEFAULT_STREAMING_LIMIT,
Expand All @@ -46,7 +48,6 @@

if TYPE_CHECKING:
from hathor.p2p.protocol import HathorProtocol
from hathor.transaction.storage import TransactionStorage

logger = get_logger()

Expand Down Expand Up @@ -84,7 +85,14 @@ class NodeBlockSync(SyncAgent):
"""
name: str = 'node-block-sync'

def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None:
def __init__(
self,
*,
protocol: 'HathorProtocol',
reactor: Reactor,
p2p_storage: P2PStorage,
p2p_vertex_handler: P2PVertexHandler,
) -> None:
"""
:param protocol: Protocol of the connection.
:type protocol: HathorProtocol
Expand All @@ -95,7 +103,8 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None:
self._settings = get_global_settings()
self.protocol = protocol
self.manager = protocol.node
self.tx_storage: 'TransactionStorage' = protocol.node.tx_storage
self.p2p_storage = p2p_storage
self.p2p_vertex_handler = p2p_vertex_handler
self.state = PeerState.UNKNOWN

self.DEFAULT_STREAMING_LIMIT = DEFAULT_STREAMING_LIMIT
Expand Down Expand Up @@ -167,9 +176,7 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None:
def get_status(self) -> dict[str, Any]:
""" Return the status of the sync.
"""
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.mempool_tips is not None
tips = self.tx_storage.indexes.mempool_tips.get()
tips = self.p2p_storage.get_mempool_tips()
tips_limited, tips_has_more = collect_n(iter(tips), MAX_MEMPOOL_STATUS_TIPS)
res = {
'is_enabled': self.is_sync_enabled(),
Expand Down Expand Up @@ -336,18 +343,15 @@ def run_sync_mempool(self) -> Generator[Any, Any, None]:

def get_my_best_block(self) -> _HeightInfo:
"""Return my best block info."""
bestblock = self.tx_storage.get_best_block()
meta = bestblock.get_metadata()
assert meta.validation.is_fully_connected()
return _HeightInfo(height=bestblock.get_height(), id=bestblock.hash)
best_block = self.p2p_storage.get_local_best_block()
return _HeightInfo(height=best_block.static_metadata.height, id=best_block.hash)

@inlineCallbacks
def run_sync_blocks(self) -> Generator[Any, Any, bool]:
"""Async step of the block syncing phase. Return True if we already have all other peer's blocks.
Notice that we might already have all other peer's blocks while the other peer is still syncing.
"""
assert self.tx_storage.indexes is not None
self.state = PeerState.SYNCING_BLOCKS

# Get my best block.
Expand All @@ -370,7 +374,8 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
# Not synced but same blockchain?
if self.peer_best_block.height <= my_best_block.height:
# Is peer behind me at the same blockchain?
common_block_hash = self.tx_storage.indexes.height.get(self.peer_best_block.height)
common_block_hash = self.p2p_storage.get_local_block_by_height(self.peer_best_block.height)

if common_block_hash == self.peer_best_block.id:
# If yes, nothing to sync from this peer.
if not self.is_synced():
Expand Down Expand Up @@ -448,15 +453,13 @@ def send_get_tips(self) -> None:
def handle_get_tips(self, _payload: str) -> None:
""" Handle a GET-TIPS message.
"""
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.mempool_tips is not None
if self._is_streaming:
self.log.warn('can\'t send while streaming') # XXX: or can we?
self.send_message(ProtocolMessages.MEMPOOL_END)
return
self.log.debug('handle_get_tips')
# TODO Use a streaming of tips
for tx_id in self.tx_storage.indexes.mempool_tips.get():
for tx_id in self.p2p_storage.get_mempool_tips():
self.send_tips(tx_id)
self.log.debug('tips end')
self.send_message(ProtocolMessages.TIPS_END)
Expand All @@ -477,7 +480,9 @@ def handle_tips(self, payload: str) -> None:
data = [bytes.fromhex(x) for x in data]
# filter-out txs we already have
try:
self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id))
self._receiving_tips.extend(
VertexId(tx_id) for tx_id in data if not self.p2p_storage.local_partial_vertex_exists(tx_id)
)
except ValueError:
self.protocol.send_error_and_close_connection('Invalid trasaction ID received')
# XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol
Expand Down Expand Up @@ -542,12 +547,6 @@ def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) ->
assert self.protocol.state is not None
self.protocol.state.send_message(cmd, payload)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
""" Return true if the vertex exists no matter its validation state.
"""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

@inlineCallbacks
def find_best_common_block(self,
my_best_block: _HeightInfo,
Expand Down Expand Up @@ -590,13 +589,11 @@ def find_best_common_block(self,
for info in block_info_list:
try:
# We must check only fully validated transactions.
blk = self.tx_storage.get_transaction(info.id)
block = self.p2p_storage.get_local_block(info.id)
except TransactionDoesNotExist:
hi = info
else:
assert blk.get_metadata().validation.is_fully_connected()
assert isinstance(blk, Block)
assert info.height == blk.get_height()
assert info.height == block.static_metadata.height
lo = info
break

Expand All @@ -608,12 +605,12 @@ def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> G
"""This method is called when a block and its transactions are downloaded."""
# Note: Any vertex and block could have already been added by another concurrent syncing peer.
for tx in vertex_list:
if not self.tx_storage.transaction_exists(tx.hash):
self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False)
if not self.p2p_storage.local_vertex_exists(tx.hash):
self.p2p_vertex_handler.handle_new_vertex(tx, propagate_to_peers=False, fails_silently=False)
yield deferLater(self.reactor, 0, lambda: None)

if not self.tx_storage.transaction_exists(blk.hash):
self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)
if not self.p2p_storage.local_vertex_exists(blk.hash):
self.p2p_vertex_handler.handle_new_vertex(blk, propagate_to_peers=False, fails_silently=False)

def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]:
""" Returns the peer's block hashes in the given heights.
Expand All @@ -633,18 +630,17 @@ def send_get_peer_block_hashes(self, heights: list[int]) -> None:
def handle_get_peer_block_hashes(self, payload: str) -> None:
""" Handle a GET-PEER-BLOCK-HASHES message.
"""
assert self.tx_storage.indexes is not None
heights = json.loads(payload)
if len(heights) > 20:
self.log.info('too many heights', heights_qty=len(heights))
self.protocol.send_error_and_close_connection('GET-PEER-BLOCK-HASHES: too many heights')
return
data = []
for h in heights:
blk_hash = self.tx_storage.indexes.height.get(h)
blk_hash = self.p2p_storage.get_block_by_height(h)
if blk_hash is None:
break
blk = self.tx_storage.get_transaction(blk_hash)
blk = self.p2p_storage.get_vertex(blk_hash)
if blk.get_metadata().voided_by:
break
data.append((h, blk_hash.hex()))
Expand Down Expand Up @@ -695,7 +691,7 @@ def handle_get_next_blocks(self, payload: str) -> None:
def _validate_block(self, _hash: VertexId) -> Optional[Block]:
"""Validate block given in the GET-NEXT-BLOCKS and GET-TRANSACTIONS-BFS messages."""
try:
blk = self.tx_storage.get_transaction(_hash)
blk = self.p2p_storage.get_vertex(_hash)
except TransactionDoesNotExist:
self.log.debug('requested block not found', blk_id=_hash.hex())
self.send_message(ProtocolMessages.NOT_FOUND, _hash.hex())
Expand Down Expand Up @@ -772,7 +768,6 @@ def handle_blocks(self, payload: str) -> None:
if not isinstance(blk, Block):
# Not a block. Punish peer?
return
blk.storage = self.tx_storage

assert self._blk_streaming_client is not None
self._blk_streaming_client.handle_blocks(blk)
Expand Down Expand Up @@ -833,7 +828,7 @@ def send_get_best_block(self) -> None:
def handle_get_best_block(self, _payload: str) -> None:
""" Handle a GET-BEST-BLOCK message.
"""
best_block = self.tx_storage.get_best_block()
best_block = self.p2p_storage.get_best_block()
meta = best_block.get_metadata()
assert meta.validation.is_fully_connected()
payload = BestBlockPayload(
Expand Down Expand Up @@ -946,7 +941,7 @@ def handle_get_transactions_bfs(self, payload: str) -> None:
start_from_txs = []
for start_from_hash in data.start_from:
try:
tx = self.tx_storage.get_transaction(start_from_hash)
tx = self.p2p_storage.get_vertex(start_from_hash)
except TransactionDoesNotExist:
# In case the tx does not exist we send a NOT-FOUND message
self.log.debug('requested start_from_hash not found', start_from_hash=start_from_hash.hex())
Expand Down Expand Up @@ -1023,7 +1018,6 @@ def handle_transaction(self, payload: str) -> None:
self.log.warn('not a transaction', hash=tx.hash_hex)
# Not a transaction. Punish peer?
return
tx.storage = self.tx_storage

assert self._tx_streaming_client is not None
self._tx_streaming_client.handle_transaction(tx)
Expand All @@ -1037,14 +1031,14 @@ def get_tx(self, tx_id: bytes) -> Generator[Deferred, Any, BaseTransaction]:
self.log.debug('tx in cache', tx=tx_id.hex())
return tx
try:
tx = self.tx_storage.get_transaction(tx_id)
return self.p2p_storage.get_local_vertex(tx_id)
except TransactionDoesNotExist:
tx = yield self.get_data(tx_id, 'mempool')
assert tx is not None
if tx.hash != tx_id:
self.protocol.send_error_and_close_connection(f'DATA mempool {tx_id.hex()} hash mismatch')
raise
return tx
return tx

def get_data(self, tx_id: bytes, origin: str) -> Deferred[BaseTransaction]:
""" Async method to request a tx by id.
Expand Down Expand Up @@ -1107,7 +1101,7 @@ def handle_get_data(self, payload: str) -> None:
origin = data.get('origin', '')
# self.log.debug('handle_get_data', payload=hash_hex)
try:
tx = self.protocol.node.tx_storage.get_transaction(bytes.fromhex(txid_hex))
tx = self.p2p_storage.get_vertex(bytes.fromhex(txid_hex))
self.send_data(tx, origin=origin)
except TransactionDoesNotExist:
# In case the tx does not exist we send a NOT-FOUND message
Expand Down Expand Up @@ -1142,24 +1136,22 @@ def handle_data(self, payload: str) -> None:
return

assert tx is not None
if self.protocol.node.tx_storage.get_genesis(tx.hash):
if self.p2p_storage.get_genesis(tx.hash):
# We just got the data of a genesis tx/block. What should we do?
# Will it reduce peer reputation score?
return

tx.storage = self.protocol.node.tx_storage

if self.partial_vertex_exists(tx.hash):
if self.p2p_storage.local_partial_vertex_exists(tx.hash):
# transaction already added to the storage, ignore it
# XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs
self.manager.tx_storage.compare_bytes_with_local_tx(tx)
self.p2p_storage.compare_bytes_with_local_tx(tx)
return
else:
# If we have not requested the data, it is a new transaction being propagated
# in the network, thus, we propagate it as well.
if tx.can_validate_full():
if self.p2p_storage.local_can_validate_full(tx):
self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id())
self.manager.on_new_tx(tx, propagate_to_peers=True)
self.p2p_vertex_handler.handle_new_vertex(tx, propagate_to_peers=True)
else:
self.log.debug('skipping tx received in real time from peer',
tx=tx.hash_hex, peer=self.protocol.get_peer_id())
17 changes: 7 additions & 10 deletions hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from hathor.p2p.sync_v2.streamers import StreamEnd
from hathor.transaction import Block
from hathor.transaction.exceptions import HathorError
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo
Expand All @@ -39,7 +38,6 @@ class BlockchainStreamingClient:
def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None:
self.sync_agent = sync_agent
self.protocol = self.sync_agent.protocol
self.tx_storage = self.sync_agent.tx_storage
self.manager = self.sync_agent.manager

self.log = logger.new(peer=self.protocol.get_short_peer_id())
Expand Down Expand Up @@ -75,11 +73,6 @@ def fails(self, reason: 'StreamingError') -> None:
"""Fail the execution by resolving the deferred with an error."""
self._deferred.errback(reason)

def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
"""Return true if the vertex exists no matter its validation state."""
with self.tx_storage.allow_partially_validated_context():
return self.tx_storage.transaction_exists(vertex_id)

def handle_blocks(self, blk: Block) -> None:
"""This method is called by the sync agent when a BLOCKS message is received."""
if self._deferred.called:
Expand All @@ -105,7 +98,7 @@ def handle_blocks(self, blk: Block) -> None:

# Check for repeated blocks.
is_duplicated = False
if self.partial_vertex_exists(blk.hash):
if self.sync_agent.p2p_storage.local_partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
Expand All @@ -130,9 +123,13 @@ def handle_blocks(self, blk: Block) -> None:
else:
self.log.debug('block received', blk_id=blk.hash.hex())

if blk.can_validate_full():
if self.sync_agent.p2p_storage.local_can_validate_full(blk):
try:
self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False)
self.sync_agent.p2p_vertex_handler.handle_new_vertex(
blk,
propagate_to_peers=False,
fails_silently=False
)
except HathorError:
self.fails(InvalidVertexError(blk.hash.hex()))
return
Expand Down
12 changes: 11 additions & 1 deletion hathor/p2p/sync_v2/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_v2.agent import NodeBlockSync
from hathor.p2p.sync_v2.p2p_storage import P2PStorage
from hathor.p2p.sync_v2.p2p_vertex_handler import P2PVertexHandler
from hathor.reactor import ReactorProtocol as Reactor

if TYPE_CHECKING:
Expand All @@ -29,4 +31,12 @@ def __init__(self, connections: ConnectionsManager):
self.connections = connections

def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Reactor) -> SyncAgent:
return NodeBlockSync(protocol, reactor=reactor)
p2p_storage = P2PStorage(tx_storage=protocol.node.tx_storage)
p2p_vertex_handler = P2PVertexHandler(manager=protocol.node)

return NodeBlockSync(
protocol=protocol,
reactor=reactor,
p2p_storage=p2p_storage,
p2p_vertex_handler=p2p_vertex_handler,
)

0 comments on commit 197c056

Please sign in to comment.