Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #347 from carver/fix-prune-key-error
Browse files Browse the repository at this point in the history
Fix pruning with consecutive finished dependencies
  • Loading branch information
carver committed Mar 8, 2019
2 parents 3ad7f02 + bf26e30 commit 1b4db83
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 32 deletions.
22 changes: 22 additions & 0 deletions tests/core/task-queue-utils/test_ordered_task_preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,28 @@ async def test_pruning():
ti.register_tasks((25, ))


@pytest.mark.asyncio
async def test_pruning_consecutive_finished_deps():
ti = OrderedTaskPreparation(NoPrerequisites, identity, lambda x: x - 1, max_depth=2)
ti.set_finished_dependency(3)
ti.set_finished_dependency(4)
ti.register_tasks((5, 6))

assert 3 in ti._tasks
assert 4 in ti._tasks

# trigger pruning by requesting the ready tasks through 6, then "finishing" them
# by requesting the next batch of ready tasks (7)
completed = await wait(ti.ready_tasks())
assert completed == (5, 6)
ti.register_tasks((7, ))
completed = await wait(ti.ready_tasks())
assert completed == (7, )

assert 3 not in ti._tasks
assert 4 in ti._tasks


@pytest.mark.asyncio
async def test_wait_forever():
ti = OrderedTaskPreparation(OnePrereq, identity, lambda x: x - 1)
Expand Down
50 changes: 39 additions & 11 deletions tests/core/utils/test_root_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_out_of_order_line(insertion_order):


@given(st.lists(st.integers(min_value=0, max_value=9)))
def test_prune_reinsert_root_tracking(element_flipping):
def test_prune_reinsert_root_tracking_linear(element_flipping):
tracker = RootTracker()

present = set()
Expand All @@ -105,15 +105,46 @@ def test_prune_reinsert_root_tracking(element_flipping):
FULL_BINARY_TREE = [(layer, column) for layer in [0, 1, 2, 3] for column in range(2**layer)]


