Skip to content

Commit

Permalink
Merge pull request #613 from ably/feature/integration-2.0
Browse files Browse the repository at this point in the history
[SDK-3746] Feature/integration 2.0
  • Loading branch information
sacOO7 committed Jan 16, 2024
2 parents e66ad6a + 3cddaaf commit b4246f8
Show file tree
Hide file tree
Showing 28 changed files with 2,257 additions and 982 deletions.
1 change: 1 addition & 0 deletions ably/error_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
ErrTimeoutError ErrorCode = 50003
ErrConnectionFailed ErrorCode = 80000
ErrConnectionSuspended ErrorCode = 80002
ErrConnectionClosed ErrorCode = 80017
ErrDisconnected ErrorCode = 80003
ErrProtocolError ErrorCode = 80013
ErrChannelOperationFailed ErrorCode = 90000
Expand Down
12 changes: 12 additions & 0 deletions ably/errors.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 66 additions & 5 deletions ably/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ func (c *REST) GetCachedFallbackHost() string {
return c.successFallbackHost.get()
}

func (c *RealtimeChannel) GetChannelSerial() string {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.properties.ChannelSerial
}

func (c *RealtimeChannel) GetAttachResume() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
Expand All @@ -105,6 +111,12 @@ func (c *RealtimeChannel) SetAttachResume(value bool) {
c.attachResume = value
}

func (c *RealtimeChannel) SetState(chanState ChannelState) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.state = chanState
}

func (opts *clientOptions) GetFallbackRetryTimeout() time.Duration {
return opts.fallbackRetryTimeout()
}
Expand Down Expand Up @@ -193,6 +205,55 @@ func (c *Connection) PendingItems() int {
return len(c.pending.queue)
}

// AckAll empties queue and acks all pending callbacks
func (c *Connection) AckAll() {
c.mtx.Lock()
cx := c.pending.Dismiss()
c.mtx.Unlock()
c.log().Infof("Ack all %d messages waiting for ACK/NACK", len(cx))
for _, v := range cx {
v.onAck(nil)
}
}

func (c *Connection) SetKey(key string) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.key = key
}

func (c *RealtimePresence) Members() map[string]*PresenceMessage {
c.mtx.Lock()
defer c.mtx.Unlock()
presenceMembers := make(map[string]*PresenceMessage, len(c.members))
for k, pm := range c.members {
presenceMembers[k] = pm
}
return presenceMembers
}

func (c *RealtimePresence) InternalMembers() map[string]*PresenceMessage {
c.mtx.Lock()
defer c.mtx.Unlock()
internalMembers := make(map[string]*PresenceMessage, len(c.internalMembers))
for k, pm := range c.internalMembers {
internalMembers[k] = pm
}
return internalMembers
}

func (c *RealtimePresence) SyncInitial() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.syncState == syncInitial
}

func (c *RealtimePresence) SyncInProgress() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.syncState == syncInProgress
}

