Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: enforce maximum header size on outgoing ws conns #5268

Merged
merged 8 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/algorand/graphtrace v0.1.0
github.com/algorand/msgp v1.1.53
github.com/algorand/oapi-codegen v1.12.0-algorand.0
github.com/algorand/websocket v1.4.5
github.com/algorand/websocket v1.4.6
github.com/aws/aws-sdk-go v1.33.0
github.com/consensys/gnark-crypto v0.7.0
github.com/davidlazar/go-crypto v0.0.0-20170701192655-dcfb0a7ac018
Expand All @@ -36,6 +36,7 @@ require (
golang.org/x/sys v0.1.0
golang.org/x/text v0.4.0
gopkg.in/sohlich/elogrus.v3 v3.0.0-20180410122755-1fa29e2f2009
gopkg.in/yaml.v3 v3.0.1
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
)

require (
Expand Down Expand Up @@ -71,5 +72,4 @@ require (
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/algorand/msgp v1.1.53 h1:D6HKLyvLE6ltfsf8Apsrc+kqYb/CcOZEAfh1DpkPrNg=
github.com/algorand/msgp v1.1.53/go.mod h1:5K3d58/poT5fPmtiwuQft6GjgSrVEM46KoXdLrID8ZU=
github.com/algorand/oapi-codegen v1.12.0-algorand.0 h1:W9PvED+wAJc+9EeXPONnA+0zE9UhynEqoDs4OgAxKhk=
github.com/algorand/oapi-codegen v1.12.0-algorand.0/go.mod h1:tIWJ9K/qrLDVDt5A1p82UmxZIEGxv2X+uoujdhEAL48=
github.com/algorand/websocket v1.4.5 h1:Cs6UTaCReAl02evYxmN8k57cNHmBILRcspfSxYg4AJE=
github.com/algorand/websocket v1.4.5/go.mod h1:79n6FSZY08yQagHzE/YWZqTPBYfY5wc3IS+UTZe1W5c=
github.com/algorand/websocket v1.4.6 h1:I0kV4EYwatuUrKtNiwzYYgojgwh6pksDmlqntKG2Woc=
github.com/algorand/websocket v1.4.6/go.mod h1:HJmdGzFtnlUQ4nTzZP6WrT29oGYf1t6Ybi64vROcT+M=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
github.com/aws/aws-sdk-go v1.33.0 h1:Bq5Y6VTLbfnJp1IV8EL/qUU5qO1DYHda/zis/sqevkY=
Expand Down
10 changes: 10 additions & 0 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ const unprintableCharacterGlyph = "▯"
// PublicAddress (which will match HTTP Listener's Address) in tests only.
const testingPublicAddress = "testing"

// Maximum number of bytes to read from a header when trying to establish a websocket connection.
const wsMaxHeaderBytes = 4096

var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections)
var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections)

Expand Down Expand Up @@ -396,6 +399,9 @@ type WebsocketNetwork struct {
// outgoingMessagesBufferSize is the size used for outgoing messages.
outgoingMessagesBufferSize int

// wsMaxHeaderBytes is the maximum accepted size of the header prior to upgrading to websocket connection.
wsMaxHeaderBytes int64

// slowWritingPeerMonitorInterval defines the interval between two consecutive tests for slow peer writing
slowWritingPeerMonitorInterval time.Duration

Expand Down Expand Up @@ -758,6 +764,8 @@ func (wn *WebsocketNetwork) setup() {
config.Consensus[protocol.ConsensusCurrentVersion].DownCommitteeSize),
)

wn.wsMaxHeaderBytes = wsMaxHeaderBytes

wn.identityTracker = NewIdentityTracker()

wn.broadcastQueueHighPrio = make(chan broadcastRequest, wn.outgoingMessagesBufferSize)
Expand Down Expand Up @@ -2193,9 +2201,11 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) {
EnableCompression: false,
NetDialContext: wn.dialer.DialContext,
NetDial: wn.dialer.Dial,
MaxHeaderSize: wn.wsMaxHeaderBytes,
}

conn, response, err := websocketDialer.DialContext(wn.ctx, gossipAddr, requestHeader)

