Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download all data in queues (instead of batches) while fast & regular syncing #1226

Merged
merged 5 commits into from Sep 10, 2018

Conversation

carver
Copy link
Contributor

@carver carver commented Aug 30, 2018

What was wrong?

Requests for block data and receipts were batched together. A slow peer can dramatically slow down this process.

How was it fixed?

Roughly, put everything into queues so they can run independently, and one peer isn't waiting on another to finish.

Numbers vary widely, but I've seen as high as 1500 blocks imported in 5 s.

TODO:

  • hunt down an occassional RuntimeError: Cannot restart a service that has already been cancelled
  • manually test
  • overnight manual test
  • make code more readable
  • linting
  • clean commit history

Cute Animal Picture

put a cute animal picture link inside the parentheses

header_stats = stats['BlockHeaders']
assert header_stats['count'] == idx
assert header_stats['items'] == idx
assert header_stats['timeouts'] == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might rebase this on top of #1225 so that we don't conflict here and also the stats available in that branch are easier to get at and probably better.

Copy link
Contributor Author

@carver carver Aug 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might rebase this on top of #1225

Yeah, I just noticed that conflict.

also the stats available in that branch are easier to get at and probably better.

At first glance, I agree. I'll review it now.

@carver
Copy link
Contributor Author

carver commented Sep 1, 2018

Ok, getting closer. I think most what's left (before removing the WIP tag) is linting and maybe some minor refactors for readibility. Plus whatever the full CI run digs up.

@carver
Copy link
Contributor Author

carver commented Sep 4, 2018

I'm in the process of pulling most of these pieces out into other PRs. I'll rebase this after the others are merged:

@carver
Copy link
Contributor Author

carver commented Sep 7, 2018

I rewrote most of trinity/sync/full/chain.py to fit the new model. So much has changed that it's not really worth looking at the diff on that file, just read the whole new file at: https://github.com/carver/py-evm/blob/block-data-queues/trinity/sync/full/chain.py

In my tests, we seem to be able to download bodies and receipts so fast that the new bottleneck is downloading the headers. You may see this yourself if you find something like this in the debug logs:

(in progress, queued, max size) of headers, bodies, receipts: [(0, 0, 1536), (320, 320, 512), (576, 576, 1024)]

That means that there are no new headers from which to start downloading bodies and receipts. All of the current download tasks (320 and 576, for bodies and receipts respectively) have current outstanding requests to peers.

This problem is similar to #1218 -- we probably have a peer with low header throughput, and we are stuck syncing on them until we give-up/time-out/catch-up. Some changes to make header sync look even more like body/receipt downloads might help (always getting the peer with the best throughput, on every new request).

Edit~ or do what gsalgado said is happening in geth: fetch every 128th header from your "lead" peer, and fill in the rest from other peers.

@carver carver changed the title [WIP] Block data queues Download all data in parallel while fast & regular syncing Sep 7, 2018
@carver carver changed the title Download all data in parallel while fast & regular syncing Download all data in queues (instead of batches) while fast & regular syncing Sep 7, 2018
@carver carver requested a review from gsalgado September 7, 2018 23:51
Copy link
Member

@pipermerriam pipermerriam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving this approval as I think the approach is generally sound and rather than futz with it in theory I'd rather get it merged so we can run it and shake out whatever problems may be hiding that way.

trinity/sync/full/chain.py Show resolved Hide resolved
self._block_body_tasks = TaskQueue(MAX_BODIES_FETCH * 4, attrgetter('block_number'))

# TODO move this to BaseService if it gets broader usage and/or stays stable for long enough
def delayed_run_func(self, func: Callable[[], None], delay: float) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it should be using loop.call_later() or call_at

headers = tuple(concat(all_missing_headers))
self._mark_body_download_complete(batch_id, completed_headers + trivial_headers)
except BaseP2PError:
self._block_body_tasks.complete(batch_id, trivial_headers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this go in a finally statement so that it doesn't have to be present in all except blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, let's see if I can work it out (right now, it's being called separately in _mark_body_download_complete())

self._mark_body_download_complete(batch_id, completed_headers + trivial_headers)
except BaseP2PError:
self._block_body_tasks.complete(batch_id, trivial_headers)
self.logger.debug("Problem downloading body from peer, dropping...", exc_info=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This blanket catch of any p2p exception stands out to me as potentially problematic. What do you think about adding an INFO level statement so that we can zero in on being more explicit about what we catch?


self.logger.debug("Got block bodies batch for %d headers", len(all_headers))
return block_bodies_by_key
def _mark_body_download_complete(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this passthrough function just for readability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an unfortunate side-effect of the two classes being built with inheritance. They have to register that the download is complete in a different OrderedTaskPreparation object, with a different prerequisite.

self._block_body_tasks.complete(batch_id, trivial_headers)
self.logger.debug("Problem downloading body from peer, dropping...", exc_info=True)
except Exception:
self._block_body_tasks.complete(batch_id, trivial_headers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could/should use self._mark_body_download_complete?

trinity/sync/full/chain.py Show resolved Hide resolved
self.run_task(self._launch_prerequisite_tasks())
self.run_task(self._assign_receipt_download_to_peers())
self.run_task(self._assign_body_download_to_peers())
self.run_task(self._persist_ready_blocks())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in theory, most of these background tasks are required for the service to be operational/functional. Using run_task allows for them to exit at which point the client will keep running but sync will stop working. Thoughts on how we can address this? Something like BaseService.run_daemon_task()?

vm_class = self.chain.get_vm_class(header)
block_class = vm_class.get_block_class()
# We don't need to use our block transactions here because persist_block() doesn't do
# anything with them as it expects them to have been persisted already.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this comment is accurate:

py-evm/eth/db/chain.py

Lines 233 to 243 in de4809f

for header in new_canonical_headers:
if header.hash == block.hash:
# Most of the time this is called to persist a block whose parent is the current
# head, so we optimize for that and read the tx hashes from the block itself. This
# is specially important during a fast sync.
tx_hashes = [tx.hash for tx in block.transactions]
else:
tx_hashes = self.get_block_transaction_hashes(header)
for index, transaction_hash in enumerate(tx_hashes):
self._add_transaction_to_canonical_chain(transaction_hash, header, index)

By not including the transactions, we are failing to populate the canonical transaction hash to block lookups right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I haven't looked deeply (it was already there), but at first glance I think you're right.

@carver carver merged commit 6432b67 into ethereum:master Sep 10, 2018
@carver carver deleted the block-data-queues branch September 10, 2018 22:41
@@ -142,6 +142,25 @@ def run_task(self, awaitable: Awaitable[Any]) -> None:
self.logger.trace("Task %s finished with no errors", awaitable)
self._tasks.add(asyncio.ensure_future(_run_task_wrapper()))

def run_daemon_task(self, awaitable: Awaitable[Any]) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at this and the run_task api and I think we should update them as follows.

def run_task(self, fn, *args):
    @functools.wraps(fn)
    async def _run_task_wrapper():
        ...
    ...

By doing it this way we'll get intelligible stacktraces when tasks are not properly cleaned up which should make finding the offending code a lot easier.

@@ -238,7 +235,10 @@ def delayed_run_func(self, func: Callable[[], None], delay: float) -> None:
# peer returned no results, wait a while before trying again
delay = self.EMPTY_PEER_RESPONSE_PENALTY
self.logger.debug("Pausing %s for %.1fs, for sending 0 block bodies", peer, delay)
self.delayed_run_func(lambda: self._body_peers.put_nowait(peer), delay)
loop = self.get_event_loop()
loop.call_later(delay, partial(self._body_peers.put_nowait, peer))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we need to wrap call_later in a BaseService.call_later which retains the return value which I believe is the only way we can issue a cancellation when triggered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants