Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed May 19, 2023
1 parent a4e998a commit a55bf20
Showing 1 changed file with 13 additions and 18 deletions.
31 changes: 13 additions & 18 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,16 @@ func (m *Manager) Start(startCtx context.Context) error {
func (m *Manager) Stop(ctx context.Context) error {
m.cancel()

for i := 0; i < 2; i++ {
select {
case <-m.headerSubDone:
case <-m.disconnectedPeersDone:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-m.headerSubDone:
case <-ctx.Done():
return ctx.Err()
}

select {
case <-m.disconnectedPeersDone:
case <-ctx.Done():
return ctx.Err()
}

return nil
Expand All @@ -206,8 +209,6 @@ func (m *Manager) Peer(
if m.removeIfUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
}
log.Debugw("get peer from shrexsub pool",
"peer", peerID.String(), "hash", datahash.String())
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0)
}

Expand All @@ -225,8 +226,6 @@ func (m *Manager) Peer(
if m.removeIfUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
}
log.Debugw("got peer from shrexSub pool after wait",
"after (s)", time.Since(start), "peer", peerID.String(), "hash", datahash.String())
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
case peerID = <-m.fullNodes.next(ctx):
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start))
Expand Down Expand Up @@ -279,9 +278,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS

// subscribeHeader takes datahash from received header and validates corresponding peer pool.
func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) {
defer func() {
m.headerSubDone <- struct{}{}
}()
defer close(m.headerSubDone)
defer headerSub.Cancel()

for {
Expand All @@ -305,17 +302,15 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
// subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected
// peers from full nodes pool
func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) {
defer func() {
m.disconnectedPeersDone <- struct{}{}
}()
defer close(m.disconnectedPeersDone)
defer sub.Close()
for {
select {
case <-ctx.Done():
return
case e, ok := <-sub.Out():
if !ok {
log.Error("Subscription for connectedness events is closed.")
log.Fatal("Subscription for connectedness events is closed.") //nolint:gocritic
return
}
// listen to disconnect event to remove peer from full nodes pool
Expand Down

0 comments on commit a55bf20

Please sign in to comment.