Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#374 from ipfs/refactor/unref-want-mgr
Browse files Browse the repository at this point in the history
refactor: remove WantManager

This commit was moved from ipfs/go-bitswap@5643004
  • Loading branch information
Stebalien committed Apr 23, 2020
2 parents b4ce47b + 8d0dcda commit 2a5234f
Show file tree
Hide file tree
Showing 12 changed files with 156 additions and 343 deletions.
25 changes: 9 additions & 16 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
bssim "github.com/ipfs/go-bitswap/internal/sessioninterestmanager"
bssm "github.com/ipfs/go-bitswap/internal/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/internal/sessionpeermanager"
bswm "github.com/ipfs/go-bitswap/internal/wantmanager"
bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
blocks "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -123,13 +122,13 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return nil
})

var wm *bswm.WantManager
// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
var sm *bssm.SessionManager
onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) {
// Simulate a DONT_HAVE message arriving to the WantManager
wm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
// Simulate a message arriving with DONT_HAVEs
sm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
Expand All @@ -138,25 +137,23 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
wm = bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sessionFactory := func(sessctx context.Context, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, wm, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
return bssession.New(ctx, sessctx, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
}
notif := notifications.New()
sm := bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
wm.SetSessionManager(sm)
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self())

bs := &Bitswap{
Expand All @@ -166,7 +163,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: pm,
pqm: pqm,
sm: sm,
Expand Down Expand Up @@ -207,9 +203,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,

// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

pm *bspm.PeerManager

// the provider query manager manages requests to find providers
Expand Down Expand Up @@ -357,7 +350,7 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.wm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)
bs.sm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)

// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted, haves)
Expand Down Expand Up @@ -480,14 +473,14 @@ func (bs *Bitswap) blockstoreHas(blks []blocks.Block) []bool {
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
bs.pm.Connected(p)
bs.engine.PeerConnected(p)
}

// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.pm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}

Expand Down
Binary file modified bitswap/docs/go-bitswap.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 6 additions & 8 deletions bitswap/docs/go-bitswap.puml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@ node "Sending Blocks" {
[Engine] --> [TaskWorker (workers.go)]
}

node "Requesting Blocks" {
[Bitswap] --* [WantManager]
[WantManager] --> [BlockPresenceManager]
[WantManager] --> [PeerManager]
[PeerManager] --* [MessageQueue]
}

