Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: discard unrequested or stale block messages #5431

Merged
merged 12 commits into from
Jun 20, 2023
122 changes: 120 additions & 2 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"github.com/algorand/go-algorand/internal/rapidgen"
"io"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"pgregory.net/rapid"
"regexp"
"runtime"
"sort"
Expand All @@ -41,6 +39,9 @@ import (
"testing"
"time"

"github.com/algorand/go-algorand/internal/rapidgen"
"pgregory.net/rapid"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -3949,6 +3950,123 @@ func TestTryConnectEarlyWrite(t *testing.T) {
assert.Equal(t, uint64(1), netA.peers[0].miMessageCount)
}

// Test functionality that allows a node to discard a block response that it did not request or that arrived too late.
// Both cases are tested here by having A send unexpected, late responses to nodes B and C respectively.
func TestDiscardUnrequestedBlockResponse(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netA"})
netA.config.GossipFanout = 1

netB := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netB"})
netB.config.GossipFanout = 1

netC := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netC"})
netC.config.GossipFanout = 1

netA.Start()
defer netA.Stop()
netB.Start()
defer netB.Stop()

addrB, ok := netB.Address()
require.True(t, ok)
gossipB, err := netB.addrToGossipAddr(addrB)
require.NoError(t, err)

netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
time.Sleep(250 * time.Millisecond)
require.Equal(t, 1, len(netA.peers))
iansuvak marked this conversation as resolved.
Show resolved Hide resolved

// Create a buffer to monitor log output from netB
logBuffer := bytes.NewBuffer(nil)
netB.log.SetOutput(logBuffer)

// send an unrequested block response
msg := make([]sendMessage, 1, 1)
msg[0] = sendMessage{
data: append([]byte(protocol.TopicMsgRespTag), []byte("foo")...),
enqueued: time.Now(),
peerEnqueued: time.Now(),
ctx: context.Background(),
}
netA.peers[0].sendBufferBulk <- sendMessages{msgs: msg}
require.Eventually(t,
func() bool {
return networkConnectionsDroppedTotal.GetUint64ValueForLabels(map[string]string{"reason": "protocol"}) == 1
},
1*time.Second,
50*time.Millisecond,
)

// Stop and confirm that we hit the case of disconnecting a peer for sending an unrequested block response
netB.Stop()
lg := logBuffer.String()
require.Contains(t, lg, "sent TS response without a request")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check outstandingTopicRequests counter here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this point netB has already disconnected from netA so can't check that since it would have been on the peer. Trying to monitor it flipping to negative while it's in the process of disconnecting would cause a data race I believe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't cause a race on an atomic counter though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The relevant counter here is on netBs peer struct representing netA. The race wouldn't be on the atomic counter but on the fact that peer that I'm trying to check the counter on is in the process of being destroyed. Either way I removed the offending log check for this one but kept it for the next case in netC so far.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking log content seems brittle if the message changes. The concern here is to confirm that it didn't disconnect for any other reasons? If so, it might justify having counters for disconnect reasons which you can extend with this reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is somewhat brittle but is also an easy fix if the log message changes. I'm open to adding counters anyhow but we do use this pattern elsewhere in the code already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are already bumping networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "protocol"}) so you could check that?

Copy link
Contributor

@cce cce Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But isn't it good enough to assert the disconnect happens? (with the new reason code for example) asserting actual behavior vs logging seems better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that it's good enough and I've changed the reason code


netC.Start()
defer netC.Stop()

addrC, ok := netC.Address()
require.True(t, ok)
gossipC, err := netC.addrToGossipAddr(addrC)
require.NoError(t, err)
_ = gossipC

netA.wg.Add(1)
netA.tryConnect(addrC, gossipC)
time.Sleep(250 * time.Millisecond)
require.Equal(t, 1, len(netA.peers))

ctx, cancel := context.WithCancel(context.Background())
topics := Topics{
MakeTopic("requestDataType",
[]byte("fake block and cert value")),
MakeTopic(
"blockData",
[]byte("fake round value")),
}
// Send a request for a block and cancel it after the handler has been registered
go func() {
netC.peers[0].Request(ctx, protocol.UniEnsBlockReqTag, topics)
}()
require.Eventually(
t,
func() bool { return netC.peers[0].hasOutstandingRequests() },
1*time.Second,
50*time.Millisecond,
)
cancel()

// confirm that the request was cancelled but that we have registered that we have sent a request
require.Eventually(
t,
func() bool { return !netC.peers[0].hasOutstandingRequests() },
500*time.Millisecond,
20*time.Millisecond,
)
require.Equal(t, atomic.LoadInt64(&netC.peers[0].outstandingTopicRequests), int64(1))

// Create a buffer to monitor log output from netC
logBuffer = bytes.NewBuffer(nil)
netC.log.SetOutput(logBuffer)

// send a late TS response from A -> C
netA.peers[0].sendBufferBulk <- sendMessages{msgs: msg}
require.Eventually(
t,
func() bool { return atomic.LoadInt64(&netC.peers[0].outstandingTopicRequests) == int64(0) },
500*time.Millisecond,
20*time.Millisecond,
)

// Stop and confirm that we hit the case of disconnecting a peer for sending a stale block response
netC.Stop()
Comment on lines +4063 to +4064
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

netA and netB are Stopped via defer, if the test fails, will this netC be left open?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a defer for netC as well on 4009

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I'm also stopping manually (doing the same for netB as well) is that otherwise reading the log would be a datarace

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should not read the log then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed one of the log reading behaviors from netB since I agree that the disconnect reason is a good enough check but this case doesn't warrant a disconnect since we did make the request (or more) we are just no longer interested in the response.

Do you want me to introduce a new counter here and check that instead of the log?

lg = logBuffer.String()
require.Contains(t, lg, "wsPeer readLoop: received a TS response for a stale request ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than assert log behavior, why not actual behavior, like that the message was discarded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that anything else happens as a side-effect here that I could check. We aren't disconnecting or bumping any counters. I wanted to distinguish it from the fall-through case of going through unmarshalling process in the switch statement below though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behavior is that we're dropping the message so the handler is not called.. there are some similar tests that register handlers for certain tags and count the number of calls

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this tag doesn't use a handler.. it's handled inline. so weird

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. Should we make a TS handler to make it more consistent as part of this?

}

func customNetworkIDGen(networkID protocol.NetworkID) *rapid.Generator {
return rapid.Custom(func(t *rapid.T) protocol.NetworkID {
// Unused/satisfying rapid requirement
Expand Down
36 changes: 36 additions & 0 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
const disconnectStaleWrite disconnectReason = "DisconnectStaleWrite"
const disconnectDuplicateConnection disconnectReason = "DuplicateConnection"
const disconnectBadIdentityData disconnectReason = "BadIdentityData"
const disconnectUnexpectedTopicResp disconnectReason = "UnexpectedTopicResp"

algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// Response is the structure holding the response from the server
type Response struct {
Expand All @@ -185,6 +186,10 @@
// we want this to be a 64-bit aligned for atomics support on 32bit platforms.
lastPacketTime int64

// outstandingTopicRequests is an atomic counter for the number of outstanding block requests we've made out to this peer
// if a peer sends more blocks than we've requested, we'll disconnect from it.
outstandingTopicRequests int64

// intermittentOutgoingMessageEnqueueTime contains the UnixNano of the message's enqueue time that is currently being written to the
// peer, or zero if no message is being written.
intermittentOutgoingMessageEnqueueTime int64
Expand Down Expand Up @@ -505,6 +510,28 @@
return
}
msg.Tag = Tag(string(tag[:]))

// Skip the message if it's a response to a request we didn't make or has timed out
if msg.Tag == protocol.TopicMsgRespTag && !wp.hasOutstandingRequests() {
atomic.AddInt64(&wp.outstandingTopicRequests, -1)

// This peers has sent us more responses than we have requested. This is a protocol violation and we should disconnect.
if atomic.LoadInt64(&wp.outstandingTopicRequests) < 0 {
wp.net.log.Errorf("wsPeer readloop: peer %s sent TS response without a request", wp.conn.RemoteAddr().String())
networkConnectionsDroppedTotal.Inc(map[string]string{"reason": "protocol"})
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
cleanupCloseError = disconnectUnexpectedTopicResp
return
}
var n int64
// Peer sent us a response to a request we made but we've already timed out -- discard
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
n, err = io.Copy(io.Discard, reader)
if err != nil {
wp.net.log.Warnf("wsPeer readloop: could not discard timed-out TS message from %s : %s", wp.conn.RemoteAddr().String(), err)
Copy link
Contributor

@cce cce Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly maybe this is not important enough to be Warnf level? io.Discard.Write() can't fail but I guess reader.Read() could return err ...

oh I see, we seem to have a wp.reportReadErr(err) just for that, and has its own special handling of when and how to log read errors from peers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, shouldn't you disconnect here? that's what happens for other reader Read errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, made the change

continue

Check warning on line 530 in network/wsPeer.go

View check run for this annotation

Codecov / codecov/patch

network/wsPeer.go#L529-L530

Added lines #L529 - L530 were not covered by tests
}
wp.net.log.Warnf("wsPeer readLoop: received a TS response for a stale request from %s. %d bytes discarded", wp.conn.RemoteAddr().String(), n)
Copy link
Contributor

@cce cce Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warnf goes to telemetry by default, but this doesn't seem very important. Could we make this Infof

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but it's nice to have a telemetry of how often this happened. Current behavior actually logs this case to telemetry but not until after it unmarshalls the message on line 581. This doesn't increase the number of telemetry messages we are expecting to receive but even so happy to downgrade if others agree and do the same for the other place where we log this

continue
}
slurper.Reset()
err = slurper.Read(reader)
if err != nil {
Expand Down Expand Up @@ -543,6 +570,7 @@
}
continue
case protocol.TopicMsgRespTag: // Handle Topic message
atomic.AddInt64(&wp.outstandingTopicRequests, -1)
topics, err := UnmarshallTopics(msg.Data)
if err != nil {
wp.net.log.Warnf("wsPeer readLoop: could not read the message from: %s %s", wp.conn.RemoteAddr().String(), err)
Expand Down Expand Up @@ -942,13 +970,15 @@

// Send serializedMsg
msg := make([]sendMessage, 1, 1)

iansuvak marked this conversation as resolved.
Show resolved Hide resolved
msg[0] = sendMessage{
data: append([]byte(tag), serializedMsg...),
enqueued: time.Now(),
peerEnqueued: time.Now(),
ctx: context.Background()}
select {
case wp.sendBufferBulk <- sendMessages{msgs: msg}:
atomic.AddInt64(&wp.outstandingTopicRequests, 1)
case <-wp.closing:
e = fmt.Errorf("peer closing %s", wp.conn.RemoteAddr().String())
return
Expand Down Expand Up @@ -976,6 +1006,12 @@
return newChan
}

func (wp *wsPeer) hasOutstandingRequests() bool {
wp.responseChannelsMutex.Lock()
defer wp.responseChannelsMutex.Unlock()
return len(wp.responseChannels) > 0
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're doing all the work of taking the lock, could you isntead return the len directly, and let the caller decide to compare it with 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have a slight preference for the way it is currently but happy to change if there's a +1 .

I just don't think that we will be checking the length of this outside of this use-case and to me this parses slightly easier in the conditional.

// getAndRemoveResponseChannel returns the channel and deletes the channel from the map
func (wp *wsPeer) getAndRemoveResponseChannel(key uint64) (respChan chan *Response, found bool) {
wp.responseChannelsMutex.Lock()
Expand Down
13 changes: 13 additions & 0 deletions util/metrics/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ func (counter *Counter) GetUint64Value() (x uint64) {
return atomic.LoadUint64(&counter.intValue)
}

// GetUint64ValueForLabels returns the value of the counter for the given labels or 0 if it's not found.
func (counter *Counter) GetUint64ValueForLabels(labels map[string]string) uint64 {
counter.Lock()
defer counter.Unlock()

labelIndex := counter.findLabelIndex(labels)
counterIdx, has := counter.valuesIndices[labelIndex]
if !has {
return 0
}
return counter.values[counterIdx].counter
}

func (counter *Counter) fastAddUint64(x uint64) {
if atomic.AddUint64(&counter.intValue, x) == x {
// What we just added is the whole value, this
Expand Down
20 changes: 20 additions & 0 deletions util/metrics/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,23 @@ func TestGetValue(t *testing.T) {
c.Inc(nil)
require.Equal(t, uint64(2), c.GetUint64Value())
}

func TestGetValueForLables(t *testing.T) {
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
partitiontest.PartitionTest(t)

c := MakeCounter(MetricName{Name: "testname", Description: "testhelp"})
c.Deregister(nil)

labels := map[string]string{"a": "b"}
require.Equal(t, uint64(0), c.GetUint64ValueForLabels(labels))
c.Inc(labels)
require.Equal(t, uint64(1), c.GetUint64ValueForLabels(labels))
c.Inc(labels)
require.Equal(t, uint64(2), c.GetUint64ValueForLabels(labels))
// confirm that the value is not shared between labels
c.Inc(nil)
require.Equal(t, uint64(2), c.GetUint64ValueForLabels(labels))
labels2 := map[string]string{"a": "c"}
c.Inc(labels2)
require.Equal(t, uint64(1), c.GetUint64ValueForLabels(labels2))
}