Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor mutiproc prevalidation #15166

Merged
merged 7 commits into from
May 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 2 additions & 13 deletions chia/consensus/multiprocess_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ async def pre_validate_blocks_multiprocessing(
prev_b: Optional[BlockRecord] = None
# Collects all the recent blocks (up to the previous sub-epoch)
recent_blocks: Dict[bytes32, BlockRecord] = {}
recent_blocks_compressed: Dict[bytes32, BlockRecord] = {}
num_sub_slots_found = 0
num_blocks_seen = 0
if blocks[0].height > 0:
Expand All @@ -211,9 +210,6 @@ async def pre_validate_blocks_multiprocessing(
or num_blocks_seen < constants.NUMBER_OF_TIMESTAMPS
or num_sub_slots_found < num_sub_slots_to_look_for
) and curr.height > 0:
if num_blocks_seen < constants.NUMBER_OF_TIMESTAMPS or num_sub_slots_found < num_sub_slots_to_look_for:
recent_blocks_compressed[curr.header_hash] = curr

if curr.first_in_sub_slot:
assert curr.finished_challenge_slot_hashes is not None
num_sub_slots_found += len(curr.finished_challenge_slot_hashes)
Expand All @@ -222,7 +218,6 @@ async def pre_validate_blocks_multiprocessing(
num_blocks_seen += 1
curr = block_records.block_record(curr.prev_hash)
recent_blocks[curr.header_hash] = curr
recent_blocks_compressed[curr.header_hash] = curr
block_record_was_present = []
for block in blocks:
block_record_was_present.append(block_records.contains_block(block.header_hash))
Expand Down Expand Up @@ -282,10 +277,8 @@ async def pre_validate_blocks_multiprocessing(
if not block_records.contains_block(block_rec.header_hash):
block_records.add_block_record(block_rec) # Temporarily add block to dict
recent_blocks[block_rec.header_hash] = block_rec
recent_blocks_compressed[block_rec.header_hash] = block_rec
else:
recent_blocks[block_rec.header_hash] = block_records.block_record(block_rec.header_hash)
recent_blocks_compressed[block_rec.header_hash] = block_records.block_record(block_rec.header_hash)
prev_b = block_rec
diff_ssis.append((difficulty, sub_slot_iters))

Expand All @@ -295,19 +288,15 @@ async def pre_validate_blocks_multiprocessing(
if not block_record_was_present[i]:
block_records.remove_block_record(block.header_hash)

recent_sb_compressed_pickled = {bytes(k): bytes(v) for k, v in recent_blocks_compressed.items()}
npc_results_pickled = {}
for k, v in npc_results.items():
npc_results_pickled[k] = bytes(v)
futures = []
# Pool of workers to validate blocks concurrently
recent_blocks_bytes = {bytes(k): bytes(v) for k, v in recent_blocks.items()} # convert to bytes
for i in range(0, len(blocks), batch_size):
end_i = min(i + batch_size, len(blocks))
blocks_to_validate = blocks[i:end_i]
if any([len(block.finished_sub_slots) > 0 for block in blocks_to_validate]):
final_pickled = {bytes(k): bytes(v) for k, v in recent_blocks.items()}
else:
final_pickled = recent_sb_compressed_pickled
b_pickled: Optional[List[bytes]] = None
hb_pickled: Optional[List[bytes]] = None
previous_generators: List[Optional[bytes]] = []
Expand Down Expand Up @@ -349,7 +338,7 @@ async def pre_validate_blocks_multiprocessing(
pool,
batch_pre_validate_blocks,
constants,
final_pickled,
recent_blocks_bytes,
b_pickled,
hb_pickled,
previous_generators,
Expand Down