node "Providing" {
[Bitswap] --* [Provide Collector (workers.go)]
[Provide Collector (workers.go)] --* [Provide Worker (workers.go)]
Expand All @@ -31,14 +24,19 @@ node "Sessions (smart requests)" {
[Bitswap] --* [SessionManager]
[SessionManager] --> [SessionInterestManager]
[SessionManager] --o [Session]
[SessionManager] --> [BlockPresenceManager]
[Session] --* [sessionWantSender]
[Session] --* [SessionPeerManager]
[Session] --> [WantManager]
[Session] --> [ProvideQueryManager]
[Session] --* [sessionWants]
[Session] --> [SessionInterestManager]
[sessionWantSender] --> [BlockPresenceManager]
}

node "Requesting Blocks" {
[SessionManager] --> [PeerManager]
[sessionWantSender] --> [PeerManager]
[PeerManager] --* [MessageQueue]
}

node "Network" {
Expand Down
13 changes: 7 additions & 6 deletions bitswap/docs/how-bitswap-works.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ When a message is received, Bitswap
So that the Engine can send responses to the wants
- Informs the Engine of any received blocks
So that the Engine can send the received blocks to any peers that want them
- Informs the WantManager of received blocks, HAVEs and DONT_HAVEs
So that the WantManager can inform interested sessions
- Informs the SessionManager of received blocks, HAVEs and DONT_HAVEs
So that the SessionManager can inform interested sessions

When the client makes an API call, Bitswap creates a new Session and calls the corresponding method (eg `GetBlocks()`).

Expand All @@ -101,9 +101,10 @@ The PeerTaskQueue prioritizes tasks such that the peers with the least amount of

### Requesting Blocks

When the WantManager is informed of a new message, it
- informs the SessionManager
The SessionManager informs the Sessions that are interested in the received blocks and wants
When the SessionManager is informed of a new message, it
- informs the BlockPresenceManager
The BlockPresenceManager keeps track of which peers have sent HAVES and DONT_HAVEs for each block
- informs the Sessions that are interested in the received blocks and wants
- informs the PeerManager of received blocks
The PeerManager checks if any wants were send to a peer for the received blocks. If so it sends a `CANCEL` message to those peers.

Expand All @@ -114,7 +115,7 @@ The Session starts in "discovery" mode. This means it doesn't have any peers yet
When the client initially requests blocks from a Session, the Session
- informs the SessionInterestManager that it is interested in the want
- informs the sessionWantManager of the want
- tells the WantManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block
- tells the PeerManager to broadcast a `want-have` to all connected peers so as to discover which peers have the block
- queries the ProviderQueryManager to discover which peers have the block

When the session receives a message with `HAVE` or a `block`, it informs the SessionPeerManager. The SessionPeerManager keeps track of all peers in the session.
Expand Down
68 changes: 42 additions & 26 deletions bitswap/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,6 @@ const (
broadcastLiveWantsLimit = 64
)

// WantManager is an interface that can be used to request blocks
// from given peers.
type WantManager interface {
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, uint64, []cid.Cid)
// RemoveSession removes the session from the WantManager (when the
// session shuts down)
RemoveSession(context.Context, uint64)
}

// PeerManager keeps track of which sessions are interested in which peers
// and takes care of sending wants for the sessions
type PeerManager interface {
Expand All @@ -47,6 +36,11 @@ type PeerManager interface {
UnregisterSession(uint64)
// SendWants tells the PeerManager to send wants to the given peer
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
// SendCancels tells the PeerManager to send cancels to all peers
SendCancels(context.Context, []cid.Cid)
}

// SessionPeerManager keeps track of peers in the session
Expand Down Expand Up @@ -97,8 +91,10 @@ type op struct {
// info to, and who to request blocks from.
type Session struct {
// dependencies
ctx context.Context
wm WantManager
bsctx context.Context // context for bitswap
ctx context.Context // context for session
pm PeerManager
bpm *bsbpm.BlockPresenceManager
sprm SessionPeerManager
providerFinder ProviderFinder
sim *bssim.SessionInterestManager
Expand Down Expand Up @@ -129,9 +125,10 @@ type Session struct {

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context,
func New(
bsctx context.Context, // context for bitswap
ctx context.Context, // context for this session
id uint64,
wm WantManager,
sprm SessionPeerManager,
providerFinder ProviderFinder,
sim *bssim.SessionInterestManager,
Expand All @@ -144,8 +141,10 @@ func New(ctx context.Context,
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
tickDelayReqs: make(chan time.Duration),
bsctx: bsctx,
ctx: ctx,
wm: wm,
pm: pm,
bpm: bpm,
sprm: sprm,
providerFinder: providerFinder,
sim: sim,
Expand Down Expand Up @@ -301,13 +300,13 @@ func (s *Session) run(ctx context.Context) {
s.sw.WantsSent(oper.keys)
case opBroadcast:
// Broadcast want-haves to all peers
s.broadcastWantHaves(ctx, oper.keys)
s.broadcast(ctx, oper.keys)
default:
panic("unhandled operation")
}
case <-s.idleTick.C:
// The session hasn't received blocks for a while, broadcast
s.broadcastWantHaves(ctx, nil)
s.broadcast(ctx, nil)
case <-s.periodicSearchTimer.C:
// Periodically search for a random live want
s.handlePeriodicSearch(ctx)
Expand All @@ -325,23 +324,23 @@ func (s *Session) run(ctx context.Context) {
// Called when the session hasn't received any blocks for some time, or when
// all peers in the session have sent DONT_HAVE for a particular set of CIDs.
// Send want-haves to all connected peers, and search for new peers with the CID.
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
func (s *Session) broadcast(ctx context.Context, wants []cid.Cid) {
// If this broadcast is because of an idle timeout (we haven't received
// any blocks for a while) then broadcast all pending wants
if wants == nil {
wants = s.sw.PrepareBroadcast()
}

// Broadcast a want-have for the live wants to everyone we're connected to
s.wm.BroadcastWantHaves(ctx, s.id, wants)
s.broadcastWantHaves(ctx, wants)

// do not find providers on consecutive ticks
// -- just rely on periodic search widening
if len(wants) > 0 && (s.consecutiveTicks == 0) {
// Search for providers who have the first want in the list.
// Typically if the provider has the first block they will have
// the rest of the blocks also.
log.Debugf("Ses%d: FindMorePeers with want %s (1st of %d wants)", s.id, wants[0], len(wants))
log.Debugw("FindMorePeers", "session", s.id, "cid", wants[0], "pending", len(wants))
s.findMorePeers(ctx, wants[0])
}
s.resetIdleTick()
Expand All @@ -364,7 +363,7 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
// for new providers for blocks.
s.findMorePeers(ctx, randomWant)

s.wm.BroadcastWantHaves(ctx, s.id, []cid.Cid{randomWant})
s.broadcastWantHaves(ctx, []cid.Cid{randomWant})

s.periodicSearchTimer.Reset(s.periodicSearchDelay.NextWaitTime())
}
Expand All @@ -390,8 +389,19 @@ func (s *Session) handleShutdown() {
// Shut down the sessionWantSender (blocks until sessionWantSender stops
// sending)
s.sws.Shutdown()
// Remove the session from the want manager
s.wm.RemoveSession(s.ctx, s.id)

// Remove session's interest in the given blocks.
cancelKs := s.sim.RemoveSessionInterest(s.id)

// Free up block presence tracking for keys that no session is interested
// in anymore
s.bpm.RemoveKeys(cancelKs)

// Send CANCEL to all peers for blocks that no session is interested in
// anymore.
// Note: use bitswap context because session context has already been
// cancelled.
s.pm.SendCancels(s.bsctx, cancelKs)
}

// handleReceive is called when the session receives blocks from a peer
Expand Down Expand Up @@ -439,11 +449,17 @@ func (s *Session) wantBlocks(ctx context.Context, newks []cid.Cid) {
// No peers discovered yet, broadcast some want-haves
ks := s.sw.GetNextWants()
if len(ks) > 0 {
log.Infof("Ses%d: No peers - broadcasting %d want HAVE requests\n", s.id, len(ks))
s.wm.BroadcastWantHaves(ctx, s.id, ks)
log.Infow("No peers - broadcasting", "session", s.id, "want-count", len(ks))
s.broadcastWantHaves(ctx, ks)
}
}

// Send want-haves to all connected peers
func (s *Session) broadcastWantHaves(ctx context.Context, wants []cid.Cid) {
log.Debugw("broadcastWantHaves", "session", s.id, "cids", wants)
s.pm.BroadcastWantHaves(ctx, wants)
}

// The session will broadcast if it has outstanding wants and doesn't receive
// any blocks for some time.
// The length of time is calculated
Expand Down

0 comments on commit 2a5234f

Please sign in to comment.