diff --git a/hathor/manager.py b/hathor/manager.py index 9fd823ca5..1fae09d2d 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -457,7 +457,7 @@ def _initialize_components_full_verification(self) -> None: # TODO: deal with invalid tx tx._update_parents_children_metadata() - if tx.can_validate_full(): + if self.tx_storage.can_validate_full(tx): tx.update_initial_metadata() if tx.is_genesis: assert tx.validate_checkpoint(self.checkpoints) @@ -940,7 +940,8 @@ def on_new_tx( quiet: bool = False, fails_silently: bool = True, propagate_to_peers: bool = True, - reject_locked_reward: bool = True + reject_locked_reward: bool = True, + is_sync_v2: bool = False, ) -> bool: """ New method for adding transactions or blocks that steps the validation state machine. @@ -955,6 +956,7 @@ def on_new_tx( fails_silently=fails_silently, propagate_to_peers=propagate_to_peers, reject_locked_reward=reject_locked_reward, + is_sync_v2=is_sync_v2, ) def has_sync_version_capability(self) -> bool: diff --git a/hathor/p2p/sync_v2/agent.py b/hathor/p2p/sync_v2/agent.py index 780e84f41..c98cdad3c 100644 --- a/hathor/p2p/sync_v2/agent.py +++ b/hathor/p2p/sync_v2/agent.py @@ -29,6 +29,8 @@ from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.sync_v2.blockchain_streaming_client import BlockchainStreamingClient, StreamingError from hathor.p2p.sync_v2.mempool import SyncMempoolManager +from hathor.p2p.sync_v2.p2p_storage import P2PStorage +from hathor.p2p.sync_v2.p2p_vertex_handler import P2PVertexHandler from hathor.p2p.sync_v2.payloads import BestBlockPayload, GetNextBlocksPayload, GetTransactionsBFSPayload from hathor.p2p.sync_v2.streamers import ( DEFAULT_STREAMING_LIMIT, @@ -46,7 +48,6 @@ if TYPE_CHECKING: from hathor.p2p.protocol import HathorProtocol - from hathor.transaction.storage import TransactionStorage logger = get_logger() @@ -84,7 +85,14 @@ class NodeBlockSync(SyncAgent): """ name: str = 'node-block-sync' - def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None: + def __init__( + self, + *, + protocol: 'HathorProtocol', + reactor: Reactor, + p2p_storage: P2PStorage, + p2p_vertex_handler: P2PVertexHandler, + ) -> None: """ :param protocol: Protocol of the connection. :type protocol: HathorProtocol @@ -95,7 +103,8 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None: self._settings = get_global_settings() self.protocol = protocol self.manager = protocol.node - self.tx_storage: 'TransactionStorage' = protocol.node.tx_storage + self.p2p_storage = p2p_storage + self.p2p_vertex_handler = p2p_vertex_handler self.state = PeerState.UNKNOWN self.DEFAULT_STREAMING_LIMIT = DEFAULT_STREAMING_LIMIT @@ -167,9 +176,7 @@ def __init__(self, protocol: 'HathorProtocol', reactor: Reactor) -> None: def get_status(self) -> dict[str, Any]: """ Return the status of the sync. """ - assert self.tx_storage.indexes is not None - assert self.tx_storage.indexes.mempool_tips is not None - tips = self.tx_storage.indexes.mempool_tips.get() + tips = self.p2p_storage.get_mempool_tips() tips_limited, tips_has_more = collect_n(iter(tips), MAX_MEMPOOL_STATUS_TIPS) res = { 'is_enabled': self.is_sync_enabled(), @@ -336,10 +343,8 @@ def run_sync_mempool(self) -> Generator[Any, Any, None]: def get_my_best_block(self) -> _HeightInfo: """Return my best block info.""" - bestblock = self.tx_storage.get_best_block() - meta = bestblock.get_metadata() - assert meta.validation.is_fully_connected() - return _HeightInfo(height=bestblock.get_height(), id=bestblock.hash) + best_block = self.p2p_storage.get_local_best_block() + return _HeightInfo(height=best_block.static_metadata.height, id=best_block.hash) @inlineCallbacks def run_sync_blocks(self) -> Generator[Any, Any, bool]: @@ -347,7 +352,6 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]: Notice that we might already have all other peer's blocks while the other peer is still syncing. """ - assert self.tx_storage.indexes is not None self.state = PeerState.SYNCING_BLOCKS # Get my best block. @@ -370,7 +374,8 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]: # Not synced but same blockchain? if self.peer_best_block.height <= my_best_block.height: # Is peer behind me at the same blockchain? - common_block_hash = self.tx_storage.indexes.height.get(self.peer_best_block.height) + common_block_hash = self.p2p_storage.get_local_block_by_height(self.peer_best_block.height) + if common_block_hash == self.peer_best_block.id: # If yes, nothing to sync from this peer. if not self.is_synced(): @@ -448,15 +453,13 @@ def send_get_tips(self) -> None: def handle_get_tips(self, _payload: str) -> None: """ Handle a GET-TIPS message. """ - assert self.tx_storage.indexes is not None - assert self.tx_storage.indexes.mempool_tips is not None if self._is_streaming: self.log.warn('can\'t send while streaming') # XXX: or can we? self.send_message(ProtocolMessages.MEMPOOL_END) return self.log.debug('handle_get_tips') # TODO Use a streaming of tips - for tx_id in self.tx_storage.indexes.mempool_tips.get(): + for tx_id in self.p2p_storage.get_mempool_tips(): self.send_tips(tx_id) self.log.debug('tips end') self.send_message(ProtocolMessages.TIPS_END) @@ -477,7 +480,9 @@ def handle_tips(self, payload: str) -> None: data = [bytes.fromhex(x) for x in data] # filter-out txs we already have try: - self._receiving_tips.extend(VertexId(tx_id) for tx_id in data if not self.partial_vertex_exists(tx_id)) + self._receiving_tips.extend( + VertexId(tx_id) for tx_id in data if not self.p2p_storage.local_partial_vertex_exists(tx_id) + ) except ValueError: self.protocol.send_error_and_close_connection('Invalid trasaction ID received') # XXX: it's OK to do this *after* the extend because the payload is limited by the line protocol @@ -542,12 +547,6 @@ def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> assert self.protocol.state is not None self.protocol.state.send_message(cmd, payload) - def partial_vertex_exists(self, vertex_id: VertexId) -> bool: - """ Return true if the vertex exists no matter its validation state. - """ - with self.tx_storage.allow_partially_validated_context(): - return self.tx_storage.transaction_exists(vertex_id) - @inlineCallbacks def find_best_common_block(self, my_best_block: _HeightInfo, @@ -590,13 +589,11 @@ def find_best_common_block(self, for info in block_info_list: try: # We must check only fully validated transactions. - blk = self.tx_storage.get_transaction(info.id) + block = self.p2p_storage.get_local_block(info.id) except TransactionDoesNotExist: hi = info else: - assert blk.get_metadata().validation.is_fully_connected() - assert isinstance(blk, Block) - assert info.height == blk.get_height() + assert info.height == block.static_metadata.height lo = info break @@ -608,12 +605,12 @@ def on_block_complete(self, blk: Block, vertex_list: list[BaseTransaction]) -> G """This method is called when a block and its transactions are downloaded.""" # Note: Any vertex and block could have already been added by another concurrent syncing peer. for tx in vertex_list: - if not self.tx_storage.transaction_exists(tx.hash): - self.manager.on_new_tx(tx, propagate_to_peers=False, fails_silently=False) + if not self.p2p_storage.local_vertex_exists(tx.hash): + self.p2p_vertex_handler.handle_new_vertex(tx, propagate_to_peers=False, fails_silently=False) yield deferLater(self.reactor, 0, lambda: None) - if not self.tx_storage.transaction_exists(blk.hash): - self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False) + if not self.p2p_storage.local_vertex_exists(blk.hash): + self.p2p_vertex_handler.handle_new_vertex(blk, propagate_to_peers=False, fails_silently=False) def get_peer_block_hashes(self, heights: list[int]) -> Deferred[list[_HeightInfo]]: """ Returns the peer's block hashes in the given heights. @@ -633,7 +630,6 @@ def send_get_peer_block_hashes(self, heights: list[int]) -> None: def handle_get_peer_block_hashes(self, payload: str) -> None: """ Handle a GET-PEER-BLOCK-HASHES message. """ - assert self.tx_storage.indexes is not None heights = json.loads(payload) if len(heights) > 20: self.log.info('too many heights', heights_qty=len(heights)) @@ -641,10 +637,10 @@ def handle_get_peer_block_hashes(self, payload: str) -> None: return data = [] for h in heights: - blk_hash = self.tx_storage.indexes.height.get(h) + blk_hash = self.p2p_storage.get_block_by_height(h) if blk_hash is None: break - blk = self.tx_storage.get_transaction(blk_hash) + blk = self.p2p_storage.get_vertex(blk_hash) if blk.get_metadata().voided_by: break data.append((h, blk_hash.hex())) @@ -695,7 +691,7 @@ def handle_get_next_blocks(self, payload: str) -> None: def _validate_block(self, _hash: VertexId) -> Optional[Block]: """Validate block given in the GET-NEXT-BLOCKS and GET-TRANSACTIONS-BFS messages.""" try: - blk = self.tx_storage.get_transaction(_hash) + blk = self.p2p_storage.get_vertex(_hash) except TransactionDoesNotExist: self.log.debug('requested block not found', blk_id=_hash.hex()) self.send_message(ProtocolMessages.NOT_FOUND, _hash.hex()) @@ -772,7 +768,6 @@ def handle_blocks(self, payload: str) -> None: if not isinstance(blk, Block): # Not a block. Punish peer? return - blk.storage = self.tx_storage assert self._blk_streaming_client is not None self._blk_streaming_client.handle_blocks(blk) @@ -833,7 +828,7 @@ def send_get_best_block(self) -> None: def handle_get_best_block(self, _payload: str) -> None: """ Handle a GET-BEST-BLOCK message. """ - best_block = self.tx_storage.get_best_block() + best_block = self.p2p_storage.get_best_block() meta = best_block.get_metadata() assert meta.validation.is_fully_connected() payload = BestBlockPayload( @@ -946,7 +941,7 @@ def handle_get_transactions_bfs(self, payload: str) -> None: start_from_txs = [] for start_from_hash in data.start_from: try: - tx = self.tx_storage.get_transaction(start_from_hash) + tx = self.p2p_storage.get_vertex(start_from_hash) except TransactionDoesNotExist: # In case the tx does not exist we send a NOT-FOUND message self.log.debug('requested start_from_hash not found', start_from_hash=start_from_hash.hex()) @@ -1023,7 +1018,6 @@ def handle_transaction(self, payload: str) -> None: self.log.warn('not a transaction', hash=tx.hash_hex) # Not a transaction. Punish peer? return - tx.storage = self.tx_storage assert self._tx_streaming_client is not None self._tx_streaming_client.handle_transaction(tx) @@ -1037,14 +1031,14 @@ def get_tx(self, tx_id: bytes) -> Generator[Deferred, Any, BaseTransaction]: self.log.debug('tx in cache', tx=tx_id.hex()) return tx try: - tx = self.tx_storage.get_transaction(tx_id) + return self.p2p_storage.get_local_vertex(tx_id) except TransactionDoesNotExist: tx = yield self.get_data(tx_id, 'mempool') assert tx is not None if tx.hash != tx_id: self.protocol.send_error_and_close_connection(f'DATA mempool {tx_id.hex()} hash mismatch') raise - return tx + return tx def get_data(self, tx_id: bytes, origin: str) -> Deferred[BaseTransaction]: """ Async method to request a tx by id. @@ -1107,7 +1101,7 @@ def handle_get_data(self, payload: str) -> None: origin = data.get('origin', '') # self.log.debug('handle_get_data', payload=hash_hex) try: - tx = self.protocol.node.tx_storage.get_transaction(bytes.fromhex(txid_hex)) + tx = self.p2p_storage.get_vertex(bytes.fromhex(txid_hex)) self.send_data(tx, origin=origin) except TransactionDoesNotExist: # In case the tx does not exist we send a NOT-FOUND message @@ -1142,24 +1136,22 @@ def handle_data(self, payload: str) -> None: return assert tx is not None - if self.protocol.node.tx_storage.get_genesis(tx.hash): + if self.p2p_storage.get_genesis(tx.hash): # We just got the data of a genesis tx/block. What should we do? # Will it reduce peer reputation score? return - tx.storage = self.protocol.node.tx_storage - - if self.partial_vertex_exists(tx.hash): + if self.p2p_storage.local_partial_vertex_exists(tx.hash): # transaction already added to the storage, ignore it # XXX: maybe we could add a hash blacklist and punish peers propagating known bad txs - self.manager.tx_storage.compare_bytes_with_local_tx(tx) + self.p2p_storage.compare_bytes_with_local_tx(tx) return else: # If we have not requested the data, it is a new transaction being propagated # in the network, thus, we propagate it as well. - if tx.can_validate_full(): + if self.p2p_storage.local_can_validate_full(tx): self.log.debug('tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) - self.manager.on_new_tx(tx, propagate_to_peers=True) + self.p2p_vertex_handler.handle_new_vertex(tx, propagate_to_peers=True) else: self.log.debug('skipping tx received in real time from peer', tx=tx.hash_hex, peer=self.protocol.get_peer_id()) diff --git a/hathor/p2p/sync_v2/blockchain_streaming_client.py b/hathor/p2p/sync_v2/blockchain_streaming_client.py index a08b305de..f7d7acfc2 100644 --- a/hathor/p2p/sync_v2/blockchain_streaming_client.py +++ b/hathor/p2p/sync_v2/blockchain_streaming_client.py @@ -27,7 +27,6 @@ from hathor.p2p.sync_v2.streamers import StreamEnd from hathor.transaction import Block from hathor.transaction.exceptions import HathorError -from hathor.types import VertexId if TYPE_CHECKING: from hathor.p2p.sync_v2.agent import NodeBlockSync, _HeightInfo @@ -39,7 +38,6 @@ class BlockchainStreamingClient: def __init__(self, sync_agent: 'NodeBlockSync', start_block: '_HeightInfo', end_block: '_HeightInfo') -> None: self.sync_agent = sync_agent self.protocol = self.sync_agent.protocol - self.tx_storage = self.sync_agent.tx_storage self.manager = self.sync_agent.manager self.log = logger.new(peer=self.protocol.get_short_peer_id()) @@ -75,11 +73,6 @@ def fails(self, reason: 'StreamingError') -> None: """Fail the execution by resolving the deferred with an error.""" self._deferred.errback(reason) - def partial_vertex_exists(self, vertex_id: VertexId) -> bool: - """Return true if the vertex exists no matter its validation state.""" - with self.tx_storage.allow_partially_validated_context(): - return self.tx_storage.transaction_exists(vertex_id) - def handle_blocks(self, blk: Block) -> None: """This method is called by the sync agent when a BLOCKS message is received.""" if self._deferred.called: @@ -105,7 +98,7 @@ def handle_blocks(self, blk: Block) -> None: # Check for repeated blocks. is_duplicated = False - if self.partial_vertex_exists(blk.hash): + if self.sync_agent.p2p_storage.local_partial_vertex_exists(blk.hash): # We reached a block we already have. Skip it. self._blk_repeated += 1 is_duplicated = True @@ -130,9 +123,13 @@ def handle_blocks(self, blk: Block) -> None: else: self.log.debug('block received', blk_id=blk.hash.hex()) - if blk.can_validate_full(): + if self.sync_agent.p2p_storage.local_can_validate_full(blk): try: - self.manager.on_new_tx(blk, propagate_to_peers=False, fails_silently=False) + self.sync_agent.p2p_vertex_handler.handle_new_vertex( + blk, + propagate_to_peers=False, + fails_silently=False + ) except HathorError: self.fails(InvalidVertexError(blk.hash.hex())) return diff --git a/hathor/p2p/sync_v2/factory.py b/hathor/p2p/sync_v2/factory.py index 71f17dd87..dbb4199b1 100644 --- a/hathor/p2p/sync_v2/factory.py +++ b/hathor/p2p/sync_v2/factory.py @@ -18,6 +18,8 @@ from hathor.p2p.sync_agent import SyncAgent from hathor.p2p.sync_factory import SyncAgentFactory from hathor.p2p.sync_v2.agent import NodeBlockSync +from hathor.p2p.sync_v2.p2p_storage import P2PStorage +from hathor.p2p.sync_v2.p2p_vertex_handler import P2PVertexHandler from hathor.reactor import ReactorProtocol as Reactor if TYPE_CHECKING: @@ -29,4 +31,12 @@ def __init__(self, connections: ConnectionsManager): self.connections = connections def create_sync_agent(self, protocol: 'HathorProtocol', reactor: Reactor) -> SyncAgent: - return NodeBlockSync(protocol, reactor=reactor) + p2p_storage = P2PStorage(tx_storage=protocol.node.tx_storage) + p2p_vertex_handler = P2PVertexHandler(manager=protocol.node) + + return NodeBlockSync( + protocol=protocol, + reactor=reactor, + p2p_storage=p2p_storage, + p2p_vertex_handler=p2p_vertex_handler, + ) diff --git a/hathor/p2p/sync_v2/mempool.py b/hathor/p2p/sync_v2/mempool.py index d4eb7bfe6..422e00b7c 100644 --- a/hathor/p2p/sync_v2/mempool.py +++ b/hathor/p2p/sync_v2/mempool.py @@ -36,7 +36,6 @@ def __init__(self, sync_agent: 'NodeBlockSync'): # Shortcuts. self.sync_agent = sync_agent self.manager = self.sync_agent.manager - self.tx_storage = self.manager.tx_storage self.reactor = self.sync_agent.reactor self._deferred: Optional[Deferred[bool]] = None @@ -87,7 +86,7 @@ def _unsafe_run(self) -> Generator[Deferred, Any, bool]: if not self.missing_tips: # No missing tips? Let's get them! tx_hashes: list[bytes] = yield self.sync_agent.get_tips() - self.missing_tips.update(h for h in tx_hashes if not self.tx_storage.transaction_exists(h)) + self.missing_tips.update(h for h in tx_hashes if not self.sync_agent.p2p_storage.local_vertex_exists(h)) while self.missing_tips: self.log.debug('We have missing tips! Let\'s start!', missing_tips=[x.hex() for x in self.missing_tips]) @@ -124,14 +123,14 @@ def _next_missing_dep(self, tx: BaseTransaction) -> Optional[bytes]: """Get the first missing dependency found of tx.""" assert not tx.is_block for txin in tx.inputs: - if not self.tx_storage.transaction_exists(txin.tx_id): + if not self.sync_agent.p2p_storage.local_vertex_exists(txin.tx_id): return txin.tx_id for parent in tx.parents: - if not self.tx_storage.transaction_exists(parent): + if not self.sync_agent.p2p_storage.local_vertex_exists(parent): return parent return None def _add_tx(self, tx: BaseTransaction) -> None: """Add tx to the DAG.""" self.missing_tips.discard(tx.hash) - self.manager.on_new_tx(tx) + self.sync_agent.p2p_vertex_handler.handle_new_vertex(tx) diff --git a/hathor/p2p/sync_v2/p2p_storage.py b/hathor/p2p/sync_v2/p2p_storage.py new file mode 100644 index 000000000..2994ebc6d --- /dev/null +++ b/hathor/p2p/sync_v2/p2p_storage.py @@ -0,0 +1,118 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.transaction import Block, Vertex +from hathor.transaction.storage import TransactionStorage +from hathor.types import VertexId + + +class P2PStorage: + """ + This class represents a single point of contact for sync-v2 interacting with the storage and indexes. + Every time sync-v2 needs to retrieve some info from the storage, it should call one of the methods in this class. + + It is basically a forward to storage methods, however it introduces the concept of a "local" version for some + methods. The local version of a method returns data based on a single peer's perspective, while non-local methods + return data directly from the storage. + + This class is to be used with synchronous sync-v2. Every time a new vertex is received, it is synchronously + handled, and therefore the next vertex is only received after the previous vertex has been handled and saved in the + storage. This means that in this class, every local method is simply a forward to the respective non-local method. + + Generally, local methods should be called every time the agent needs to retrieve data for its own downloading + process. Conversely, non-local methods should be called when the agent is sending data to another peer. + + The `AsyncP2PStorage` subclass deals with asynchronous sync-v2, implementing special handling for local methods. + """ + __slots__ = ('_tx_storage', '_mempool_tips_index', '_height_index') + + def __init__(self, *, tx_storage: TransactionStorage) -> None: + assert tx_storage.indexes is not None + assert tx_storage.indexes.mempool_tips is not None + assert tx_storage.indexes.height is not None + self._tx_storage = tx_storage + self._mempool_tips_index = tx_storage.indexes.mempool_tips + self._height_index = tx_storage.indexes.height + + def get_mempool_tips(self) -> set[VertexId]: + return self._mempool_tips_index.get() + + def get_local_best_block(self) -> Block: + return self.get_best_block() + + def get_best_block(self) -> Block: + return self._tx_storage.get_best_block() + + def get_local_block_by_height(self, height: int) -> VertexId | None: + return self.get_block_by_height(height) + + def get_block_by_height(self, height: int) -> VertexId | None: + return self._height_index.get(height) + + def local_partial_vertex_exists(self, vertex_id: VertexId) -> bool: + """Return true if the vertex exists no matter its validation state.""" + with self._tx_storage.allow_partially_validated_context(): + return self._tx_storage.transaction_exists(vertex_id) + + def local_vertex_exists(self, vertex_id: VertexId) -> bool: + return self._tx_storage.transaction_exists(vertex_id) + + def get_genesis(self, vertex_id: VertexId) -> Vertex | None: + return self._tx_storage.get_genesis(vertex_id) + + def compare_bytes_with_local_tx(self, tx: Vertex) -> bool: + return self._tx_storage.compare_bytes_with_local_tx(tx) + + def get_local_vertex(self, vertex_id: VertexId) -> Vertex: + return self.get_vertex(vertex_id) + + def get_vertex(self, vertex_id: VertexId) -> Vertex: + return self._tx_storage.get_vertex(vertex_id) + + def get_local_block(self, block_id: VertexId) -> Block: + return self.get_block(block_id) + + def get_block(self, block_id: VertexId) -> Block: + return self._tx_storage.get_block(block_id) + + def get_parent_block(self, block: Block) -> Block: + return self._tx_storage.get_parent_block(block) + + def get_best_block_tips(self) -> list[VertexId]: + return self._tx_storage.get_best_block_tips() + + def local_can_validate_full(self, vertex: Vertex) -> bool: + return self._tx_storage.can_validate_full(vertex) + + +class AsyncP2PStorage(P2PStorage): + """ + This class represents a single point of contact for sync-v2 interacting with the storage and indexes. + Every time sync-v2 needs to retrieve some info from the storage, it should call one of the methods in this class. + + It is basically a forward to storage methods, however it introduces the concept of a "local" version for some + methods. The local version of a method returns data based on a single peer's perspective, while non-local methods + return data directly from the storage. + + This class is to be used with asynchronous sync-v2. It helps progress the sync without waiting for handling of + vertices, therefore there is a set of handled vertices already saved in the storage, and a set of not-yet-handled + vertices that are only local to this agent. This means that every local method is implemented by checking not only + the storage, but also vertices that are still waiting to be handled, in memory. + + Generally, local methods should be called every time the agent needs to retrieve data for its own downloading + process. Conversely, non-local methods should be called when the agent is sending data to another peer. + + The `P2PStorage` superclass deals with synchronous sync-v2. + """ + # TODO: To be implemented diff --git a/hathor/p2p/sync_v2/p2p_vertex_handler.py b/hathor/p2p/sync_v2/p2p_vertex_handler.py new file mode 100644 index 000000000..f0adad536 --- /dev/null +++ b/hathor/p2p/sync_v2/p2p_vertex_handler.py @@ -0,0 +1,43 @@ +# Copyright 2024 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.manager import HathorManager +from hathor.transaction import BaseTransaction + + +class P2PVertexHandler: + """ + This class represents a single point of contact for sync-v2 handling of new vertices. + Every time a new vertex is received by sync-v2, it should call `handle_new_vertex()` to send the vertex to the rest + of the pipeline (i.e. verification, consensus, saving, propagation, etc). + This class handles vertices synchronously. + """ + __slots__ = ('_manager',) + + def __init__(self, *, manager: HathorManager) -> None: + self._manager = manager + + def handle_new_vertex( + self, + vertex: BaseTransaction, + *, + fails_silently: bool = True, + propagate_to_peers: bool = True, + ) -> bool: + return self._manager.on_new_tx( + vertex, + fails_silently=fails_silently, + propagate_to_peers=propagate_to_peers, + is_sync_v2=True + ) diff --git a/hathor/p2p/sync_v2/streamers.py b/hathor/p2p/sync_v2/streamers.py index df11131ba..5247bf417 100644 --- a/hathor/p2p/sync_v2/streamers.py +++ b/hathor/p2p/sync_v2/streamers.py @@ -68,7 +68,6 @@ def __str__(self): class _StreamingServerBase: def __init__(self, sync_agent: 'NodeBlockSync', *, limit: int = DEFAULT_STREAMING_LIMIT): self.sync_agent = sync_agent - self.tx_storage = self.sync_agent.tx_storage self.protocol: 'HathorProtocol' = sync_agent.protocol assert self.protocol.transport is not None @@ -232,7 +231,12 @@ def __init__(self, assert tx.get_metadata().first_block == self.first_block.hash self.current_block: Optional[Block] = self.first_block - self.bfs = BFSOrderWalk(self.tx_storage, is_dag_verifications=True, is_dag_funds=True, is_left_to_right=False) + self.bfs = BFSOrderWalk( + self.sync_agent.p2p_storage, + is_dag_verifications=True, + is_dag_funds=True, + is_left_to_right=False + ) self.iter = self.get_iter() def _stop_streaming_server(self, response_code: StreamEnd) -> None: @@ -296,7 +300,7 @@ def send_next(self) -> None: # Check if tx is confirmed by the `self.current_block` or any next block. assert cur_metadata.first_block is not None assert self.current_block is not None - first_block = self.tx_storage.get_transaction(cur_metadata.first_block) + first_block = self.sync_agent.p2p_storage.get_vertex(cur_metadata.first_block) if not_none(first_block.get_metadata().height) < not_none(self.current_block.get_metadata().height): self.log.debug('skipping tx: out of current block') self.bfs.skip_neighbors(cur) diff --git a/hathor/p2p/sync_v2/transaction_streaming_client.py b/hathor/p2p/sync_v2/transaction_streaming_client.py index d1b068222..1673c4767 100644 --- a/hathor/p2p/sync_v2/transaction_streaming_client.py +++ b/hathor/p2p/sync_v2/transaction_streaming_client.py @@ -44,7 +44,6 @@ def __init__(self, limit: int) -> None: self.sync_agent = sync_agent self.protocol = self.sync_agent.protocol - self.tx_storage = self.sync_agent.tx_storage self.manager = self.sync_agent.manager self.reactor = self.manager.reactor @@ -194,7 +193,7 @@ def _process_transaction(self, tx: BaseTransaction) -> Generator[Any, Any, None] def _update_dependencies(self, tx: BaseTransaction) -> None: """Update _existing_deps and _waiting_for with the dependencies.""" for dep in tx.get_all_dependencies(): - if self.tx_storage.transaction_exists(dep) or dep in self._db: + if self.sync_agent.p2p_storage.local_vertex_exists(dep) or dep in self._db: self._existing_deps.add(dep) else: self._waiting_for.add(dep) diff --git a/hathor/transaction/base_transaction.py b/hathor/transaction/base_transaction.py index 31e49d1db..7c9ac9ea8 100644 --- a/hathor/transaction/base_transaction.py +++ b/hathor/transaction/base_transaction.py @@ -453,29 +453,6 @@ def add_address_from_output(output: 'TxOutput') -> None: return addresses - def can_validate_full(self) -> bool: - """ Check if this transaction is ready to be fully validated, either all deps are full-valid or one is invalid. - """ - assert self.storage is not None - assert self._hash is not None - if self.is_genesis: - return True - deps = self.get_all_dependencies() - all_exist = True - all_valid = True - # either they all exist and are fully valid - for dep in deps: - meta = self.storage.get_metadata(dep) - if meta is None: - all_exist = False - continue - if not meta.validation.is_fully_connected(): - all_valid = False - if meta.validation.is_invalid(): - # or any of them is invalid (which would make this one invalid too) - return True - return all_exist and all_valid - def set_validation(self, validation: ValidationState) -> None: """ This method will set the internal validation state AND the appropriate voided_by marker. diff --git a/hathor/transaction/storage/__init__.py b/hathor/transaction/storage/__init__.py index e46ff6035..4fbdd6ae7 100644 --- a/hathor/transaction/storage/__init__.py +++ b/hathor/transaction/storage/__init__.py @@ -15,6 +15,7 @@ from hathor.transaction.storage.cache_storage import TransactionCacheStorage from hathor.transaction.storage.memory_storage import TransactionMemoryStorage from hathor.transaction.storage.transaction_storage import TransactionStorage +from hathor.transaction.storage.vertex_storage_protocol import VertexStorageProtocol try: from hathor.transaction.storage.rocksdb_storage import TransactionRocksDBStorage @@ -26,4 +27,5 @@ 'TransactionMemoryStorage', 'TransactionCacheStorage', 'TransactionRocksDBStorage', + 'VertexStorageProtocol' ] diff --git a/hathor/transaction/storage/transaction_storage.py b/hathor/transaction/storage/transaction_storage.py index 16244256b..654ab1362 100644 --- a/hathor/transaction/storage/transaction_storage.py +++ b/hathor/transaction/storage/transaction_storage.py @@ -1191,6 +1191,27 @@ def iter_all_raw_metadata(self) -> Iterator[tuple[VertexId, dict[str, Any]]]: """ raise NotImplementedError + def can_validate_full(self, vertex: BaseTransaction) -> bool: + """ Check if this transaction is ready to be fully validated, either all deps are full-valid or one is invalid. + """ + if vertex.is_genesis: + return True + deps = vertex.get_all_dependencies() + all_exist = True + all_valid = True + # either they all exist and are fully valid + for dep in deps: + meta = self.get_metadata(dep) + if meta is None: + all_exist = False + continue + if not meta.validation.is_fully_connected(): + all_valid = False + if meta.validation.is_invalid(): + # or any of them is invalid (which would make this one invalid too) + return True + return all_exist and all_valid + class BaseTransactionStorage(TransactionStorage): indexes: Optional[IndexesManager] diff --git a/hathor/transaction/storage/traversal.py b/hathor/transaction/storage/traversal.py index d88b47b9d..db5a0e8a1 100644 --- a/hathor/transaction/storage/traversal.py +++ b/hathor/transaction/storage/traversal.py @@ -21,7 +21,7 @@ if TYPE_CHECKING: from hathor.transaction import BaseTransaction # noqa: F401 - from hathor.transaction.storage import TransactionStorage # noqa: F401 + from hathor.transaction.storage import VertexStorageProtocol # noqa: F401 from hathor.types import VertexId @@ -47,8 +47,8 @@ class GenericWalk(ABC): """ seen: set['VertexId'] - def __init__(self, storage: 'TransactionStorage', *, is_dag_funds: bool = False, - is_dag_verifications: bool = False, is_left_to_right: bool = True): + def __init__(self, storage: 'VertexStorageProtocol', *, is_dag_funds: bool = False, + is_dag_verifications: bool = False, is_left_to_right: bool = True) -> None: """ If `is_left_to_right` is `True`, we walk in the direction of the unverified transactions. Otherwise, we walk in the direction of the genesis. @@ -112,7 +112,7 @@ def add_neighbors(self, tx: 'BaseTransaction') -> None: for _hash in it: if _hash not in self.seen: self.seen.add(_hash) - neighbor = self.storage.get_transaction(_hash) + neighbor = self.storage.get_vertex(_hash) self._push_visit(neighbor) def skip_neighbors(self, tx: 'BaseTransaction') -> None: @@ -155,8 +155,14 @@ class BFSTimestampWalk(GenericWalk): """ _to_visit: list[HeapItem] - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, storage: 'VertexStorageProtocol', *, is_dag_funds: bool = False, + is_dag_verifications: bool = False, is_left_to_right: bool = True) -> None: + super().__init__( + storage, + is_dag_funds=is_dag_funds, + is_dag_verifications=is_dag_verifications, + is_left_to_right=is_left_to_right + ) self._to_visit = [] def _is_empty(self) -> bool: @@ -179,8 +185,14 @@ class BFSOrderWalk(GenericWalk): """ _to_visit: deque['BaseTransaction'] - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, storage: 'VertexStorageProtocol', *, is_dag_funds: bool = False, + is_dag_verifications: bool = False, is_left_to_right: bool = True) -> None: + super().__init__( + storage, + is_dag_funds=is_dag_funds, + is_dag_verifications=is_dag_verifications, + is_left_to_right=is_left_to_right + ) self._to_visit = deque() def _is_empty(self) -> bool: @@ -198,8 +210,14 @@ class DFSWalk(GenericWalk): """ _to_visit: list['BaseTransaction'] - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, storage: 'VertexStorageProtocol', *, is_dag_funds: bool = False, + is_dag_verifications: bool = False, is_left_to_right: bool = True) -> None: + super().__init__( + storage, + is_dag_funds=is_dag_funds, + is_dag_verifications=is_dag_verifications, + is_left_to_right=is_left_to_right + ) self._to_visit = [] def _is_empty(self) -> bool: diff --git a/hathor/vertex_handler/vertex_handler.py b/hathor/vertex_handler/vertex_handler.py index e375873c7..4761015ff 100644 --- a/hathor/vertex_handler/vertex_handler.py +++ b/hathor/vertex_handler/vertex_handler.py @@ -75,6 +75,7 @@ def on_new_vertex( fails_silently: bool = True, propagate_to_peers: bool = True, reject_locked_reward: bool = True, + is_sync_v2: bool = False, ) -> bool: """Method for adding vertices (transactions or blocks) that steps the validation state machine, synchronously. @@ -83,6 +84,9 @@ def on_new_vertex( :param fails_silently: if False will raise an exception when tx cannot be added :param propagate_to_peers: if True will relay the tx to other peers if it is accepted """ + if is_sync_v2: + assert vertex.storage is None, 'sync-v2 should never set a storage in the vertex' + is_pre_valid = self._pre_validate_vertex(vertex, fails_silently=fails_silently) if not is_pre_valid: return False