Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: keep client default client read limit when realtime doesn't specify #631

Merged
merged 8 commits into from
Feb 2, 2024
4 changes: 4 additions & 0 deletions ably/ably_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ type interceptConn struct {
active *activeIntercept
}

func (c interceptConn) Unwrap() ably.Conn {
return c.Conn
}

func (c interceptConn) Receive(deadline time.Time) (*ably.ProtocolMessage, error) {
msg, err := c.Conn.Receive(deadline)
if err != nil {
Expand Down
25 changes: 25 additions & 0 deletions ably/realtime_channel_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,31 @@ func TestRealtimeChannel_ShouldSetProvidedReadLimit(t *testing.T) {
assert.Equal(t, int64(2048), client.Connection.ReadLimit())
}

func TestRealtimeChannel_SetDefaultReadLimitIfServerHasNoLimit(t *testing.T) {

dial := func(proto string, url *url.URL, timeout time.Duration) (ably.Conn, error) {
return ably.DialWebsocket(proto, url, timeout)
}
wrappedDialWebsocket, interceptMsg := DialIntercept(dial)

ctx, cancel := context.WithCancel(context.Background())
msgCh := interceptMsg(ctx, ably.ActionConnected)

app, client := ablytest.NewRealtime(ably.WithDial(wrappedDialWebsocket))
defer safeclose(t, ablytest.FullRealtimeCloser(client), app)
connectedWaiter := ablytest.ConnWaiter(client, nil, ably.ConnectionEventConnected)

connectedMsg := <-msgCh
connectedMsg.ConnectionDetails.MaxMessageSize = 0 // 0 represents limitless message size
cancel() // unblocks updated message to be processed

err := ablytest.Wait(connectedWaiter, nil)
assert.Nil(t, err)

// If server set limit is 0, value is set to default readlimit
assert.Equal(t, int64(65536), client.Connection.ReadLimit())
}

func TestRealtimeChannel_ShouldReturnErrorIfReadLimitExceeded(t *testing.T) {
app, client1 := ablytest.NewRealtime(ably.WithEchoMessages(false))
defer safeclose(t, ablytest.FullRealtimeCloser(client1), app)
Expand Down
6 changes: 5 additions & 1 deletion ably/realtime_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func (c *Connection) eventloop() {
c.connStateTTL = connDetails.ConnectionStateTTL
// Spec RSA7b3, RSA7b4, RSA12a
c.auth.updateClientID(connDetails.ClientID)
if !c.isReadLimitSetExternally {
if !c.isReadLimitSetExternally && connDetails.MaxMessageSize > 0 {
c.readLimit = connDetails.MaxMessageSize // set MaxMessageSize limit as per TO3l8
}
}
Expand Down Expand Up @@ -988,6 +988,10 @@ func (vc verboseConn) Close() error {
return vc.conn.Close()
}

func (vc verboseConn) Unwrap() conn {
return vc.conn
}

func (c *Connection) setState(state ConnectionState, err error, retryIn time.Duration) error {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand Down
15 changes: 11 additions & 4 deletions ably/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,19 @@ func dialWebsocketTimeout(uri, origin string, timeout time.Duration, agents map[
return c, nil
}

func setConnectionReadLimit(c conn, readLimit int64) error {
verboseConn, ok := c.(verboseConn)
func unwrapConn(c conn) conn {
u, ok := c.(interface {
Unwrap() conn
})
if !ok {
return errors.New("cannot set readlimit for connection, connection does not use verboseConn")
return c
}
websocketConn, ok := verboseConn.conn.(*websocketConn)
return unwrapConn(u.Unwrap())
}

func setConnectionReadLimit(c conn, readLimit int64) error {
unwrappedConn := unwrapConn(c)
websocketConn, ok := unwrappedConn.(*websocketConn)
if !ok {
return errors.New("cannot set readlimit for connection, connection does not use nhooyr.io/websocket")
}
Expand Down