Skip to content

Commit

Permalink
fix: Use buffered reader in peer to fix ShortBuffer (#303)
Browse files Browse the repository at this point in the history
This prevents a io.ErrShortBuffer from occurring when the byte
slice being read is smaller than the chunks sent from the opposite
pipe.

This makes sense for unordered connections, where transmission is
not guaranteed, but does not make sense for TCP-like connections.

We use a bufio.Reader when ordered to ensure data isn't lost.
  • Loading branch information
kylecarbs committed Feb 17, 2022
1 parent deb7170 commit d436993
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"tfexec",
"tfstate",
"unconvert",
"webrtc",
"xerrors",
"yamux"
]
Expand Down
21 changes: 19 additions & 2 deletions peer/channel.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package peer

import (
"bufio"
"context"
"io"
"net"
Expand Down Expand Up @@ -78,7 +79,8 @@ type Channel struct {
dc *webrtc.DataChannel
// This field can be nil. It becomes set after the DataChannel
// has been opened and is detached.
rwc datachannel.ReadWriteCloser
rwc datachannel.ReadWriteCloser
reader io.Reader

closed chan struct{}
closeMutex sync.Mutex
Expand Down Expand Up @@ -130,6 +132,21 @@ func (c *Channel) init() {
_ = c.closeWithError(xerrors.Errorf("detach: %w", err))
return
}
// pion/webrtc will return an io.ErrShortBuffer when a read
// is triggerred with a buffer size less than the chunks written.
//
// This makes sense when considering UDP connections, because
// bufferring of data that has no transmit guarantees is likely
// to cause unexpected behavior.
//
// When ordered, this adds a bufio.Reader. This ensures additional
// data on TCP-like connections can be read in parts, while still
// being bufferred.
if c.opts.Unordered {
c.reader = c.rwc
} else {
c.reader = bufio.NewReader(c.rwc)
}
close(c.opened)
})

Expand Down Expand Up @@ -181,7 +198,7 @@ func (c *Channel) Read(bytes []byte) (int, error) {
}
}

bytesRead, err := c.rwc.Read(bytes)
bytesRead, err := c.reader.Read(bytes)
if err != nil {
if c.isClosed() {
return 0, c.closeError
Expand Down
21 changes: 21 additions & 0 deletions peer/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,27 @@ func TestConn(t *testing.T) {
_, err := client.Ping()
require.NoError(t, err)
})

t.Run("ShortBuffer", func(t *testing.T) {
t.Parallel()
client, server, _ := createPair(t)
exchange(client, server)
go func() {
channel, err := client.Dial(context.Background(), "test", nil)
require.NoError(t, err)
_, err = channel.Write([]byte{1, 2})
require.NoError(t, err)
}()
channel, err := server.Accept(context.Background())
require.NoError(t, err)
data := make([]byte, 1)
_, err = channel.Read(data)
require.NoError(t, err)
require.Equal(t, uint8(0x1), data[0])
_, err = channel.Read(data)
require.NoError(t, err)
require.Equal(t, uint8(0x2), data[0])
})
}

func createPair(t *testing.T) (client *peer.Conn, server *peer.Conn, wan *vnet.Router) {
Expand Down

0 comments on commit d436993

Please sign in to comment.