From 930d1144ef87528a723c1a1b2e4e1ab95abd15ac Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Mon, 28 Aug 2023 19:46:59 +0300 Subject: [PATCH 01/14] Delete files on unsubscribe. --- chia/data_layer/data_layer.py | 17 +++++++++- tests/core/data_layer/test_data_rpc.py | 43 ++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index 9b7ded2bd8fb..79950dead4bc 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -35,7 +35,12 @@ ) from chia.data_layer.data_layer_wallet import DataLayerWallet, Mirror, SingletonRecord, verify_offer from chia.data_layer.data_store import DataStore -from chia.data_layer.download_data import insert_from_delta_file, write_files_for_root +from chia.data_layer.download_data import ( + get_delta_filename, + get_full_tree_filename, + insert_from_delta_file, + write_files_for_root, +) from chia.rpc.rpc_server import StateChangedProtocol, default_get_connections from chia.rpc.wallet_rpc_client import WalletRpcClient from chia.server.outbound_message import NodeType @@ -527,10 +532,20 @@ async def unsubscribe(self, tree_id: bytes32) -> None: subscriptions = await self.get_subscriptions() if tree_id not in (subscription.tree_id for subscription in subscriptions): raise RuntimeError("No subscription found for the given tree_id.") + generation = await self.data_store.get_tree_generation(tree_id) + all_roots = await self.data_store.get_roots_between(tree_id, 1, generation + 1) + filenames: List[str] = [] + for root in all_roots: + filenames.append(get_full_tree_filename(tree_id, root.node_hash, root.generation)) + filenames.append(get_delta_filename(tree_id, root.node_hash, root.generation)) async with self.subscription_lock: await self.data_store.unsubscribe(tree_id) await self.wallet_rpc.dl_stop_tracking(tree_id) self.log.info(f"Unsubscribed to {tree_id}") + for filename in filenames: + file_path = self.server_files_location.joinpath(filename) + if os.path.exists(file_path): + os.remove(file_path) async def get_subscriptions(self) -> List[Subscription]: async with self.subscription_lock: diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index aae21142eb33..262e754a9722 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -23,6 +23,7 @@ from chia.data_layer.data_layer_errors import OfferIntegrityError from chia.data_layer.data_layer_util import OfferStore, Status, StoreProofs from chia.data_layer.data_layer_wallet import DataLayerWallet, verify_offer +from chia.data_layer.download_data import get_delta_filename, get_full_tree_filename from chia.rpc.data_layer_rpc_api import DataLayerRpcApi from chia.rpc.data_layer_rpc_client import DataLayerRpcClient from chia.rpc.wallet_rpc_api import WalletRpcApi @@ -73,6 +74,7 @@ async def init_data_layer_service( config["data_layer"]["port"] = 0 config["data_layer"]["rpc_port"] = 0 config["data_layer"]["database_path"] = str(db_path.joinpath("db.sqlite")) + config["data_layer"]["manage_data_interval"] = 5 save_config(bt.root_path, "config.yaml", config) service = create_data_layer_service( root_path=bt.root_path, config=config, wallet_service=wallet_service, downloaders=[], uploaders=[] @@ -2016,3 +2018,44 @@ async def test_issue_15955_deadlock( await asyncio.gather( *(asyncio.create_task(data_layer.get_value(store_id=tree_id, key=key)) for _ in range(10)) ) + + +@pytest.mark.asyncio +async def test_unsubscribe_removes_files( + self_hostname: str, + one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices, + tmp_path: Path, +) -> None: + wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node( + self_hostname, one_wallet_and_one_simulator_services + ) + async with init_data_layer(wallet_rpc_port=wallet_rpc_port, bt=bt, db_path=tmp_path) as data_layer: + data_rpc_api = DataLayerRpcApi(data_layer) + res = await data_rpc_api.create_data_store({}) + root_hashes: List[bytes32] = [] + assert res is not None + store_id = bytes32.from_hexstr(res["id"]) + await farm_block_check_singelton(data_layer, full_node_api, ph, store_id) + + for batch_count in range(10): + key = batch_count.to_bytes(2, "big") + value = batch_count.to_bytes(2, "big") + changelist = [{"action": "insert", "key": key.hex(), "value": value.hex()}] + res = await data_rpc_api.batch_update({"id": store_id.hex(), "changelist": changelist}) + update_tx_rec = res["tx_id"] + await farm_block_with_spend(full_node_api, ph, update_tx_rec, wallet_rpc_api) + await asyncio.sleep(10) + root_hash = await data_rpc_api.get_root({"id": store_id.hex()}) + root_hashes.append(root_hash["hash"]) + + with os.scandir(data_layer.server_files_location) as entries: + filenames = {entry.name for entry in entries} + assert len(filenames) == 20 + for generation, hash in enumerate(root_hashes): + assert get_delta_filename(store_id, hash, generation + 1) in filenames + assert get_full_tree_filename(store_id, hash, generation + 1) in filenames + + res = await data_rpc_api.unsubscribe(request={"id": store_id.hex()}) + with os.scandir(data_layer.server_files_location) as entries: + filenames = {entry.name for entry in entries} + assert len(filenames) == 0 From 82c561aee679f8d8ec5834f1552e017943a8ffc7 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Mon, 28 Aug 2023 19:55:25 +0300 Subject: [PATCH 02/14] Handle none case. --- chia/data_layer/data_layer.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index 79950dead4bc..46b31c5b03f9 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -536,8 +536,9 @@ async def unsubscribe(self, tree_id: bytes32) -> None: all_roots = await self.data_store.get_roots_between(tree_id, 1, generation + 1) filenames: List[str] = [] for root in all_roots: - filenames.append(get_full_tree_filename(tree_id, root.node_hash, root.generation)) - filenames.append(get_delta_filename(tree_id, root.node_hash, root.generation)) + root_hash = root.node_hash if root.node_hash is not None else self.none_bytes + filenames.append(get_full_tree_filename(tree_id, root_hash, root.generation)) + filenames.append(get_delta_filename(tree_id, root_hash, root.generation)) async with self.subscription_lock: await self.data_store.unsubscribe(tree_id) await self.wallet_rpc.dl_stop_tracking(tree_id) From 6fe2249f7923930d00d4336c7adfaa8a3ed19495 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 29 Aug 2023 01:12:21 +0300 Subject: [PATCH 03/14] Fix test. v --- chia/data_layer/data_layer.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index 46b31c5b03f9..4d7ace536183 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -532,13 +532,14 @@ async def unsubscribe(self, tree_id: bytes32) -> None: subscriptions = await self.get_subscriptions() if tree_id not in (subscription.tree_id for subscription in subscriptions): raise RuntimeError("No subscription found for the given tree_id.") - generation = await self.data_store.get_tree_generation(tree_id) - all_roots = await self.data_store.get_roots_between(tree_id, 1, generation + 1) filenames: List[str] = [] - for root in all_roots: - root_hash = root.node_hash if root.node_hash is not None else self.none_bytes - filenames.append(get_full_tree_filename(tree_id, root_hash, root.generation)) - filenames.append(get_delta_filename(tree_id, root_hash, root.generation)) + if await self.tree_id_exists(tree_id): + generation = await self.data_store.get_tree_generation(tree_id) + all_roots = await self.data_store.get_roots_between(tree_id, 1, generation + 1) + for root in all_roots: + root_hash = root.node_hash if root.node_hash is not None else self.none_bytes + filenames.append(get_full_tree_filename(tree_id, root_hash, root.generation)) + filenames.append(get_delta_filename(tree_id, root_hash, root.generation)) async with self.subscription_lock: await self.data_store.unsubscribe(tree_id) await self.wallet_rpc.dl_stop_tracking(tree_id) From a47dde48f4b0cd5f40f420b280c7f9d608afdcc1 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 29 Aug 2023 01:24:32 +0300 Subject: [PATCH 04/14] typo --- chia/data_layer/data_layer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index 4d7ace536183..f90364820d5b 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -533,7 +533,7 @@ async def unsubscribe(self, tree_id: bytes32) -> None: if tree_id not in (subscription.tree_id for subscription in subscriptions): raise RuntimeError("No subscription found for the given tree_id.") filenames: List[str] = [] - if await self.tree_id_exists(tree_id): + if await self.data_store.tree_id_exists(tree_id): generation = await self.data_store.get_tree_generation(tree_id) all_roots = await self.data_store.get_roots_between(tree_id, 1, generation + 1) for root in all_roots: From 885ac796695b3bed0cb3a132994584b625c28139 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 29 Aug 2023 14:50:50 +0300 Subject: [PATCH 05/14] Lint. --- chia/data_layer/data_layer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index f90364820d5b..dc87509bc646 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -534,7 +534,7 @@ async def unsubscribe(self, tree_id: bytes32) -> None: raise RuntimeError("No subscription found for the given tree_id.") filenames: List[str] = [] if await self.data_store.tree_id_exists(tree_id): - generation = await self.data_store.get_tree_generation(tree_id) + generation = await self.data_store.get_tree_generation(tree_id) all_roots = await self.data_store.get_roots_between(tree_id, 1, generation + 1) for root in all_roots: root_hash = root.node_hash if root.node_hash is not None else self.none_bytes From 56f5f5ba2f92d7ff1b3669f1bd394127aadb76e2 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 29 Aug 2023 18:19:39 +0300 Subject: [PATCH 06/14] Update chia/data_layer/data_layer.py Co-authored-by: Kyle Altendorf --- chia/data_layer/data_layer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index dc87509bc646..86908ce716a7 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -546,8 +546,10 @@ async def unsubscribe(self, tree_id: bytes32) -> None: self.log.info(f"Unsubscribed to {tree_id}") for filename in filenames: file_path = self.server_files_location.joinpath(filename) - if os.path.exists(file_path): - os.remove(file_path) + try: + file_path.unlink() + except FileNotFoundError: + pass async def get_subscriptions(self) -> List[Subscription]: async with self.subscription_lock: From 289f44c9c27de97f485eeda256e69f447dc8186f Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 29 Aug 2023 18:22:44 +0300 Subject: [PATCH 07/14] Apply suggestions from code review Co-authored-by: Kyle Altendorf --- tests/core/data_layer/test_data_rpc.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index 262e754a9722..44ab2ab1d310 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -2037,7 +2037,8 @@ async def test_unsubscribe_removes_files( store_id = bytes32.from_hexstr(res["id"]) await farm_block_check_singelton(data_layer, full_node_api, ph, store_id) - for batch_count in range(10): + update_count = 10 + for batch_count in range(update_count): key = batch_count.to_bytes(2, "big") value = batch_count.to_bytes(2, "big") changelist = [{"action": "insert", "key": key.hex(), "value": value.hex()}] @@ -2048,9 +2049,8 @@ async def test_unsubscribe_removes_files( root_hash = await data_rpc_api.get_root({"id": store_id.hex()}) root_hashes.append(root_hash["hash"]) - with os.scandir(data_layer.server_files_location) as entries: - filenames = {entry.name for entry in entries} - assert len(filenames) == 20 + filenames = {path.name for path in data_layer.server_files_location.iterdir()} + assert len(filenames) == 2 * update_count for generation, hash in enumerate(root_hashes): assert get_delta_filename(store_id, hash, generation + 1) in filenames assert get_full_tree_filename(store_id, hash, generation + 1) in filenames From ce8ba3ad19fd4965fb0eb26058bd27d83cb77fd1 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 29 Aug 2023 18:32:21 +0300 Subject: [PATCH 08/14] rewiew comments. --- tests/core/data_layer/test_data_rpc.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index 44ab2ab1d310..0d15e27e76c4 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -2050,12 +2050,11 @@ async def test_unsubscribe_removes_files( root_hashes.append(root_hash["hash"]) filenames = {path.name for path in data_layer.server_files_location.iterdir()} - assert len(filenames) == 2 * update_count - for generation, hash in enumerate(root_hashes): - assert get_delta_filename(store_id, hash, generation + 1) in filenames - assert get_full_tree_filename(store_id, hash, generation + 1) in filenames + assert len(filenames) == 2 * update_count + for generation, hash in enumerate(root_hashes): + assert get_delta_filename(store_id, hash, generation + 1) in filenames + assert get_full_tree_filename(store_id, hash, generation + 1) in filenames res = await data_rpc_api.unsubscribe(request={"id": store_id.hex()}) - with os.scandir(data_layer.server_files_location) as entries: - filenames = {entry.name for entry in entries} - assert len(filenames) == 0 + filenames = {path.name for path in data_layer.server_files_location.iterdir()} + assert len(filenames) == 0 From d782c1b914196fc172b52f4a7614e76aa3696124 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 29 Aug 2023 18:45:05 +0300 Subject: [PATCH 09/14] review --- tests/core/data_layer/test_data_rpc.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index 0d15e27e76c4..12ffeac519c1 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -66,6 +66,7 @@ async def init_data_layer_service( bt: BlockTools, db_path: Path, wallet_service: Optional[Service[WalletNode, WalletNodeAPI]] = None, + manage_data_interval: int = 5, ) -> AsyncIterator[Service[DataLayer, DataLayerAPI]]: config = bt.config config["data_layer"]["wallet_peer"]["port"] = int(wallet_rpc_port) @@ -74,7 +75,7 @@ async def init_data_layer_service( config["data_layer"]["port"] = 0 config["data_layer"]["rpc_port"] = 0 config["data_layer"]["database_path"] = str(db_path.joinpath("db.sqlite")) - config["data_layer"]["manage_data_interval"] = 5 + config["data_layer"]["manage_data_interval"] = manage_data_interval save_config(bt.root_path, "config.yaml", config) service = create_data_layer_service( root_path=bt.root_path, config=config, wallet_service=wallet_service, downloaders=[], uploaders=[] @@ -2029,7 +2030,10 @@ async def test_unsubscribe_removes_files( wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node( self_hostname, one_wallet_and_one_simulator_services ) - async with init_data_layer(wallet_rpc_port=wallet_rpc_port, bt=bt, db_path=tmp_path) as data_layer: + manage_data_interval = 5 + async with init_data_layer( + wallet_rpc_port=wallet_rpc_port, bt=bt, db_path=tmp_path, manage_data_interval=manage_data_interval + ) as data_layer: data_rpc_api = DataLayerRpcApi(data_layer) res = await data_rpc_api.create_data_store({}) root_hashes: List[bytes32] = [] @@ -2045,7 +2049,7 @@ async def test_unsubscribe_removes_files( res = await data_rpc_api.batch_update({"id": store_id.hex(), "changelist": changelist}) update_tx_rec = res["tx_id"] await farm_block_with_spend(full_node_api, ph, update_tx_rec, wallet_rpc_api) - await asyncio.sleep(10) + await asyncio.sleep(manage_data_interval * 2) root_hash = await data_rpc_api.get_root({"id": store_id.hex()}) root_hashes.append(root_hash["hash"]) From 870ae315536a8ec645a4f91a5c3d76fde4bf04bb Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 29 Aug 2023 18:55:33 +0300 Subject: [PATCH 10/14] Update test_data_rpc.py --- tests/core/data_layer/test_data_rpc.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index 12ffeac519c1..51d6fdbf99c8 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -94,8 +94,11 @@ async def init_data_layer( bt: BlockTools, db_path: Path, wallet_service: Optional[Service[WalletNode, WalletNodeAPI]] = None, + manage_data_interval: int = 5 ) -> AsyncIterator[DataLayer]: - async with init_data_layer_service(wallet_rpc_port, bt, db_path, wallet_service) as data_layer_service: + async with init_data_layer_service( + wallet_rpc_port, bt, db_path, wallet_service, manage_data_interval + ) as data_layer_service: yield data_layer_service._api.data_layer From c2e326d6251ec183d9ba4c34459b043260f0dd72 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Wed, 30 Aug 2023 00:33:31 +0300 Subject: [PATCH 11/14] Lint. --- tests/core/data_layer/test_data_rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index 51d6fdbf99c8..974d6f1e1dd9 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -94,7 +94,7 @@ async def init_data_layer( bt: BlockTools, db_path: Path, wallet_service: Optional[Service[WalletNode, WalletNodeAPI]] = None, - manage_data_interval: int = 5 + manage_data_interval: int = 5, ) -> AsyncIterator[DataLayer]: async with init_data_layer_service( wallet_rpc_port, bt, db_path, wallet_service, manage_data_interval From d0aa8662005177d67acdf902bfb2be16a8092bbe Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 5 Sep 2023 20:02:33 +0300 Subject: [PATCH 12/14] Fix conflict. --- tests/core/data_layer/test_data_rpc.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index e712ecef4018..4aa9e0fe0b13 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -2058,8 +2058,11 @@ async def test_unsubscribe_removes_files( self_hostname: str, one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices, tmp_path: Path, - manage_data_interval = 5, ) -> None: + wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node( + self_hostname, one_wallet_and_one_simulator_services + ) + manage_data_interval = 5 async with init_data_layer( wallet_rpc_port=wallet_rpc_port, bt=bt, db_path=tmp_path, manage_data_interval=manage_data_interval ) as data_layer: From 052ce3fe2cac313387b0df93d723fb633386d315 Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Tue, 5 Sep 2023 22:10:32 +0300 Subject: [PATCH 13/14] Add retain option. --- chia/cmds/data.py | 4 +++- chia/cmds/data_funcs.py | 3 ++- chia/data_layer/data_layer.py | 4 ++-- chia/rpc/data_layer_rpc_api.py | 3 ++- chia/rpc/data_layer_rpc_client.py | 4 ++-- tests/core/data_layer/test_data_rpc.py | 6 ++++-- 6 files changed, 15 insertions(+), 9 deletions(-) diff --git a/chia/cmds/data.py b/chia/cmds/data.py index 6fb828d9f2c5..de900a91af17 100644 --- a/chia/cmds/data.py +++ b/chia/cmds/data.py @@ -253,14 +253,16 @@ def remove_subscription( @create_data_store_id_option() @create_rpc_port_option() @options.create_fingerprint() +@click.option("--retain", is_flag=True, help="Retain .dat files") def unsubscribe( id: str, data_rpc_port: int, fingerprint: Optional[int], + retain: bool, ) -> None: from chia.cmds.data_funcs import unsubscribe_cmd - run(unsubscribe_cmd(rpc_port=data_rpc_port, store_id=id, fingerprint=fingerprint)) + run(unsubscribe_cmd(rpc_port=data_rpc_port, store_id=id, fingerprint=fingerprint, retain=retain)) @data_cmd.command( diff --git a/chia/cmds/data_funcs.py b/chia/cmds/data_funcs.py index 571a46fcb2b4..a63f0742e005 100644 --- a/chia/cmds/data_funcs.py +++ b/chia/cmds/data_funcs.py @@ -130,10 +130,11 @@ async def unsubscribe_cmd( rpc_port: Optional[int], store_id: str, fingerprint: Optional[int], + retain: bool, ) -> None: store_id_bytes = bytes32.from_hexstr(store_id) async with get_client(rpc_port=rpc_port, fingerprint=fingerprint) as (client, _): - res = await client.unsubscribe(store_id=store_id_bytes) + res = await client.unsubscribe(store_id=store_id_bytes, retain=retain) print(json.dumps(res, indent=4, sort_keys=True)) diff --git a/chia/data_layer/data_layer.py b/chia/data_layer/data_layer.py index 7e84e576c52b..68b52012fcc1 100644 --- a/chia/data_layer/data_layer.py +++ b/chia/data_layer/data_layer.py @@ -540,12 +540,12 @@ async def remove_subscriptions(self, store_id: bytes32, urls: List[str]) -> None async with self.subscription_lock: await self.data_store.remove_subscriptions(store_id, parsed_urls) - async def unsubscribe(self, tree_id: bytes32) -> None: + async def unsubscribe(self, tree_id: bytes32, retain_files: bool) -> None: subscriptions = await self.get_subscriptions() if tree_id not in (subscription.tree_id for subscription in subscriptions): raise RuntimeError("No subscription found for the given tree_id.") filenames: List[str] = [] - if await self.data_store.tree_id_exists(tree_id): + if await self.data_store.tree_id_exists(tree_id) and not retain_files: generation = await self.data_store.get_tree_generation(tree_id) all_roots = await self.data_store.get_roots_between(tree_id, 1, generation + 1) for root in all_roots: diff --git a/chia/rpc/data_layer_rpc_api.py b/chia/rpc/data_layer_rpc_api.py index b3e3b3367df8..3652357dae05 100644 --- a/chia/rpc/data_layer_rpc_api.py +++ b/chia/rpc/data_layer_rpc_api.py @@ -282,12 +282,13 @@ async def unsubscribe(self, request: Dict[str, Any]) -> EndpointResult: unsubscribe from singleton """ store_id = request.get("id") + retain_files = request.get("retain", False) if store_id is None: raise Exception("missing store id in request") if self.service is None: raise Exception("Data layer not created") store_id_bytes = bytes32.from_hexstr(store_id) - await self.service.unsubscribe(store_id_bytes) + await self.service.unsubscribe(store_id_bytes, retain_files) return {} async def subscriptions(self, request: Dict[str, Any]) -> EndpointResult: diff --git a/chia/rpc/data_layer_rpc_client.py b/chia/rpc/data_layer_rpc_client.py index e43c21bdf32e..e0f21b155c47 100644 --- a/chia/rpc/data_layer_rpc_client.py +++ b/chia/rpc/data_layer_rpc_client.py @@ -70,8 +70,8 @@ async def remove_subscriptions(self, store_id: bytes32, urls: List[str]) -> Dict response = await self.fetch("remove_subscriptions", {"id": store_id.hex(), "urls": urls}) return response - async def unsubscribe(self, store_id: bytes32) -> Dict[str, Any]: - response = await self.fetch("unsubscribe", {"id": store_id.hex()}) + async def unsubscribe(self, store_id: bytes32, retain: bool) -> Dict[str, Any]: + response = await self.fetch("unsubscribe", {"id": store_id.hex(), "retain": retain}) return response async def add_missing_files( diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index 4aa9e0fe0b13..b5dacee1e223 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -2053,11 +2053,13 @@ async def test_issue_15955_deadlock( ) +@pytest.mark.parametrize("retain", [True, False]) @pytest.mark.asyncio async def test_unsubscribe_removes_files( self_hostname: str, one_wallet_and_one_simulator_services: SimulatorsAndWalletsServices, tmp_path: Path, + retain: bool, ) -> None: wallet_rpc_api, full_node_api, wallet_rpc_port, ph, bt = await init_wallet_and_node( self_hostname, one_wallet_and_one_simulator_services @@ -2091,9 +2093,9 @@ async def test_unsubscribe_removes_files( assert get_delta_filename(store_id, hash, generation + 1) in filenames assert get_full_tree_filename(store_id, hash, generation + 1) in filenames - res = await data_rpc_api.unsubscribe(request={"id": store_id.hex()}) + res = await data_rpc_api.unsubscribe(request={"id": store_id.hex(), "retain": retain}) filenames = {path.name for path in data_layer.server_files_location.iterdir()} - assert len(filenames) == 0 + assert len(filenames) == 2 * update_count if retain else 0 @pytest.mark.parametrize(argnames="layer", argvalues=list(InterfaceLayer)) From 821d25d365326c5ff29d997ede3fd786d79b28fb Mon Sep 17 00:00:00 2001 From: Florin Chirica Date: Wed, 6 Sep 2023 01:30:12 +0300 Subject: [PATCH 14/14] Fix test. --- tests/core/data_layer/test_data_rpc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/core/data_layer/test_data_rpc.py b/tests/core/data_layer/test_data_rpc.py index b5dacee1e223..99545131c244 100644 --- a/tests/core/data_layer/test_data_rpc.py +++ b/tests/core/data_layer/test_data_rpc.py @@ -2095,7 +2095,7 @@ async def test_unsubscribe_removes_files( res = await data_rpc_api.unsubscribe(request={"id": store_id.hex(), "retain": retain}) filenames = {path.name for path in data_layer.server_files_location.iterdir()} - assert len(filenames) == 2 * update_count if retain else 0 + assert len(filenames) == (2 * update_count if retain else 0) @pytest.mark.parametrize(argnames="layer", argvalues=list(InterfaceLayer))