diff --git a/README.md b/README.md index 06e0a190b..19eaf5ab2 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ This is a .NET client library for Ably which targets the 2.0 client library spec ## Supported platforms * [.NET Standard 2.0+](https://learn.microsoft.com/en-us/dotnet/standard/net-standard?tabs=net-standard-2-0) -* .NET 6.x, 7.x, MAUI, check [MAUI config](#maui-configuration). +* .NET 6.0+, MAUI, check [MAUI config](#maui-configuration). * .NET Framework 4.6.2+ * .NET (Core) 2.0+ * Mono 5.4+ @@ -424,7 +424,6 @@ var options = new ClientOptions { AuthCallback = async tokenParams => { - // Return serialized jwttokenstring returned from server string jwtToken = await getJwtTokenFromServer(tokenParams); int expiresIn = 3600; // assuming jwtToken has 1 hr expiry return new TokenDetails(jwtToken) { diff --git a/src/IO.Ably.Shared/Extensions/PresenceExtensions.cs b/src/IO.Ably.Shared/Extensions/PresenceExtensions.cs index 49d13126b..64c6f97dc 100644 --- a/src/IO.Ably.Shared/Extensions/PresenceExtensions.cs +++ b/src/IO.Ably.Shared/Extensions/PresenceExtensions.cs @@ -2,43 +2,43 @@ { internal static class PresenceExtensions { - public static bool IsSynthesized(this PresenceMessage msg) + public static bool IsServerSynthesized(this PresenceMessage msg) { return msg.Id == null || !msg.Id.StartsWith(msg.ConnectionId); } // RTP2b, RTP2c - public static bool IsNewerThan(this PresenceMessage thisMessage, PresenceMessage thatMessage) + public static bool IsNewerThan(this PresenceMessage existingMsg, PresenceMessage incomingMsg) { // RTP2b1 - if (thisMessage.IsSynthesized() || thatMessage.IsSynthesized()) + if (existingMsg.IsServerSynthesized() || incomingMsg.IsServerSynthesized()) { - return thisMessage.Timestamp > thatMessage.Timestamp; + return existingMsg.Timestamp > incomingMsg.Timestamp; } // RTP2b2 - var thisValues = thisMessage.Id.Split(':'); - var thatValues = thatMessage.Id.Split(':'); + var thisValues = existingMsg.Id.Split(':'); + var thatValues = incomingMsg.Id.Split(':'); // if any part of the message serial fails to parse then throw an exception if (thisValues.Length != 3 || - !(int.TryParse(thisValues[1], out int msgSerialThis) | int.TryParse(thisValues[2], out int indexThis))) + !(int.TryParse(thisValues[1], out int existingMsgSerial) | int.TryParse(thisValues[2], out int existingMsgIndex))) { - throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{thisMessage.Id}'."); + throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{existingMsg.Id}'."); } if (thatValues.Length != 3 || - !(int.TryParse(thatValues[1], out int msgSerialThat) | int.TryParse(thatValues[2], out int indexThat))) + !(int.TryParse(thatValues[1], out int incomingMsgSerial) | int.TryParse(thatValues[2], out int incomingMsgIndex))) { - throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{thatMessage.Id}'."); + throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{incomingMsg.Id}'."); } - if (msgSerialThis == msgSerialThat) + if (existingMsgSerial == incomingMsgSerial) { - return indexThis > indexThat; + return existingMsgIndex > incomingMsgIndex; } - return msgSerialThis > msgSerialThat; + return existingMsgSerial > incomingMsgSerial; } } } diff --git a/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs b/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs index 001dac60b..0ab81cf07 100644 --- a/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs +++ b/src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs @@ -70,10 +70,13 @@ public Task MessageReceived(ProtocolMessage protocolMessage, RealtimeState } // RTL12 - if (channel.State == ChannelState.Attached && !protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed)) + if (channel.State == ChannelState.Attached) { - channel.Presence.ChannelAttached(protocolMessage, false); - channel.EmitErrorUpdate(protocolMessage.Error, false, protocolMessage); + if (!protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed)) + { + channel.Presence.ChannelAttached(protocolMessage); + channel.EmitErrorUpdate(protocolMessage.Error, false, protocolMessage); + } } else { diff --git a/src/IO.Ably.Shared/Realtime/Presence.cs b/src/IO.Ably.Shared/Realtime/Presence.cs index 90faf4edb..42358fd7c 100644 --- a/src/IO.Ably.Shared/Realtime/Presence.cs +++ b/src/IO.Ably.Shared/Realtime/Presence.cs @@ -39,14 +39,28 @@ internal Presence(IConnectionManager connection, RealtimeChannel channel, string internal ILogger Logger { get; private set; } /// - /// Has the sync completed. + /// Checks if presence sync has ended. /// - public bool IsSyncComplete => MembersMap.SyncCompleted && !IsSyncInProgress; + /// + [Obsolete("This property is deprecated, use SyncComplete instead")] + public bool IsSyncComplete => SyncComplete; // RTP13. + + /// + /// Checks if presence sync has ended. + /// + /// + public bool SyncComplete => MembersMap.SyncCompleted && !SyncInProgress; // RTP13 + + /// + /// Indicates whether there is currently a sync in progress. + /// + [Obsolete("This property is internal, will be removed in the future")] + public bool IsSyncInProgress => SyncInProgress; /// /// Indicates whether there is currently a sync in progress. /// - public bool IsSyncInProgress => MembersMap.SyncInProgress; + internal bool SyncInProgress => MembersMap.SyncInProgress; /// /// Indicates all members present on the channel. @@ -153,7 +167,7 @@ private async Task WaitForSyncAsync() // The InternalSync should be completed and the channels Attached or Attaching void CheckAndSet() { - if (IsSyncComplete + if (SyncComplete && (_channel.State == ChannelState.Attached || _channel.State == ChannelState.Attaching)) { tsc.TrySetResult(true); @@ -532,7 +546,7 @@ internal void OnSyncMessage(ProtocolMessage protocolMessage) /* If a new sequence identifier is sent from Ably, then the client library * must consider that to be the start of a new sync sequence * and any previous in-flight sync should be discarded. (part of RTP18)*/ - if (IsSyncInProgress && _currentSyncChannelSerial.IsNotEmpty() && _currentSyncChannelSerial != syncSequenceId) + if (SyncInProgress && _currentSyncChannelSerial.IsNotEmpty() && _currentSyncChannelSerial != syncSequenceId) { EndSync(); } @@ -582,7 +596,7 @@ internal void OnPresence(PresenceMessage[] messages) // RTP2e case PresenceAction.Leave: broadcast &= MembersMap.Remove(message); - if (updateInternalPresence && !message.IsSynthesized()) + if (updateInternalPresence && !message.IsServerSynthesized()) { InternalMembersMap.Remove(message); } @@ -613,7 +627,7 @@ internal void OnPresence(PresenceMessage[] messages) internal void StartSync() { - if (!IsSyncInProgress) + if (!SyncInProgress) { MembersMap.StartSync(); } @@ -621,7 +635,7 @@ internal void StartSync() private void EndSync() { - if (!IsSyncInProgress) + if (!SyncInProgress) { return; } @@ -647,7 +661,7 @@ private void EnterMembersFromInternalPresenceMap() { try { - var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data); + var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data, item.Id); UpdatePresence(itemToSend, (success, info) => { if (!success) @@ -721,24 +735,15 @@ internal void ChannelSuspended(ErrorInfo error) FailQueuedMessages(error); } - internal void ChannelAttached(ProtocolMessage attachedMessage, bool isAttachWithoutMessageLoss = true) + internal void ChannelAttached(ProtocolMessage attachedMessage) { - // RTP19 - StartSync(); - // RTP1 var hasPresence = attachedMessage != null && attachedMessage.HasFlag(ProtocolMessage.Flag.HasPresence); - if (hasPresence) - { - if (Logger.IsDebug) - { - Logger.Debug( - $"Protocol message has presence flag. Starting Presence SYNC. Flag: {attachedMessage.Flags}"); - } - StartSync(); - } - else + // RTP19 + StartSync(); + + if (!hasPresence) { EndSync(); // RTP19 } @@ -747,10 +752,7 @@ internal void ChannelAttached(ProtocolMessage attachedMessage, bool isAttachWith SendQueuedMessages(); // RTP17f - if (isAttachWithoutMessageLoss) - { - EnterMembersFromInternalPresenceMap(); - } + EnterMembersFromInternalPresenceMap(); } private void SendQueuedMessages() diff --git a/src/IO.Ably.Shared/Realtime/RealtimeChannels.cs b/src/IO.Ably.Shared/Realtime/RealtimeChannels.cs index ae08cca24..eb7601a99 100644 --- a/src/IO.Ably.Shared/Realtime/RealtimeChannels.cs +++ b/src/IO.Ably.Shared/Realtime/RealtimeChannels.cs @@ -264,7 +264,10 @@ internal IDictionary GetChannelSerials() var channelSerials = new Dictionary(); foreach (var realtimeChannel in this) { - channelSerials[realtimeChannel.Name] = realtimeChannel.Properties.ChannelSerial; + if (realtimeChannel.State == ChannelState.Attached) + { + channelSerials[realtimeChannel.Name] = realtimeChannel.Properties.ChannelSerial; + } } return channelSerials; diff --git a/src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs b/src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs index db1da65ac..d55ec9427 100644 --- a/src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs +++ b/src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs @@ -26,9 +26,9 @@ public static RecoveryKeyContext Decode(string recover, ILogger logger = null) { return JsonHelper.Deserialize(recover); } - catch (Exception) + catch (Exception e) { - logger?.Warning($"Error deserializing recover - {recover}, setting it as null"); + logger?.Warning($"Error deserializing recover - {recover}, setting it as null", e); return null; } } diff --git a/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs index f940e9221..c1926a260 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs @@ -81,7 +81,7 @@ public async Task WhenAttachingToAChannelWithNoMembers_PresenceShouldBeConsidere await channel.WaitForAttachedState(); channel.State.Should().Be(ChannelState.Attached); - channel.Presence.IsSyncComplete.Should().BeTrue(); + channel.Presence.SyncComplete.Should().BeTrue(); } [Theory] @@ -109,8 +109,8 @@ public async Task WhenAttachingToAChannelWithMembers_PresenceShouldBeInProgress( if (args.Current == ChannelState.Attached) { Logger.Debug("Test: Setting inSync to - " + channel2.Presence.MembersMap.SyncInProgress); - syncInProgress = channel2.Presence.IsSyncInProgress; - syncComplete = channel2.Presence.IsSyncComplete; + syncInProgress = channel2.Presence.SyncInProgress; + syncComplete = channel2.Presence.SyncComplete; awaiter.SetCompleted(); } }; @@ -406,7 +406,6 @@ async Task WaitForNoPresenceOnChannel(IRestChannel rChannel) // let the library know the transport is really dead testTransport.Listener?.OnTransportEvent(testTransport.Id, TransportState.Closed); - await realtimeClient.WaitForState(ConnectionState.Disconnected); await realtimeClient.WaitForState(ConnectionState.Connected); await realtimeChannel.WaitForAttachedState(); @@ -450,45 +449,38 @@ public async Task OnAttach_ShouldEnterMembersFromInternalMap(Protocol protocol) await Task.Delay(250); presence.MembersMap.Members.Should().HaveCount(4); presence.InternalMembersMap.Members.Should().HaveCount(1); + var internalMemberId = presence.InternalMembersMap.Members.Values.First().Id; List leaveMessages = new List(); - PresenceMessage updateMessage = null; - PresenceMessage enterMessage = null; + PresenceMessage enteredMember = null; await WaitForMultiple(2, partialDone => { presence.Subscribe(PresenceAction.Leave, message => { leaveMessages.Add(message); }); - presence.Subscribe(PresenceAction.Update, message => - { - updateMessage = message; - partialDone(); // 1 call - }); - presence.Subscribe(PresenceAction.Enter, message => + client.GetTestTransport().BeforeMessageSend = message => { - enterMessage = message; // not expected to hit - }); + enteredMember = message.Presence.First(); + client.GetTestTransport().BeforeMessageSend = _ => { }; + partialDone(); + }; client.GetTestTransport().AfterDataReceived = message => { if (message.Action == ProtocolMessage.MessageAction.Attached) { bool hasPresence = message.HasFlag(ProtocolMessage.Flag.HasPresence); hasPresence.Should().BeFalse(); - bool resumed = message.HasFlag(ProtocolMessage.Flag.Resumed); - resumed.Should().BeTrue(); client.GetTestTransport().AfterDataReceived = _ => { }; partialDone(); // 1 call } }; - // inject duplicate attached message with resume flag ( no RTL12 message loss event) + // inject duplicate attached message without resume flag var protocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Attached) { Channel = channelName, Flags = 0, }; - protocolMessage.SetFlag(ProtocolMessage.Flag.Resumed); - protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed).Should().BeTrue(); client.GetTestTransport().FakeReceivedMessage(protocolMessage); }); @@ -498,15 +490,13 @@ await WaitForMultiple(2, partialDone => msg.ClientId.Should().BeOneOf("member_0", "member_1", "member_2", "local"); } - updateMessage.Should().NotBeNull(); - updateMessage.ClientId.Should().Be("local"); - enterMessage.Should().BeNull(); - + enteredMember.Should().NotBeNull(); + enteredMember.Id.Should().Be(internalMemberId); + enteredMember.Action.Should().Be(PresenceAction.Enter); + enteredMember.ClientId.Should().Be("local"); presence.Unsubscribe(); - var remainingMembers = await presence.GetAsync(); - remainingMembers.Should().HaveCount(1); - remainingMembers.First().ClientId.Should().Be("local"); + client.Close(); } [Theory] @@ -646,7 +636,7 @@ await WaitForMultiple(2, partialDone => msgA = null; msgB = null; var synthesizedMsg = new PresenceMessage(PresenceAction.Leave, clientB.ClientId) { ConnectionId = null }; - synthesizedMsg.IsSynthesized().Should().BeTrue(); + synthesizedMsg.IsServerSynthesized().Should().BeTrue(); channelB.Presence.OnPresence(new[] { synthesizedMsg }); msgB.Should().BeNull(); @@ -750,8 +740,8 @@ public async Task PresenceMap_WhenNotSyncingAndLeaveActionArrivesMemberKeyShould members.Should().HaveCount(20); // sync should not be in progress and initial an sync should have completed - channel.Presence.IsSyncInProgress.Should().BeFalse("sync should have completed"); - channel.Presence.IsSyncComplete.Should().BeTrue(); + channel.Presence.SyncInProgress.Should().BeFalse("sync should have completed"); + channel.Presence.SyncComplete.Should().BeTrue(); // pull a random member key from the presence map var memberNumber = new Random().Next(0, 19); @@ -1514,19 +1504,19 @@ await WaitForMultiple(2, partialDone => presence2.Subscribe(PresenceAction.Enter, msg => { - presence2.MembersMap.Members.Should().HaveCount(presence2.IsSyncComplete ? 2 : 1); + presence2.MembersMap.Members.Should().HaveCount(presence2.SyncComplete ? 2 : 1); presence2.Unsubscribe(); partialDone(); }); presence2.PendingPresenceQueue.Should().HaveCount(1); - presence2.IsSyncComplete.Should().BeFalse(); + presence2.SyncComplete.Should().BeFalse(); presence2.MembersMap.Members.Should().HaveCount(0); taskCountWaiter.Tick(); }); var transport = client2.GetTestTransport(); - await new ConditionalAwaiter(() => presence2.IsSyncComplete); + await new ConditionalAwaiter(() => presence2.SyncComplete); transport.ProtocolMessagesReceived.Any(m => m.Action == ProtocolMessage.MessageAction.Sync). Should().BeTrue("Should receive sync message"); presence2.MembersMap.Members.Should().HaveCount(2); diff --git a/src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs b/src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs index db9a4c3d5..4ba5374af 100644 --- a/src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs +++ b/src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs @@ -47,7 +47,7 @@ internal static async Task WaitForAttachedState(this IRealtimeChannel channel, T internal static async Task WaitSync(this Presence presence, TimeSpan? waitSpan = null) { TaskCompletionSource taskCompletionSource = new TaskCompletionSource(); - var inProgress = presence.IsSyncInProgress; + var inProgress = presence.SyncInProgress; if (inProgress == false) { return true;