Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock bugs when peers are misbehaving #20

Merged
merged 1 commit into from Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions blockmanager.go
Expand Up @@ -936,6 +936,16 @@ func (b *blockManager) getCheckpointedCFHeaders(checkpoints []*chainhash.Hash,
if !verifyCheckpoint(prevCheckpoint, nextCheckpoint, r) {
log.Warnf("Checkpoints at index %v don't match "+
"response!!!", checkPointIndex)

// If the peer gives us a header that doesn't
// match what we know to be the best
// checkpoint, then we'll ban the peer so we
// can re-allocate the query elsewhere.
log.Warnf("Banning peer=%v for invalid "+
"checkpoints", sp)

b.server.BanPeer(sp)
sp.Disconnect()
return false
}

Expand Down
49 changes: 30 additions & 19 deletions query.go
Expand Up @@ -299,6 +299,11 @@ func queryChainServiceBatch(
// The query is now marked as in-process. We
// begin to process it.
handleQuery = i

// We have a query we're working on.
mtxPeerStates.Lock()
peerStates[sp.Addr()] = queryMsgs[handleQuery]
mtxPeerStates.Unlock()
sp.QueueMessageWithEncoding(queryMsgs[i],
nil, qo.encoding)
break
Expand Down Expand Up @@ -333,31 +338,37 @@ func queryChainServiceBatch(
}
}

// We have a query we're working on.
mtxPeerStates.Lock()
peerStates[sp.Addr()] = queryMsgs[handleQuery]
mtxPeerStates.Unlock()
select {
case <-queryQuit:
return
case <-s.quit:
return
case <-quit:
// We failed, so set the query state back to
// zero and update our lastFailed state.
atomic.StoreUint32(&queryStates[handleQuery],
uint32(queryWaitSubmit))
return
case <-timeout:
// We failed, so set the query state back to
// zero and update our lastFailed state.
atomic.StoreUint32(&queryStates[handleQuery],
uint32(queryWaitSubmit))
if !sp.Connected() {
return
}

log.Tracef("Query for #%v failed, moving "+
"on: %v", handleQuery,
newLogClosure(func() string {
return spew.Sdump(queryMsgs[handleQuery])
}))
// If we timeout here we need to return. If we don't it
// could put the peer in a bad state and cause a deadlock
// if the message comes in the matchSignal chan after the
// timeout is fired.
//
// Returning here closes the peer goroutine but it will
// be reopened again with a fresh state on the next pass
// through the loop below.
return

case <-matchSignal:
// We got a match signal so we can mark this
Expand Down Expand Up @@ -550,9 +561,9 @@ checkResponses:
case <-allQuit:
break checkResponses

// A message has arrived over the subscription channel, so we
// execute the checkResponses callback to see if this ends our
// query session.
// A message has arrived over the subscription channel, so we
// execute the checkResponses callback to see if this ends our
// query session.
case sm := <-msgChan:
// TODO: This will get stuck if checkResponse gets
// stuck. This is a caveat for callers that should be
Expand Down Expand Up @@ -646,17 +657,17 @@ checkResponses:
}
break checkResponses

// A message has arrived over the subscription channel, so we
// execute the checkResponses callback to see if this ends our
// query session.
// A message has arrived over the subscription channel, so we
// execute the checkResponses callback to see if this ends our
// query session.
case sm := <-msgChan:
// TODO: This will get stuck if checkResponse gets
// stuck. This is a caveat for callers that should be
// fixed before exposing this function for public use.
checkResponse(sm.sp, sm.msg, queryQuit)

// The current peer we're querying has failed to answer the
// query. Time to select a new peer and query it.
// The current peer we're querying has failed to answer the
// query. Time to select a new peer and query it.
case <-peerTimeout.C:
if queryPeer != nil {
queryPeer.unsubscribeRecvMsgs(subscription)
Expand Down Expand Up @@ -1052,10 +1063,10 @@ func (s *ChainService) sendTransaction(tx *wire.MsgTx, options ...QueryOption) e
}
}

// A peer has rejected our transaction for whatever
// reason. Rather than returning to the caller upon the
// first rejection, we'll gather them all to determine
// whether it is critical/fatal.
// A peer has rejected our transaction for whatever
// reason. Rather than returning to the caller upon the
// first rejection, we'll gather them all to determine
// whether it is critical/fatal.
case *wire.MsgReject:
// Ensure this rejection is for the transaction
// we're attempting to broadcast.
Expand Down