From fefc0bd8ec3f838a8d3fc9a99df53ed37eb06420 Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Wed, 12 Feb 2020 18:33:15 -0800 Subject: [PATCH] Secure lock when accessing the map only (#4849) * Secure lock when accessing the map * wrong lock * Remove some deadlocks --- beacon-chain/sync/BUILD.bazel | 1 + .../sync/pending_attestations_queue.go | 21 ++++++++++++++++--- beacon-chain/sync/validate_aggregate_proof.go | 4 ---- ...date_committee_index_beacon_attestation.go | 2 -- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 475aca2f174..4c05e82d365 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -101,6 +101,7 @@ go_test( "validate_voluntary_exit_test.go", ], embed = [":go_default_library"], + shard_count = 4, deps = [ "//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/core/feed:go_default_library", diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go index 450c149e13e..2a24800d7f9 100644 --- a/beacon-chain/sync/pending_attestations_queue.go +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -39,8 +39,6 @@ func (s *Service) processPendingAttsQueue() { // 2. Check if pending attestations can be processed when the block has arrived. // 3. Request block from a random peer if unable to proceed step 2. func (s *Service) processPendingAtts(ctx context.Context) error { - s.pendingAttsLock.Lock() - defer s.pendingAttsLock.Unlock() ctx, span := trace.StartSpan(ctx, "processPendingAtts") defer span.End() @@ -51,7 +49,17 @@ func (s *Service) processPendingAtts(ctx context.Context) error { // be deleted from the queue if invalid (ie. getting staled from falling too many slots behind). s.validatePendingAtts(ctx, s.chain.CurrentSlot()) - for bRoot, attestations := range s.blkRootToPendingAtts { + roots := make([][32]byte, 0, len(s.blkRootToPendingAtts)) + s.pendingAttsLock.RLock() + for br := range s.blkRootToPendingAtts { + roots = append(roots, br) + } + s.pendingAttsLock.RUnlock() + + for _, bRoot := range roots { + s.pendingAttsLock.RLock() + attestations := s.blkRootToPendingAtts[bRoot] + s.pendingAttsLock.RUnlock() // Has the pending attestation's missing block arrived yet? if s.db.HasBlock(ctx, bRoot) { numberOfBlocksRecoveredFromAtt.Inc() @@ -95,7 +103,9 @@ func (s *Service) processPendingAtts(ctx context.Context) error { }).Info("Verified and saved pending attestations to pool") // Delete the missing block root key from pending attestation queue so a node will not request for the block again. + s.pendingAttsLock.Lock() delete(s.blkRootToPendingAtts, bRoot) + s.pendingAttsLock.Unlock() } else { // Pending attestation's missing block has not arrived yet. log.WithFields(logrus.Fields{ @@ -132,6 +142,8 @@ func (s *Service) processPendingAtts(ctx context.Context) error { func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) { root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot) + s.pendingAttsLock.Lock() + defer s.pendingAttsLock.Unlock() _, ok := s.blkRootToPendingAtts[root] if !ok { s.blkRootToPendingAtts[root] = []*ethpb.AggregateAttestationAndProof{att} @@ -149,6 +161,9 @@ func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) { ctx, span := trace.StartSpan(ctx, "validatePendingAtts") defer span.End() + s.pendingAttsLock.Lock() + defer s.pendingAttsLock.Unlock() + for bRoot, atts := range s.blkRootToPendingAtts { for i := len(atts) - 1; i >= 0; i-- { if slot >= atts[i].Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch { diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 3b3965a746e..f569b4f39e1 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -59,13 +59,9 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms if seen { return false } - r.pendingAttsLock.Lock() if !r.validateBlockInAttestation(ctx, m) { - r.pendingAttsLock.Unlock() return false } - // we dont defer here as the rest of the validation is expensive - r.pendingAttsLock.Unlock() if !r.validateAggregatedAtt(ctx, m) { return false diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation.go b/beacon-chain/sync/validate_committee_index_beacon_attestation.go index 55d44a6b79b..ecd98426b8e 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation.go @@ -76,9 +76,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p // Verify the block being voted is in DB. The block should have passed validation if it's in the DB. if !s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) { // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. - s.pendingAttsLock.Lock() s.savePendingAtt(ð.AggregateAttestationAndProof{Aggregate: att}) - s.pendingAttsLock.Unlock() return false }