From 29f19e1e7bf12c21e1d36d77778a5ff3de8784ca Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 25 Apr 2023 17:15:54 +0200 Subject: [PATCH] Fix: make Ethereum sync compatible with web3 6.0 Problem: camel case functions were removed from the web3 library. Solution: use the snake case variants. In addition, fixed the query to get unconfirmed messages that was missing a join when the chain is specified. --- src/aleph/chains/ethereum.py | 18 ++++++++------- src/aleph/db/accessors/messages.py | 35 +++++++++++++++--------------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/aleph/chains/ethereum.py b/src/aleph/chains/ethereum.py index db135e716..6f8b8a10d 100644 --- a/src/aleph/chains/ethereum.py +++ b/src/aleph/chains/ethereum.py @@ -290,14 +290,14 @@ async def fetcher(self, config: Config): def _broadcast_content( config, contract, web3: Web3, account, gas_price, nonce, content ): - tx = contract.functions.doEmit(content).buildTransaction( + tx = contract.functions.doEmit(content).build_transaction( { "chainId": config.ethereum.chain_id.value, "gasPrice": gas_price, "nonce": nonce, } ) - signed_tx = account.signTransaction(tx) + signed_tx = account.sign_transaction(tx) return web3.eth.send_raw_transaction(signed_tx.rawTransaction) async def packer(self, config: Config): @@ -310,7 +310,7 @@ async def packer(self, config: Config): LOGGER.info("Ethereum Connector set up with address %s" % address) i = 0 - gas_price = web3.eth.generateGasPrice() + gas_price = web3.eth.generate_gas_price() while True: with self.session_factory() as session: @@ -320,11 +320,11 @@ async def packer(self, config: Config): ) > 1000: await asyncio.sleep(30) continue - gas_price = web3.eth.generateGasPrice() + gas_price = web3.eth.generate_gas_price() if i >= 100: await asyncio.sleep(30) # wait three (!!) blocks - gas_price = web3.eth.generateGasPrice() + gas_price = web3.eth.generate_gas_price() i = 0 if gas_price > config.ethereum.max_gas_price.value: @@ -332,7 +332,7 @@ async def packer(self, config: Config): await asyncio.sleep(60) continue - nonce = web3.eth.getTransactionCount(account.address) + nonce = web3.eth.get_transaction_count(account.address) messages = list( get_unconfirmed_messages( @@ -340,7 +340,9 @@ async def packer(self, config: Config): ) ) - if len(messages): + if messages: + LOGGER.info("Chain sync: %d unconfirmed messages") + # This function prepares a chain data file and makes it downloadable from the node. content = await self.chain_data_service.get_chaindata( session=session, messages=messages, bulk_threshold=200 @@ -358,7 +360,7 @@ async def packer(self, config: Config): nonce, content, ) - LOGGER.info("Broadcasted %r on %s" % (response, CHAIN_NAME)) + LOGGER.info("Broadcast %r on %s" % (response, CHAIN_NAME)) await asyncio.sleep(config.ethereum.commit_delay.value) i += 1 diff --git a/src/aleph/db/accessors/messages.py b/src/aleph/db/accessors/messages.py index 0350591ad..b474cb0b1 100644 --- a/src/aleph/db/accessors/messages.py +++ b/src/aleph/db/accessors/messages.py @@ -104,7 +104,9 @@ def make_matching_messages_query( MessageDb.content["type"].astext.in_(content_types) ) if tags: - select_stmt = select_stmt.where(MessageDb.content["content"]["tags"].contains(tags)) + select_stmt = select_stmt.where( + MessageDb.content["content"]["tags"].contains(tags) + ) if channels: select_stmt = select_stmt.where(MessageDb.channel.in_(channels)) @@ -233,23 +235,22 @@ def get_unconfirmed_messages( session: DbSession, limit: int = 100, chain: Optional[Chain] = None ) -> Iterable[MessageDb]: - where_clause = message_confirmations.c.item_hash == MessageDb.item_hash - if chain: - where_clause = where_clause & (ChainTxDb.chain == chain) - - # (MessageDb.item_hash, - # MessageDb.message_type, - # MessageDb.chain, - # MessageDb.sender, - # MessageDb.signature, - # MessageDb.item_type, - # MessageDb.item_content, - # # TODO: exclude content field - # MessageDb.content, - # MessageDb.time, - # MessageDb.channel,) + if chain is None: + select_message_confirmations = select(message_confirmations.c.item_hash).where( + message_confirmations.c.item_hash == MessageDb.item_hash + ) + else: + select_message_confirmations = ( + select(message_confirmations.c.item_hash) + .join(ChainTxDb, message_confirmations.c.tx_hash == ChainTxDb.hash) + .where( + (message_confirmations.c.item_hash == MessageDb.item_hash) + & (ChainTxDb.chain == chain) + ) + ) + select_stmt = select(MessageDb).where( - ~select(message_confirmations.c.item_hash).where(where_clause).exists() + ~select_message_confirmations.exists() ) return (session.execute(select_stmt.limit(limit))).scalars()