def binary_parent(node):
return (node[0] - 1, node[1] // 2)


# only use the first 3 layers of the tree
@given(st.lists(
st.integers(min_value=0, max_value=6),
min_size=3,
))
def test_prune_reinsert_root_tracking_binary_tree(element_flipping):
tracker = RootTracker()

present = set()
for node_id in element_flipping:
node = FULL_BINARY_TREE[node_id]
if node in present:
prune_root_id, _ = tracker.get_root(node)
tracker.prune(prune_root_id)
present.remove(prune_root_id)
else:
tracker.add(node, binary_parent(node))
present.add(node)

# validate all the present nodes have valid roots
for test_node in present:
root_node, depth = tracker.get_root(test_node)

# make sure parent is *not* present
assert binary_parent(root_node) not in present

# make sure depth is correct
assert depth == test_node[0] - root_node[0]


@given(st.permutations(FULL_BINARY_TREE))
def test_full_branching(insertion_order):
"""Test full binary tree, in random order"""
def parent(node):
return (node[0] - 1, node[1] // 2)

tracker = RootTracker()
for node in insertion_order:
tracker.add(node, parent(node))
tracker.add(node, binary_parent(node))

# prune all the way to the leaf of (3, 0)
for num_prunings in range(3):
Expand All @@ -136,22 +167,19 @@ def subset_and_order(draw):
def test_sparse_branching(test_data):
nodes_to_insert, prune_order = test_data

def parent(node):
return (node[0] - 1, node[1] // 2)

def get_expected_root(node, present_nodes):
expected_depth = 0
expected_root = node
parent_node = parent(node)
parent_node = binary_parent(node)
while parent_node in present_nodes:
expected_depth += 1
expected_root = parent_node
parent_node = parent(parent_node)
parent_node = binary_parent(parent_node)
return expected_root, expected_depth

tracker = RootTracker()
for node in nodes_to_insert:
tracker.add(node, parent(node))
tracker.add(node, binary_parent(node))

# verify parent and depth of partially-built tree
for node in nodes_to_insert:
Expand Down
8 changes: 7 additions & 1 deletion trinity/_utils/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,13 @@ def set_finished_dependency(self, finished_task: TTask) -> None:
)
self._tasks[task_id] = completed
self._declared_finished.add(task_id)
self._roots.add(task_id, self._dependency_of(finished_task))

dependency_id = self._dependency_of(finished_task)
self._roots.add(task_id, dependency_id)
if dependency_id in self._tasks:
# set a finished dependency that has a parent already entered. Mark this as a dependency
self._dependencies[dependency_id].add(task_id)

# note that this task is intentionally *not* added to self._unready

def register_tasks(self, tasks: Tuple[TTask, ...], ignore_duplicates: bool = False) -> None:
Expand Down
13 changes: 10 additions & 3 deletions trinity/_utils/tree_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,14 +323,21 @@ def prune(self, prune_off_id: TNodeID) -> None:

def _get_new_root(self, node_id: TNodeID, parent_id: TNodeID) -> Tuple[TreeRoot[TNodeID], int]:
if self._tree.has_parent(node_id):
parent_root = self._roots[parent_id]
try:
parent_root = self._roots[parent_id]
except KeyError as e:
tree_parent = self._tree.parent_of(node_id)
raise ValidationError(
f"When adding node {node_id} with parent {parent_id}, The tree says that "
f"parent {tree_parent} is present, but the parent is missing from roots."
) from e

if len(self._tree.children_of(parent_id)) > 1:
node_root = TreeRoot(node_id)
node_root.extend(parent_root, 0)
original_depth = self._original_depth_to_root[parent_id] + 1
else:
node_root = parent_root
original_depth = self._original_depth_to_root[parent_id] + 1
original_depth = self._original_depth_to_root[parent_id] + 1
else:
node_root = TreeRoot(node_id)
original_depth = 0
Expand Down
37 changes: 32 additions & 5 deletions trinity/sync/common/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,17 @@ async def new_sync_headers(
if False:
yield

@abstractmethod
async def clear_buffer(self) -> None:
"""
Whatever headers have been received until now, dump them all and restart.
This is a last resort, to be used only when a consumer seems to receive
headers out of other and decides they have no other option besides reset.
It wastes a lot of previously completed work.
"""
pass

@abstractmethod
def get_target_header_hash(self) -> Hash32:
pass
Expand Down Expand Up @@ -728,7 +739,20 @@ def __init__(self,
self._chain = chain
self._peer_pool = peer_pool
self._tip_monitor = self.tip_monitor_class(peer_pool, token=self.cancel_token)
self._last_target_header_hash: Hash32 = None
self._skeleton: SkeletonSyncer[TChainPeer] = None

# Track if there is capacity for syncing more headers
self._buffer_capacity = asyncio.Event()

self._reset_buffer()

async def clear_buffer(self) -> None:
if self._skeleton is not None:
await self._skeleton.cancel()
self._reset_buffer()

def _reset_buffer(self) -> None:
# stitch together headers as they come in
self._stitcher = OrderedTaskPreparation(
# we don't have to do any prep work on the headers, just linearize them, so empty enum
Expand All @@ -741,12 +765,15 @@ def __init__(self,
)
# When downloading the headers into the gaps left by the syncer, they must be linearized
# by the stitcher
self._meat = HeaderMeatSyncer(chain, peer_pool, self._stitcher, token)
self._last_target_header_hash: Hash32 = None
self._meat = HeaderMeatSyncer(
self._chain,
self._peer_pool,
self._stitcher,
self.cancel_token,
)

# Track if there is capacity for syncing more headers
self._buffer_capacity = asyncio.Event()
self._buffer_capacity.set() # start with capacity
# Queue has reset, so always start with capacity
self._buffer_capacity.set()

async def new_sync_headers(
self,
Expand Down
31 changes: 23 additions & 8 deletions trinity/sync/full/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from trinity.sync.common.peers import WaitingPeers
from trinity.sync.full.constants import (
HEADER_QUEUE_SIZE_TARGET,
BLOCK_QUEUE_SIZE_TARGET,
)
from trinity._utils.datastructures import (
MissingDependency,
Expand All @@ -82,7 +83,7 @@
]

# How big should the pending request queue get, as a multiple of the largest request size
REQUEST_BUFFER_MULTIPLIER = 8
REQUEST_BUFFER_MULTIPLIER = 16


class BaseBodyChainSyncer(BaseService, PeerSubscriber):
Expand Down Expand Up @@ -455,6 +456,10 @@ async def _launch_prerequisite_tasks(self) -> None:
tasks as they become available.
"""
get_headers_coro = self._header_syncer.new_sync_headers(HEADER_QUEUE_SIZE_TARGET)

# Track the highest registered block header by number, purely for stats/logging
highest_block_num = -1

async for headers in self.wait_iter(get_headers_coro):
try:
# We might end up with duplicates that can be safely ignored.
Expand All @@ -475,8 +480,13 @@ async def _launch_prerequisite_tasks(self) -> None:
self.db.coro_get_block_header_by_hash(headers[0].parent_hash)
)
except HeaderNotFound:
await self._log_header_link_failure(headers[0])
raise
await self._log_missing_parent(headers[0], highest_block_num)

# Nowhere to go from here, reset and try again
await self._header_syncer.clear_buffer()

# Don't try to process `headers`, wait for new ones to come in
continue

# This appears to be a fork, since the parent header is persisted,
self.logger.info(
Expand Down Expand Up @@ -504,8 +514,10 @@ async def _launch_prerequisite_tasks(self) -> None:
# Don't race ahead of the database, by blocking when the persistance queue is too long
await self._db_buffer_capacity.wait()

async def _log_header_link_failure(self, first_header: BlockHeader) -> None:
self.logger.info("Unable to find parent in our database for %r", first_header)
highest_block_num = max(headers[-1].block_number, highest_block_num)

async def _log_missing_parent(self, first_header: BlockHeader, highest_block_num: int) -> None:
self.logger.warning("Parent missing for header %r, restarting header sync", first_header)
block_num = first_header.block_number
try:
local_header = await self.db.coro_get_canonical_block_header_by_number(block_num)
Expand All @@ -527,23 +539,26 @@ async def _log_header_link_failure(self, first_header: BlockHeader) -> None:

self.logger.debug(
"Header syncer returned header %s, which is not in our DB. "
"Instead at #%d, our header is %s, whose parent is %s, with canonical tip %s",
"Instead at #%d, our header is %s, whose parent is %s, with canonical tip %s. ",
"The highest received header is %d.",
first_header,
block_num,
local_header,
local_parent,
canonical_tip,
highest_block_num,
)

async def _display_stats(self) -> None:
while self.is_operational:
await self.sleep(5)
self.logger.debug(
"(in progress, queued, max size) of bodies, receipts: %r",
"(in progress, queued, max size) of bodies, receipts: %r. Write capacity? %s",
[(q.num_in_progress(), len(q), q._maxsize) for q in (
self._block_body_tasks,
self._receipt_tasks,
)],
"yes" if self._db_buffer_capacity.is_set() else "no",
)

stats = self.tracker.report()
Expand Down Expand Up @@ -580,7 +595,7 @@ async def _persist_ready_blocks(self) -> None:
while self.is_operational:
# This tracker waits for all prerequisites to be complete, and returns headers in
# order, so that each header's parent is already persisted.
get_completed_coro = self._block_persist_tracker.ready_tasks(HEADER_QUEUE_SIZE_TARGET)
get_completed_coro = self._block_persist_tracker.ready_tasks(BLOCK_QUEUE_SIZE_TARGET)
completed_headers = await self.wait(get_completed_coro)

if self._block_persist_tracker.has_ready_tasks():
Expand Down
9 changes: 5 additions & 4 deletions trinity/sync/full/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
# BUFFER_SECONDS = 30
# (this should allow plenty of time for peers to fill in the buffer during db writes)
#
# MARGIN = 10
# (better to have a buffer that's too big than to artificially constrain performance)
#
HEADER_QUEUE_SIZE_TARGET = 60000
HEADER_QUEUE_SIZE_TARGET = 6000

# How many blocks to persist at a time
# Only need a few seconds of buffer on the DB write side.
BLOCK_QUEUE_SIZE_TARGET = 1000

0 comments on commit 1b4db83

Please sign in to comment.