Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: discard unrequested or stale block messages #5431

Merged
merged 12 commits into from
Jun 20, 2023
51 changes: 51 additions & 0 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3945,3 +3945,54 @@ func TestTryConnectEarlyWrite(t *testing.T) {
fmt.Printf("MI Message Count: %v\n", netA.peers[0].miMessageCount)
assert.Equal(t, uint64(1), netA.peers[0].miMessageCount)
}

func TestDiscardUnrequestedBlockResponse(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netA"})
netA.config.GossipFanout = 1

netB := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netB"})
netB.config.GossipFanout = 1

netC := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netC"})
netC.config.GossipFanout = 1

netA.Start()
defer netA.Stop()
netB.Start()
defer netB.Stop()

addrB, ok := netB.Address()
require.True(t, ok)
gossipB, err := netB.addrToGossipAddr(addrB)
require.NoError(t, err)

netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
time.Sleep(100 * time.Millisecond)
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
assert.Equal(t, 1, len(netA.peers))

// Create a buffer to monitor log output from netB
logBuffer := bytes.NewBuffer(nil)
netB.log.SetOutput(logBuffer)

// send an unrequested block response
msg := make([]sendMessage, 1, 1)
msg[0] = sendMessage{
data: append([]byte(protocol.TopicMsgRespTag), []byte("foo")...),
enqueued: time.Now(),
peerEnqueued: time.Now(),
ctx: context.Background(),
}
netA.peers[0].sendBufferBulk <- sendMessages{msgs: msg}
time.Sleep(100 * time.Millisecond)

// Stop and confirm that we hit the case of disconnecting a peer for sending an unrequested block response
netB.Stop()
lg := logBuffer.String()
assert.Contains(t, lg, "peer sent TS response without a request", gossipB)

// TODO: add a test for the case where we receive a block response for a block we didn't request

}
33 changes: 31 additions & 2 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ const disconnectStaleWrite disconnectReason = "DisconnectStaleWrite"
const disconnectDuplicateConnection disconnectReason = "DuplicateConnection"
const disconnectBadIdentityData disconnectReason = "BadIdentityData"

const lastSentRequestTime string = "lsrt"

algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// Response is the structure holding the response from the server
type Response struct {
Topics Topics
Expand Down Expand Up @@ -505,6 +507,24 @@ func (wp *wsPeer) readLoop() {
return
}
msg.Tag = Tag(string(tag[:]))

// Skip the message if it's a response to a request we didn't make or has timed out
if msg.Tag == protocol.TopicMsgRespTag && !wp.hasOutstandingRequests() {
// We never requested anything from this peer so sending a response is breach protocol -- disconnect
if wp.getPeerData(lastSentRequestTime) == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a weak condition since lastSentRequestTime is never removed from the peer data map.
I also traced the context all way up and do not see if it cancelled by timeout - could you point out? The only cancellation I found in catchup.innerFetch is case <-ledgerWaitCh when the ledger received the block by other means.

It appears the message the existence of a hash in responseChannels is a pretty good indication of "a request was sent" and an empty responseChannels opposite and a good opportunity to drop.

It will be more complex but more error proof if there would be a compliment data structure to responseChannels - like topic requests tags have been sent but this is complicated to manage and since there are lots of block requests on catchup.

Edit: agreed on lastSentRequestTime importance but it needs to be cleared after some period of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of clearing it out, added a synchronous check and disconnect if the response is more than a minute late.

wp.net.log.Errorf("wsPeer readloop: peer %s sent TS response without a request", wp.conn.RemoteAddr().String())
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "protocol"})
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
return
}
// Peer sent us a response to a request we made but we've already timed out -- discard
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
n, err := io.Copy(io.Discard, reader)
if err != nil {
wp.net.log.Warnf("wsPeer readloop: could not discard timed-out TS message from %s : %s", wp.conn.RemoteAddr().String(), err)
Copy link
Contributor

@cce cce Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly maybe this is not important enough to be Warnf level? io.Discard.Write() can't fail but I guess reader.Read() could return err ...

oh I see, we seem to have a wp.reportReadErr(err) just for that, and has its own special handling of when and how to log read errors from peers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, shouldn't you disconnect here? that's what happens for other reader Read errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, made the change

continue
}
wp.net.log.Warnf("wsPeer readLoop: received a TS response for a stale request from %s. %d bytes discarded", wp.conn.RemoteAddr().String(), n)
Copy link
Contributor

@cce cce Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warnf goes to telemetry by default, but this doesn't seem very important. Could we make this Infof

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but it's nice to have a telemetry of how often this happened. Current behavior actually logs this case to telemetry but not until after it unmarshalls the message on line 581. This doesn't increase the number of telemetry messages we are expecting to receive but even so happy to downgrade if others agree and do the same for the other place where we log this

continue
}
slurper.Reset()
err = slurper.Read(reader)
if err != nil {
Expand Down Expand Up @@ -942,13 +962,16 @@ func (wp *wsPeer) Request(ctx context.Context, tag Tag, topics Topics) (resp *Re

// Send serializedMsg
msg := make([]sendMessage, 1, 1)

iansuvak marked this conversation as resolved.
Show resolved Hide resolved
tStart := time.Now()
msg[0] = sendMessage{
data: append([]byte(tag), serializedMsg...),
enqueued: time.Now(),
peerEnqueued: time.Now(),
enqueued: tStart,
peerEnqueued: tStart,
ctx: context.Background()}
select {
case wp.sendBufferBulk <- sendMessages{msgs: msg}:
wp.setPeerData(lastSentRequestTime, tStart)
case <-wp.closing:
e = fmt.Errorf("peer closing %s", wp.conn.RemoteAddr().String())
return
Expand Down Expand Up @@ -976,6 +999,12 @@ func (wp *wsPeer) makeResponseChannel(key uint64) (responseChannel chan *Respons
return newChan
}

func (wp *wsPeer) hasOutstandingRequests() bool {
wp.responseChannelsMutex.Lock()
defer wp.responseChannelsMutex.Unlock()
return len(wp.responseChannels) > 0
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're doing all the work of taking the lock, could you isntead return the len directly, and let the caller decide to compare it with 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have a slight preference for the way it is currently but happy to change if there's a +1 .

I just don't think that we will be checking the length of this outside of this use-case and to me this parses slightly easier in the conditional.

// getAndRemoveResponseChannel returns the channel and deletes the channel from the map
func (wp *wsPeer) getAndRemoveResponseChannel(key uint64) (respChan chan *Response, found bool) {
wp.responseChannelsMutex.Lock()
Expand Down