Skip to content

Commit

Permalink
Secure lock when accessing the map only (prysmaticlabs#4849)
Browse files Browse the repository at this point in the history
* Secure lock when accessing the map

* wrong lock

* Remove some deadlocks
  • Loading branch information
prestonvanloon authored and cryptomental committed Feb 24, 2020
1 parent caf9921 commit 0f536e3
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 9 deletions.
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Expand Up @@ -101,6 +101,7 @@ go_test(
"validate_voluntary_exit_test.go",
],
embed = [":go_default_library"],
shard_count = 4,
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/feed:go_default_library",
Expand Down
21 changes: 18 additions & 3 deletions beacon-chain/sync/pending_attestations_queue.go
Expand Up @@ -39,8 +39,6 @@ func (s *Service) processPendingAttsQueue() {
// 2. Check if pending attestations can be processed when the block has arrived.
// 3. Request block from a random peer if unable to proceed step 2.
func (s *Service) processPendingAtts(ctx context.Context) error {
s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
defer span.End()

Expand All @@ -51,7 +49,17 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
s.validatePendingAtts(ctx, s.chain.CurrentSlot())

for bRoot, attestations := range s.blkRootToPendingAtts {
roots := make([][32]byte, 0, len(s.blkRootToPendingAtts))
s.pendingAttsLock.RLock()
for br := range s.blkRootToPendingAtts {
roots = append(roots, br)
}
s.pendingAttsLock.RUnlock()

for _, bRoot := range roots {
s.pendingAttsLock.RLock()
attestations := s.blkRootToPendingAtts[bRoot]
s.pendingAttsLock.RUnlock()
// Has the pending attestation's missing block arrived yet?
if s.db.HasBlock(ctx, bRoot) {
numberOfBlocksRecoveredFromAtt.Inc()
Expand Down Expand Up @@ -95,7 +103,9 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
}).Info("Verified and saved pending attestations to pool")

// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
s.pendingAttsLock.Lock()
delete(s.blkRootToPendingAtts, bRoot)
s.pendingAttsLock.Unlock()
} else {
// Pending attestation's missing block has not arrived yet.
log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -132,6 +142,8 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) {
root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot)

s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()
_, ok := s.blkRootToPendingAtts[root]
if !ok {
s.blkRootToPendingAtts[root] = []*ethpb.AggregateAttestationAndProof{att}
Expand All @@ -149,6 +161,9 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) {
ctx, span := trace.StartSpan(ctx, "validatePendingAtts")
defer span.End()

s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()

for bRoot, atts := range s.blkRootToPendingAtts {
for i := len(atts) - 1; i >= 0; i-- {
if slot >= atts[i].Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch {
Expand Down
4 changes: 0 additions & 4 deletions beacon-chain/sync/validate_aggregate_proof.go
Expand Up @@ -59,13 +59,9 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
if seen {
return false
}
r.pendingAttsLock.Lock()
if !r.validateBlockInAttestation(ctx, m) {
r.pendingAttsLock.Unlock()
return false
}
// we dont defer here as the rest of the validation is expensive
r.pendingAttsLock.Unlock()

if !r.validateAggregatedAtt(ctx, m) {
return false
Expand Down
Expand Up @@ -76,9 +76,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p
// Verify the block being voted is in DB. The block should have passed validation if it's in the DB.
if !s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) {
// A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue.
s.pendingAttsLock.Lock()
s.savePendingAtt(&eth.AggregateAttestationAndProof{Aggregate: att})
s.pendingAttsLock.Unlock()
return false
}

Expand Down

0 comments on commit 0f536e3

Please sign in to comment.