Skip to content

Commit

Permalink
Fix use of asyncio events. Minor mypy typing package additions. CI is…
Browse files Browse the repository at this point in the history
… currently broken as the developer of pysqlite3-binary deleted the 0.4.7 releases and seems to have replaced them with 0.4.7.post3 which breaks our requirements install.
  • Loading branch information
rt121212121 committed Apr 11, 2022
1 parent cb8b067 commit 86ffa66
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .azure-pipelines/run_mypy_static_analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ steps:
architecture: x64
- script: |
python3 -m pip install mypy
python3 -m pip install types-certifi types-pkg_resources types-python-dateutil types-requests
python3 -m pip install types-certifi types-pkg_resources types-python-dateutil types-requests types-attrs
# These PyQt5 stubs are not updated often enough. Even the versions at the tip at the time of
# this commit are not good enough and have gaps.
pip install git+https://github.com/python-qt-tools/PyQt5-stubs.git@166af25fbe0886f95ef0b1a1b57bbdc893e9144d
Expand Down
2 changes: 1 addition & 1 deletion contrib/scripts/run_mypy.bat
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ SET TLD=%ScriptDir%\..\..
echo %ScriptDir%
cd %ScriptDir%

py -m pip install types-certifi types-pkg_resources types-python-dateutil types-requests
py -m pip install types-certifi types-pkg_resources types-python-dateutil types-requests types-attrs
py -m pip install git+https://github.com/python-qt-tools/PyQt5-stubs.git@166af25fbe0886f95ef0b1a1b57bbdc893e9144d
py -m mypy --install-types --non-interactive
py -m mypy --config=%TLD%\mypy.ini %TLD%\electrumsv --python-version 3.10
7 changes: 7 additions & 0 deletions electrumsv/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(self) -> None:
app_state.read_headers()

# Events
self.new_server_ready_event = app_state.async_.event()
self.new_server_connection_event = app_state.async_.event()
self._shutdown_complete_event = app_state.async_.event()

Expand Down Expand Up @@ -234,6 +235,9 @@ async def _monitor_chain_tip_task_async(self, server_key: ServerAccountKey) \
# be indexing server capable).
server_state.connection_event.set()

self.new_server_ready_event.set()
self.new_server_ready_event.clear()

