Skip to content

Commit

Permalink
Minor refactoring to the main server concept and reorging.
Browse files Browse the repository at this point in the history
  • Loading branch information
rt121212121 committed Apr 11, 2022
1 parent 45c0bd0 commit cb8b067
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 145 deletions.
6 changes: 6 additions & 0 deletions electrumsv/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,12 @@ class NetworkServerType(IntEnum):
GENERAL = 2


class ServerSwitchReason(IntEnum):
'''The reason the main server was changed.'''
NONE = 0
INITIALISATION = 1


API_SERVER_TYPES = { NetworkServerType.MERCHANT_API, NetworkServerType.GENERAL }


Expand Down
4 changes: 2 additions & 2 deletions electrumsv/gui/qt/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,8 +1185,8 @@ def _update_network_status(self) -> None:
text = _("Synchronizing...")
text += f' {response_count:,d}/{request_count:,d}'
else:
if self._wallet.main_server is not None:
server_chain_tip = self._wallet.main_server.tip_header
if self._wallet.indexing_server_state is not None:
server_chain_tip = self._wallet.indexing_server_state.tip_header
server_height = server_chain_tip.height if server_chain_tip else 0
server_lag = self.network.get_local_height() - server_height
if server_height == 0:
Expand Down
64 changes: 29 additions & 35 deletions electrumsv/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
from aiohttp import ClientSession
import bitcoinx
from bitcoinx import Chain, double_sha256, hex_str_to_hash, Header, Headers, MissingHeader
from collections import defaultdict
import dataclasses
import concurrent.futures
import time
from enum import IntEnum
from io import BytesIO
from typing import Any, cast, Iterable, Optional, TYPE_CHECKING

Expand Down Expand Up @@ -58,13 +56,6 @@
MAX_CONCEIVABLE_REORG_DEPTH = 500


class SwitchReason(IntEnum):
'''The reason the main server was changed.'''
disconnected = 0
lagging = 1
user_set = 2


def future_callback(future: concurrent.futures.Future[None]) -> None:
if future.cancelled():
return
Expand Down Expand Up @@ -92,8 +83,6 @@ def __init__(self) -> None:
# Events
self.new_server_connection_event = app_state.async_.event()
self._shutdown_complete_event = app_state.async_.event()
self.servers_synced_events: defaultdict[ServerAccountKey, asyncio.Event] = \
defaultdict(app_state.async_.event)

