Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix spec RTL12 #1275

Merged
merged 16 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This is a .NET client library for Ably which targets the 2.0 client library spec
## Supported platforms

* [.NET Standard 2.0+](https://learn.microsoft.com/en-us/dotnet/standard/net-standard?tabs=net-standard-2-0)
* .NET 6.x, 7.x, MAUI, check [MAUI config](#maui-configuration).
* .NET 6.0+, MAUI, check [MAUI config](#maui-configuration).
* .NET Framework 4.6.2+
* .NET (Core) 2.0+
* Mono 5.4+
Expand Down Expand Up @@ -424,7 +424,6 @@ var options = new ClientOptions
{
AuthCallback = async tokenParams =>
{
// Return serialized jwttokenstring returned from server
string jwtToken = await getJwtTokenFromServer(tokenParams);
int expiresIn = 3600; // assuming jwtToken has 1 hr expiry
return new TokenDetails(jwtToken) {
Expand Down
26 changes: 13 additions & 13 deletions src/IO.Ably.Shared/Extensions/PresenceExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,43 @@
{
internal static class PresenceExtensions
{
public static bool IsSynthesized(this PresenceMessage msg)
public static bool IsServerSynthesized(this PresenceMessage msg)
{
return msg.Id == null || !msg.Id.StartsWith(msg.ConnectionId);
}

// RTP2b, RTP2c
public static bool IsNewerThan(this PresenceMessage thisMessage, PresenceMessage thatMessage)
public static bool IsNewerThan(this PresenceMessage existingMsg, PresenceMessage incomingMsg)
{
// RTP2b1
if (thisMessage.IsSynthesized() || thatMessage.IsSynthesized())
if (existingMsg.IsServerSynthesized() || incomingMsg.IsServerSynthesized())
{
return thisMessage.Timestamp > thatMessage.Timestamp;
return existingMsg.Timestamp > incomingMsg.Timestamp;
}

// RTP2b2
var thisValues = thisMessage.Id.Split(':');
var thatValues = thatMessage.Id.Split(':');
var thisValues = existingMsg.Id.Split(':');
var thatValues = incomingMsg.Id.Split(':');

// if any part of the message serial fails to parse then throw an exception
if (thisValues.Length != 3 ||
!(int.TryParse(thisValues[1], out int msgSerialThis) | int.TryParse(thisValues[2], out int indexThis)))
!(int.TryParse(thisValues[1], out int existingMsgSerial) | int.TryParse(thisValues[2], out int existingMsgIndex)))
{
throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{thisMessage.Id}'.");
throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{existingMsg.Id}'.");
}

if (thatValues.Length != 3 ||
!(int.TryParse(thatValues[1], out int msgSerialThat) | int.TryParse(thatValues[2], out int indexThat)))
!(int.TryParse(thatValues[1], out int incomingMsgSerial) | int.TryParse(thatValues[2], out int incomingMsgIndex)))
{
throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{thatMessage.Id}'.");
throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{incomingMsg.Id}'.");
}

if (msgSerialThis == msgSerialThat)
if (existingMsgSerial == incomingMsgSerial)
{
return indexThis > indexThat;
return existingMsgIndex > incomingMsgIndex;
}

return msgSerialThis > msgSerialThat;
return existingMsgSerial > incomingMsgSerial;
}
}
}
9 changes: 6 additions & 3 deletions src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ public Task<bool> MessageReceived(ProtocolMessage protocolMessage, RealtimeState
}

// RTL12
if (channel.State == ChannelState.Attached && !protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed))
if (channel.State == ChannelState.Attached)
{
channel.Presence.ChannelAttached(protocolMessage, false);
channel.EmitErrorUpdate(protocolMessage.Error, false, protocolMessage);
if (!protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed))
{
channel.Presence.ChannelAttached(protocolMessage);
channel.EmitErrorUpdate(protocolMessage.Error, false, protocolMessage);
}
}
else
{
Expand Down
56 changes: 29 additions & 27 deletions src/IO.Ably.Shared/Realtime/Presence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,28 @@ internal Presence(IConnectionManager connection, RealtimeChannel channel, string
internal ILogger Logger { get; private set; }

/// <summary>
/// Has the sync completed.
/// Checks if presence sync has ended.
/// </summary>
public bool IsSyncComplete => MembersMap.SyncCompleted && !IsSyncInProgress;
///
[Obsolete("This property is deprecated, use SyncComplete instead")]
public bool IsSyncComplete => SyncComplete; // RTP13.

/// <summary>
/// Checks if presence sync has ended.
/// </summary>
///
public bool SyncComplete => MembersMap.SyncCompleted && !SyncInProgress; // RTP13

/// <summary>
/// Indicates whether there is currently a sync in progress.
/// </summary>
[Obsolete("This property is internal, will be removed in the future")]
public bool IsSyncInProgress => SyncInProgress;

/// <summary>
/// Indicates whether there is currently a sync in progress.
/// </summary>
public bool IsSyncInProgress => MembersMap.SyncInProgress;
internal bool SyncInProgress => MembersMap.SyncInProgress;

/// <summary>
/// Indicates all members present on the channel.
Expand Down Expand Up @@ -153,7 +167,7 @@ private async Task<bool> WaitForSyncAsync()
// The InternalSync should be completed and the channels Attached or Attaching
void CheckAndSet()
{
if (IsSyncComplete
if (SyncComplete
&& (_channel.State == ChannelState.Attached || _channel.State == ChannelState.Attaching))
{
tsc.TrySetResult(true);
Expand Down Expand Up @@ -532,7 +546,7 @@ internal void OnSyncMessage(ProtocolMessage protocolMessage)
/* If a new sequence identifier is sent from Ably, then the client library
* must consider that to be the start of a new sync sequence
* and any previous in-flight sync should be discarded. (part of RTP18)*/
if (IsSyncInProgress && _currentSyncChannelSerial.IsNotEmpty() && _currentSyncChannelSerial != syncSequenceId)
if (SyncInProgress && _currentSyncChannelSerial.IsNotEmpty() && _currentSyncChannelSerial != syncSequenceId)
{
EndSync();
}
Expand Down Expand Up @@ -582,7 +596,7 @@ internal void OnPresence(PresenceMessage[] messages)
// RTP2e
case PresenceAction.Leave:
broadcast &= MembersMap.Remove(message);
if (updateInternalPresence && !message.IsSynthesized())
if (updateInternalPresence && !message.IsServerSynthesized())
{
InternalMembersMap.Remove(message);
}
Expand Down Expand Up @@ -613,15 +627,15 @@ internal void OnPresence(PresenceMessage[] messages)

internal void StartSync()
{
if (!IsSyncInProgress)
if (!SyncInProgress)
{
MembersMap.StartSync();
}
}

private void EndSync()
{
if (!IsSyncInProgress)
if (!SyncInProgress)
{
return;
}
Expand All @@ -647,7 +661,7 @@ private void EnterMembersFromInternalPresenceMap()
{
try
{
var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data);
var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data, item.Id);
UpdatePresence(itemToSend, (success, info) =>
{
if (!success)
Expand Down Expand Up @@ -721,24 +735,15 @@ internal void ChannelSuspended(ErrorInfo error)
FailQueuedMessages(error);
}

internal void ChannelAttached(ProtocolMessage attachedMessage, bool isAttachWithoutMessageLoss = true)
internal void ChannelAttached(ProtocolMessage attachedMessage)
{
// RTP19
StartSync();

// RTP1
var hasPresence = attachedMessage != null && attachedMessage.HasFlag(ProtocolMessage.Flag.HasPresence);
if (hasPresence)
{
if (Logger.IsDebug)
{
Logger.Debug(
$"Protocol message has presence flag. Starting Presence SYNC. Flag: {attachedMessage.Flags}");
}

StartSync();
}
else
// RTP19
StartSync();

if (!hasPresence)
{
EndSync(); // RTP19
}
Expand All @@ -747,10 +752,7 @@ internal void ChannelAttached(ProtocolMessage attachedMessage, bool isAttachWith
SendQueuedMessages();

// RTP17f
if (isAttachWithoutMessageLoss)
{
EnterMembersFromInternalPresenceMap();
}
EnterMembersFromInternalPresenceMap();
}

private void SendQueuedMessages()
Expand Down
5 changes: 4 additions & 1 deletion src/IO.Ably.Shared/Realtime/RealtimeChannels.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ internal IDictionary<string, string> GetChannelSerials()
var channelSerials = new Dictionary<string, string>();
foreach (var realtimeChannel in this)
{
channelSerials[realtimeChannel.Name] = realtimeChannel.Properties.ChannelSerial;
if (realtimeChannel.State == ChannelState.Attached)
{
channelSerials[realtimeChannel.Name] = realtimeChannel.Properties.ChannelSerial;
}
}

return channelSerials;
Expand Down
4 changes: 2 additions & 2 deletions src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public static RecoveryKeyContext Decode(string recover, ILogger logger = null)
{
return JsonHelper.Deserialize<RecoveryKeyContext>(recover);
}
catch (Exception)
catch (Exception e)
{
logger?.Warning($"Error deserializing recover - {recover}, setting it as null");
logger?.Warning($"Error deserializing recover - {recover}, setting it as null", e);
return null;
}
}
Expand Down
54 changes: 22 additions & 32 deletions src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task WhenAttachingToAChannelWithNoMembers_PresenceShouldBeConsidere
await channel.WaitForAttachedState();
channel.State.Should().Be(ChannelState.Attached);

channel.Presence.IsSyncComplete.Should().BeTrue();
channel.Presence.SyncComplete.Should().BeTrue();
}

[Theory]
Expand Down Expand Up @@ -109,8 +109,8 @@ public async Task WhenAttachingToAChannelWithMembers_PresenceShouldBeInProgress(
if (args.Current == ChannelState.Attached)
{
Logger.Debug("Test: Setting inSync to - " + channel2.Presence.MembersMap.SyncInProgress);
syncInProgress = channel2.Presence.IsSyncInProgress;
syncComplete = channel2.Presence.IsSyncComplete;
syncInProgress = channel2.Presence.SyncInProgress;
syncComplete = channel2.Presence.SyncComplete;
awaiter.SetCompleted();
}
};
Expand Down Expand Up @@ -406,7 +406,6 @@ async Task<bool> WaitForNoPresenceOnChannel(IRestChannel rChannel)

// let the library know the transport is really dead
testTransport.Listener?.OnTransportEvent(testTransport.Id, TransportState.Closed);

await realtimeClient.WaitForState(ConnectionState.Disconnected);
await realtimeClient.WaitForState(ConnectionState.Connected);
await realtimeChannel.WaitForAttachedState();
Expand Down Expand Up @@ -450,45 +449,38 @@ public async Task OnAttach_ShouldEnterMembersFromInternalMap(Protocol protocol)
await Task.Delay(250);
presence.MembersMap.Members.Should().HaveCount(4);
presence.InternalMembersMap.Members.Should().HaveCount(1);
var internalMemberId = presence.InternalMembersMap.Members.Values.First().Id;

List<PresenceMessage> leaveMessages = new List<PresenceMessage>();
PresenceMessage updateMessage = null;
PresenceMessage enterMessage = null;
PresenceMessage enteredMember = null;
await WaitForMultiple(2, partialDone =>
{
presence.Subscribe(PresenceAction.Leave, message =>
{
leaveMessages.Add(message);
});
presence.Subscribe(PresenceAction.Update, message =>
{
updateMessage = message;
partialDone(); // 1 call
});
presence.Subscribe(PresenceAction.Enter, message =>
client.GetTestTransport().BeforeMessageSend = message =>
{
enterMessage = message; // not expected to hit
});
enteredMember = message.Presence.First();
client.GetTestTransport().BeforeMessageSend = _ => { };
partialDone();
};
client.GetTestTransport().AfterDataReceived = message =>
{
if (message.Action == ProtocolMessage.MessageAction.Attached)
{
bool hasPresence = message.HasFlag(ProtocolMessage.Flag.HasPresence);
hasPresence.Should().BeFalse();
bool resumed = message.HasFlag(ProtocolMessage.Flag.Resumed);
resumed.Should().BeTrue();
client.GetTestTransport().AfterDataReceived = _ => { };
partialDone(); // 1 call
}
};
// inject duplicate attached message with resume flag ( no RTL12 message loss event)
// inject duplicate attached message without resume flag
var protocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Attached)
{
Channel = channelName,
Flags = 0,
};
protocolMessage.SetFlag(ProtocolMessage.Flag.Resumed);
protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed).Should().BeTrue();
client.GetTestTransport().FakeReceivedMessage(protocolMessage);
});

Expand All @@ -498,15 +490,13 @@ await WaitForMultiple(2, partialDone =>
msg.ClientId.Should().BeOneOf("member_0", "member_1", "member_2", "local");
}

updateMessage.Should().NotBeNull();
updateMessage.ClientId.Should().Be("local");
enterMessage.Should().BeNull();

enteredMember.Should().NotBeNull();
enteredMember.Id.Should().Be(internalMemberId);
enteredMember.Action.Should().Be(PresenceAction.Enter);
enteredMember.ClientId.Should().Be("local");
presence.Unsubscribe();
var remainingMembers = await presence.GetAsync();

remainingMembers.Should().HaveCount(1);
remainingMembers.First().ClientId.Should().Be("local");
client.Close();
}

[Theory]
Expand Down Expand Up @@ -646,7 +636,7 @@ await WaitForMultiple(2, partialDone =>
msgA = null;
msgB = null;
var synthesizedMsg = new PresenceMessage(PresenceAction.Leave, clientB.ClientId) { ConnectionId = null };
synthesizedMsg.IsSynthesized().Should().BeTrue();
synthesizedMsg.IsServerSynthesized().Should().BeTrue();
channelB.Presence.OnPresence(new[] { synthesizedMsg });

msgB.Should().BeNull();
Expand Down Expand Up @@ -750,8 +740,8 @@ public async Task PresenceMap_WhenNotSyncingAndLeaveActionArrivesMemberKeyShould
members.Should().HaveCount(20);

// sync should not be in progress and initial an sync should have completed
channel.Presence.IsSyncInProgress.Should().BeFalse("sync should have completed");
channel.Presence.IsSyncComplete.Should().BeTrue();
channel.Presence.SyncInProgress.Should().BeFalse("sync should have completed");
channel.Presence.SyncComplete.Should().BeTrue();

// pull a random member key from the presence map
var memberNumber = new Random().Next(0, 19);
Expand Down Expand Up @@ -1514,19 +1504,19 @@ await WaitForMultiple(2, partialDone =>

presence2.Subscribe(PresenceAction.Enter, msg =>
{
presence2.MembersMap.Members.Should().HaveCount(presence2.IsSyncComplete ? 2 : 1);
presence2.MembersMap.Members.Should().HaveCount(presence2.SyncComplete ? 2 : 1);
presence2.Unsubscribe();
partialDone();
});

presence2.PendingPresenceQueue.Should().HaveCount(1);
presence2.IsSyncComplete.Should().BeFalse();
presence2.SyncComplete.Should().BeFalse();
presence2.MembersMap.Members.Should().HaveCount(0);
taskCountWaiter.Tick();
});

var transport = client2.GetTestTransport();
await new ConditionalAwaiter(() => presence2.IsSyncComplete);
await new ConditionalAwaiter(() => presence2.SyncComplete);
transport.ProtocolMessagesReceived.Any(m => m.Action == ProtocolMessage.MessageAction.Sync).
Should().BeTrue("Should receive sync message");
presence2.MembersMap.Members.Should().HaveCount(2);
Expand Down
2 changes: 1 addition & 1 deletion src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal static async Task WaitForAttachedState(this IRealtimeChannel channel, T
internal static async Task<bool> WaitSync(this Presence presence, TimeSpan? waitSpan = null)
{
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
var inProgress = presence.IsSyncInProgress;
var inProgress = presence.SyncInProgress;
if (inProgress == false)
{
return true;
Expand Down
Loading