func (c *Connection) ConnectionStateTTL() time.Duration {
return c.connectionStateTTL()
}
Expand All @@ -217,11 +278,11 @@ const (
DefaultCipherAlgorithm = defaultCipherAlgorithm
DefaultCipherMode = defaultCipherMode

AblyVersionHeader = ablyVersionHeader
AblyVersion = ablyVersion
LibraryVersion = libraryVersion
AblyAgentHeader = ablyAgentHeader
AblySDKIdentifier = ablySDKIdentifier
AblyProtocolVersionHeader = ablyProtocolVersionHeader
AblyProtocolVersion = ablyProtocolVersion
ClientLibraryVersion = clientLibraryVersion
AblyAgentHeader = ablyAgentHeader
AblySDKIdentifier = ablySDKIdentifier

EncUTF8 = encUTF8
EncJSON = encJSON
Expand Down
32 changes: 0 additions & 32 deletions ably/mock_test.go

This file was deleted.

5 changes: 2 additions & 3 deletions ably/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var defaultOptions = clientOptions{
HTTPOpenTimeout: 4 * time.Second, //TO3l3
ChannelRetryTimeout: 15 * time.Second, // TO3l7
FallbackRetryTimeout: 10 * time.Minute,
IdempotentRESTPublishing: false,
IdempotentRESTPublishing: true, // TO3n
Port: Port,
TLSPort: TLSPort,
Now: time.Now,
Expand Down Expand Up @@ -235,7 +235,6 @@ func (opts *authOptions) KeySecret() string {
// clientOptions passes additional client-specific properties to the [ably.NewREST] or to the [ably.NewRealtime].
// Properties set using [ably.clientOptions] are used instead of the [ably.defaultOptions] values.
type clientOptions struct {

// authOptions Embedded an [ably.authOptions] object (TO3j).
authOptions

Expand Down Expand Up @@ -282,7 +281,7 @@ type clientOptions struct {
// A recovery key string can be explicitly provided, or alternatively if a callback function is provided,
// the client library will automatically persist the recovery key between page reloads and call the callback
// when the connection is recoverable. The callback is then responsible for confirming whether the connection
// should be recovered or not. See connection state recovery for further information (RTC1c, TO3i).
// should be recovered or not. See connection state recovery for further information (RTC1c, TO3i, RTN16i).
Recover string

// TransportParams is a set of key-value pairs that can be used to pass in arbitrary connection parameters,
Expand Down
10 changes: 10 additions & 0 deletions ably/proto_channel_propeties.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ably

// CP2
type ChannelProperties struct {
// AttachSerial contains the channelSerial from latest ATTACHED ProtocolMessage received on the channel, see CP2a, RTL15a
AttachSerial string

// ChannelSerial contains the channelSerial from latest ProtocolMessage of action type Message/PresenceMessage received on the channel, see CP2b, RTL15b.
ChannelSerial string
}
22 changes: 11 additions & 11 deletions ably/proto_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ import (

// constants for rsc7
const (
ablyVersionHeader = "X-Ably-Version"
ablyErrorCodeHeader = "X-Ably-Errorcode"
ablyErrorMessageHeader = "X-Ably-Errormessage"
libraryVersion = "1.2.14"
libraryName = "go"
ablyVersion = "1.2"
ablyClientIDHeader = "X-Ably-ClientId"
hostHeader = "Host"
ablyAgentHeader = "Ably-Agent" // RSC7d
ablySDKIdentifier = "ably-go/" + libraryVersion // RSC7d1
ablyProtocolVersionHeader = "X-Ably-Version"
ablyErrorCodeHeader = "X-Ably-Errorcode"
ablyErrorMessageHeader = "X-Ably-Errormessage"
clientLibraryVersion = "1.2.14"
clientRuntimeName = "go"
ablyProtocolVersion = "2" // CSV2
ablyClientIDHeader = "X-Ably-ClientId"
hostHeader = "Host"
ablyAgentHeader = "Ably-Agent" // RSC7d
ablySDKIdentifier = "ably-go/" + clientLibraryVersion // RSC7d1
)

var goRuntimeIdentifier = func() string {
return fmt.Sprintf("%s/%s", libraryName, runtime.Version()[2:])
return fmt.Sprintf("%s/%s", clientRuntimeName, runtime.Version()[2:])
}()

func ablyAgentIdentifier(agents map[string]string) string {
Expand Down
44 changes: 44 additions & 0 deletions ably/proto_presence_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ably

import (
"fmt"
"strconv"
"strings"
)

// PresenceAction describes the possible actions members in the presence set can emit (TP2).
Expand Down Expand Up @@ -58,3 +60,45 @@ func (m PresenceMessage) String() string {
"update",
}[m.Action], m.ClientID, m.Data)
}

func (msg *PresenceMessage) isServerSynthesized() bool {
return !strings.HasPrefix(msg.ID, msg.ConnectionID)
}

func (msg *PresenceMessage) getMsgSerialAndIndex() (int64, int64, error) {
msgIds := strings.Split(msg.ID, ":")
if len(msgIds) != 3 {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid id %v", msg.ID)
}
msgSerial, err := strconv.ParseInt(msgIds[1], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgSerial, for msgId %v", msg.ID)
}
msgIndex, err := strconv.ParseInt(msgIds[2], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("parsing error, the presence message has invalid msgIndex, for msgId %v", msg.ID)
}
return msgSerial, msgIndex, nil
}

// RTP2b, RTP2c
func (msg1 *PresenceMessage) IsNewerThan(msg2 *PresenceMessage) (bool, error) {
// RTP2b1
if msg1.isServerSynthesized() || msg2.isServerSynthesized() {
return msg1.Timestamp >= msg2.Timestamp, nil
}

// RTP2b2
msg1Serial, msg1Index, err := msg1.getMsgSerialAndIndex()
if err != nil {
return false, err
}
msg2Serial, msg2Index, err := msg2.getMsgSerialAndIndex()
if err != nil {
return true, err
}
if msg1Serial == msg2Serial {
return msg1Index >= msg2Index, nil
}
return msg1Serial > msg2Serial, nil
}

0 comments on commit b4246f8

Please sign in to comment.