Skip to content

Commit

Permalink
fix(dot/network): remove maxReads limitation to read stream (#3287)
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed May 30, 2023
1 parent 86c7577 commit 483b23f
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 32 deletions.
4 changes: 4 additions & 0 deletions dot/network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ var (
errInvalidStartingBlockType = errors.New("invalid StartingBlock in messsage")
errInboundHanshakeExists = errors.New("an inbound handshake already exists for given peer")
errInvalidRole = errors.New("invalid role")
ErrFailedToReadEntireMessage = errors.New("failed to read entire message")
ErrNilStream = errors.New("nil stream")
ErrInvalidLEB128EncodedData = errors.New("invalid LEB128 encoded data")
ErrGreaterThanMaxSize = errors.New("greater than maximum size")
)
249 changes: 249 additions & 0 deletions dot/network/mock_stream_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions dot/network/mocks_generate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ package network
//go:generate mockgen -destination=mock_syncer_test.go -package $GOPACKAGE . Syncer
//go:generate mockgen -destination=mock_block_state_test.go -package $GOPACKAGE . BlockState
//go:generate mockgen -destination=mock_transaction_handler_test.go -package $GOPACKAGE . TransactionHandler
//go:generate mockgen -destination=mock_stream_test.go -package $GOPACKAGE github.com/libp2p/go-libp2p/core/network Stream
3 changes: 1 addition & 2 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ const (
)

var (
logger = log.NewFromGlobal(log.AddContext("pkg", "network"))
maxReads = 256
logger = log.NewFromGlobal(log.AddContext("pkg", "network"))

peerCountGauge = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "gossamer_network_node",
Expand Down
42 changes: 16 additions & 26 deletions dot/network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package network
import (
crand "crypto/rand"
"encoding/hex"
"errors"
"fmt"
"io"
mrand "math/rand"
Expand Down Expand Up @@ -150,34 +149,32 @@ func uint64ToLEB128(in uint64) []byte {
return out
}

func readLEB128ToUint64(r io.Reader, buf []byte) (uint64, int, error) {
if len(buf) == 0 {
return 0, 0, errors.New("buffer has length 0")
}

func readLEB128ToUint64(r io.Reader) (uint64, int, error) {
var out uint64
var shift uint

maxSize := 10 // Max bytes in LEB128 encoding of uint64 is 10.
bytesRead := 0

for {
n, err := r.Read(buf[:1])
// read a sinlge byte
singleByte := []byte{0}
n, err := r.Read(singleByte)
if err != nil {
return 0, bytesRead, err
}

bytesRead += n

b := buf[0]
b := singleByte[0]
out |= uint64(0x7F&b) << shift
if b&0x80 == 0 {
break
}

maxSize--
if maxSize == 0 {
return 0, bytesRead, fmt.Errorf("invalid LEB128 encoded data")
return 0, bytesRead, ErrInvalidLEB128EncodedData
}

shift += 7
Expand All @@ -186,17 +183,12 @@ func readLEB128ToUint64(r io.Reader, buf []byte) (uint64, int, error) {
}

// readStream reads from the stream into the given buffer, returning the number of bytes read
func readStream(stream libp2pnetwork.Stream, bufPointer *[]byte, maxSize uint64) (int, error) {
func readStream(stream libp2pnetwork.Stream, bufPointer *[]byte, maxSize uint64) (tot int, err error) {
if stream == nil {
return 0, errors.New("stream is nil")
return 0, ErrNilStream
}

var (
tot int
)

buf := *bufPointer
length, bytesRead, err := readLEB128ToUint64(stream, buf[:1])
length, bytesRead, err := readLEB128ToUint64(stream)
if err != nil {
return bytesRead, fmt.Errorf("failed to read length: %w", err)
}
Expand All @@ -205,32 +197,30 @@ func readStream(stream libp2pnetwork.Stream, bufPointer *[]byte, maxSize uint64)
return 0, nil // msg length of 0 is allowed, for example transactions handshake
}

buf := *bufPointer
if length > uint64(len(buf)) {
extraBytes := int(length) - len(buf)
*bufPointer = append(buf, make([]byte, extraBytes)...) // TODO #2288 use bytes.Buffer instead
logger.Warnf("received message with size %d greater than allocated message buffer size %d", length, len(buf))
extraBytes := int(length) - len(buf)
*bufPointer = append(buf, make([]byte, extraBytes)...)
buf = *bufPointer
}

if length > maxSize {
logger.Warnf("received message with size %d greater than max size %d, closing stream", length, maxSize)
return 0, fmt.Errorf("message size greater than maximum: got %d", length)
return 0, fmt.Errorf("%w: max %d, got %d", ErrGreaterThanMaxSize, maxSize, length)
}

tot = 0
for i := 0; i < maxReads; i++ {
for tot < int(length) {
n, err := stream.Read(buf[tot:])
if err != nil {
return n + tot, err
}

tot += n
if tot == int(length) {
break
}
}

if tot != int(length) {
return tot, fmt.Errorf("failed to read entire message: expected %d bytes, received %d bytes", length, tot)
return tot, fmt.Errorf("%w: expected %d bytes, received %d bytes", ErrFailedToReadEntireMessage, length, tot)
}

return tot, nil
Expand Down

0 comments on commit 483b23f

Please sign in to comment.