Skip to content

Commit

Permalink
Merge pull request #154 from nhooyr/fix-atomic
Browse files Browse the repository at this point in the history
Fix unaligned 64 bit atomic loads on 32 bit platforms
  • Loading branch information
nhooyr authored Sep 27, 2019
2 parents 0e00760 + 4b51f4a commit 31df3a5
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 15 deletions.
2 changes: 2 additions & 0 deletions ci/wasm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ GOOS=js GOARCH=wasm go test -exec=wasmbrowsertest ./... -args "$WS_ECHO_SERVER_U

if ! wait "$wsjstestPID"; then
echo "wsjstest exited unsuccessfully"
echo "output:"
cat "$wsjstestOut"
exit 1
fi
15 changes: 8 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
)

// Conn represents a WebSocket connection.
// All methods may be called concurrently except for Reader, Read
// and SetReadLimit.
// All methods may be called concurrently except for Reader and Read.
//
// You must always read from the connection. Otherwise control
// frames will not be handled. See the docs on Reader and CloseRead.
Expand Down Expand Up @@ -56,7 +55,7 @@ type Conn struct {
writeHeaderBuf []byte
writeHeader *header
// read limit for a message in bytes.
msgReadLimit int64
msgReadLimit *atomicInt64

// Used to ensure a previous writer is not used after being closed.
activeWriter atomic.Value
Expand All @@ -70,7 +69,7 @@ type Conn struct {
activeReader *messageReader
// readFrameLock is acquired to read from bw.
readFrameLock chan struct{}
readClosed int64
readClosed *atomicInt64
readHeaderBuf []byte
controlPayloadBuf []byte

Expand All @@ -90,7 +89,8 @@ type Conn struct {
func (c *Conn) init() {
c.closed = make(chan struct{})

c.msgReadLimit = 32768
c.msgReadLimit = &atomicInt64{}
c.msgReadLimit.Store(32768)

c.writeMsgLock = make(chan struct{}, 1)
c.writeFrameLock = make(chan struct{}, 1)
Expand All @@ -105,6 +105,7 @@ func (c *Conn) init() {
c.writeHeaderBuf = makeWriteHeaderBuf()
c.writeHeader = &header{}
c.readHeaderBuf = makeReadHeaderBuf()
c.readClosed = &atomicInt64{}
c.controlPayloadBuf = make([]byte, maxControlFramePayload)

runtime.SetFinalizer(c, func(c *Conn) {
Expand Down Expand Up @@ -341,7 +342,7 @@ func (c *Conn) handleControl(ctx context.Context, h header) error {
// See https://github.com/nhooyr/websocket/issues/87#issue-451703332
// Most users should not need this.
func (c *Conn) Reader(ctx context.Context) (MessageType, io.Reader, error) {
if atomic.LoadInt64(&c.readClosed) == 1 {
if c.readClosed.Load() == 1 {
return 0, nil, fmt.Errorf("websocket connection read closed")
}

Expand Down Expand Up @@ -391,7 +392,7 @@ func (c *Conn) reader(ctx context.Context) (MessageType, io.Reader, error) {
c.readerMsgHeader = h
c.readerFrameEOF = false
c.readerMaskPos = 0
c.readMsgLeft = c.msgReadLimit
c.readMsgLeft = c.msgReadLimit.Load()

r := &messageReader{
c: c,
Expand Down
25 changes: 23 additions & 2 deletions conn_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *netConn) SetReadDeadline(t time.Time) error {
// Use this when you do not want to read data messages from the connection anymore but will
// want to write messages to it.
func (c *Conn) CloseRead(ctx context.Context) context.Context {
atomic.StoreInt64(&c.readClosed, 1)
c.readClosed.Store(1)

ctx, cancel := context.WithCancel(ctx)
go func() {
Expand All @@ -200,11 +200,32 @@ func (c *Conn) CloseRead(ctx context.Context) context.Context {
//
// When the limit is hit, the connection will be closed with StatusMessageTooBig.
func (c *Conn) SetReadLimit(n int64) {
c.msgReadLimit = n
c.msgReadLimit.Store(n)
}

func (c *Conn) setCloseErr(err error) {
c.closeErrOnce.Do(func() {
c.closeErr = fmt.Errorf("websocket closed: %w", err)
})
}

// See https://github.com/nhooyr/websocket/issues/153
type atomicInt64 struct {
v atomic.Value
}

func (v *atomicInt64) Load() int64 {
i, ok := v.v.Load().(int64)
if !ok {
return 0
}
return i
}

func (v *atomicInt64) Store(i int64) {
v.v.Store(i)
}

func (v *atomicInt64) String() string {
return fmt.Sprint(v.v.Load())
}
16 changes: 10 additions & 6 deletions websocket_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"reflect"
"runtime"
"sync"
"sync/atomic"
"syscall/js"

"nhooyr.io/websocket/internal/bpool"
Expand All @@ -21,9 +20,10 @@ import (
type Conn struct {
ws wsjs.WebSocket

msgReadLimit int64
// read limit for a message in bytes.
msgReadLimit *atomicInt64

readClosed int64
readClosed *atomicInt64
closeOnce sync.Once
closed chan struct{}
closeErrOnce sync.Once
Expand All @@ -49,7 +49,11 @@ func (c *Conn) close(err error) {
func (c *Conn) init() {
c.closed = make(chan struct{})
c.readSignal = make(chan struct{}, 1)
c.msgReadLimit = 32768

c.msgReadLimit = &atomicInt64{}
c.msgReadLimit.Store(32768)

c.readClosed = &atomicInt64{}

c.releaseOnClose = c.ws.OnClose(func(e wsjs.CloseEvent) {
cerr := CloseError{
Expand Down Expand Up @@ -89,15 +93,15 @@ func (c *Conn) closeWithInternal() {
// Read attempts to read a message from the connection.
// The maximum time spent waiting is bounded by the context.
func (c *Conn) Read(ctx context.Context) (MessageType, []byte, error) {
if atomic.LoadInt64(&c.readClosed) == 1 {
if c.readClosed.Load() == 1 {
return 0, nil, fmt.Errorf("websocket connection read closed")
}

typ, p, err := c.read(ctx)
if err != nil {
return 0, nil, fmt.Errorf("failed to read: %w", err)
}
if int64(len(p)) > c.msgReadLimit {
if int64(len(p)) > c.msgReadLimit.Load() {
c.Close(StatusMessageTooBig, fmt.Sprintf("read limited at %v bytes", c.msgReadLimit))
return 0, nil, c.closeErr
}
Expand Down

0 comments on commit 31df3a5

Please sign in to comment.