Skip to content

Commit

Permalink
Merge pull request ipfs/go-bitswap#351 from ipfs/refactor/conn-mgmt
Browse files Browse the repository at this point in the history
Move connection management into networking layer

This commit was moved from ipfs/go-bitswap@9d9719e
  • Loading branch information
Stebalien committed Apr 18, 2020
2 parents 5df9dc3 + 123abbb commit 521c70c
Show file tree
Hide file tree
Showing 12 changed files with 843 additions and 362 deletions.
21 changes: 4 additions & 17 deletions bitswap/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,32 +745,19 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
func (e *Engine) PeerConnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()
l, ok := e.ledgerMap[p]

_, ok := e.ledgerMap[p]
if !ok {
l = newLedger(p)
e.ledgerMap[p] = l
e.ledgerMap[p] = newLedger(p)
}

l.lk.Lock()
defer l.lk.Unlock()
l.ref++
}

// PeerDisconnected is called when a peer disconnects.
func (e *Engine) PeerDisconnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()
l, ok := e.ledgerMap[p]
if !ok {
return
}

l.lk.Lock()
defer l.lk.Unlock()
l.ref--
if l.ref <= 0 {
delete(e.ledgerMap, p)
}
delete(e.ledgerMap, p)
}

// If the want is a want-have, and it's below a certain size, send the full
Expand Down
4 changes: 0 additions & 4 deletions bitswap/internal/decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ type ledger struct {
// wantList is a (bounded, small) set of keys that Partner desires.
wantList *wl.Wantlist

// ref is the reference count for this ledger, its used to ensure we
// don't drop the reference to this ledger in multi-connection scenarios
ref int

lk sync.RWMutex
}