# Add an wallet, remove an wallet, or redo all wallet verifications
self._wallets: set[Wallet] = set()
Expand Down Expand Up @@ -146,7 +135,7 @@ async def _find_any_common_header_async(self, server_state: HeaderServerState,
fork.
If two ESV Reference Servers are on the same chain then this function will only
be called once per distinct chain (see `_synchronize_initial_headers_async`)"""
be called once per distinct chain (see `_synchronise_headers_for_server_tip`)"""
# start with step = 16 to cut down on network round trips for the first initial header sync
step = 16
height_to_test = server_tip.height
Expand All @@ -161,9 +150,12 @@ async def _find_any_common_header_async(self, server_state: HeaderServerState,
height_to_test -= step
step = step * 4 # keep doubling the interval until we hit a common header

async def _synchronize_initial_headers_async(self, server_state: HeaderServerState) \
async def _synchronise_headers_for_server_tip(self, server_state: HeaderServerState) \
-> tuple[Header, Chain]:
"""
Identify the chain tip on the remote server, synchronise to that tip and then return
to the caller.
NOTE: requesting batched headers by height works because the headers at these heights are
on the longest chain **for that instance of the ElectrumSV-Reference-Server**
(emphasis added). For example there could be two persisting forks but requesting batched
Expand All @@ -180,9 +172,9 @@ async def _synchronize_initial_headers_async(self, server_state: HeaderServerSta
tip_header.height + 1)]
await self._request_and_connect_headers_at_heights_async(server_state, heights)

server_tip = tip_header
header, server_chain = cast(Headers, app_state.headers).lookup(server_tip.hash)
return server_tip, server_chain
server_tip_header = tip_header
header, server_chain = cast(Headers, app_state.headers).lookup(server_tip_header.hash)
return server_tip_header, server_chain

async def _connect_tip_and_maybe_backfill(self, server_state: HeaderServerState,
new_tip: TipResponse) -> None:
Expand Down Expand Up @@ -213,29 +205,28 @@ async def _monitor_chain_tip_task_async(self, server_key: ServerAccountKey) \
will affect wallet state).
raises `ServiceUnavailableError` via:
- `main_server._synchronize_initial_headers_async` or
- `main_server._synchronise_headers_for_server_tip` or
- `main_server.subscribe_to_headers` or
- `self._connect_tip_and_maybe_backfill`
"""
# Already obsolete.
if server_key not in self.connected_header_server_states:
return

# Will not proceed past this point until we have all the headers up to and including
# the servers tip header.
server_state = self.connected_header_server_states[server_key]
server_tip_header, server_chain = \
await self._synchronise_headers_for_server_tip(server_state)

# Will not proceed past this point until initial headers sync completes
server_tip, server_chain = \
await self._synchronize_initial_headers_async(server_state)

server_state.tip_header = server_tip
server_state.tip_header = server_tip_header
server_state.chain = server_chain

for wallet in self._wallets:
if wallet.main_server is not None and \
wallet.main_server.server_key.url == server_state.server_key.url:
if wallet.indexing_server_state is not None and \
wallet.indexing_server_state.server_key == server_state.server_key:
wallet.update_main_server_tip_and_chain(server_state.tip_header, server_state.chain)

self.servers_synced_events[server_key].set()

server_metadata = self._server_connectivity_metadata[server_key]
server_metadata.last_try = time.time()

Expand All @@ -261,8 +252,7 @@ async def _monitor_chain_tip_task_async(self, server_key: ServerAccountKey) \
server_state.chain = server_chain

for wallet in self._wallets:
if wallet.main_server is not None and \
wallet.main_server.server_key.url == server_state.server_key.url:
if wallet.indexing_server_state is server_state:
await wallet.reorg_check_main_chain(server_chain_before, server_chain)
wallet.update_main_server_tip_and_chain(server_state.tip_header,
server_state.chain)
Expand Down Expand Up @@ -295,6 +285,7 @@ def register_wallet_server(self, server_key: ServerAccountKey) -> None:
A wallet is notifying the network of header-capable servers that they know of. There may
be some overlap, but this is okay. The networking logic filters out known servers.
"""
logger.debug("Queueing wallet header server %s", server_key)
self._new_server_queue.put_nowait(server_key)

async def _main_loop_async(self, context: MainLoopContext) -> None:
Expand All @@ -317,6 +308,7 @@ async def _main_loop_async(self, context: MainLoopContext) -> None:
ideal_url = url.strip().lower()
assert url == ideal_url, \
f"Skipped bad server with strange url '{url}' != '{ideal_url}'"
assert url.endswith("/"), f"All server urls must have trailing slash '{url}'"

server_key = ServerAccountKey(url, server_type, None)
for capability_name in hardcoded_server_config.get("capabilities", []):
Expand All @@ -325,6 +317,7 @@ async def _main_loop_async(self, context: MainLoopContext) -> None:
logger.error("Server '%s' has invalid capability '%s'", url,
capability_name)
elif capability_value == ServerCapability.HEADERS:
logger.debug("Queuing initial header server %s", server_key)
self._new_server_queue.put_nowait(server_key)

while self._main_loop_context is context:
Expand All @@ -345,10 +338,12 @@ async def _main_loop_async(self, context: MainLoopContext) -> None:
future.cancel()
self._shutdown_complete_event.set()

def get_header_server_state(self, server_key: ServerAccountKey) -> HeaderServerState:
return self.connected_header_server_states[server_key]

def is_header_server_ready(self, server_key: ServerAccountKey) -> bool:
if self.servers_synced_events[server_key].is_set():
return self.connected_header_server_states.get(server_key) is not None
return False
server_state = self.connected_header_server_states.get(server_key)
return server_state is not None and server_state.connection_event.is_set()

async def wait_until_header_server_is_ready_async(self, server_key: ServerAccountKey) -> None:
"""
Expand Down Expand Up @@ -404,7 +399,7 @@ async def shutdown_wait(self) -> None:
logger.warning('stopped')

def is_connected(self) -> bool:
return all([wallet.main_server is not None for wallet in self._wallets])
return all([wallet.indexing_server_state is not None for wallet in self._wallets])

# def is_server_disabled(self, url: str, server_type: NetworkServerType) -> bool:
# """
Expand Down Expand Up @@ -440,7 +435,6 @@ async def _request_and_connect_headers_at_heights_async(self, server_state: Head
logger.debug("Fetching %s headers from start height: %s", count, min_height)
header_array = await get_batched_headers_by_height_async(server_state,
self.aiohttp_session, min_height, count)
print(header_array)
stream = BytesIO(header_array)

count_of_raw_headers = len(header_array) // 80
Expand All @@ -463,9 +457,9 @@ def auto_connect(self) -> bool:

def status(self) -> dict[str, Any]:
return {
# 'server': str(self.main_server.base_url),
# 'server': str(self.indexing_server_state.base_url),
'blockchain_height': self.get_local_height(),
# 'server_height': self.main_server.tip.height,
# 'server_height': self.indexing_server_state.tip_header.height,
'spv_nodes': len(self._known_header_server_keys),
'connected': self.is_connected(),
'auto_connect': self.auto_connect(),
Expand Down
2 changes: 1 addition & 1 deletion electrumsv/tests/test_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ async def test_reorg(mock_app_state, tmp_storage) -> None:

wallet = Wallet(tmp_storage)
wallet.try_get_mapi_proofs = try_get_mapi_proofs_mock
wallet.main_server = MockHeadersClient()
wallet.indexing_server_state = MockHeadersClient()
masterkey_row = wallet.create_masterkey_from_keystore(child_keystore)

raw_account_row = AccountRow(-1, masterkey_row.masterkey_id, ScriptType.P2PKH, '...',
Expand Down

0 comments on commit cb8b067

Please sign in to comment.