Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#317 from ipfs/fix/sesswantmgr-shut…
Browse files Browse the repository at this point in the history
…down

wait for sessionWantSender to shutdown before completing session shutdown

This commit was moved from ipfs/go-bitswap@7888679
  • Loading branch information
Stebalien committed Mar 24, 2020
2 parents 876dc3c + aa5d143 commit 3da9ff1
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
5 changes: 4 additions & 1 deletion bitswap/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func New(ctx context.Context,
periodicSearchDelay: periodicSearchDelay,
self: self,
}
s.sws = newSessionWantSender(ctx, id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)
s.sws = newSessionWantSender(id, pm, sprm, bpm, s.onWantsSent, s.onPeersExhausted)

go s.run(ctx)

Expand Down Expand Up @@ -387,6 +387,9 @@ func (s *Session) handleShutdown() {
s.idleTick.Stop()
// Shut down the session peer manager
s.sprm.Shutdown()
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s.sws.Shutdown()
// Remove the session from the want manager
s.wm.RemoveSession(s.ctx, s.id)
}
Expand Down
32 changes: 24 additions & 8 deletions bitswap/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ type onPeersExhaustedFn func([]cid.Cid)
// consults the peer response tracker (records which peers sent us blocks).
//
type sessionWantSender struct {
// When the context is cancelled, sessionWantSender shuts down
// The context is used when sending wants
ctx context.Context
// Called to shutdown the sessionWantSender
shutdown func()
// The sessionWantSender uses the closed channel to signal when it's
// finished shutting down
closed chan struct{}
// The session ID
sessionID uint64
// A channel that collects incoming changes (events)
Expand All @@ -97,11 +102,14 @@ type sessionWantSender struct {
onPeersExhausted onPeersExhaustedFn
}

func newSessionWantSender(ctx context.Context, sid uint64, pm PeerManager, spm SessionPeerManager,
func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager,
bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {

ctx, cancel := context.WithCancel(context.Background())
sws := sessionWantSender{
ctx: ctx,
shutdown: cancel,
closed: make(chan struct{}),
sessionID: sid,
changes: make(chan change, changesBufferSize),
wants: make(map[cid.Cid]*wantInfo),
Expand Down Expand Up @@ -158,12 +166,25 @@ func (sws *sessionWantSender) Run() {
case ch := <-sws.changes:
sws.onChange([]change{ch})
case <-sws.ctx.Done():
sws.shutdown()
// Unregister the session with the PeerManager
sws.pm.UnregisterSession(sws.sessionID)

// Close the 'closed' channel to signal to Shutdown() that the run
// loop has exited
close(sws.closed)
return
}
}
}

// Shutdown the sessionWantSender
func (sws *sessionWantSender) Shutdown() {
// Signal to the run loop to stop processing
sws.shutdown()
// Wait for run loop to complete
<-sws.closed
}

// addChange adds a new change to the queue
func (sws *sessionWantSender) addChange(c change) {
select {
Expand All @@ -172,11 +193,6 @@ func (sws *sessionWantSender) addChange(c change) {
}
}

// shutdown unregisters the session with the PeerManager
func (sws *sessionWantSender) shutdown() {
sws.pm.UnregisterSession(sws.sessionID)
}

// collectChanges collects all the changes that have occurred since the last
// invocation of onChange
func (sws *sessionWantSender) collectChanges(changes []change) []change {
Expand Down
22 changes: 11 additions & 11 deletions bitswap/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestSendWants(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestSendsWantBlockToOnePeerOnly(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -234,7 +234,7 @@ func TestReceiveBlock(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -294,7 +294,7 @@ func TestPeerUnavailable(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -360,7 +360,7 @@ func TestPeersExhausted(t *testing.T) {
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -436,7 +436,7 @@ func TestPeersExhaustedLastWaitingPeerUnavailable(t *testing.T) {
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -484,7 +484,7 @@ func TestPeersExhaustedAllPeersUnavailable(t *testing.T) {
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}

ep := exhaustedPeers{}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, ep.onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -522,7 +522,7 @@ func TestConsecutiveDontHaveLimit(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -578,7 +578,7 @@ func TestConsecutiveDontHaveLimitInterrupted(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -633,7 +633,7 @@ func TestConsecutiveDontHaveReinstateAfterRemoval(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down Expand Up @@ -717,7 +717,7 @@ func TestConsecutiveDontHaveDontRemoveIfHasWantedBlock(t *testing.T) {
bpm := bsbpm.New()
onSend := func(peer.ID, []cid.Cid, []cid.Cid) {}
onPeersExhausted := func([]cid.Cid) {}
spm := newSessionWantSender(context.Background(), sid, pm, fpm, bpm, onSend, onPeersExhausted)
spm := newSessionWantSender(sid, pm, fpm, bpm, onSend, onPeersExhausted)

go spm.Run()

Expand Down

0 comments on commit 3da9ff1

Please sign in to comment.