forked from Chia-Network/chia-blockchain
/
wallet_node.py
1705 lines (1510 loc) · 81.1 KB
/
wallet_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import annotations
import asyncio
import contextlib
import dataclasses
import logging
import multiprocessing
import random
import sys
import time
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Any, AsyncIterator, ClassVar, Dict, List, Optional, Set, Tuple, Union, cast
import aiosqlite
from chik_rs import AugSchemeMPL, G1Element, G2Element, PrivateKey
from packaging.version import Version
from chik.consensus.blockchain import AddBlockResult
from chik.consensus.constants import ConsensusConstants
from chik.daemon.keychain_proxy import KeychainProxy, connect_to_keychain_and_validate, wrap_local_keychain
from chik.full_node.full_node_api import FullNodeAPI
from chik.protocols.full_node_protocol import RequestProofOfWeight, RespondProofOfWeight
from chik.protocols.protocol_message_types import ProtocolMessageTypes
from chik.protocols.wallet_protocol import (
CoinState,
CoinStateUpdate,
NewPeakWallet,
RegisterForCoinUpdates,
RequestBlockHeader,
RequestChildren,
RespondBlockHeader,
RespondChildren,
RespondToCoinUpdates,
SendTransaction,
)
from chik.rpc.rpc_server import StateChangedProtocol, default_get_connections
from chik.server.node_discovery import WalletPeers
from chik.server.outbound_message import Message, NodeType, make_msg
from chik.server.server import ChikServer
from chik.server.ws_connection import WSChikConnection
from chik.types.blockchain_format.coin import Coin
from chik.types.blockchain_format.sized_bytes import bytes32
from chik.types.header_block import HeaderBlock
from chik.types.mempool_inclusion_status import MempoolInclusionStatus
from chik.types.spend_bundle import SpendBundle
from chik.types.weight_proof import WeightProof
from chik.util.config import lock_and_load_config, process_config_start_method, save_config
from chik.util.db_wrapper import manage_connection
from chik.util.errors import KeychainIsEmpty, KeychainIsLocked, KeychainKeyNotFound, KeychainProxyConnectionFailure
from chik.util.hash import std_hash
from chik.util.ints import uint16, uint32, uint64, uint128
from chik.util.keychain import Keychain
from chik.util.misc import to_batches
from chik.util.path import path_from_root
from chik.util.profiler import mem_profile_task, profile_task
from chik.util.streamable import Streamable, streamable
from chik.wallet.puzzles.clawback.metadata import AutoClaimSettings
from chik.wallet.transaction_record import TransactionRecord
from chik.wallet.util.new_peak_queue import NewPeakItem, NewPeakQueue, NewPeakQueueTypes
from chik.wallet.util.peer_request_cache import PeerRequestCache, can_use_peer_request_cache
from chik.wallet.util.wallet_sync_utils import (
PeerRequestException,
fetch_header_blocks_in_range,
request_and_validate_additions,
request_and_validate_removals,
request_header_blocks,
sort_coin_states,
subscribe_to_coin_updates,
subscribe_to_phs,
)
from chik.wallet.util.wallet_types import CoinType, WalletType
from chik.wallet.wallet_state_manager import WalletStateManager
from chik.wallet.wallet_weight_proof_handler import WalletWeightProofHandler, get_wp_fork_point
def get_wallet_db_path(root_path: Path, config: Dict[str, Any], key_fingerprint: str) -> Path:
"""
Construct a path to the wallet db. Uses config values and the wallet key's fingerprint to
determine the wallet db filename.
"""
db_path_replaced: str = (
config["database_path"].replace("CHALLENGE", config["selected_network"]).replace("KEY", key_fingerprint)
)
# "v2_r1" is the current wallet db version identifier
if "v2_r1" not in db_path_replaced:
db_path_replaced = db_path_replaced.replace("v2", "v2_r1").replace("v1", "v2_r1")
path: Path = path_from_root(root_path, db_path_replaced)
return path
@streamable
@dataclasses.dataclass(frozen=True)
class Balance(Streamable):
confirmed_wallet_balance: uint128 = uint128(0)
unconfirmed_wallet_balance: uint128 = uint128(0)
spendable_balance: uint128 = uint128(0)
pending_change: uint64 = uint64(0)
max_send_amount: uint128 = uint128(0)
unspent_coin_count: uint32 = uint32(0)
pending_coin_removal_count: uint32 = uint32(0)
@dataclasses.dataclass
class WalletNode:
if TYPE_CHECKING:
from chik.rpc.rpc_server import RpcServiceProtocol
_protocol_check: ClassVar[RpcServiceProtocol] = cast("WalletNode", None)
config: Dict[str, Any]
root_path: Path
constants: ConsensusConstants
local_keychain: Optional[Keychain] = None
log: logging.Logger = logging.getLogger(__name__)
# Sync data
state_changed_callback: Optional[StateChangedProtocol] = None
_wallet_state_manager: Optional[WalletStateManager] = None
_weight_proof_handler: Optional[WalletWeightProofHandler] = None
_server: Optional[ChikServer] = None
sync_task: Optional[asyncio.Task[None]] = None
logged_in_fingerprint: Optional[int] = None
logged_in: bool = False
_keychain_proxy: Optional[KeychainProxy] = None
_balance_cache: Dict[int, Balance] = dataclasses.field(default_factory=dict)
# Peers that we have long synced to
synced_peers: Set[bytes32] = dataclasses.field(default_factory=set)
wallet_peers: Optional[WalletPeers] = None
peer_caches: Dict[bytes32, PeerRequestCache] = dataclasses.field(default_factory=dict)
validation_semaphore: Optional[asyncio.Semaphore] = None
local_node_synced: bool = False
LONG_SYNC_THRESHOLD: int = 300
last_wallet_tx_resend_time: int = 0
# Duration in seconds
coin_state_retry_seconds: int = 10
wallet_tx_resend_timeout_secs: int = 1800
_new_peak_queue: Optional[NewPeakQueue] = None
_shut_down: bool = False
_process_new_subscriptions_task: Optional[asyncio.Task[None]] = None
_retry_failed_states_task: Optional[asyncio.Task[None]] = None
_secondary_peer_sync_task: Optional[asyncio.Task[None]] = None
_tx_messages_in_progress: Dict[bytes32, List[bytes32]] = dataclasses.field(default_factory=dict)
@contextlib.asynccontextmanager
async def manage(self) -> AsyncIterator[None]:
await self._start()
try:
yield
finally:
self._close()
await self._await_closed()
@property
def keychain_proxy(self) -> KeychainProxy:
# This is a stop gap until the class usage is refactored such the values of
# integral attributes are known at creation of the instance.
if self._keychain_proxy is None:
raise RuntimeError("keychain proxy not assigned")
return self._keychain_proxy
@property
def wallet_state_manager(self) -> WalletStateManager:
# This is a stop gap until the class usage is refactored such the values of
# integral attributes are known at creation of the instance.
if self._wallet_state_manager is None:
raise RuntimeError("wallet state manager not assigned")
return self._wallet_state_manager
@property
def server(self) -> ChikServer:
# This is a stop gap until the class usage is refactored such the values of
# integral attributes are known at creation of the instance.
if self._server is None:
raise RuntimeError("server not assigned")
return self._server
@property
def new_peak_queue(self) -> NewPeakQueue:
# This is a stop gap until the class usage is refactored such the values of
# integral attributes are known at creation of the instance.
if self._new_peak_queue is None:
raise RuntimeError("new peak queue not assigned")
return self._new_peak_queue
def get_connections(self, request_node_type: Optional[NodeType]) -> List[Dict[str, Any]]:
return default_get_connections(server=self.server, request_node_type=request_node_type)
async def ensure_keychain_proxy(self) -> KeychainProxy:
if self._keychain_proxy is None:
if self.local_keychain:
self._keychain_proxy = wrap_local_keychain(self.local_keychain, log=self.log)
else:
self._keychain_proxy = await connect_to_keychain_and_validate(self.root_path, self.log)
if not self._keychain_proxy:
raise KeychainProxyConnectionFailure()
return self._keychain_proxy
def get_cache_for_peer(self, peer: WSChikConnection) -> PeerRequestCache:
if peer.peer_node_id not in self.peer_caches:
self.peer_caches[peer.peer_node_id] = PeerRequestCache()
return self.peer_caches[peer.peer_node_id]
def rollback_request_caches(self, reorg_height: int) -> None:
# Everything after reorg_height should be removed from the cache
for cache in self.peer_caches.values():
cache.clear_after_height(reorg_height)
async def get_key_for_fingerprint(self, fingerprint: Optional[int]) -> Optional[PrivateKey]:
try:
keychain_proxy = await self.ensure_keychain_proxy()
# Returns first private key if fingerprint is None
key = await keychain_proxy.get_key_for_fingerprint(fingerprint)
except KeychainIsEmpty:
self.log.warning("No keys present. Create keys with the UI, or with the 'chik keys' program.")
return None
except KeychainKeyNotFound:
self.log.warning(f"Key not found for fingerprint {fingerprint}")
return None
except KeychainIsLocked:
self.log.warning("Keyring is locked")
return None
except KeychainProxyConnectionFailure as e:
tb = traceback.format_exc()
self.log.error(f"Missing keychain_proxy: {e} {tb}")
raise # Re-raise so that the caller can decide whether to continue or abort
return key
async def get_private_key(self, fingerprint: Optional[int]) -> Optional[PrivateKey]:
"""
Attempt to get the private key for the given fingerprint. If the fingerprint is None,
get_key_for_fingerprint() will return the first private key. Similarly, if a key isn't
returned for the provided fingerprint, the first key will be returned.
"""
key: Optional[PrivateKey] = await self.get_key_for_fingerprint(fingerprint)
if key is None and fingerprint is not None:
key = await self.get_key_for_fingerprint(None)
if key is not None:
self.log.info(f"Using first key found (fingerprint: {key.get_g1().get_fingerprint()})")
return key
def set_resync_on_startup(self, fingerprint: int, enabled: bool = True) -> None:
with lock_and_load_config(self.root_path, "config.yaml") as config:
if enabled is True:
config["wallet"]["reset_sync_for_fingerprint"] = fingerprint
self.log.info("Enabled resync for wallet fingerprint: %s", fingerprint)
else:
self.log.debug(
"Trying to disable resync: %s [%s]", fingerprint, config["wallet"].get("reset_sync_for_fingerprint")
)
if config["wallet"].get("reset_sync_for_fingerprint") == fingerprint:
del config["wallet"]["reset_sync_for_fingerprint"]
self.log.info("Disabled resync for wallet fingerprint: %s", fingerprint)
save_config(self.root_path, "config.yaml", config)
def set_auto_claim(self, auto_claim_config: AutoClaimSettings) -> Dict[str, Any]:
if auto_claim_config.batch_size < 1:
auto_claim_config = dataclasses.replace(auto_claim_config, batch_size=uint16(50))
auto_claim_config_json = auto_claim_config.to_json_dict()
if "auto_claim" not in self.config or self.config["auto_claim"] != auto_claim_config_json:
# Update in memory config
self.config["auto_claim"] = auto_claim_config_json
# Update config file
with lock_and_load_config(self.root_path, "config.yaml") as config:
config["wallet"]["auto_claim"] = self.config["auto_claim"]
save_config(self.root_path, "config.yaml", config)
return auto_claim_config.to_json_dict()
async def reset_sync_db(self, db_path: Union[Path, str], fingerprint: int) -> bool:
conn: aiosqlite.Connection
# are not part of core wallet tables, but might appear later
ignore_tables = {"lineage_proofs_", "sqlite_", "MIGRATED_VALID_TIMES_TXS", "MIGRATED_VALID_TIMES_TRADES"}
required_tables = [
"coin_record",
"transaction_record",
"derivation_paths",
"users_wallets",
"users_nfts",
"action_queue",
"all_notification_ids",
"key_val_store",
"trade_records",
"trade_record_times",
"tx_times",
"pool_state_transitions",
"singleton_records",
"mirrors",
"launchers",
"interested_coins",
"interested_puzzle_hashes",
"unacknowledged_asset_tokens",
"coin_of_interest_to_trade_record",
"notifications",
"retry_store",
"unacknowledged_asset_token_states",
"vc_records",
"vc_proofs",
]
async with manage_connection(db_path) as conn:
self.log.info("Resetting wallet sync data...")
rows = list(await conn.execute_fetchall("SELECT name FROM sqlite_master WHERE type='table'"))
names = {x[0] for x in rows}
names = names - set(required_tables)
for name in names:
for ignore_name in ignore_tables:
if name.startswith(ignore_name):
break
else:
self.log.error(
f"Mismatch in expected schema to reset, found unexpected table: {name}. "
"Please check if you've run all migration scripts."
)
return False
await conn.execute("BEGIN")
commit = True
tables = [row[0] for row in rows]
try:
if "coin_record" in tables:
await conn.execute("DELETE FROM coin_record")
if "interested_coins" in tables:
await conn.execute("DELETE FROM interested_coins")
if "interested_puzzle_hashes" in tables:
await conn.execute("DELETE FROM interested_puzzle_hashes")
if "key_val_store" in tables:
await conn.execute("DELETE FROM key_val_store")
if "users_nfts" in tables:
await conn.execute("DELETE FROM users_nfts")
except aiosqlite.Error:
self.log.exception("Error resetting sync tables")
commit = False
finally:
try:
if commit:
self.log.info("Reset wallet sync data completed.")
await conn.execute("COMMIT")
else:
self.log.info("Reverting reset resync changes")
await conn.execute("ROLLBACK")
except aiosqlite.Error:
self.log.exception("Error finishing reset resync db")
# disable the resync in any case
self.set_resync_on_startup(fingerprint, False)
return commit
async def _start(self) -> None:
await self._start_with_fingerprint()
async def _start_with_fingerprint(
self,
fingerprint: Optional[int] = None,
) -> bool:
# Makes sure the coin_state_updates get higher priority than new_peak messages.
# Delayed instantiation until here to avoid errors.
# got Future <Future pending> attached to a different loop
self._new_peak_queue = NewPeakQueue(inner_queue=asyncio.PriorityQueue())
if not fingerprint:
fingerprint = self.get_last_used_fingerprint()
multiprocessing_start_method = process_config_start_method(config=self.config, log=self.log)
multiprocessing_context = multiprocessing.get_context(method=multiprocessing_start_method)
self._weight_proof_handler = WalletWeightProofHandler(self.constants, multiprocessing_context)
self.synced_peers = set()
private_key = await self.get_private_key(fingerprint)
if private_key is None:
self.log_out()
return False
# override with private key fetched in case it's different from what was passed
if fingerprint is None:
fingerprint = private_key.get_g1().get_fingerprint()
if self.config.get("enable_profiler", False):
if sys.getprofile() is not None:
self.log.warning("not enabling profiler, getprofile() is already set")
else:
asyncio.create_task(profile_task(self.root_path, "wallet", self.log))
if self.config.get("enable_memory_profiler", False):
asyncio.create_task(mem_profile_task(self.root_path, "wallet", self.log))
path: Path = get_wallet_db_path(self.root_path, self.config, str(fingerprint))
path.parent.mkdir(parents=True, exist_ok=True)
if self.config.get("reset_sync_for_fingerprint") == fingerprint:
await self.reset_sync_db(path, fingerprint)
self._wallet_state_manager = await WalletStateManager.create(
private_key,
self.config,
path,
self.constants,
self.server,
self.root_path,
self,
)
if self.state_changed_callback is not None:
self.wallet_state_manager.set_callback(self.state_changed_callback)
self.last_wallet_tx_resend_time = int(time.time())
self.wallet_tx_resend_timeout_secs = self.config.get("tx_resend_timeout_secs", 60 * 60)
self.wallet_state_manager.set_pending_callback(self._pending_tx_handler)
self._shut_down = False
self._process_new_subscriptions_task = asyncio.create_task(self._process_new_subscriptions())
self._retry_failed_states_task = asyncio.create_task(self._retry_failed_states())
self.sync_event = asyncio.Event()
self.log_in(private_key)
self.wallet_state_manager.state_changed("sync_changed")
# Populate the balance caches for all wallets
async with self.wallet_state_manager.lock:
for wallet_id in self.wallet_state_manager.wallets:
await self._update_balance_cache(wallet_id)
async with self.wallet_state_manager.puzzle_store.lock:
index = await self.wallet_state_manager.puzzle_store.get_last_derivation_path()
if index is None or index < self.wallet_state_manager.initial_num_public_keys - 1:
await self.wallet_state_manager.create_more_puzzle_hashes(from_zero=True)
if self.wallet_peers is None:
self.initialize_wallet_peers()
return True
def _close(self) -> None:
self.log.info("self._close")
self.log_out()
self._shut_down = True
if self._weight_proof_handler is not None:
self._weight_proof_handler.cancel_weight_proof_tasks()
if self._process_new_subscriptions_task is not None:
self._process_new_subscriptions_task.cancel()
if self._retry_failed_states_task is not None:
self._retry_failed_states_task.cancel()
if self._secondary_peer_sync_task is not None:
self._secondary_peer_sync_task.cancel()
async def _await_closed(self, shutting_down: bool = True) -> None:
self.log.info("self._await_closed")
if self._server is not None:
await self.server.close_all_connections()
if self.wallet_peers is not None:
await self.wallet_peers.ensure_is_closed()
if self._wallet_state_manager is not None:
await self.wallet_state_manager._await_closed()
self._wallet_state_manager = None
if shutting_down and self._keychain_proxy is not None:
proxy = self._keychain_proxy
self._keychain_proxy = None
await proxy.close()
await asyncio.sleep(0.5) # https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
self.wallet_peers = None
self._balance_cache = {}
def _set_state_changed_callback(self, callback: StateChangedProtocol) -> None:
self.state_changed_callback = callback
if self._wallet_state_manager is not None:
self.wallet_state_manager.set_callback(self.state_changed_callback)
self.wallet_state_manager.set_pending_callback(self._pending_tx_handler)
def _pending_tx_handler(self) -> None:
if self._wallet_state_manager is None:
return None
asyncio.create_task(self._resend_queue())
async def _resend_queue(self) -> None:
if self._shut_down or self._server is None or self._wallet_state_manager is None:
return None
for msg, sent_peers in await self._messages_to_resend():
if self._shut_down or self._server is None or self._wallet_state_manager is None:
return None
full_nodes = self.server.get_connections(NodeType.FULL_NODE)
for peer in full_nodes:
if peer.peer_node_id in sent_peers:
continue
msg_name: bytes32 = std_hash(msg.data)
if (
peer.peer_node_id in self._tx_messages_in_progress
and msg_name in self._tx_messages_in_progress[peer.peer_node_id]
):
continue
self.log.debug(f"sending: {msg}")
await peer.send_message(msg)
self._tx_messages_in_progress.setdefault(peer.peer_node_id, [])
self._tx_messages_in_progress[peer.peer_node_id].append(msg_name)
async def _messages_to_resend(self) -> List[Tuple[Message, Set[bytes32]]]:
if self._wallet_state_manager is None or self._shut_down:
return []
messages: List[Tuple[Message, Set[bytes32]]] = []
current_time = int(time.time())
retry_accepted_txs = False
if self.last_wallet_tx_resend_time < current_time - self.wallet_tx_resend_timeout_secs:
self.last_wallet_tx_resend_time = current_time
retry_accepted_txs = True
records: List[TransactionRecord] = await self.wallet_state_manager.tx_store.get_not_sent(
include_accepted_txs=retry_accepted_txs
)
for record in records:
if record.spend_bundle is None:
continue
msg = make_msg(ProtocolMessageTypes.send_transaction, SendTransaction(record.spend_bundle))
already_sent = set()
for peer, status, _ in record.sent_to:
if status == MempoolInclusionStatus.SUCCESS.value:
already_sent.add(bytes32.from_hexstr(peer))
messages.append((msg, already_sent))
return messages
async def _retry_failed_states(self) -> None:
while not self._shut_down:
try:
await asyncio.sleep(self.coin_state_retry_seconds)
if self.wallet_state_manager is None:
continue
states_to_retry = await self.wallet_state_manager.retry_store.get_all_states_to_retry()
for state, peer_id, fork_height in states_to_retry:
matching_peer = tuple(
p for p in self.server.get_connections(NodeType.FULL_NODE) if p.peer_node_id == peer_id
)
if len(matching_peer) == 0:
try:
peer = self.get_full_node_peer()
self.log.info(
f"disconnected from peer {peer_id}, state will retry with {peer.peer_node_id}"
)
except ValueError:
self.log.info(f"disconnected from all peers, cannot retry state: {state}")
continue
else:
peer = matching_peer[0]
async with self.wallet_state_manager.db_wrapper.writer():
self.log.info(f"retrying coin_state: {state}")
await self.wallet_state_manager.add_coin_states(
[state], peer, None if fork_height == 0 else fork_height
)
except asyncio.CancelledError:
self.log.info("Retry task cancelled, exiting.")
raise
async def _process_new_subscriptions(self) -> None:
while not self._shut_down:
# Here we process four types of messages in the queue, where the first one has higher priority (lower
# number in the queue), and priority decreases for each type.
peer: Optional[WSChikConnection] = None
item: Optional[NewPeakItem] = None
try:
peer, item = None, None
item = await self.new_peak_queue.get()
assert item is not None
if item.item_type == NewPeakQueueTypes.COIN_ID_SUBSCRIPTION:
self.log.debug("Pulled from queue: %s %s", item.item_type.name, item.data)
# Subscriptions are the highest priority, because we don't want to process any more peaks or
# state updates until we are sure that we subscribed to everything that we need to. Otherwise,
# we might not be able to process some state.
coin_ids: List[bytes32] = item.data
for peer in self.server.get_connections(NodeType.FULL_NODE):
coin_states: List[CoinState] = await subscribe_to_coin_updates(coin_ids, peer, 0)
if len(coin_states) > 0:
async with self.wallet_state_manager.lock:
await self.add_states_from_peer(coin_states, peer)
elif item.item_type == NewPeakQueueTypes.PUZZLE_HASH_SUBSCRIPTION:
self.log.debug("Pulled from queue: %s %s", item.item_type.name, item.data)
puzzle_hashes: List[bytes32] = item.data
for peer in self.server.get_connections(NodeType.FULL_NODE):
# Puzzle hash subscription
coin_states = await subscribe_to_phs(puzzle_hashes, peer, 0)
if len(coin_states) > 0:
async with self.wallet_state_manager.lock:
await self.add_states_from_peer(coin_states, peer)
elif item.item_type == NewPeakQueueTypes.FULL_NODE_STATE_UPDATED:
# Note: this can take a while when we have a lot of transactions. We want to process these
# before new_peaks, since new_peak_wallet requires that we first obtain the state for that peak.
self.log.debug("Pulled from queue: %s %s", item.item_type.name, item.data[0])
coin_state_update = item.data[0]
peer = item.data[1]
assert peer is not None
await self.state_update_received(coin_state_update, peer)
elif item.item_type == NewPeakQueueTypes.NEW_PEAK_WALLET:
self.log.debug("Pulled from queue: %s %s", item.item_type.name, item.data[0])
# This can take a VERY long time, because it might trigger a long sync. It is OK if we miss some
# subscriptions or state updates, since all subscriptions and state updates will be handled by
# long_sync (up to the target height).
new_peak = item.data[0]
peer = item.data[1]
assert peer is not None
await self.new_peak_wallet(new_peak, peer)
# Check if any coin needs auto spending
if self.config.get("auto_claim", {}).get("enabled", False):
await self.wallet_state_manager.auto_claim_coins()
else:
self.log.debug("Pulled from queue: UNKNOWN %s", item.item_type)
assert False
except asyncio.CancelledError:
self.log.info("Queue task cancelled, exiting.")
raise
except Exception as e:
self.log.error(f"Exception handling {item}, {e} {traceback.format_exc()}")
if peer is not None:
await peer.close(9999)
def log_in(self, sk: PrivateKey) -> None:
self.logged_in_fingerprint = sk.get_g1().get_fingerprint()
self.logged_in = True
self.log.info(f"Wallet is logged in using key with fingerprint: {self.logged_in_fingerprint}")
try:
self.update_last_used_fingerprint()
except Exception:
self.log.exception("Non-fatal: Unable to update last used fingerprint.")
def log_out(self) -> None:
self.logged_in_fingerprint = None
self.logged_in = False
def update_last_used_fingerprint(self) -> None:
fingerprint = self.logged_in_fingerprint
assert fingerprint is not None
path = self.get_last_used_fingerprint_path()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(str(fingerprint))
self.log.info(f"Updated last used fingerprint: {fingerprint}")
def get_last_used_fingerprint(self) -> Optional[int]:
fingerprint: Optional[int] = None
try:
path = self.get_last_used_fingerprint_path()
if path.exists():
fingerprint = int(path.read_text().strip())
except Exception:
self.log.exception("Non-fatal: Unable to read last used fingerprint.")
return fingerprint
def get_last_used_fingerprint_path(self) -> Path:
db_path: Path = path_from_root(self.root_path, self.config["database_path"])
fingerprint_path = db_path.parent / "last_used_fingerprint"
return fingerprint_path
def set_server(self, server: ChikServer) -> None:
self._server = server
self.initialize_wallet_peers()
def initialize_wallet_peers(self) -> None:
self.server.on_connect = self.on_connect
network_name = self.config["selected_network"]
try:
default_port = self.config["network_overrides"]["config"][network_name]["default_full_node_port"]
except KeyError:
self.log.info("Default port field not found in config.")
default_port = None
connect_to_unknown_peers = self.config.get("connect_to_unknown_peers", True)
testing = self.config.get("testing", False)
if self.wallet_peers is None and connect_to_unknown_peers and not testing:
self.wallet_peers = WalletPeers(
self.server,
self.config["target_peer_count"],
self.root_path / Path(self.config.get("wallet_peers_file_path", "wallet/db/wallet_peers.dat")),
self.config["introducer_peer"],
self.config.get("dns_servers", ["dns-introducer.chiknetwork.com"]),
self.config["peer_connect_interval"],
network_name,
default_port,
self.log,
)
asyncio.create_task(self.wallet_peers.start())
async def on_disconnect(self, peer: WSChikConnection) -> None:
if self.is_trusted(peer):
self.local_node_synced = False
self.initialize_wallet_peers()
if peer.peer_node_id in self.peer_caches:
self.peer_caches.pop(peer.peer_node_id)
if peer.peer_node_id in self.synced_peers:
self.synced_peers.remove(peer.peer_node_id)
if peer.peer_node_id in self._tx_messages_in_progress:
del self._tx_messages_in_progress[peer.peer_node_id]
self.wallet_state_manager.state_changed("close_connection")
async def on_connect(self, peer: WSChikConnection) -> None:
if self._wallet_state_manager is None:
return None
if peer.protocol_version < Version("0.0.33"):
self.log.info("Disconnecting, full node running old software")
await peer.close()
trusted = self.is_trusted(peer)
if not trusted and self.local_node_synced:
await peer.close()
if peer.peer_node_id in self.synced_peers:
self.synced_peers.remove(peer.peer_node_id)
self.log.info(f"Connected peer {peer.get_peer_info()} is trusted: {trusted}")
messages_peer_ids = await self._messages_to_resend()
self.wallet_state_manager.state_changed("add_connection")
for msg, peer_ids in messages_peer_ids:
if peer.peer_node_id in peer_ids:
continue
await peer.send_message(msg)
if self.wallet_peers is not None:
await self.wallet_peers.on_connect(peer)
async def perform_atomic_rollback(self, fork_height: int, cache: Optional[PeerRequestCache] = None) -> None:
self.log.info(f"perform_atomic_rollback to {fork_height}")
# this is to start a write transaction
async with self.wallet_state_manager.db_wrapper.writer():
try:
removed_wallet_ids = await self.wallet_state_manager.reorg_rollback(fork_height)
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(fork_height, in_rollback=True)
if cache is None:
self.rollback_request_caches(fork_height)
else:
cache.clear_after_height(fork_height)
except Exception as e:
tb = traceback.format_exc()
self.log.error(f"Exception while perform_atomic_rollback: {e} {tb}")
raise
else:
await self.wallet_state_manager.blockchain.clean_block_records()
for wallet_id in removed_wallet_ids:
self.wallet_state_manager.wallets.pop(wallet_id)
# this has to be called *after* the transaction commits, otherwise it
# won't see the changes (since we spawn a new task to handle potential
# resends)
self._pending_tx_handler()
async def long_sync(
self,
target_height: uint32,
full_node: WSChikConnection,
fork_height: int,
*,
rollback: bool,
) -> None:
"""
Sync algorithm:
- Download and verify weight proof (if not trusted)
- Roll back anything after the fork point (if rollback=True)
- Subscribe to all puzzle_hashes over and over until there are no more updates
- Subscribe to all coin_ids over and over until there are no more updates
- rollback=False means that we are just double-checking with this peer to make sure we don't have any
missing transactions, so we don't need to rollback
"""
def is_new_state_update(cs: CoinState) -> bool:
if cs.spent_height is None and cs.created_height is None:
return True
if cs.spent_height is not None and cs.spent_height >= fork_height:
return True
if cs.created_height is not None and cs.created_height >= fork_height:
return True
return False
trusted: bool = self.is_trusted(full_node)
self.log.info(f"Starting sync trusted: {trusted} to peer {full_node.peer_info.host}")
start_time = time.time()
if rollback:
# we should clear all peers since this is a full rollback
await self.perform_atomic_rollback(fork_height)
await self.update_ui()
# We only process new state updates to avoid slow reprocessing. We set the sync height after adding
# Things, so we don't have to reprocess these later. There can be many things in ph_update_res.
use_delta_sync = self.config.get("use_delta_sync", False)
min_height_for_subscriptions = fork_height if use_delta_sync else 0
already_checked_ph: Set[bytes32] = set()
while not self._shut_down:
await self.wallet_state_manager.create_more_puzzle_hashes()
all_puzzle_hashes = await self.get_puzzle_hashes_to_subscribe()
not_checked_puzzle_hashes = set(all_puzzle_hashes) - already_checked_ph
if not_checked_puzzle_hashes == set():
break
for batch in to_batches(not_checked_puzzle_hashes, 1000):
ph_update_res: List[CoinState] = await subscribe_to_phs(
batch.entries, full_node, min_height_for_subscriptions
)
ph_update_res = list(filter(is_new_state_update, ph_update_res))
if not await self.add_states_from_peer(ph_update_res, full_node):
# If something goes wrong, abort sync
return
already_checked_ph.update(not_checked_puzzle_hashes)
self.log.info(f"Successfully subscribed and updated {len(already_checked_ph)} puzzle hashes")
# The number of coin id updates are usually going to be significantly less than ph updates, so we can
# sync from 0 every time.
already_checked_coin_ids: Set[bytes32] = set()
while not self._shut_down:
all_coin_ids = await self.get_coin_ids_to_subscribe()
not_checked_coin_ids = set(all_coin_ids) - already_checked_coin_ids
if not_checked_coin_ids == set():
break
for batch in to_batches(not_checked_coin_ids, 1000):
c_update_res: List[CoinState] = await subscribe_to_coin_updates(
batch.entries, full_node, min_height_for_subscriptions
)
if not await self.add_states_from_peer(c_update_res, full_node):
# If something goes wrong, abort sync
return
already_checked_coin_ids.update(not_checked_coin_ids)
self.log.info(f"Successfully subscribed and updated {len(already_checked_coin_ids)} coin ids")
# Only update this fully when the entire sync has completed
await self.wallet_state_manager.blockchain.set_finished_sync_up_to(target_height)
if trusted:
self.local_node_synced = True
self.wallet_state_manager.state_changed("new_block")
self.synced_peers.add(full_node.peer_node_id)
await self.update_ui()
self.log.info(f"Sync (trusted: {trusted}) duration was: {time.time() - start_time}")
async def add_states_from_peer(
self,
items_input: List[CoinState],
peer: WSChikConnection,
fork_height: Optional[uint32] = None,
height: Optional[uint32] = None,
) -> bool:
# Adds the state to the wallet state manager. If the peer is trusted, we do not validate. If the peer is
# untrusted we do, but we might not add the state, since we need to receive the new_peak message as well.
assert self._wallet_state_manager is not None
trusted = self.is_trusted(peer)
# Validate states in parallel, apply serial
# TODO: optimize fetching
if self.validation_semaphore is None:
self.validation_semaphore = asyncio.Semaphore(10)
# Rollback is handled in wallet_short_sync_backtrack for untrusted peers, so we don't need to do it here.
# Also it's not safe to rollback, an untrusted peer can give us old fork point and make our TX disappear.
# wallet_short_sync_backtrack can safely rollback because we validated the weight for the new peak so we
# know the peer is telling the truth about the reorg.
# If there is a fork, we need to ensure that we roll back in trusted mode to properly handle reorgs
cache: PeerRequestCache = self.get_cache_for_peer(peer)
if (
trusted
and fork_height is not None
and height is not None
and fork_height != height - 1
and peer.peer_node_id in self.synced_peers
):
# only one peer told us to rollback so only clear for that peer
await self.perform_atomic_rollback(fork_height, cache=cache)
else:
if fork_height is not None:
# only one peer told us to rollback so only clear for that peer
cache.clear_after_height(fork_height)
self.log.info(f"clear_after_height {fork_height} for peer {peer}")
if not trusted:
# Rollback race_cache not in clear_after_height to avoid applying rollbacks from new peak processing
cache.rollback_race_cache(fork_height=fork_height)
all_tasks: List[asyncio.Task[None]] = []
target_concurrent_tasks: int = 30
# Ensure the list is sorted
unique_items = set(items_input)
before = len(unique_items)
items = await self.wallet_state_manager.filter_spam(sort_coin_states(unique_items))
num_filtered = before - len(items)
if num_filtered > 0:
self.log.info(f"Filtered {num_filtered} spam transactions")
async def validate_and_add(inner_states: List[CoinState], inner_idx_start: int) -> None:
try:
assert self.validation_semaphore is not None
async with self.validation_semaphore:
valid_states = [
inner_state
for inner_state in inner_states
if await self.validate_received_state_from_peer(inner_state, peer, cache, fork_height)
]
if len(valid_states) > 0:
async with self.wallet_state_manager.db_wrapper.writer():
self.log.info(
f"new coin state received ({inner_idx_start}-"
f"{inner_idx_start + len(inner_states) - 1}/ {len(updated_coin_states)})"
)
await self.wallet_state_manager.add_coin_states(valid_states, peer, fork_height)
except Exception as e:
tb = traceback.format_exc()
log_level = logging.DEBUG if peer.closed or self._shut_down else logging.ERROR
self.log.log(log_level, f"validate_and_add failed - exception: {e}, traceback: {tb}")
# Keep chunk size below 1000 just in case, windows has sqlite limits of 999 per query
# Untrusted has a smaller batch size since validation has to happen which takes a while
chunk_size: int = 900 if trusted else 10
reorged_coin_states = []
updated_coin_states = []
for coin_state in items:
if coin_state.created_height is None:
reorged_coin_states.append(coin_state)
else:
updated_coin_states.append(coin_state)
# Reorged coin states don't require any validation in untrusted mode, so we can just always apply them upfront
# instead of adding them to the race cache in untrusted mode.
for batch in to_batches(reorged_coin_states, chunk_size):
self.log.info(f"Process reorged states: ({len(batch.entries)} / {len(reorged_coin_states)})")
if not await self.wallet_state_manager.add_coin_states(batch.entries, peer, fork_height):
self.log.debug("Processing reorged states failed")
return False
idx = 1
for batch in to_batches(updated_coin_states, chunk_size):
if self._server is None:
self.log.error("No server")
await asyncio.gather(*all_tasks)
return False
if peer.peer_node_id not in self.server.all_connections:
self.log.error(f"Disconnected from peer {peer.peer_node_id} host {peer.peer_info.host}")
await asyncio.gather(*all_tasks)
return False
if trusted:
async with self.wallet_state_manager.db_wrapper.writer():
self.log.info(
f"new coin state received ({idx}-{idx + len(batch.entries) - 1}/ {len(updated_coin_states)})"
)
if not await self.wallet_state_manager.add_coin_states(batch.entries, peer, fork_height):
return False
else:
if fork_height is not None:
cache.add_states_to_race_cache(batch.entries)
else:
while len(all_tasks) >= target_concurrent_tasks:
all_tasks = [task for task in all_tasks if not task.done()]
await asyncio.sleep(0.1)
if self._shut_down:
self.log.info("Terminating receipt and validation due to shut down request")
await asyncio.gather(*all_tasks)
return False
all_tasks.append(asyncio.create_task(validate_and_add(batch.entries, idx)))
idx += len(batch.entries)
still_connected = self._server is not None and peer.peer_node_id in self.server.all_connections
await asyncio.gather(*all_tasks)
await self.update_ui()
return still_connected and self._server is not None and peer.peer_node_id in self.server.all_connections
def is_timestamp_in_sync(self, timestamp: uint64) -> bool:
return self.config.get("testing", False) or uint64(time.time()) - timestamp < 600
def is_trusted(self, peer: WSChikConnection) -> bool:
return self.server.is_trusted_peer(peer, self.config.get("trusted_peers", {}))
async def state_update_received(self, request: CoinStateUpdate, peer: WSChikConnection) -> None:
# This gets called every time there is a new coin or puzzle hash change in the DB
# that is of interest to this wallet. It is not guaranteed to come for every height. This message is guaranteed
# to come before the corresponding new_peak for each height. We handle this differently for trusted and
# untrusted peers. For trusted, we always process the state, and we process reorgs as well.
for coin in request.items:
self.log.info(f"request coin: {coin.coin.name().hex()}{coin}")
async with self.wallet_state_manager.lock:
await self.add_states_from_peer(
request.items,
peer,
request.fork_height,
request.height,
)
def get_full_node_peer(self) -> WSChikConnection:
"""
Get a full node, preferring synced & trusted > synced & untrusted > unsynced & trusted > unsynced & untrusted
"""
full_nodes: List[WSChikConnection] = self.get_full_node_peers_in_order()
if len(full_nodes) == 0:
raise ValueError("No peer connected")
return full_nodes[0]
def get_full_node_peers_in_order(self) -> List[WSChikConnection]:
"""