Skip to content
Browse files

Made acks more generic so that it applies to any Message.

- Added waitForReply to Connection.Send overload.
- Added WaitForReply to ConnectionMessage.
  • Loading branch information...
1 parent 6648c16 commit 5f9d4303964668d9112da19cb5e6f1100152ef60 @davidfowl davidfowl committed Dec 15, 2012
View
1 src/Microsoft.AspNet.SignalR.Core/Command.cs
@@ -12,7 +12,6 @@ public Command()
Id = Guid.NewGuid().ToString();
}
- public bool WaitForAck { get; set; }
public string Id { get; private set; }
public CommandType CommandType { get; set; }
public string Value { get; set; }
View
3 src/Microsoft.AspNet.SignalR.Core/CommandType.cs
@@ -7,6 +7,7 @@ public enum CommandType
AddToGroup,
RemoveFromGroup,
Disconnect,
- Abort
+ Abort,
+ Ping
}
}
View
46 src/Microsoft.AspNet.SignalR.Core/Connection.cs
@@ -95,7 +95,7 @@ private TraceSource Trace
public Task Send(ConnectionMessage message)
{
- Message busMessage = CreateMessage(message.Signal, message.Value);
+ Message busMessage = CreateMessage(message);
if (message.ExcludedSignals != null)
{
@@ -104,25 +104,18 @@ public Task Send(ConnectionMessage message)
if (busMessage.WaitForAck)
{
- Task ackTask = _ackHandler.CreateAck(busMessage.CommandId);
+ Task ackTask = _ackHandler.CreateAck(busMessage.Id);
return _bus.Publish(busMessage).Then(task => task, ackTask);
}
return _bus.Publish(busMessage);
}
- private Message CreateMessage(string key, object value)
+ private Message CreateMessage(ConnectionMessage connectionMessage)
{
- var command = value as Command;
- var message = new Message(_connectionId, key, _serializer.Stringify(value));
-
- if (command != null)
- {
- // Set the command id
- message.CommandId = command.Id;
- message.WaitForAck = command.WaitForAck;
- }
-
+ var message = new Message(_connectionId, connectionMessage.Signal, _serializer.Stringify(connectionMessage.Value));
+ message.WaitForAck = connectionMessage.WaitForReply;
+ message.IsCommand = connectionMessage.Value is Command;
return message;
}
@@ -170,6 +163,11 @@ private PersistentResponse GetResponse(MessageResult result)
private bool ExcludeMessage(Message message)
{
+ if (message.IsCommand)
+ {
+ return true;
+ }
+
if (String.IsNullOrEmpty(message.Filter))
{
return false;
@@ -184,29 +182,29 @@ private bool ExcludeMessage(Message message)
private void ProcessResults(MessageResult result)
{
- result.Messages.Enumerate(message => message.IsAck || message.IsCommand,
- message =>
+ result.Messages.Enumerate(message =>
{
if (message.IsAck)
{
- _ackHandler.TriggerAck(message.CommandId);
+ _ackHandler.TriggerAck(message.AckId);
}
else if (message.IsCommand)
{
var command = _serializer.Parse<Command>(message.Value);
ProcessCommand(command);
+ }
- // Only send the ack if this command is waiting for it
- if (message.WaitForAck)
+ // Only send the ack if this command is waiting for it
+ if (message.WaitForAck)
+ {
+ // If we're on the same box and there's a pending ack for this command then
+ // just trip it
+ if (!_ackHandler.TriggerAck(message.Id))
{
- // If we're on the same box and there's a pending ack for this command then
- // just trip it
- if (!_ackHandler.TriggerAck(message.CommandId))
- {
- _bus.Ack(_connectionId, message.Key, message.CommandId).Catch();
- }
+ _bus.Ack(_connectionId, message.Key, message.Id).Catch();
}
}
+
});
}
View
5 src/Microsoft.AspNet.SignalR.Core/ConnectionMessage.cs
@@ -29,6 +29,11 @@ public struct ConnectionMessage
public IEnumerable<string> ExcludedSignals { get; set; }
/// <summary>
+ /// Determines if ths message should wait until the receiver received it.
+ /// </summary>
+ public bool WaitForReply { get; set; }
+
+ /// <summary>
/// Initializes a new instance of the <see cref="ConnectionMessage"/> class.
/// </summary>
/// <param name="signal">The signal</param>
View
10 src/Microsoft.AspNet.SignalR.Core/GroupManager.cs
@@ -75,11 +75,10 @@ public Task Add(string connectionId, string groupName)
var command = new Command
{
CommandType = CommandType.AddToGroup,
- Value = CreateQualifiedName(groupName),
- WaitForAck = true
+ Value = CreateQualifiedName(groupName)
};
- return _connection.Send(connectionId, command);
+ return _connection.Send(connectionId, command, waitForReply: true);
}
/// <summary>
@@ -103,11 +102,10 @@ public Task Remove(string connectionId, string groupName)
var command = new Command
{
CommandType = CommandType.RemoveFromGroup,
- Value = CreateQualifiedName(groupName),
- WaitForAck = true
+ Value = CreateQualifiedName(groupName)
};
- return _connection.Send(connectionId, command);
+ return _connection.Send(connectionId, command, waitForReply: true);
}
private string CreateQualifiedName(string groupName)
View
50 src/Microsoft.AspNet.SignalR.Core/Infrastructure/ConnectionExtensions.cs
@@ -43,15 +43,63 @@ public static Task Send(this IConnection connection, string signal, object value
throw new ArgumentNullException("connection");
}
+ if (signal == null)
+ {
+ throw new ArgumentNullException("signal");
+ }
+
var message = new ConnectionMessage(signal, value)
{
- ExcludedSignals = exclude
+ ExcludedSignals = exclude,
+ };
+
+ return connection.Send(message);
+ }
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="connection"></param>
+ /// <param name="connectionId"></param>
+ /// <param name="value"></param>
+ /// <param name="waitForReply"></param>
+ /// <returns></returns>
+ public static Task Send(this IConnection connection, string connectionId, object value, bool waitForReply)
+ {
+ if (connection == null)
+ {
+ throw new ArgumentNullException("connection");
+ }
+
+ if (connectionId == null)
+ {
+ throw new ArgumentNullException("connectionId");
+ }
+
+ var message = new ConnectionMessage(connectionId, value)
+ {
+ WaitForReply = waitForReply
};
return connection.Send(message);
}
/// <summary>
+ ///
+ /// </summary>
+ /// <param name="connection"></param>
+ /// <param name="connectionId"></param>
+ /// <returns></returns>
+ public static Task Ping(this IConnection connection, string connectionId)
+ {
+ var command = new Command
+ {
+ CommandType = CommandType.Ping
+ };
+ return Send(connection, connectionId, command, waitForReply: true);
+ }
+
+ /// <summary>
/// Broadcasts a value to all connections, excluding the connection ids specified.
/// </summary>
/// <param name="connection">The connection</param>
View
16 src/Microsoft.AspNet.SignalR.Core/MessageBus/Message.cs
@@ -12,26 +12,32 @@ public class Message
public string Source { get; set; }
public string Key { get; set; }
public string Value { get; set; }
+ public string Id { get; set; }
- public string CommandId { get; set; }
+ // Replies
+ public string AckId { get; set; }
public bool WaitForAck { get; set; }
- public bool IsAck { get; set; }
- public string Filter { get; set; }
- public bool IsCommand
+ public bool IsAck
{
get
{
- return CommandId != null;
+ return AckId != null;
}
}
+ // Filtering
+ public string Filter { get; set; }
+ public bool IsCommand { get; set; }
+
+
public Message()
{
}
public Message(string source, string key, string value)
{
+ Id = Guid.NewGuid().ToString();
Source = source;
Key = key;
Value = value;
View
5 src/Microsoft.AspNet.SignalR.Core/MessageBus/MessageBusExtensions.cs
@@ -31,12 +31,11 @@ public static Task Publish(this IMessageBus bus, string source, string key, stri
return bus.Publish(new Message(source, key, value));
}
- internal static Task Ack(this IMessageBus bus, string source, string eventKey, string commandId)
+ internal static Task Ack(this IMessageBus bus, string source, string eventKey, string id)
{
// Prepare the ack
var message = new Message(source, AckPrefix(eventKey), null);
- message.CommandId = commandId;
- message.IsAck = true;
+ message.AckId = id;
return bus.Publish(message);
}
View
2 src/Microsoft.AspNet.SignalR.Core/PersistentResponse.cs
@@ -140,7 +140,7 @@ void IJsonWritable.WriteJson(TextWriter writer)
jsonWriter.WritePropertyName("M");
jsonWriter.WriteStartArray();
- Messages.Enumerate(m => !m.IsCommand && !ExcludeFilter(m),
+ Messages.Enumerate(m => !ExcludeFilter(m),
m => jsonWriter.WriteRawValue(m.Value));
jsonWriter.WriteEndArray();
View
17 src/Microsoft.AspNet.SignalR.ServiceBus/FastMessageSerializer.cs
@@ -56,20 +56,25 @@ public Message[] ReadToEnd()
if (marker == MessageMarker)
{
+ string id = this.GetDataItem();
string source = this.GetDataItem();
string key = this.GetDataItem();
string value = this.GetDataItem();
- string commandId = this.GetDataItem();
+ string isCommandValue = this.GetDataItem();
string waitForAcValue = this.GetDataItem();
- string isAckValue = this.GetDataItem();
+ string ackValue = this.GetDataItem();
string filter = this.GetDataItem();
messages.Add(
- new Message(source, key, value)
+ new Message()
{
- CommandId = commandId,
+ Id = id,
+ Source = source,
+ Key = key,
+ Value = value,
+ IsCommand = Boolean.Parse(isCommandValue),
WaitForAck = Boolean.Parse(waitForAcValue),
- IsAck = Boolean.Parse(isAckValue),
+ AckId = ackValue,
Filter = filter
});
}
@@ -222,7 +227,7 @@ static IEnumerator<byte[]> CreateIterator(IEnumerable<Message> messages)
{
yield return BitConverter.GetBytes(MessageMarker);
- string[] itemList = new string[] { message.Source, message.Key, message.Value, message.CommandId, message.WaitForAck.ToString(), message.IsAck.ToString(), message.Filter };
+ string[] itemList = new string[] { message.Id, message.Source, message.Key, message.Value, message.IsCommand.ToString(), message.WaitForAck.ToString(), message.AckId, message.Filter };
foreach (string item in itemList)
{
View
2 tests/Microsoft.AspNet.SignalR.Tests/GroupManagerFacts.cs
@@ -54,7 +54,6 @@ public void CreatesAddToGroupCommandOnAdd()
Assert.NotNull(command.Id);
Assert.Equal(CommandType.AddToGroup, command.CommandType);
Assert.Equal("Prefix.MyGroup", command.Value);
- Assert.True(command.WaitForAck);
});
var groupManager = new GroupManager(connection.Object, "Prefix");
@@ -105,7 +104,6 @@ public void CreatesRemoveFromGroupCommandOnAdd()
Assert.NotNull(command.Id);
Assert.Equal(CommandType.RemoveFromGroup, command.CommandType);
Assert.Equal("Prefix.MyGroup", command.Value);
- Assert.True(command.WaitForAck);
});
var groupManager = new GroupManager(connection.Object, "Prefix");
View
22 tests/Microsoft.AspNet.SignalR.Tests/PersistentResponseFacts.cs
@@ -30,28 +30,6 @@ public void WriteJsonWritesMessagesThatAreNotExcluded()
}
[Fact]
- public void WriteJsonWritesSkipsCommands()
- {
- // Arrange
- var writer = new StringWriter();
- var response = new PersistentResponse(m => false);
- response.Messages = new List<ArraySegment<Message>>();
- response.AddedGroups = new List<string>
- {
- "g1"
- };
- response.MessageId = "Baz";
- response.Messages.Add(new ArraySegment<Message>(new[] { new Message("1", "key", "value1") { CommandId = "something"},
- new Message("1", "key2", "value2") }, 0, 2));
-
- // Act
- ((IJsonWritable)response).WriteJson(writer);
-
- // Assert
- Assert.Equal(@"{""C"":""Baz"",""G"":[""g1""],""M"":[value2]}", writer.ToString());
- }
-
- [Fact]
public void WriteJsonSkipsNullGroups()
{
// Arrange

0 comments on commit 5f9d430

Please sign in to comment.
Something went wrong with that request. Please try again.