diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index a85348a071..46cb9123f6 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -178,13 +178,16 @@ func (m *Manager) Start(startCtx context.Context) error { func (m *Manager) Stop(ctx context.Context) error { m.cancel() - for i := 0; i < 2; i++ { - select { - case <-m.headerSubDone: - case <-m.disconnectedPeersDone: - case <-ctx.Done(): - return ctx.Err() - } + select { + case <-m.headerSubDone: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case <-m.disconnectedPeersDone: + case <-ctx.Done(): + return ctx.Err() } return nil @@ -206,8 +209,6 @@ func (m *Manager) Peer( if m.removeIfUnreachable(p, peerID) { return m.Peer(ctx, datahash) } - log.Debugw("get peer from shrexsub pool", - "peer", peerID.String(), "hash", datahash.String()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) } @@ -225,8 +226,6 @@ func (m *Manager) Peer( if m.removeIfUnreachable(p, peerID) { return m.Peer(ctx, datahash) } - log.Debugw("got peer from shrexSub pool after wait", - "after (s)", time.Since(start), "peer", peerID.String(), "hash", datahash.String()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) @@ -279,9 +278,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS // subscribeHeader takes datahash from received header and validates corresponding peer pool. func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) { - defer func() { - m.headerSubDone <- struct{}{} - }() + defer close(m.headerSubDone) defer headerSub.Cancel() for { @@ -305,9 +302,7 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri // subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected // peers from full nodes pool func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) { - defer func() { - m.disconnectedPeersDone <- struct{}{} - }() + defer close(m.disconnectedPeersDone) defer sub.Close() for { select { @@ -315,7 +310,7 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs return case e, ok := <-sub.Out(): if !ok { - log.Error("Subscription for connectedness events is closed.") + log.Fatal("Subscription for connectedness events is closed.") //nolint:gocritic return } // listen to disconnect event to remove peer from full nodes pool