Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 81 additions & 63 deletions app/eth2wrap/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,49 +375,55 @@ func (c *DutiesCache) ProposerDutiesCache(ctx context.Context, epoch eth2p0.Epoc
allActive := c.activeValIdxs.valIdxs
c.activeValIdxs.RUnlock()

requestVidxs := vidxs
// Clone so requestVidxs is an independent working copy; it is narrowed to missing indices below
// and must never alias either the caller's slice or the shared activeValIdxs slice.
requestVidxs := slices.Clone(vidxs)
if len(requestVidxs) == 0 {
requestVidxs = allActive
requestVidxs = slices.Clone(allActive)
}

dutiesForEpoch, ok := c.fetchProposerDuties(epoch)
dutiesResult := make([]*eth2v1.ProposerDuty, 0, len(vidxs))

if ok {
// If the request was for all validators and also all duties are already cached, skip more expensive operations.
// This is the common case for most validator clients and Charon, which usually request duties for all active validators.
if len(allActive) == len(requestVidxs) && len(allActive) == len(dutiesForEpoch.requestedIdxs) {
for _, d := range dutiesForEpoch.duties {
dutiesResult = append(dutiesResult, &d)
}
// previouslyRequested is the set of indices already queried from the beacon for this epoch.
// A validator with no duty for the epoch is absent from dutiesForEpoch.duties but present
// in dutiesForEpoch.requestedIdxs, so this set (not the duties list) determines cache hits.
previouslyRequested := make(map[eth2p0.ValidatorIndex]struct{}, len(dutiesForEpoch.requestedIdxs))
for _, idx := range dutiesForEpoch.requestedIdxs {
previouslyRequested[idx] = struct{}{}
}

cacheUsed = true
requestedSet := make(map[eth2p0.ValidatorIndex]struct{}, len(requestVidxs))

return ProposerDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil
var missing []eth2p0.ValidatorIndex

for _, idx := range requestVidxs {
requestedSet[idx] = struct{}{}

if _, hit := previouslyRequested[idx]; !hit {
missing = append(missing, idx)
}
}

// Filter out the found duties.
for _, d := range dutiesForEpoch.duties {
if slices.Contains(requestVidxs, d.ValidatorIndex) {
if _, hit := requestedSet[d.ValidatorIndex]; hit {
dutiesResult = append(dutiesResult, &d)
}
}
Comment thread
KaloyanTanev marked this conversation as resolved.

if len(dutiesResult) > 0 {
// Fast path: every requested index has been queried previously, so the cache answer is complete.
if len(missing) == 0 {
cacheUsed = true
}

// Check if all requested duties were found in the cache (= being a subset of it).
if len(dutiesResult) == len(requestVidxs) {
return ProposerDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil
}

for _, duty := range dutiesForEpoch.duties {
requestVidxs = slices.DeleteFunc(requestVidxs, func(requestVidx eth2p0.ValidatorIndex) bool {
return requestVidx == duty.ValidatorIndex
})
if len(dutiesResult) > 0 {
cacheUsed = true
}

requestVidxs = missing

log.Debug(ctx, "Cached proposer duties do not contain all requested validator indices, fetching from beacon node...", z.Any("missing_validator_indices", requestVidxs), z.Any("requested_validator_indices", vidxs))
}

Expand Down Expand Up @@ -464,49 +470,55 @@ func (c *DutiesCache) AttesterDutiesCache(ctx context.Context, epoch eth2p0.Epoc
allActive := c.activeValIdxs.valIdxs
c.activeValIdxs.RUnlock()

requestVidxs := vidxs
// Clone so requestVidxs is an independent working copy; it is narrowed to missing indices below
// and must never alias either the caller's slice or the shared activeValIdxs slice.
requestVidxs := slices.Clone(vidxs)
if len(requestVidxs) == 0 {
requestVidxs = allActive
requestVidxs = slices.Clone(allActive)
}

dutiesForEpoch, ok := c.fetchAttesterDuties(epoch)
dutiesResult := make([]*eth2v1.AttesterDuty, 0, len(vidxs))

if ok {
// If the request was for all validators and also all duties are already cached, this is done to skip more expensive operations.
// This is the common case for most validator clients and Charon, which usually request duties for all active validators.
if len(allActive) == len(requestVidxs) && len(allActive) == len(dutiesForEpoch.requestedIdxs) {
for _, d := range dutiesForEpoch.duties {
dutiesResult = append(dutiesResult, &d)
}
// previouslyRequested is the set of indices already queried from the beacon for this epoch.
// A validator with no duty for the epoch is absent from dutiesForEpoch.duties but present
// in dutiesForEpoch.requestedIdxs, so this set (not the duties list) determines cache hits.
previouslyRequested := make(map[eth2p0.ValidatorIndex]struct{}, len(dutiesForEpoch.requestedIdxs))
for _, idx := range dutiesForEpoch.requestedIdxs {
previouslyRequested[idx] = struct{}{}
}

cacheUsed = true
requestedSet := make(map[eth2p0.ValidatorIndex]struct{}, len(requestVidxs))

return AttesterDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil
var missing []eth2p0.ValidatorIndex

for _, idx := range requestVidxs {
requestedSet[idx] = struct{}{}

if _, hit := previouslyRequested[idx]; !hit {
missing = append(missing, idx)
}
}

// Filter out the found duties.
for _, d := range dutiesForEpoch.duties {
if slices.Contains(requestVidxs, d.ValidatorIndex) {
if _, hit := requestedSet[d.ValidatorIndex]; hit {
dutiesResult = append(dutiesResult, &d)
}
}
Comment thread
KaloyanTanev marked this conversation as resolved.

if len(dutiesResult) > 0 {
// Fast path: every requested index has been queried previously, so the cache answer is complete.
if len(missing) == 0 {
cacheUsed = true
}

// Check if all requested duties were found in the cache (= being a subset of it).
if len(dutiesResult) == len(requestVidxs) {
return AttesterDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil
}

for _, duty := range dutiesForEpoch.duties {
requestVidxs = slices.DeleteFunc(requestVidxs, func(requestVidx eth2p0.ValidatorIndex) bool {
return requestVidx == duty.ValidatorIndex
})
if len(dutiesResult) > 0 {
cacheUsed = true
}

requestVidxs = missing

log.Debug(ctx, "Cached attester duties do not contain all requested validator indices, fetching from beacon node...", z.Any("missing_validator_indices", requestVidxs), z.Any("requested_validator_indices", vidxs))
}

Expand Down Expand Up @@ -553,49 +565,55 @@ func (c *DutiesCache) SyncCommDutiesCache(ctx context.Context, epoch eth2p0.Epoc
allActive := c.activeValIdxs.valIdxs
c.activeValIdxs.RUnlock()

requestVidxs := vidxs
// Clone so requestVidxs is an independent working copy; it is narrowed to missing indices below
// and must never alias either the caller's slice or the shared activeValIdxs slice.
requestVidxs := slices.Clone(vidxs)
if len(requestVidxs) == 0 {
requestVidxs = allActive
requestVidxs = slices.Clone(allActive)
}

dutiesForEpoch, ok := c.fetchSyncDuties(epoch)
dutiesResult := make([]*eth2v1.SyncCommitteeDuty, 0, len(vidxs))

if ok {
// If the request was for all validators and also all duties are already cached, skip more expensive operations.
// This is the common case for most validator clients and Charon, which usually request duties for all active validators.
if len(allActive) == len(requestVidxs) && len(allActive) == len(dutiesForEpoch.requestedIdxs) {
for _, d := range dutiesForEpoch.duties {
dutiesResult = append(dutiesResult, &d)
}
// previouslyRequested is the set of indices already queried from the beacon for this epoch.
// A validator with no duty for the epoch is absent from dutiesForEpoch.duties but present
// in dutiesForEpoch.requestedIdxs, so this set (not the duties list) determines cache hits.
previouslyRequested := make(map[eth2p0.ValidatorIndex]struct{}, len(dutiesForEpoch.requestedIdxs))
for _, idx := range dutiesForEpoch.requestedIdxs {
previouslyRequested[idx] = struct{}{}
}

cacheUsed = true
requestedSet := make(map[eth2p0.ValidatorIndex]struct{}, len(requestVidxs))

return SyncDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil
var missing []eth2p0.ValidatorIndex

for _, idx := range requestVidxs {
requestedSet[idx] = struct{}{}

if _, hit := previouslyRequested[idx]; !hit {
missing = append(missing, idx)
}
}

// Filter out the found duties.
for _, d := range dutiesForEpoch.duties {
if slices.Contains(requestVidxs, d.ValidatorIndex) {
if _, hit := requestedSet[d.ValidatorIndex]; hit {
dutiesResult = append(dutiesResult, &d)
}
}
Comment thread
KaloyanTanev marked this conversation as resolved.

if len(dutiesResult) > 0 {
// Fast path: every requested index has been queried previously, so the cache answer is complete.
if len(missing) == 0 {
cacheUsed = true
}

// Check if all requested duties were found in the cache (= being a subset of it).
if len(dutiesResult) == len(requestVidxs) {
return SyncDutyWithMeta{Duties: dutiesResult, Metadata: dutiesForEpoch.metadata}, nil
}

for _, duty := range dutiesForEpoch.duties {
requestVidxs = slices.DeleteFunc(requestVidxs, func(requestVidx eth2p0.ValidatorIndex) bool {
return requestVidx == duty.ValidatorIndex
})
if len(dutiesResult) > 0 {
cacheUsed = true
}

requestVidxs = missing

log.Debug(ctx, "Cached sync duties do not contain all requested validator indices, fetching from beacon node...", z.Any("missing_validator_indices", requestVidxs), z.Any("requested_validator_indices", vidxs))
}

Expand Down
Loading
Loading