Skip to content

Commit

Permalink
Merge branch 'RTP16b-fix'
Browse files Browse the repository at this point in the history
  • Loading branch information
withakay committed Apr 9, 2019
2 parents 415c7cf + fb42a41 commit 73401c9
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 10 deletions.
23 changes: 17 additions & 6 deletions src/IO.Ably.Shared/Realtime/Presence.cs
Expand Up @@ -323,18 +323,14 @@ internal void UpdatePresence(PresenceMessage msg, Action<bool, ErrorInfo> callba
switch (_channel.State)
{
case ChannelState.Initialized:
if (_connection.Options.QueueMessages)
if (PendingPresenceEnqueue(new QueuedPresenceMessage(msg, callback)))
{
PendingPresenceQueue.Enqueue(new QueuedPresenceMessage(msg, callback));
_channel.Attach();
}

break;
case ChannelState.Attaching:
if (_connection.Options.QueueMessages)
{
PendingPresenceQueue.Enqueue(new QueuedPresenceMessage(msg, callback));
}
PendingPresenceEnqueue(new QueuedPresenceMessage(msg, callback));

break;
case ChannelState.Attached:
Expand All @@ -347,6 +343,21 @@ internal void UpdatePresence(PresenceMessage msg, Action<bool, ErrorInfo> callba
}
}

private bool PendingPresenceEnqueue(QueuedPresenceMessage msg)
{
if (!_connection.Options.QueueMessages)
{
msg.Callback?.Invoke(
false,
new ErrorInfo("Unable enqueue message because Options.QueueMessages is set to False.", _connection.Connection.ConnectionState.DefaultErrorInfo.Code, HttpStatusCode.ServiceUnavailable));

return false;
}

PendingPresenceQueue.Enqueue(msg);
return true;
}

internal void ResumeSync()
{
if (_channel.State == ChannelState.Initialized ||
Expand Down
5 changes: 4 additions & 1 deletion src/IO.Ably.Shared/Transport/ConnectionManager.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
using IO.Ably.MessageEncoders;
using IO.Ably.Realtime;
Expand Down Expand Up @@ -352,7 +353,9 @@ public void Send(ProtocolMessage message, Action<bool, ErrorInfo> callback = nul
}
else
{
throw new AblyException($"Current state is [{State.State}] which supports queuing but Options.QueueMessages is set to False.");
throw new AblyException($"Current state is [{State.State}] which supports queuing but Options.QueueMessages is set to False.",
Connection.ConnectionState.DefaultErrorInfo.Code,
HttpStatusCode.ServiceUnavailable);
}

return;
Expand Down
Expand Up @@ -6,6 +6,8 @@ namespace IO.Ably.Transport.States.Connection
{
internal class ConnectionClosedState : ConnectionStateBase
{
public new ErrorInfo DefaultErrorInfo => ErrorInfo.ReasonClosed;

public ConnectionClosedState(IConnectionContext context, ILogger logger)
: this(context, null, logger)
{
Expand Down
Expand Up @@ -8,6 +8,8 @@ internal class ConnectionDisconnectedState : ConnectionStateBase
{
private readonly ICountdownTimer _timer;

public new ErrorInfo DefaultErrorInfo => ErrorInfo.ReasonDisconnected;

public ConnectionDisconnectedState(IConnectionContext context, ILogger logger)
: this(context, null, new CountdownTimer("Disconnected state timer", logger), logger)
{
Expand Down
Expand Up @@ -6,6 +6,8 @@ namespace IO.Ably.Transport.States.Connection
{
internal class ConnectionFailedState : ConnectionStateBase
{
public new ErrorInfo DefaultErrorInfo => ErrorInfo.ReasonFailed;

public ConnectionFailedState(IConnectionContext context, ErrorInfo error, ILogger logger)
: base(context, logger)
{
Expand Down
Expand Up @@ -33,6 +33,8 @@ protected ConnectionStateBase(IConnectionContext context, ILogger logger)

public virtual bool IsUpdate { get; protected set; }

public ErrorInfo DefaultErrorInfo => ErrorInfo.ReasonUnknown;

public virtual void Connect()
{
}
Expand Down
Expand Up @@ -7,6 +7,8 @@ internal class ConnectionSuspendedState : ConnectionStateBase
{
private readonly ICountdownTimer _timer;

public new ErrorInfo DefaultErrorInfo => ErrorInfo.ReasonSuspended;

public ConnectionSuspendedState(IConnectionContext context, ILogger logger)
: this(context, null, new CountdownTimer("Suspended state timer", logger), logger)
{
Expand Down
2 changes: 1 addition & 1 deletion src/IO.Ably.Tests.Shared/Realtime/ChannelSandboxSpecs.cs
Expand Up @@ -654,7 +654,7 @@ public async Task WhenChannelEntersDetachedFailedSuspendedState_ShouldDeleteQueu

var channel = client.Channels.Get("test".AddRandomSuffix());

var tsc = new TaskCompletionAwaiter(5000);
var tsc = new TaskCompletionAwaiter(20000);
client.Connection.Once(ConnectionEvent.Disconnected, change =>
{
// place a message on the queue
Expand Down
17 changes: 15 additions & 2 deletions src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs
Expand Up @@ -1556,14 +1556,27 @@ public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_WhenChannelIsIn
await client.ConnectionManager.SetState(new ConnectionDisconnectedState(client.ConnectionManager, client.Logger));
await client.WaitForState(ConnectionState.Disconnected);

List<int> queueCounts = new List<int>();
Presence.QueuedPresenceMessage[] presenceMessages = null;

channel.Presence.Enter(client.Connection.State.ToString(), (b, info) =>{ });
var tsc = new TaskCompletionAwaiter();
ErrorInfo err = null;
bool? success = null;
channel.Presence.Enter(client.Connection.State.ToString(), (b, info) =>
{
success = b;
err = info;
tsc.SetCompleted();
});
presenceMessages = channel.Presence.PendingPresenceQueue.ToArray();

presenceMessages.Should().HaveCount(0);

await tsc.Task;
success.Should().HaveValue();
success.Value.Should().BeFalse();
err.Should().NotBeNull();
err.Message.Should().Be("Unable enqueue message because Options.QueueMessages is set to False.");

// clean up
client.Close();
}
Expand Down

0 comments on commit 73401c9

Please sign in to comment.