if err != nil {
if err == websocket.ErrBadHandshake {
// reading here from ioutil is safe only because it came from DialContext above, which already finished reading all the data from the network
Expand Down
198 changes: 193 additions & 5 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"math/rand"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
"runtime"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/algorand/go-deadlock"
"github.com/algorand/websocket"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand All @@ -54,6 +56,8 @@ import (

const sendBufferLength = 1000

const genesisID = "go-test-network-genesis"

func init() {
// this allows test code to use out-of-protocol message tags and have them go through
allowCustomTags = true
Expand Down Expand Up @@ -127,7 +131,7 @@ func makeTestWebsocketNodeWithConfig(t testing.TB, conf config.Local, opts ...te
log: log,
config: conf,
phonebook: MakePhonebook(1, 1*time.Millisecond),
GenesisID: "go-test-network-genesis",
GenesisID: genesisID,
NetworkID: config.Devtestnet,
}
// apply options to newly-created WebsocketNetwork, if provided
Expand Down Expand Up @@ -990,7 +994,7 @@ func makeTestFilterWebsocketNode(t *testing.T, nodename string) *WebsocketNetwor
log: logging.TestingLog(t).With("node", nodename),
config: dc,
phonebook: MakePhonebook(1, 1*time.Millisecond),
GenesisID: "go-test-network-genesis",
GenesisID: genesisID,
NetworkID: config.Devtestnet,
}
require.True(t, wn.config.EnableIncomingMessageFilter)
Expand Down Expand Up @@ -2462,7 +2466,7 @@ func TestSlowPeerDisconnection(t *testing.T) {
log: log,
config: defaultConfig,
phonebook: MakePhonebook(1, 1*time.Millisecond),
GenesisID: "go-test-network-genesis",
GenesisID: genesisID,
NetworkID: config.Devtestnet,
slowWritingPeerMonitorInterval: time.Millisecond * 50,
}
Expand Down Expand Up @@ -2537,7 +2541,7 @@ func TestForceMessageRelaying(t *testing.T) {
log: log,
config: defaultConfig,
phonebook: MakePhonebook(1, 1*time.Millisecond),
GenesisID: "go-test-network-genesis",
GenesisID: genesisID,
NetworkID: config.Devtestnet,
}
wn.setup()
Expand Down Expand Up @@ -2631,7 +2635,7 @@ func TestCheckProtocolVersionMatch(t *testing.T) {
log: log,
config: defaultConfig,
phonebook: MakePhonebook(1, 1*time.Millisecond),
GenesisID: "go-test-network-genesis",
GenesisID: genesisID,
NetworkID: config.Devtestnet,
}
wn.setup()
Expand Down Expand Up @@ -3757,3 +3761,187 @@ func TestWebsocketNetworkTelemetryTCP(t *testing.T) {
t.Log("closed detailsA", string(pcdA))
t.Log("closed detailsB", string(pcdB))
}

type mockServer struct {
iansuvak marked this conversation as resolved.
Show resolved Hide resolved
*httptest.Server
URL string
t *testing.T

waitForClientClose bool
}

type mockHandler struct {
*testing.T
s *mockServer
}

var mockUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
EnableCompression: true,
Error: func(w http.ResponseWriter, r *http.Request, status int, reason error) {
http.Error(w, reason.Error(), status)
},
}

func buildWsResponseHeader() http.Header {
h := http.Header{}
h.Add(ProtocolVersionHeader, ProtocolVersion)
h.Add(GenesisHeader, genesisID)
h.Add(NodeRandomHeader, "randomHeader")
return h
}

func (t mockHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Set the required headers to successfully establish a connection
ws, err := mockUpgrader.Upgrade(w, r, buildWsResponseHeader())
if err != nil {
t.Logf("Upgrade: %v", err)
return
}
defer ws.Close()
// Send a message of interest immediately after the connection is established
wr, err := ws.NextWriter(websocket.BinaryMessage)
if err != nil {
t.Logf("NextWriter: %v", err)
return
}

bytes := MarshallMessageOfInterest([]protocol.Tag{protocol.AgreementVoteTag})
msgBytes := append([]byte(protocol.MsgOfInterestTag), bytes...)
_, err = wr.Write(msgBytes)
if err != nil {
t.Logf("Error writing MessageOfInterest: %v", err)
return
}
wr.Close()

for true {
// echo a message back to the client
_, _, err := ws.NextReader()
if err != nil {
if _, ok := err.(*websocket.CloseError); ok && t.s.waitForClientClose {
t.Log("got client close")
return
}
return
}
}
}

func makeWsProto(s string) string {
return "ws" + strings.TrimPrefix(s, "http")
}

func newServer(t *testing.T) *mockServer {
var s mockServer
s.Server = httptest.NewServer(mockHandler{t, &s})
s.Server.URL += ""
s.URL = makeWsProto(s.Server.URL)
return &s
}

func TestMaxHeaderSize(t *testing.T) {
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netA"})
netA.config.GossipFanout = 1

netB := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netB"})
netB.config.GossipFanout = 1

netA.Start()
defer netA.Stop()
netB.Start()
defer netB.Stop()

addrB, ok := netB.Address()
require.True(t, ok)
gossipB, err := netB.addrToGossipAddr(addrB)
require.NoError(t, err)

// First make sure that the regular connection with default max header size works
netA.wsMaxHeaderBytes = wsMaxHeaderBytes
netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
time.Sleep(250 * time.Millisecond)
assert.Equal(t, 1, len(netA.peers))

netA.removePeer(netA.peers[0], disconnectReasonNone)
assert.Zero(t, len(netA.peers))

// Now try to connect with a max header size that is too small
logBuffer := bytes.NewBuffer(nil)
netA.log.SetOutput(logBuffer)

netA.wsMaxHeaderBytes = 128
netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
algonautshant marked this conversation as resolved.
Show resolved Hide resolved
lg := logBuffer.String()
logBuffer.Reset()
time.Sleep(250 * time.Millisecond)
assert.Contains(t, lg, fmt.Sprintf("ws connect(%s) fail:", gossipB))
assert.Zero(t, len(netA.peers))

// Test that setting 0 disables the max header size check
netA.wsMaxHeaderBytes = 0
netA.wg.Add(1)
netA.tryConnect(addrB, gossipB)
time.Sleep(250 * time.Millisecond)
assert.Equal(t, 1, len(netA.peers))
}

func TestTryConnectEarlyWrite(t *testing.T) {
partitiontest.PartitionTest(t)

netA := makeTestWebsocketNode(t, testWebsocketLogNameOption{"netA"})
netA.config.GossipFanout = 1

s := newServer(t)
s.waitForClientClose = true
defer s.Close()

netA.Start()
defer netA.Stop()

dialer := websocket.Dialer{}
mconn, resp, _ := dialer.Dial(s.URL, nil)
expectedHeader := buildWsResponseHeader()
for k, v := range expectedHeader {
assert.Equal(t, v[0], resp.Header.Get(k))
}

// Fixed overhead of the full status line "HTTP/1.1 101 Switching Protocols" (32) + 4 bytes for two instance of CRLF
// one after the status line and one to separate headers from the body
minValidHeaderSize := 36
for k, v := range resp.Header {
minValidHeaderSize += len(k) + len(v[0]) + 4 // + 4 is for the ": " and CRLF
}
mconn.Close()

// Setting the max header size to 1 byte less than the minimum header size should fail
netA.wsMaxHeaderBytes = int64(minValidHeaderSize) - 1
netA.wg.Add(1)
netA.tryConnect(s.URL, s.URL)
time.Sleep(250 * time.Millisecond)
assert.Len(t, netA.peers, 0)

// Now set the max header size to the minimum header size and it should succeed
netA.wsMaxHeaderBytes = int64(minValidHeaderSize)
netA.wg.Add(1)
netA.tryConnect(s.URL, s.URL)
p := netA.peers[0]
var messageCount uint64
for x := 0; x < 1000; x++ {
messageCount = atomic.LoadUint64(&p.miMessageCount)
if messageCount == 1 {
break
}
time.Sleep(2 * time.Millisecond)
}

// Confirm that we successfuly received a message of interest
assert.Len(t, netA.peers, 1)
fmt.Printf("MI Message Count: %v\n", netA.peers[0].miMessageCount)
assert.Equal(t, uint64(1), netA.peers[0].miMessageCount)
}