feat: implement devnet3 committee aggregation spec#552
Conversation
Signed-off-by: grapebaba <grapebaba@grapebabadeMacBook-Pro.local>
There was a problem hiding this comment.
Pull request overview
Implements committee-aggregation behavior across node, fork choice, and networking layers, aligning gossip topics and fork-choice processing with the committee aggregation spec.
Changes:
- Adds subnet-aware attestation gossip topics and introduces aggregated attestation gossip.
- Refactors fork choice to ingest aggregated payloads, aggregate committee signatures, and prune stale attestation data.
- Updates node/validator interval scheduling and subscriptions to support aggregation flow and future-block caching.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pkgs/types/src/utils.zig | Adds subnet-id helper, integer-based slot justification logic, and extends ChainSpec with committee count. |
| pkgs/types/src/state.zig | Adjusts finalization targeting loop logic using updated justifiable-slot checks. |
| pkgs/types/src/lib.zig | Re-exports new aggregation-related types/utilities. |
| pkgs/types/src/attestation.zig | Introduces SignedAggregatedAttestation and JSON helpers. |
| pkgs/spectest/src/runner/fork_choice_runner.zig | Extends spectest runner to capture/verify block attestations and align interval advancement with updated API. |
| pkgs/node/src/validator_client.zig | Makes attestations subnet-aware and adds optional aggregation production. |
| pkgs/node/src/utils.zig | Adds a wrapper for subnet-id computation. |
| pkgs/node/src/testing.zig | Sets default committee count in test contexts. |
| pkgs/node/src/node.zig | Adds aggregation gossip handling, future block caching, per-interval ticking loop, and subnet-based subscriptions. |
| pkgs/node/src/forkchoice.zig | Refactors fork choice to process aggregated payloads, aggregate signatures, update interval semantics, and prune stale data. |
| pkgs/node/src/constants.zig | Updates intervals-per-slot to 5 to match new interval schedule. |
| pkgs/node/src/chain.zig | Wires aggregator mode, pending gossip queues, proposer-head selection, payload storage, and stale data pruning. |
| pkgs/network/src/mock.zig | Updates tests for new topic structure. |
| pkgs/network/src/lib.zig | Re-exports new gossip-topic kinds and attestation gossip wrapper. |
| pkgs/network/src/interface.zig | Reworks gossip topics into subnet-aware structured topics and adds aggregation gossip handling/serialization. |
| pkgs/network/src/ethlibp2p.zig | Decodes subnet-aware attestation topics, adds aggregation decoding, and subscribes to all attestation subnets. |
| pkgs/configs/src/lib.zig | Sets default committee count for custom chain configs when omitted. |
| pkgs/configs/src/configs/mainnet.zig | Adds committee count to mainnet ChainSpec. |
| pkgs/cli/test/integration.zig | Enables aggregator mode in integration test run. |
| pkgs/cli/src/node.zig | Plumbs aggregator mode into node setup and ENR fields. |
| pkgs/cli/src/main.zig | Adds CLI flag for aggregator mode and threads through networking params. |
| leanSpec | Bumps leanSpec submodule pointer. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pkgs/types/src/utils.zig
Outdated
| const json = std.json; | ||
|
|
||
| pub fn computeSubnetId(validator_id: ValidatorIndex, committee_count: u64) u64 { | ||
| if (committee_count == 0) return 0; |
There was a problem hiding this comment.
Returning subnet 0 when committee_count == 0 silently masks a configuration error and can cause attestations to be published/subscribed on the wrong subnet. Prefer making this an error-returning API (e.g. !u64 with error.InvalidCommitteeCount) or asserting committee_count > 0 so callers must handle the misconfiguration explicitly.
| if (committee_count == 0) return 0; | |
| std.debug.assert(committee_count > 0); |
| var start_interval: isize = self.last_interval + 1; | ||
| if (start_interval < 1) start_interval = 1; | ||
| if (start_interval > itime_intervals) start_interval = itime_intervals; | ||
|
|
||
| if (self.validator) |*validator| { | ||
| // we also tick validator per interval in case it would | ||
| // need to sync its future duties when its an independent validator | ||
| var validator_output = validator.onInterval(interval) catch |e| { | ||
| self.logger.err("error ticking validator to time(intervals)={d} err={any}", .{ interval, e }); | ||
| var current_interval: isize = start_interval; | ||
| while (current_interval <= itime_intervals) : (current_interval += 1) { | ||
| const interval: usize = @intCast(current_interval); |
There was a problem hiding this comment.
If itime_intervals is 0, start_interval becomes 0 (due to the start_interval > itime_intervals clamp), and the loop runs with interval == 0. That reintroduces interval-0 processing despite the earlier run() catch-up guard (current_interval > 0). Consider an early return for itime_intervals < 1, or ensure the clamp never drops below 1 (e.g. clamp to [1, itime_intervals] only when itime_intervals >= 1).
| const participants = ctx.allocator.alloc(u64, indices.items.len) catch |err| { | ||
| std.debug.print( | ||
| "fixture {s} case {s}{}: failed to allocate participants ({s})\n", | ||
| .{ fixture_path, case_name, formatStep(step_index), @errorName(err) }, | ||
| ); | ||
| return FixtureError.InvalidFixture; | ||
| }; | ||
| for (indices.items, 0..) |idx, i| { | ||
| participants[i] = @intCast(idx); | ||
| } | ||
| ctx.block_attestations.append(ctx.allocator, .{ | ||
| .participants = participants, | ||
| .attestation_slot = aggregated_attestation.data.slot, | ||
| .target_slot = aggregated_attestation.data.target.slot, | ||
| }) catch |err| { | ||
| std.debug.print( | ||
| "fixture {s} case {s}{}: failed to record block attestation ({s})\n", | ||
| .{ fixture_path, case_name, formatStep(step_index), @errorName(err) }, | ||
| ); | ||
| return FixtureError.InvalidFixture; | ||
| }; |
There was a problem hiding this comment.
If append fails, participants is leaked. Add an errdefer ctx.allocator.free(participants); after allocation (or free in the error path). Also, verifyBlockAttestations sorts expected participants before comparing, but here the stored participants are not sorted—sorting before storing (or sorting during verification for both sides) will avoid order-dependent false mismatches.
pkgs/node/src/forkchoice.zig
Outdated
| if (self.latest_new_aggregated_payloads.count() > 0) { | ||
| var it = self.latest_new_aggregated_payloads.iterator(); | ||
| while (it.next()) |entry| { | ||
| const sig_key = entry.key_ptr.*; | ||
| var list = entry.value_ptr.*; | ||
|
|
||
| const gop = try self.latest_known_aggregated_payloads.getOrPut(sig_key); | ||
| if (!gop.found_existing) { | ||
| gop.value_ptr.* = AggregatedPayloadsList.init(self.allocator); | ||
| } | ||
|
|
||
| try self.attestations.put(validator_id, attestation_tracker); | ||
| for (list.items) |stored| { | ||
| try gop.value_ptr.append(stored); | ||
| } | ||
| list.deinit(); | ||
| } | ||
| self.latest_new_aggregated_payloads.clearAndFree(); | ||
| } |
There was a problem hiding this comment.
This is doing a by-value move of stored items (which include proof ownership) from latest_new_aggregated_payloads to latest_known_aggregated_payloads. If any append fails mid-loop, some proofs will have been copied into latest_known_* while still remaining in latest_new_*, leading to double-ownership and eventual double-free during pruning/deinit. To make this safe, ensure capacity up-front (so appends cannot allocate) and/or remove/move items from the source list as you append (e.g., pop/swap-remove), and only clear the source map after a fully successful transfer.
pkgs/node/src/forkchoice.zig
Outdated
| } | ||
|
|
||
| pub fn aggregateCommitteeSignatures(self: *Self, state_opt: ?*const types.BeamState) ![]types.SignedAggregatedAttestation { | ||
| const state = state_opt orelse return &[_]types.SignedAggregatedAttestation{}; |
There was a problem hiding this comment.
This function sometimes returns a non-allocator-owned slice (&[_]T{}) and sometimes returns an allocator-owned slice (toOwnedSlice()). That makes ownership ambiguous and easy to misuse (callers commonly free() the result). Prefer always returning allocator-owned memory (including len == 0, e.g., allocate a zero-length slice), or change the return type/contracts so callers never free it.
| const state = state_opt orelse return &[_]types.SignedAggregatedAttestation{}; | |
| const state = state_opt orelse { | |
| return try self.allocator.alloc(types.SignedAggregatedAttestation, 0); | |
| }; |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 22 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pkgs/types/src/utils.zig
Outdated
| const sqrt_delta = isqrt(delta); | ||
| if (sqrt_delta * sqrt_delta == delta) { | ||
| return true; | ||
| } | ||
| const delta_x2_x: f32 = @mod(std.math.pow(f32, delta + 0.25, 0.5), 1); | ||
| if (delta_x2_x == 0.5) { | ||
|
|
||
| if (sqrt_delta * (sqrt_delta + 1) == delta) { | ||
| return true; | ||
| } |
There was a problem hiding this comment.
These multiplications can overflow u64 for large slots, causing incorrect justifiability results (and potentially differing behavior between debug/release). Consider rewriting the perfect-square / k(k+1) checks using division-based comparisons (e.g., compare via delta / sqrt_delta and remainder) or using a wide/checked multiply helper so the equality test can’t overflow.
| for (validator_indices.items) |validator_index| { | ||
| const sig_key = SignatureKey{ | ||
| .validator_id = @intCast(validator_index), | ||
| .data_root = data_root, | ||
| }; | ||
| var cloned_proof: types.AggregatedSignatureProof = undefined; | ||
| try types.sszClone(self.allocator, types.AggregatedSignatureProof, signed_aggregation.proof, &cloned_proof); | ||
|
|
||
| // Identify gossip signatures that are at or before the finalized slot | ||
| var gossip_it = self.gossip_signatures.iterator(); | ||
| while (gossip_it.next()) |entry| { | ||
| if (entry.value_ptr.slot <= finalized_slot) { | ||
| try gossip_keys_to_remove.append(entry.key_ptr.*); | ||
| const gop = try self.latest_new_aggregated_payloads.getOrPut(sig_key); | ||
| if (!gop.found_existing) { | ||
| gop.value_ptr.* = AggregatedPayloadsList.init(self.allocator); | ||
| } | ||
| try gop.value_ptr.append(.{ | ||
| .slot = signed_aggregation.data.slot, | ||
| .proof = cloned_proof, | ||
| }); | ||
| } |
There was a problem hiding this comment.
If getOrPut or append fails after cloned_proof is cloned, cloned_proof leaks. Add an errdefer cloned_proof.deinit(); immediately after cloning (and cancel it only once ownership is transferred into the list).
| const gop = try self.latest_new_aggregated_payloads.getOrPut(sig_key); | ||
| if (!gop.found_existing) { | ||
| gop.value_ptr.* = AggregatedPayloadsList.init(self.allocator); | ||
| } | ||
| var cloned_proof: types.AggregatedSignatureProof = undefined; | ||
| try types.sszClone(self.allocator, types.AggregatedSignatureProof, proof, &cloned_proof); | ||
| try gop.value_ptr.append(.{ | ||
| .slot = agg_att.data.slot, | ||
| .proof = cloned_proof, | ||
| }); |
There was a problem hiding this comment.
Same ownership issue here: if append fails after cloning cloned_proof, it leaks. Add errdefer cloned_proof.deinit(); after the clone, and only release it once the list owns the proof.
| const participants = ctx.allocator.alloc(u64, indices.items.len) catch |err| { | ||
| std.debug.print( | ||
| "fixture {s} case {s}{}: failed to allocate participants ({s})\n", | ||
| .{ fixture_path, case_name, formatStep(step_index), @errorName(err) }, | ||
| ); | ||
| return FixtureError.InvalidFixture; | ||
| }; | ||
| for (indices.items, 0..) |idx, i| { | ||
| participants[i] = @intCast(idx); | ||
| } | ||
| ctx.block_attestations.append(ctx.allocator, .{ | ||
| .participants = participants, | ||
| .attestation_slot = aggregated_attestation.data.slot, | ||
| .target_slot = aggregated_attestation.data.target.slot, | ||
| }) catch |err| { | ||
| std.debug.print( | ||
| "fixture {s} case {s}{}: failed to record block attestation ({s})\n", | ||
| .{ fixture_path, case_name, formatStep(step_index), @errorName(err) }, | ||
| ); | ||
| return FixtureError.InvalidFixture; | ||
| }; |
There was a problem hiding this comment.
On append failure, participants is leaked because it is not freed before returning. Add an errdefer ctx.allocator.free(participants); right after allocation (and cancel it once the entry is successfully appended), or free it in the catch block before returning.
pkgs/network/src/ethlibp2p.zig
Outdated
| for (std.enums.values(interface.GossipTopicKind)) |kind| { | ||
| switch (kind) { | ||
| .attestation => { | ||
| const count = self.params.attestation_committee_count; |
There was a problem hiding this comment.
attestation_committee_count is u64, but for (0..count) typically expects a usize range. This can be a compile error (or force undesirable implicit casts depending on Zig version). Cast once to usize with bounds checking (and consider validating that count fits into u32 too since subnet ids are u32).
| const count = self.params.attestation_committee_count; | |
| const count_u64 = self.params.attestation_committee_count; | |
| // Ensure the attestation committee count can be represented as both u32 (for subnet ids) | |
| // and usize (for use as a range bound) before iterating. | |
| if (count_u64 > std.math.maxInt(u32)) { | |
| return error.AttestationCommitteeCountTooLarge; | |
| } | |
| const count = std.math.cast(usize, count_u64) orelse return error.AttestationCommitteeCountTooLarge; |
pkgs/types/src/utils.zig
Outdated
|
|
||
| pub fn computeSubnetId(validator_id: ValidatorIndex, committee_count: u64) u64 { | ||
| if (committee_count == 0) return 0; | ||
| return validator_id % committee_count; |
There was a problem hiding this comment.
The subnet id is later cast to u32 in multiple places (gossip topics use u32), which can trap if committee_count exceeds maxInt(u32) and makes the API easy to misuse. Consider returning u32 here (and taking committee_count: u32), or explicitly validating/capping committee_count and returning an error when it can’t be represented as a u32.
| return validator_id % committee_count; | |
| const max_u32_committee: u64 = std.math.maxInt(u32); | |
| const effective_committee: u64 = if (committee_count > max_u32_committee) | |
| max_u32_committee | |
| else | |
| committee_count; | |
| return validator_id % effective_committee; |
| std.sort.heap(u64, expected_participants, {}, std.sort.asc(u64)); | ||
|
|
||
| const expected_attestation_slot = try expectU64Field(obj, &.{"attestationSlot"}, fixture_path, case_name, step_index, "attestationSlot"); | ||
| const expected_target_slot = try expectU64Field(obj, &.{"targetSlot"}, fixture_path, case_name, step_index, "targetSlot"); | ||
|
|
||
| var found = false; | ||
| for (ctx.block_attestations.items, 0..) |actual, actual_idx| { | ||
| if (matched[actual_idx]) continue; | ||
| if (actual.attestation_slot != expected_attestation_slot) continue; | ||
| if (actual.target_slot != expected_target_slot) continue; | ||
| if (actual.participants.len != expected_participants.len) continue; | ||
|
|
||
| var equal = true; | ||
| for (actual.participants, 0..) |p, p_idx| { | ||
| if (p != expected_participants[p_idx]) { | ||
| equal = false; | ||
| break; | ||
| } | ||
| } | ||
| if (!equal) continue; |
There was a problem hiding this comment.
The expected participant list is sorted, but the actual participant list is compared as-is. If the captured participants ordering differs (even with identical sets), this check will fail. Consider sorting participants when capturing them, or sorting a temporary copy of actual.participants before comparison.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 22 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
pkgs/spectest/src/runner/fork_choice_runner.zig:1
aggregationBitsToValidatorIndicesis computed twice per aggregated attestation inprocessBlockStep(once forblock_attestations, once forstoreAggregatedPayload). Since this is test-runner hot-path code, consider computing indices once and reusing them (e.g., store indices/participants alongside the captured block attestation summary, or restructure to do capture + payload storage in a single pass).
const std = @import("std");
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pkgs/types/src/utils.zig
Outdated
| pub fn computeSubnetId(validator_id: ValidatorIndex, committee_count: u64) u32 { | ||
| if (committee_count == 0) { | ||
| @panic("attestation_committee_count must be greater than 0"); | ||
| } | ||
| if (committee_count > std.math.maxInt(u32)) { | ||
| @panic("attestation_committee_count must fit in u32"); | ||
| } |
There was a problem hiding this comment.
computeSubnetId is part of a public util surface but it hard-panics on invalid input. For a config-driven value like attestation_committee_count, prefer returning an error (e.g., error.InvalidCommitteeCount) so callers can surface a structured configuration error instead of crashing the process.
| pub fn computeSubnetId(validator_id: ValidatorIndex, committee_count: u64) u32 { | |
| if (committee_count == 0) { | |
| @panic("attestation_committee_count must be greater than 0"); | |
| } | |
| if (committee_count > std.math.maxInt(u32)) { | |
| @panic("attestation_committee_count must fit in u32"); | |
| } | |
| pub fn computeSubnetId(validator_id: ValidatorIndex, committee_count: u64) error{InvalidCommitteeCount}!u32 { | |
| if (committee_count == 0 or committee_count > std.math.maxInt(u32)) { | |
| return error.InvalidCommitteeCount; | |
| } |
pkgs/types/src/utils.zig
Outdated
| var obj = json.ObjectMap.init(allocator); | ||
| try obj.put("preset", json.Value{ .string = @tagName(self.preset) }); | ||
| try obj.put("name", json.Value{ .string = self.name }); | ||
| try obj.put("attestation_committee_count", json.Value{ .integer = @as(i64, @intCast(self.attestation_committee_count)) }); |
There was a problem hiding this comment.
@intCast from u64 to i64 will trap at runtime if attestation_committee_count exceeds maxInt(i64). Since toJson is already error-returning, consider using a checked cast (e.g., std.math.cast(i64, ...) orelse return error.ValueOutOfRange) or validating this field earlier when building the config/spec.
| try obj.put("attestation_committee_count", json.Value{ .integer = @as(i64, @intCast(self.attestation_committee_count)) }); | |
| try obj.put("attestation_committee_count", json.Value{ .integer = std.math.cast(i64, self.attestation_committee_count) orelse return error.ValueOutOfRange }); |
pkgs/node/src/forkchoice.zig
Outdated
| if (is_aggregator) { | ||
| const aggregations = try self.aggregateCommitteeSignaturesUnlocked(null); | ||
| defer self.allocator.free(aggregations); | ||
| } |
There was a problem hiding this comment.
This aggregator branch always calls aggregateCommitteeSignaturesUnlocked(null), which (per current implementation) returns an empty slice and still performs an allocation/free on every tick when is_aggregator is true. Either remove this branch, or redesign it to pass the required state so the aggregation work actually happens here (otherwise it is pure overhead).
| if (is_aggregator) { | |
| const aggregations = try self.aggregateCommitteeSignaturesUnlocked(null); | |
| defer self.allocator.free(aggregations); | |
| } | |
| // Interval reserved for aggregation work. No-op until required state | |
| // is available to perform meaningful aggregation here. | |
| _ = is_aggregator; // suppress unused parameter warning |
pkgs/node/src/forkchoice.zig
Outdated
| if (self.latest_new_aggregated_payloads.count() > 0) { | ||
| // Keep payload migration synchronized with other signature/payload map writers. | ||
| self.signatures_mutex.lock(); | ||
| defer self.signatures_mutex.unlock(); | ||
|
|
||
| var it = self.latest_new_aggregated_payloads.iterator(); | ||
| while (it.next()) |entry| { | ||
| const sig_key = entry.key_ptr.*; | ||
| const source_list = entry.value_ptr; | ||
|
|
||
| const gop = try self.latest_known_aggregated_payloads.getOrPut(sig_key); | ||
| if (!gop.found_existing) { | ||
| gop.value_ptr.* = AggregatedPayloadsList.init(self.allocator); | ||
| } | ||
|
|
||
| try self.attestations.put(validator_id, attestation_tracker); | ||
| } | ||
| // Ensure all required capacity up-front so the move is non-failing. | ||
| try gop.value_ptr.ensureUnusedCapacity(source_list.items.len); | ||
| for (source_list.items) |stored| { | ||
| gop.value_ptr.appendAssumeCapacity(stored); | ||
| } | ||
|
|
||
| // Source list buffer no longer needed after ownership transfer. | ||
| source_list.deinit(); | ||
| source_list.* = AggregatedPayloadsList.init(self.allocator); | ||
| } | ||
| self.latest_new_aggregated_payloads.clearAndFree(); | ||
| } |
There was a problem hiding this comment.
This “move items” loop is not failure-safe: if any try fails after some entries have already had items appended into latest_known_aggregated_payloads, the corresponding latest_new_aggregated_payloads entries still retain their items (because they’re only cleared after the append loop completes). That can lead to duplicated ownership of stored.proof and eventual double-free during deinit/pruning. A safer approach is to migrate by removing entries from the source map as you go (e.g., fetchRemove-based transfer), or by building a new destination map/list and swapping only after the full migration succeeds.
pkgs/node/src/validator_client.zig
Outdated
| for (aggregations) |aggregation| { | ||
| try result.addAggregation(aggregation); |
There was a problem hiding this comment.
aggregateCommitteeSignatures() returns SignedAggregatedAttestation values that own heap memory (proof). This loop copies them into result, then frees the slice storage. If result.addAggregation fails part-way (e.g., OOM while appending), the remaining (not-transferred) aggregation items will leak because neither their proof.deinit() nor any cleanup runs before freeing the slice. Consider making transfer semantics explicit and failure-safe (e.g., clone into result and always deinit originals in a defer, or deinit the “unmoved tail” on error).
| for (aggregations) |aggregation| { | |
| try result.addAggregation(aggregation); | |
| var i: usize = 0; | |
| while (i < aggregations.len) : (i += 1) { | |
| result.addAggregation(aggregations[i]) catch |err| { | |
| // Clean up the unmoved tail on error to avoid leaking `proof` resources. | |
| var j = i; | |
| while (j < aggregations.len) : (j += 1) { | |
| aggregations[j].proof.deinit(); | |
| } | |
| return err; | |
| }; |
pkgs/types/src/utils.zig
Outdated
| const delta_x2_x: f32 = @mod(std.math.pow(f32, delta + 0.25, 0.5), 1); | ||
| if (delta_x2_x == 0.5) { | ||
|
|
||
| if ((delta % sqrt_delta == 0) and (delta / sqrt_delta == (sqrt_delta + 1))) { |
There was a problem hiding this comment.
@anshalshukla @ch4r10t33r @chetanyb kindly double/triple check
There was a problem hiding this comment.
this is a fix of review comments by copilot which may be overflow
There was a problem hiding this comment.
I think we should avoid this and instead use Newton's method or Bitwise Sqrt instead.
I'm not particularly comfortable around the u64::MAX boundary for @sqrt(@as(f64, @floatFromInt(n), I think we can use
fn isqrt(n: u64) u64 {
if (n == 0) return 0;
// Initial guess: 1 << ceil(bit_length(n)/2)
const bl = 64 - @clz(n); var x: u64 = @as(u64, 1) << ((bl + 1) / 2);
// Newton iteration: x_{k+1} = floor((x + n/x)/2)
while (true) {
const y = (x + n / x) >> 1;
if (y >= x) return x;
x = y;
}
}
pkgs/types/src/utils.zig
Outdated
| if (committee_count == 0 or committee_count > std.math.maxInt(u32)) { | ||
| return error.InvalidCommitteeCount; | ||
| } | ||
| return @intCast(validator_id % committee_count); |
There was a problem hiding this comment.
double check this @ch4r10t33r just for second pair of eyes
There was a problem hiding this comment.
this is a fix of review comments by copilot which return error instead of panic
| }; | ||
|
|
||
| pub const SignedAggregatedAttestation = struct { | ||
| data: AttestationData, |
There was a problem hiding this comment.
the proof already include that
pkgs/node/src/validator_client.zig
Outdated
| }, | ||
| } | ||
|
|
||
| const aggregations = self.chain.aggregateCommitteeSignatures() catch |err| { |
There was a problem hiding this comment.
ahh ok so a "node" is chosen as an aggregator, and this needs to work as peer-das custody to make sure their are enough aggregators, so based on validator ids, one would need to assign the subnet
There was a problem hiding this comment.
in that case do we even need this bit here? could just be handled purely at node oninterval
also question to be asked for spec:
how much redundancy is to be modeled for nodes aggregating on the same subnet cc @anshalshukla
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 21 out of 21 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (1)
pkgs/node/src/forkchoice.zig:824
is_aggregatoris currently unused (discarded with_ = is_aggregator;). If fork choice no longer needs this parameter, consider removing it fromtickIntervalUnlocked/onInterval(and their callers). If it’s intentionally reserved for future behavior, add a short TODO explaining what interval-2 behavior is expected to become so it’s not mistaken for dead code.
fn tickIntervalUnlocked(self: *Self, hasProposal: bool, is_aggregator: bool) !void {
self.fcStore.time += 1;
const currentInterval = self.fcStore.time % constants.INTERVALS_PER_SLOT;
switch (currentInterval) {
0 => {
self.fcStore.timeSlots += 1;
// Accept new aggregated payloads only if a proposal exists for this slot.
if (hasProposal) {
_ = try self.acceptNewAttestationsUnlocked();
}
},
1 => {},
2 => {
// Aggregation execution is driven by validator flow with an explicit state.
_ = is_aggregator;
},
3 => {
_ = try self.updateSafeTargetUnlocked();
},
4 => {
_ = try self.acceptNewAttestationsUnlocked();
},
else => @panic("invalid interval"),
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for (indices.items) |validator_index| { | ||
| var proof_clone: types.AggregatedSignatureProof = undefined; | ||
| types.sszClone(ctx.allocator, types.AggregatedSignatureProof, proof_template, &proof_clone) catch |err| { | ||
| std.debug.print( | ||
| "fixture {s} case {s}{}: failed to clone proof ({s})\n", | ||
| .{ fixture_path, case_name, formatStep(step_index), @errorName(err) }, | ||
| ); | ||
| return FixtureError.InvalidFixture; | ||
| }; | ||
| errdefer proof_clone.deinit(); |
There was a problem hiding this comment.
proof_template is manually deinited, but also has an errdefer proof_template.deinit() that remains active until the surrounding scope ends; if any later error is returned from processBlockStep after line 792, this can double-free. Fix by scoping proof_template to a nested block and using defer proof_template.deinit() (and remove the manual deinit), or otherwise ensure the errdefer cannot run after the manual deinit.
pkgs/node/src/validator_client.zig
Outdated
| var i: usize = 0; | ||
| while (i < aggregations.len) : (i += 1) { | ||
| result.addAggregation(aggregations[i]) catch |err| { | ||
| // On transfer failure, deinit the unmoved tail to avoid leaking proofs. | ||
| var j: usize = i; | ||
| while (j < aggregations.len) : (j += 1) { | ||
| aggregations[j].deinit(); | ||
| } | ||
| return err; | ||
| }; | ||
| } |
There was a problem hiding this comment.
SignedAggregatedAttestation contains heap-owned state (proof.deinit() exists), but the code appends it by value, creating shallow copies of the proof buffers. This makes ownership non-obvious and fragile (future changes could accidentally deinit both copies, or keep one copy past the other’s deinit). Consider changing the API to make ownership transfer explicit (e.g., append by pointer/move-like pattern, or provide a dedicated clone/steal helper) so there is always exactly one clear owner responsible for deinit().
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 22 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| var obj = json.ObjectMap.init(allocator); | ||
| try obj.put("preset", json.Value{ .string = @tagName(self.preset) }); | ||
| try obj.put("name", json.Value{ .string = self.name }); | ||
| try obj.put("attestation_committee_count", json.Value{ .integer = self.attestation_committee_count }); |
There was a problem hiding this comment.
std.json.Value.integer is an i64; assigning a u32 (SubnetId) is a signed/unsigned mismatch in Zig and is likely to fail compilation. Cast explicitly to the expected integer type (e.g., via @intCast to i64 / @as(i64, ...)) before assigning.
| try obj.put("attestation_committee_count", json.Value{ .integer = self.attestation_committee_count }); | |
| try obj.put("attestation_committee_count", json.Value{ .integer = @as(i64, @intCast(self.attestation_committee_count)) }); |
| const pending_missing_roots = self.chain.processPendingBlocks(); | ||
| defer self.allocator.free(pending_missing_roots); | ||
| if (pending_missing_roots.len > 0) { | ||
| self.fetchBlockByRoots(pending_missing_roots, 0) catch |err| { | ||
| self.logger.warn( | ||
| "failed to fetch {d} missing block(s) from pending blocks: {any}", | ||
| .{ pending_missing_roots.len, err }, | ||
| ); | ||
| }; | ||
| } |
There was a problem hiding this comment.
The defer self.allocator.free(pending_missing_roots); is inside a potentially long-running loop, so each iteration stacks a new defer and nothing is freed until tick() returns. This can cause unbounded memory growth. Free pending_missing_roots at the end of each iteration (e.g., using a scoped block defer, or an explicit free after fetchBlockByRoots).
| switch (gossip_msg.*) { | ||
| .aggregation => |*signed_aggregation| signed_aggregation.deinit(), | ||
| else => {}, | ||
| } |
There was a problem hiding this comment.
Only .aggregation entries are deinitialized here. If networks.GossipMessage.block owns heap memory (common for block bodies), those allocations will leak. Since GossipMessage now has a deinit() helper, calling gossip_msg.deinit() for all variants (or handling .block explicitly) would ensure correct cleanup.
| switch (gossip_msg.*) { | |
| .aggregation => |*signed_aggregation| signed_aggregation.deinit(), | |
| else => {}, | |
| } | |
| gossip_msg.deinit(); |
pkgs/node/src/forkchoice.zig
Outdated
| const participants_len = stored.proof.participants.len(); | ||
| for (0..participants_len) |validator_index| { | ||
| if (!(try stored.proof.participants.get(validator_index))) continue; | ||
| const validator_id: types.ValidatorIndex = @intCast(validator_index); | ||
| if (attestations.get(validator_id)) |existing| { | ||
| if (existing.slot >= attestation_data.slot) { | ||
| continue; | ||
| } | ||
| } | ||
| try attestations.put(validator_id, attestation_data); | ||
| } |
There was a problem hiding this comment.
This scans the entire bitfield length for every stored proof, resulting in O(N_bits) work even when only a few participants are set. To avoid a potential hot-path blowup at larger committee sizes, prefer iterating only set bits (e.g., via an indices extraction helper similar to types.aggregationBitsToValidatorIndices) and updating attestations from that list.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 22 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .validator_id = validator_id, | ||
| }; | ||
| try included_attestations.append(self.allocator, attestation); | ||
| } |
There was a problem hiding this comment.
std.AutoHashMap.iterator() does not provide a deterministic iteration order, so the returned attestation list order may vary between runs. Since SSZ list ordering affects block roots, this can introduce non-deterministic block contents; sort included_attestations (e.g., by validator_id, then data.slot) before returning, or iterate validators in a stable order.
| } | |
| } | |
| inline fn lessThan( | |
| _: void, | |
| a: types.Attestation, | |
| b: types.Attestation, | |
| ) bool { | |
| if (a.validator_id == b.validator_id) { | |
| return a.data.slot < b.data.slot; | |
| } | |
| return a.validator_id < b.validator_id; | |
| } | |
| std.sort.pdqSort(types.Attestation, lessThan, {}, included_attestations.items); |
Abstract out the repetitive SSZ deserialization + error handling + debug file creation pattern in handleMsgFromRustBridge into a reusable comptime-generic helper function. This eliminates ~30 lines of duplicated boilerplate across the block, attestation, and aggregation message handlers, making it trivial to add new message types in the future. Ref: #552 (comment) Co-authored-by: openclaw-bot <bot@openclaw.ai>
Move the interval_in_slot == 2 check from chain.maybeAggregateCommitteeSignaturesOnInterval into node.zig, so the chain method no longer needs to know which interval it should run on. Signed-off-by: Chen Kai <281165273grape@gmail.com>
g11tech
left a comment
There was a problem hiding this comment.
merging since mostly looks good, improvements to be done in followups
great work @GrapeBaBa and thanks @anshalshukla
* feat: implement committee aggregation spec Signed-off-by: grapebaba <grapebaba@grapebabadeMacBook-Pro.local> * revert mock network standalone gossip scheduler * fix spectest and attestation aggregation regressions * fix flaky forkchoice test logger lifetime and map synchronization * fix aggregation ownership and subnet validation * fix subnet id error handling and aggregation cleanup * refactor: restore justifiable slot check style * Fix segfault on early error: initialize NodeOptions slices to empty * Revert "Fix segfault on early error: initialize NodeOptions slices to empty" This reverts commit 941607d. * Align subnet typing and aggregator config handling * Fix review follow-ups for spectest and aggregation ownership * Apply zig fmt for spectest runner * fix: fix review comments Signed-off-by: grapebaba <grapebaba@grapebabadeMacBook-Pro.local> * fix: 0.15.2 changes * fix: mem leaks * fix: minor defensive changes * fix: fix review comments Signed-off-by: Chen Kai <281165273grape@gmail.com> * fix: fix spec test Signed-off-by: Chen Kai <281165273grape@gmail.com> * fix: fix review comments Signed-off-by: Chen Kai <281165273grape@gmail.com> * fix: fix the review comments Signed-off-by: Chen Kai <281165273grape@gmail.com> * fix: fix comments Signed-off-by: Chen Kai <281165273grape@gmail.com> * cleanup * fix: fix review comments Signed-off-by: Chen Kai <281165273grape@gmail.com> * node: apply block attestations to tracker * refactor: extract generic deserializeGossipMessage helper (#605) Abstract out the repetitive SSZ deserialization + error handling + debug file creation pattern in handleMsgFromRustBridge into a reusable comptime-generic helper function. This eliminates ~30 lines of duplicated boilerplate across the block, attestation, and aggregation message handlers, making it trivial to add new message types in the future. Ref: #552 (comment) Co-authored-by: openclaw-bot <bot@openclaw.ai> * node: move aggregation interval guard to caller Move the interval_in_slot == 2 check from chain.maybeAggregateCommitteeSignaturesOnInterval into node.zig, so the chain method no longer needs to know which interval it should run on. Signed-off-by: Chen Kai <281165273grape@gmail.com> * rm: extractAttestationsFromAggregatedPayloads --------- Signed-off-by: grapebaba <grapebaba@grapebabadeMacBook-Pro.local> Signed-off-by: Chen Kai <281165273grape@gmail.com> Co-authored-by: grapebaba <grapebaba@grapebabadeMacBook-Pro.local> Co-authored-by: ch4r10t33r <psramanuj@gmail.com> Co-authored-by: anshalshukla <shukla.anshal85@gmail.com> Co-authored-by: Anshal Shukla <53994948+anshalshukla@users.noreply.github.com> Co-authored-by: Parthasarathy Ramanujam <1627026+ch4r10t33r@users.noreply.github.com> Co-authored-by: harkamal <gajinder@zeam.in> Co-authored-by: zclawz <claw@zeam.in> Co-authored-by: openclaw-bot <bot@openclaw.ai>
No description provided.