# TODO(1.4.0) Servers. This establishes a header-only websocket to the server, however if
# this is used as an indexing server it will also have a general account websocket to
# the server which also sends header events. We should modify this to receive headers
Expand Down Expand Up @@ -384,6 +388,9 @@ async def _maintain_connection(self, context: MainLoopContext, server_key: Serve
except ServiceUnavailableError:
logger.error("Server unavailable: %s", server_key)
# TODO(1.4.0) Servers. Connection retrying should have some logic to it.
# - Ideally we would try with a backing off delay, eventually where that
# delay becomes very large. The user might be able to go into the UI
# and force a reconnection.
await asyncio.sleep(20)
finally:
del self.connected_header_server_states[server_key]
Expand Down
1 change: 1 addition & 0 deletions electrumsv/network_support/general_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ async def process_incoming_peer_channel_messages_async(state: ServerConnectionSt
logger.debug("Writing %d pushdata matches to the database", len(rows))
await state.wallet_data.create_pushdata_matches_async(rows)
state.tip_filter_new_matches_event.set()
state.tip_filter_new_matches_event.clear()
elif purpose == ServerPeerChannelFlag.MAPI_BROADCAST_CALLBACK:
if not isinstance(message["payload"], dict):
# TODO(1.4.0) Servers. Unreliable server (peer channel message) show user.
Expand Down
23 changes: 10 additions & 13 deletions electrumsv/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2855,6 +2855,7 @@ async def obtain_transactions_async(self, account_id: int, keys: List[Tuple[byte
# already missing transactions (the `TransactionImportFlag.PROMPTED` check).
if len(missing_tx_hashes) or import_flags & TransactionImportFlag.PROMPTED:
self._check_missing_transactions_event.set()
self._check_missing_transactions_event.clear()
return missing_tx_hashes

async def _obtain_transactions_worker_async(self) -> None:
Expand Down Expand Up @@ -2939,7 +2940,6 @@ async def _obtain_transactions_worker_async(self) -> None:
# To get here there must not have been any further missing transactions.
self._logger.debug("Waiting for more missing transactions")
await self._check_missing_transactions_event.wait()
self._check_missing_transactions_event.clear()

async def _obtain_merkle_proofs_worker_async(self) -> None:
"""
Expand Down Expand Up @@ -3026,7 +3026,6 @@ async def _obtain_merkle_proofs_worker_async(self) -> None:
# To get here there must not have been any further missing transactions.
self._logger.debug("Waiting for more missing merkle proofs")
await self._check_missing_proofs_event.wait()
self._check_missing_proofs_event.clear()

async def fetch_raw_transaction_async(self, tx_hash: bytes, account: AbstractAccount) -> bytes:
"""Selects a suitable server and requests the raw transaction.
Expand Down Expand Up @@ -3960,10 +3959,6 @@ async def _manage_server_connections_async(self) -> None:
self._logger.debug("Picking an indexing server, candidates: %s",
indexing_server_candidates)
while True:
# TODO(1.4.0) Servers. Picking an indexing server should possibly wait for
# servers being synced to sync if they are doing so in a timely
# fashion. This would give more chance of options, and not just
# whatever synced..
server_candidates = list[tuple[ServerAccountKey, NewServer]]()
for server_key, server in indexing_server_candidates:
if self._network.is_header_server_ready(server_key):
Expand All @@ -3973,7 +3968,7 @@ async def _manage_server_connections_async(self) -> None:
break
self._logger.debug("Waiting for valid indexing server, candidates: %s",
indexing_server_candidates)
await self._network.new_server_connection_event.wait()
await self._network.new_server_ready_event.wait()

chosen_servers.append((server, { ServerCapability.TIP_FILTER }))
new_indexing_server_id = server.server_id
Expand All @@ -3989,6 +3984,7 @@ async def _manage_server_connections_async(self) -> None:
f"id={account_row.indexer_server_id}")

indexing_server_key = ServerAccountKey(server.url, server.server_type, None)
# We may already know the server should be ready from server selection.
await self._network.wait_until_header_server_is_ready_async(indexing_server_key)
await self._set_indexing_server(indexing_server_key,
ServerSwitchReason.INITIALISATION)
Expand Down Expand Up @@ -4115,12 +4111,7 @@ async def _consume_tip_filter_matches_async(self, state: ServerConnectionState)
"""
Process tip filter matches received from a server.
"""
# Initial check for processable rows.
state.tip_filter_new_matches_event.set()
while True:
await state.tip_filter_new_matches_event.wait()
state.tip_filter_new_matches_event.clear()

rows_by_account_id = dict[int, list[PushDataMatchMetadataRow]]()
metadata_rows = self.data.read_pushdata_match_metadata()
for metadata_row in metadata_rows:
Expand All @@ -4140,6 +4131,8 @@ async def _consume_tip_filter_matches_async(self, state: ServerConnectionState)
len(obtain_transaction_keys), account_id, obtain_transaction_keys)
await self.obtain_transactions_async(account_id, obtain_transaction_keys)

await state.tip_filter_new_matches_event.wait()

def _register_spent_outputs_to_monitor(self, spent_outpoints: list[Outpoint]) -> None:
"""
Call this to start monitoring outpoints when the wallet needs to know if they are mined.
Expand Down Expand Up @@ -4253,6 +4246,8 @@ async def _process_received_spent_output_mined_event(self, spending_tx_hash: byt
spending_tx_hash, block_hash, None, None, TxFlags.STATE_CLEARED)

self._check_missing_proofs_event.set()
self._check_missing_proofs_event.clear()

# TODO(1.4.0) Technical debt. This is a thing we did to ensure the UI was up to date
# when the script hash stuff was the way we did it, we should probably still do it.
# NOTE 1, 1 are just placeholder arguments to save having to change the ui callback signal
Expand Down Expand Up @@ -4443,9 +4438,11 @@ async def _set_indexing_server(self, server_key: ServerAccountKey,
assert self.indexing_server_state.chain is not None
await self.reorg_check_main_chain(server_chain_before, self.indexing_server_state.chain)

self._indexing_server_ready_event.set()
self._indexing_server_ready_event.clear()

self._network.trigger_callback(NetworkEventNames.GENERIC_STATUS)

self._indexing_server_ready_event.set()

# TODO(1.4.0) Servers. This is no longer used. We will not be switching filtering or peer
# channel servers unless the user manually makes it happen. This needs to be factored
Expand Down

0 comments on commit 86ffa66

Please sign in to comment.