Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions src/aleph/chains/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,14 +290,14 @@ async def fetcher(self, config: Config):
def _broadcast_content(
config, contract, web3: Web3, account, gas_price, nonce, content
):
tx = contract.functions.doEmit(content).buildTransaction(
tx = contract.functions.doEmit(content).build_transaction(
{
"chainId": config.ethereum.chain_id.value,
"gasPrice": gas_price,
"nonce": nonce,
}
)
signed_tx = account.signTransaction(tx)
signed_tx = account.sign_transaction(tx)
return web3.eth.send_raw_transaction(signed_tx.rawTransaction)

async def packer(self, config: Config):
Expand All @@ -310,7 +310,7 @@ async def packer(self, config: Config):

LOGGER.info("Ethereum Connector set up with address %s" % address)
i = 0
gas_price = web3.eth.generateGasPrice()
gas_price = web3.eth.generate_gas_price()
while True:
with self.session_factory() as session:

Expand All @@ -320,27 +320,29 @@ async def packer(self, config: Config):
) > 1000:
await asyncio.sleep(30)
continue
gas_price = web3.eth.generateGasPrice()
gas_price = web3.eth.generate_gas_price()

if i >= 100:
await asyncio.sleep(30) # wait three (!!) blocks
gas_price = web3.eth.generateGasPrice()
gas_price = web3.eth.generate_gas_price()
i = 0

if gas_price > config.ethereum.max_gas_price.value:
# gas price too high, wait a bit and retry.
await asyncio.sleep(60)
continue

nonce = web3.eth.getTransactionCount(account.address)
nonce = web3.eth.get_transaction_count(account.address)

messages = list(
get_unconfirmed_messages(
session=session, limit=10000, chain=Chain.ETH
)
)

if len(messages):
if messages:
LOGGER.info("Chain sync: %d unconfirmed messages")

# This function prepares a chain data file and makes it downloadable from the node.
content = await self.chain_data_service.get_chaindata(
session=session, messages=messages, bulk_threshold=200
Expand All @@ -358,7 +360,7 @@ async def packer(self, config: Config):
nonce,
content,
)
LOGGER.info("Broadcasted %r on %s" % (response, CHAIN_NAME))
LOGGER.info("Broadcast %r on %s" % (response, CHAIN_NAME))

await asyncio.sleep(config.ethereum.commit_delay.value)
i += 1
35 changes: 18 additions & 17 deletions src/aleph/db/accessors/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ def make_matching_messages_query(
MessageDb.content["type"].astext.in_(content_types)
)
if tags:
select_stmt = select_stmt.where(MessageDb.content["content"]["tags"].contains(tags))
select_stmt = select_stmt.where(
MessageDb.content["content"]["tags"].contains(tags)
)
if channels:
select_stmt = select_stmt.where(MessageDb.channel.in_(channels))

Expand Down Expand Up @@ -233,23 +235,22 @@ def get_unconfirmed_messages(
session: DbSession, limit: int = 100, chain: Optional[Chain] = None
) -> Iterable[MessageDb]:

where_clause = message_confirmations.c.item_hash == MessageDb.item_hash
if chain:
where_clause = where_clause & (ChainTxDb.chain == chain)

# (MessageDb.item_hash,
# MessageDb.message_type,
# MessageDb.chain,
# MessageDb.sender,
# MessageDb.signature,
# MessageDb.item_type,
# MessageDb.item_content,
# # TODO: exclude content field
# MessageDb.content,
# MessageDb.time,
# MessageDb.channel,)
if chain is None:
select_message_confirmations = select(message_confirmations.c.item_hash).where(
message_confirmations.c.item_hash == MessageDb.item_hash
)
else:
select_message_confirmations = (
select(message_confirmations.c.item_hash)
.join(ChainTxDb, message_confirmations.c.tx_hash == ChainTxDb.hash)
.where(
(message_confirmations.c.item_hash == MessageDb.item_hash)
& (ChainTxDb.chain == chain)
)
)

select_stmt = select(MessageDb).where(
~select(message_confirmations.c.item_hash).where(where_clause).exists()
~select_message_confirmations.exists()
)

return (session.execute(select_stmt.limit(limit))).scalars()
Expand Down