From afe143e43e4d2a432bb95293e7a848b5e4027fba Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Fri, 1 Mar 2019 14:49:02 -0800 Subject: [PATCH 1/7] Fix pruning with consecutive finished dependencies --- .../test_ordered_task_preparation.py | 22 +++++++++++++++++++ trinity/_utils/datastructures.py | 8 ++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/tests/core/task-queue-utils/test_ordered_task_preparation.py b/tests/core/task-queue-utils/test_ordered_task_preparation.py index f70e58eb3d..5eeb538f02 100644 --- a/tests/core/task-queue-utils/test_ordered_task_preparation.py +++ b/tests/core/task-queue-utils/test_ordered_task_preparation.py @@ -114,6 +114,28 @@ async def test_pruning(): ti.register_tasks((25, )) +@pytest.mark.asyncio +async def test_pruning_consecutive_finished_deps(): + ti = OrderedTaskPreparation(NoPrerequisites, identity, lambda x: x - 1, max_depth=2) + ti.set_finished_dependency(3) + ti.set_finished_dependency(4) + ti.register_tasks((5, 6)) + + assert 3 in ti._tasks + assert 4 in ti._tasks + + # trigger pruning by requesting the ready tasks through 6, then "finishing" them + # by requesting the next batch of ready tasks (7) + completed = await wait(ti.ready_tasks()) + assert completed == (5, 6) + ti.register_tasks((7, )) + completed = await wait(ti.ready_tasks()) + assert completed == (7, ) + + assert 3 not in ti._tasks + assert 4 in ti._tasks + + @pytest.mark.asyncio async def test_wait_forever(): ti = OrderedTaskPreparation(OnePrereq, identity, lambda x: x - 1) diff --git a/trinity/_utils/datastructures.py b/trinity/_utils/datastructures.py index 1fa6b69015..039f355bcf 100644 --- a/trinity/_utils/datastructures.py +++ b/trinity/_utils/datastructures.py @@ -511,7 +511,13 @@ def set_finished_dependency(self, finished_task: TTask) -> None: ) self._tasks[task_id] = completed self._declared_finished.add(task_id) - self._roots.add(task_id, self._dependency_of(finished_task)) + + dependency_id = self._dependency_of(finished_task) + self._roots.add(task_id, dependency_id) + if dependency_id in self._tasks: + # set a finished dependency that has a parent already entered. Mark this as a dependency + self._dependencies[dependency_id].add(task_id) + # note that this task is intentionally *not* added to self._unready def register_tasks(self, tasks: Tuple[TTask, ...], ignore_duplicates: bool = False) -> None: From 20c7f598909f2d130b239ff3799d45c84cca3b2b Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Thu, 7 Mar 2019 13:00:15 -0800 Subject: [PATCH 2/7] Hunt for bug when adding RootTracker node --- tests/core/utils/test_root_tracker.py | 51 +++++++++++++++++++++------ trinity/_utils/tree_root.py | 10 +++++- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/tests/core/utils/test_root_tracker.py b/tests/core/utils/test_root_tracker.py index 2695f0a25c..9115367a3a 100644 --- a/tests/core/utils/test_root_tracker.py +++ b/tests/core/utils/test_root_tracker.py @@ -78,7 +78,7 @@ def test_out_of_order_line(insertion_order): @given(st.lists(st.integers(min_value=0, max_value=9))) -def test_prune_reinsert_root_tracking(element_flipping): +def test_prune_reinsert_root_tracking_linear(element_flipping): tracker = RootTracker() present = set() @@ -105,15 +105,47 @@ def test_prune_reinsert_root_tracking(element_flipping): FULL_BINARY_TREE = [(layer, column) for layer in [0, 1, 2, 3] for column in range(2**layer)] +def binary_parent(node): + return (node[0] - 1, node[1] // 2) + + +# only use the first 3 layers of the tree +@given(st.lists( + st.integers(min_value=0, max_value=6), + min_size=3, +)) +#@settings(max_examples=20000) +def test_prune_reinsert_root_tracking_binary_tree(element_flipping): + tracker = RootTracker() + + present = set() + for node_id in element_flipping: + node = FULL_BINARY_TREE[node_id] + if node in present: + prune_root_id, _ = tracker.get_root(node) + tracker.prune(prune_root_id) + present.remove(prune_root_id) + else: + tracker.add(node, binary_parent(node)) + present.add(node) + + # validate all the present nodes have valid roots + for test_node in present: + root_node, depth = tracker.get_root(test_node) + + # make sure parent is *not* present + assert binary_parent(root_node) not in present + + # make sure depth is correct + assert depth == test_node[0] - root_node[0] + + @given(st.permutations(FULL_BINARY_TREE)) def test_full_branching(insertion_order): """Test full binary tree, in random order""" - def parent(node): - return (node[0] - 1, node[1] // 2) - tracker = RootTracker() for node in insertion_order: - tracker.add(node, parent(node)) + tracker.add(node, binary_parent(node)) # prune all the way to the leaf of (3, 0) for num_prunings in range(3): @@ -136,22 +168,19 @@ def subset_and_order(draw): def test_sparse_branching(test_data): nodes_to_insert, prune_order = test_data - def parent(node): - return (node[0] - 1, node[1] // 2) - def get_expected_root(node, present_nodes): expected_depth = 0 expected_root = node - parent_node = parent(node) + parent_node = binary_parent(node) while parent_node in present_nodes: expected_depth += 1 expected_root = parent_node - parent_node = parent(parent_node) + parent_node = binary_parent(parent_node) return expected_root, expected_depth tracker = RootTracker() for node in nodes_to_insert: - tracker.add(node, parent(node)) + tracker.add(node, binary_parent(node)) # verify parent and depth of partially-built tree for node in nodes_to_insert: diff --git a/trinity/_utils/tree_root.py b/trinity/_utils/tree_root.py index 4396f3c50f..9ea0608a76 100644 --- a/trinity/_utils/tree_root.py +++ b/trinity/_utils/tree_root.py @@ -323,7 +323,15 @@ def prune(self, prune_off_id: TNodeID) -> None: def _get_new_root(self, node_id: TNodeID, parent_id: TNodeID) -> Tuple[TreeRoot[TNodeID], int]: if self._tree.has_parent(node_id): - parent_root = self._roots[parent_id] + try: + parent_root = self._roots[parent_id] + except KeyError as e: + tree_parent = self._tree.parent_of(node_id) + raise ValidationError( + f"When adding node {node_id} with parent {parent_id}, The tree says that " + f"parent {tree_parent} is present, but the parent is missing from roots." + ) from e + if len(self._tree.children_of(parent_id)) > 1: node_root = TreeRoot(node_id) node_root.extend(parent_root, 0) From e4b2ec77fb666ba6ce736768d0858028cb60dcf1 Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Thu, 7 Mar 2019 15:00:59 -0800 Subject: [PATCH 3/7] Minor logging improvement and code consolidation --- trinity/_utils/tree_root.py | 3 +-- trinity/sync/full/chain.py | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/trinity/_utils/tree_root.py b/trinity/_utils/tree_root.py index 9ea0608a76..cfe9f0263d 100644 --- a/trinity/_utils/tree_root.py +++ b/trinity/_utils/tree_root.py @@ -335,10 +335,9 @@ def _get_new_root(self, node_id: TNodeID, parent_id: TNodeID) -> Tuple[TreeRoot[ if len(self._tree.children_of(parent_id)) > 1: node_root = TreeRoot(node_id) node_root.extend(parent_root, 0) - original_depth = self._original_depth_to_root[parent_id] + 1 else: node_root = parent_root - original_depth = self._original_depth_to_root[parent_id] + 1 + original_depth = self._original_depth_to_root[parent_id] + 1 else: node_root = TreeRoot(node_id) original_depth = 0 diff --git a/trinity/sync/full/chain.py b/trinity/sync/full/chain.py index e77c0ac1e5..69700db5f9 100644 --- a/trinity/sync/full/chain.py +++ b/trinity/sync/full/chain.py @@ -539,11 +539,12 @@ async def _display_stats(self) -> None: while self.is_operational: await self.sleep(5) self.logger.debug( - "(in progress, queued, max size) of bodies, receipts: %r", + "(in progress, queued, max size) of bodies, receipts: %r. Write capacity? %s", [(q.num_in_progress(), len(q), q._maxsize) for q in ( self._block_body_tasks, self._receipt_tasks, )], + "yes" if self._db_buffer_capacity.is_set() else "no", ) stats = self.tracker.report() From 2b8de3cc732e414b4926ccacf94692c4214cdcf8 Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Thu, 7 Mar 2019 15:09:51 -0800 Subject: [PATCH 4/7] Restart skeleton sync if out-of-order header found --- trinity/sync/common/headers.py | 26 +++++++++++++++++++++----- trinity/sync/full/chain.py | 4 +++- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/trinity/sync/common/headers.py b/trinity/sync/common/headers.py index e2be7c0274..382069387c 100644 --- a/trinity/sync/common/headers.py +++ b/trinity/sync/common/headers.py @@ -728,7 +728,20 @@ def __init__(self, self._chain = chain self._peer_pool = peer_pool self._tip_monitor = self.tip_monitor_class(peer_pool, token=self.cancel_token) + self._last_target_header_hash: Hash32 = None self._skeleton: SkeletonSyncer[TChainPeer] = None + + # Track if there is capacity for syncing more headers + self._buffer_capacity = asyncio.Event() + + self._reset_buffer() + + async def clear_buffer(self) -> None: + if self._skeleton is not None: + await self._skeleton.cancel() + self._reset_buffer() + + def _reset_buffer(self) -> None: # stitch together headers as they come in self._stitcher = OrderedTaskPreparation( # we don't have to do any prep work on the headers, just linearize them, so empty enum @@ -741,12 +754,15 @@ def __init__(self, ) # When downloading the headers into the gaps left by the syncer, they must be linearized # by the stitcher - self._meat = HeaderMeatSyncer(chain, peer_pool, self._stitcher, token) - self._last_target_header_hash: Hash32 = None + self._meat = HeaderMeatSyncer( + self._chain, + self._peer_pool, + self._stitcher, + self.cancel_token, + ) - # Track if there is capacity for syncing more headers - self._buffer_capacity = asyncio.Event() - self._buffer_capacity.set() # start with capacity + # Queue has reset, so always start with capacity + self._buffer_capacity.set() async def new_sync_headers( self, diff --git a/trinity/sync/full/chain.py b/trinity/sync/full/chain.py index 69700db5f9..cd66339324 100644 --- a/trinity/sync/full/chain.py +++ b/trinity/sync/full/chain.py @@ -476,7 +476,9 @@ async def _launch_prerequisite_tasks(self) -> None: ) except HeaderNotFound: await self._log_header_link_failure(headers[0]) - raise + await self._header_syncer.clear_buffer() + # wait for new headers to come back in from a restarted skeleton sync + continue # This appears to be a fork, since the parent header is persisted, self.logger.info( From f21047c7a3d20a57a8d26a6d9256950896752cf9 Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Thu, 7 Mar 2019 16:43:47 -0800 Subject: [PATCH 5/7] Resize sync buffers - The DB buffer should be smaller than the body request buffer, otherwise we'll never know if we have hit our maximum I/O throughput --- trinity/sync/full/chain.py | 5 +++-- trinity/sync/full/constants.py | 9 +++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/trinity/sync/full/chain.py b/trinity/sync/full/chain.py index cd66339324..cc460a0d7a 100644 --- a/trinity/sync/full/chain.py +++ b/trinity/sync/full/chain.py @@ -62,6 +62,7 @@ from trinity.sync.common.peers import WaitingPeers from trinity.sync.full.constants import ( HEADER_QUEUE_SIZE_TARGET, + BLOCK_QUEUE_SIZE_TARGET, ) from trinity._utils.datastructures import ( MissingDependency, @@ -82,7 +83,7 @@ ] # How big should the pending request queue get, as a multiple of the largest request size -REQUEST_BUFFER_MULTIPLIER = 8 +REQUEST_BUFFER_MULTIPLIER = 16 class BaseBodyChainSyncer(BaseService, PeerSubscriber): @@ -583,7 +584,7 @@ async def _persist_ready_blocks(self) -> None: while self.is_operational: # This tracker waits for all prerequisites to be complete, and returns headers in # order, so that each header's parent is already persisted. - get_completed_coro = self._block_persist_tracker.ready_tasks(HEADER_QUEUE_SIZE_TARGET) + get_completed_coro = self._block_persist_tracker.ready_tasks(BLOCK_QUEUE_SIZE_TARGET) completed_headers = await self.wait(get_completed_coro) if self._block_persist_tracker.has_ready_tasks(): diff --git a/trinity/sync/full/constants.py b/trinity/sync/full/constants.py index 331a9c112a..da4109dfa9 100644 --- a/trinity/sync/full/constants.py +++ b/trinity/sync/full/constants.py @@ -11,7 +11,8 @@ # BUFFER_SECONDS = 30 # (this should allow plenty of time for peers to fill in the buffer during db writes) # -# MARGIN = 10 -# (better to have a buffer that's too big than to artificially constrain performance) -# -HEADER_QUEUE_SIZE_TARGET = 60000 +HEADER_QUEUE_SIZE_TARGET = 6000 + +# How many blocks to persist at a time +# Only need a few seconds of buffer on the DB write side. +BLOCK_QUEUE_SIZE_TARGET = 1000 From 8967903d063e8b94b6df33af9f27be79163f6d89 Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Thu, 7 Mar 2019 17:12:20 -0800 Subject: [PATCH 6/7] Track highest old block number received in sync --- trinity/sync/full/chain.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/trinity/sync/full/chain.py b/trinity/sync/full/chain.py index cc460a0d7a..80365010c8 100644 --- a/trinity/sync/full/chain.py +++ b/trinity/sync/full/chain.py @@ -456,6 +456,7 @@ async def _launch_prerequisite_tasks(self) -> None: tasks as they become available. """ get_headers_coro = self._header_syncer.new_sync_headers(HEADER_QUEUE_SIZE_TARGET) + highest_block_num = -1 async for headers in self.wait_iter(get_headers_coro): try: # We might end up with duplicates that can be safely ignored. @@ -476,7 +477,7 @@ async def _launch_prerequisite_tasks(self) -> None: self.db.coro_get_block_header_by_hash(headers[0].parent_hash) ) except HeaderNotFound: - await self._log_header_link_failure(headers[0]) + await self._log_header_link_failure(headers[0], highest_block_num) await self._header_syncer.clear_buffer() # wait for new headers to come back in from a restarted skeleton sync continue @@ -507,8 +508,10 @@ async def _launch_prerequisite_tasks(self) -> None: # Don't race ahead of the database, by blocking when the persistance queue is too long await self._db_buffer_capacity.wait() - async def _log_header_link_failure(self, first_header: BlockHeader) -> None: - self.logger.info("Unable to find parent in our database for %r", first_header) + highest_block_num = max(headers[-1].block_number, highest_block_num) + + async def _log_header_link_failure(self, first_header: BlockHeader, highest_block_num) -> None: + self.logger.warning("Parent missing for header %r, restarting header sync", first_header) block_num = first_header.block_number try: local_header = await self.db.coro_get_canonical_block_header_by_number(block_num) @@ -530,12 +533,14 @@ async def _log_header_link_failure(self, first_header: BlockHeader) -> None: self.logger.debug( "Header syncer returned header %s, which is not in our DB. " - "Instead at #%d, our header is %s, whose parent is %s, with canonical tip %s", + "Instead at #%d, our header is %s, whose parent is %s, with canonical tip %s. ", + "The highest received header is %d.", first_header, block_num, local_header, local_parent, canonical_tip, + highest_block_num, ) async def _display_stats(self) -> None: From bf26e307d92beae32fe3ee47552cfc147e6b8400 Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Thu, 7 Mar 2019 16:50:22 -0800 Subject: [PATCH 7/7] lint roll --- tests/core/utils/test_root_tracker.py | 1 - trinity/sync/common/headers.py | 11 +++++++++++ trinity/sync/full/chain.py | 12 +++++++++--- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/tests/core/utils/test_root_tracker.py b/tests/core/utils/test_root_tracker.py index 9115367a3a..a11e825b17 100644 --- a/tests/core/utils/test_root_tracker.py +++ b/tests/core/utils/test_root_tracker.py @@ -114,7 +114,6 @@ def binary_parent(node): st.integers(min_value=0, max_value=6), min_size=3, )) -#@settings(max_examples=20000) def test_prune_reinsert_root_tracking_binary_tree(element_flipping): tracker = RootTracker() diff --git a/trinity/sync/common/headers.py b/trinity/sync/common/headers.py index 382069387c..698ea7896a 100644 --- a/trinity/sync/common/headers.py +++ b/trinity/sync/common/headers.py @@ -492,6 +492,17 @@ async def new_sync_headers( if False: yield + @abstractmethod + async def clear_buffer(self) -> None: + """ + Whatever headers have been received until now, dump them all and restart. + This is a last resort, to be used only when a consumer seems to receive + headers out of other and decides they have no other option besides reset. + + It wastes a lot of previously completed work. + """ + pass + @abstractmethod def get_target_header_hash(self) -> Hash32: pass diff --git a/trinity/sync/full/chain.py b/trinity/sync/full/chain.py index 80365010c8..228a96ef67 100644 --- a/trinity/sync/full/chain.py +++ b/trinity/sync/full/chain.py @@ -456,7 +456,10 @@ async def _launch_prerequisite_tasks(self) -> None: tasks as they become available. """ get_headers_coro = self._header_syncer.new_sync_headers(HEADER_QUEUE_SIZE_TARGET) + + # Track the highest registered block header by number, purely for stats/logging highest_block_num = -1 + async for headers in self.wait_iter(get_headers_coro): try: # We might end up with duplicates that can be safely ignored. @@ -477,9 +480,12 @@ async def _launch_prerequisite_tasks(self) -> None: self.db.coro_get_block_header_by_hash(headers[0].parent_hash) ) except HeaderNotFound: - await self._log_header_link_failure(headers[0], highest_block_num) + await self._log_missing_parent(headers[0], highest_block_num) + + # Nowhere to go from here, reset and try again await self._header_syncer.clear_buffer() - # wait for new headers to come back in from a restarted skeleton sync + + # Don't try to process `headers`, wait for new ones to come in continue # This appears to be a fork, since the parent header is persisted, @@ -510,7 +516,7 @@ async def _launch_prerequisite_tasks(self) -> None: highest_block_num = max(headers[-1].block_number, highest_block_num) - async def _log_header_link_failure(self, first_header: BlockHeader, highest_block_num) -> None: + async def _log_missing_parent(self, first_header: BlockHeader, highest_block_num: int) -> None: self.logger.warning("Parent missing for header %r, restarting header sync", first_header) block_num = first_header.block_number try: