From 0e9fb809decc5c1e889e32be48007d8f4a8debde Mon Sep 17 00:00:00 2001 From: Jason Carver Date: Tue, 3 Dec 2019 16:33:58 -0800 Subject: [PATCH] Cleaner TxPool shutdown --- trinity/components/builtin/tx_pool/pool.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/trinity/components/builtin/tx_pool/pool.py b/trinity/components/builtin/tx_pool/pool.py index e890a9cd0f..0d5b7a06ce 100644 --- a/trinity/components/builtin/tx_pool/pool.py +++ b/trinity/components/builtin/tx_pool/pool.py @@ -118,7 +118,8 @@ async def _process_transactions(self) -> None: buffer: List[SignedTransactionAPI] = [] # wait for there to be items available on the queue. - buffer.extend(await self._internal_queue.get()) + transactions = await self.wait(self._internal_queue.get()) + buffer.extend(transactions) # continue to pull items from the queue synchronously until the # queue is either empty or we hit a sufficient size to justify @@ -131,7 +132,8 @@ async def _process_transactions(self) -> None: # Now that the queue is either empty or we have an adequate number # to send to our peers, broadcast them to the appropriate peers. for batch in partition_all(BATCH_HIGH_WATER, buffer): - for receiving_peer in await self._peer_pool.get_peers(): + peers = await self.wait(self._peer_pool.get_peers()) + for receiving_peer in peers: filtered_tx = self._filter_tx_for_peer(receiving_peer, batch) if len(filtered_tx) == 0: self.logger.debug2(