Skip to content

Commit

Permalink
DL batch upsert optimization.
Browse files Browse the repository at this point in the history
  • Loading branch information
fchirica committed May 9, 2024
1 parent 480bba5 commit 3ae341d
Showing 1 changed file with 46 additions and 0 deletions.
46 changes: 46 additions & 0 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,22 @@ async def get_leaf_at_minimum_height(self, root_hash: bytes32) -> TerminalNode:
elif isinstance(node, TerminalNode):
return node

async def batch_upsert(
self,
tree_id: bytes32,
hash: bytes32,
to_update_hashes: Set[bytes32],
pending_upsert_new_hashes: Dict[bytes32, bytes32],
) -> bytes32:
if hash not in to_update_hashes:
return hash
node = await self.get_node(hash)
if isinstance(node, TerminalNode):
return pending_upsert_new_hashes[hash]
new_left_hash = await self.batch_upsert(tree_id, node.left_hash, to_update_hashes, pending_upsert_new_hashes)
new_right_hash = await self.batch_upsert(tree_id, node.right_hash, to_update_hashes, pending_upsert_new_hashes)
return await self._insert_internal_node(new_left_hash, new_right_hash)

async def insert_batch(
self,
tree_id: bytes32,
Expand Down Expand Up @@ -1418,6 +1434,7 @@ async def insert_batch(
first_action[hash] = change["action"]

pending_autoinsert_hashes: List[bytes32] = []
pending_upsert_new_hashes: Dict[bytes32, bytes32] = {}
for change in changelist:
if change["action"] == "insert":
key = change["key"]
Expand Down Expand Up @@ -1462,13 +1479,42 @@ async def insert_batch(
elif change["action"] == "upsert":
key = change["key"]
new_value = change["value"]
if key_hash_frequency[hash] == 1 and enable_batch_autoinsert:
old_node: Optional[Node] = None
try:
old_node = await self.get_node_by_key(key=key, tree_id=tree_id)
except KeyNotFoundError:
pass
if old_node is not None:
terminal_node_hash = await self._insert_terminal_node(key, new_value)
pending_upsert_new_hashes[old_node.hash] = terminal_node_hash
continue
insert_result = await self.upsert(
key, new_value, tree_id, True, Status.COMMITTED, root=latest_local_root
)
latest_local_root = insert_result.root
else:
raise Exception(f"Operation in batch is not insert or delete: {change}")

if len(pending_upsert_new_hashes) > 0:
to_update_hashes: Set[bytes32] = set()
for hash in pending_upsert_new_hashes.keys():
while True:
if hash in to_update_hashes:
break
to_update_hashes.add(hash)
node = await self._get_one_ancestor(hash, tree_id)
if node is None:
break
hash = node.hash
new_root_hash = await self.batch_upsert(
tree_id,
latest_local_root.node_hash,
to_update_hashes,
pending_upsert_new_hashes,
)
latest_local_root = await self._insert_root(tree_id, new_root_hash, Status.COMMITTED)

# Start with the leaf nodes and pair them to form new nodes at the next level up, repeating this process
# in a bottom-up fashion until a single root node remains. This constructs a balanced tree from the leaves.
while len(pending_autoinsert_hashes) > 1:
Expand Down

0 comments on commit 3ae341d

Please sign in to comment.