Expand Down
166 changes: 51 additions & 115 deletions bitswap/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const (
defaultRebroadcastInterval = 30 * time.Second
// maxRetries is the number of times to attempt to send a message before
// giving up
maxRetries = 10
maxRetries = 3
sendTimeout = 30 * time.Second
// maxMessageSize is the maximum message size in bytes
maxMessageSize = 1024 * 1024 * 2
// sendErrorBackoff is the time to wait before retrying to connect after
Expand All @@ -46,7 +47,7 @@ const (
// sender.
type MessageNetwork interface {
ConnectTo(context.Context, peer.ID) error
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error)
Latency(peer.ID) time.Duration
Ping(context.Context, peer.ID) ping.Result
Self() peer.ID
Expand All @@ -55,14 +56,14 @@ type MessageNetwork interface {
// MessageQueue implements queue of want messages to send to peers.
type MessageQueue struct {
ctx context.Context
shutdown func()
p peer.ID
network MessageNetwork
dhTimeoutMgr DontHaveTimeoutManager
maxMessageSize int
sendErrorBackoff time.Duration

outgoingWork chan time.Time
done chan struct{}

// Take lock whenever any of these variables are modified
wllock sync.Mutex
Expand Down Expand Up @@ -169,8 +170,10 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {

ctx, cancel := context.WithCancel(ctx)
mq := &MessageQueue{
ctx: ctx,
shutdown: cancel,
p: p,
network: network,
dhTimeoutMgr: dhTimeoutMgr,
Expand All @@ -179,7 +182,6 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan time.Time, 1),
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
Expand Down Expand Up @@ -300,12 +302,17 @@ func (mq *MessageQueue) Startup() {

// Shutdown stops the processing of messages for a message queue.
func (mq *MessageQueue) Shutdown() {
close(mq.done)
mq.shutdown()
}

func (mq *MessageQueue) onShutdown() {
// Shut down the DONT_HAVE timeout manager
mq.dhTimeoutMgr.Shutdown()

// Reset the streamMessageSender
if mq.sender != nil {
_ = mq.sender.Reset()
}
}

func (mq *MessageQueue) runQueue() {
Expand Down Expand Up @@ -351,15 +358,7 @@ func (mq *MessageQueue) runQueue() {
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-mq.ctx.Done():
if mq.sender != nil {
_ = mq.sender.Reset()
}
return
}
}
Expand Down Expand Up @@ -409,12 +408,12 @@ func (mq *MessageQueue) sendIfReady() {
}

func (mq *MessageQueue) sendMessage() {
err := mq.initializeSender()
sender, err := mq.initializeSender()
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
// TODO: should we stop using this connection and clear the want list
// to avoid using up memory?
// If we fail to initialize the sender, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not open message sender to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}

Expand All @@ -423,7 +422,7 @@ func (mq *MessageQueue) sendMessage() {
mq.dhTimeoutMgr.Start()

// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
message := mq.extractOutgoingMessage(mq.sender.SupportsHave())

// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)
Expand All @@ -435,23 +434,22 @@ func (mq *MessageQueue) sendMessage() {
wantlist := message.Wantlist()
mq.logOutgoingMessage(wantlist)

// Try to send this message repeatedly
for i := 0; i < maxRetries; i++ {
if mq.attemptSendAndRecovery(message) {
// We were able to send successfully.
onSent()
if err := sender.SendMsg(mq.ctx, message); err != nil {
// If the message couldn't be sent, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not send message to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}

mq.simulateDontHaveWithTimeout(wantlist)
// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)

// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
// iteration of the event loop.
if mq.hasPendingWork() {
mq.signalWorkReady()
}

return
}
// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
// iteration of the event loop.
if mq.hasPendingWork() {
mq.signalWorkReady()
}
}

Expand Down Expand Up @@ -540,7 +538,7 @@ func (mq *MessageQueue) pendingWorkCount() int {
}

// Convert the lists of wants into a Bitswap message
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage {
mq.wllock.Lock()
defer mq.wllock.Unlock()

Expand All @@ -567,7 +565,6 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
}

// Add each regular want-have / want-block to the message
peerSent := peerEntries[:0]
for _, e := range peerEntries {
if msgSize >= mq.maxMessageSize {
break
Expand All @@ -579,12 +576,13 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
} else {
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
peerSent = append(peerSent, e)

// Move the key from pending to sent
mq.peerWants.MarkSent(e)
}
}

// Add each broadcast want-have to the message
bcstSent := bcstEntries[:0]
for _, e := range bcstEntries {
if msgSize >= mq.maxMessageSize {
break
Expand All @@ -600,89 +598,27 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
}

msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false)
bcstSent = append(bcstSent, e)
}

// Called when the message has been successfully sent.
onMessageSent := func() {
mq.wllock.Lock()
defer mq.wllock.Unlock()

// Move the keys from pending to sent
for _, e := range bcstSent {
mq.bcstWants.MarkSent(e)
}
for _, e := range peerSent {
mq.peerWants.MarkSent(e)
}
// Move the key from pending to sent
mq.bcstWants.MarkSent(e)
}

return mq.msg, onMessageSent
}

func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(mq.ctx, mq.network, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
return mq.msg
}

func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(mq.ctx, message)
if err == nil {
return true
}

log.Infof("bitswap send error: %s", err)
_ = mq.sender.Reset()
mq.sender = nil

select {
case <-mq.done:
return true
case <-mq.ctx.Done():
return true
case <-time.After(mq.sendErrorBackoff):
// wait 100ms in case disconnect notifications are still propagating
log.Warn("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.initializeSender()
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
return true
}

// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) {
if mq.sender == nil {
opts := &bsnet.MessageSenderOpts{
MaxRetries: maxRetries,
SendTimeout: sendTimeout,
SendErrorBackoff: sendErrorBackoff,
}
nsender, err := mq.network.NewMessageSender(mq.ctx, mq.p, opts)
if err != nil {
return nil, err
}
*/
return false
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := network.ConnectTo(conctx, p)
if err != nil {
return nil, err
}

nsender, err := network.NewMessageSender(ctx, p)
if err != nil {
return nil, err
mq.sender = nsender
}

return nsender, nil
return mq.sender, nil
}

0 comments on commit 521c70c

Please sign in to comment.