Permalink
Browse files

Potential perf optimizations by reducing allocations

  • Loading branch information...
1 parent aa45e12 commit aae6db82cc8e9fd4267faaffe74515a633a13602 @chenriksson chenriksson committed with davidfowl Oct 22, 2013
@@ -475,9 +475,9 @@ Global
{27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|ARM.ActiveCfg = Release|ARM
{27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|ARM.Build.0 = Release|ARM
{27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|ARM.Deploy.0 = Release|ARM
- {27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|Mixed Platforms.ActiveCfg = Release|x86
- {27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|Mixed Platforms.Build.0 = Release|x86
- {27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|Mixed Platforms.Deploy.0 = Release|x86
+ {27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|Mixed Platforms.Deploy.0 = Release|Any CPU
{27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|x64.ActiveCfg = Release|x64
{27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|x64.Build.0 = Release|x64
{27D349B4-FBA2-4830-978A-9B30FFB926F0}.Release|x64.Deploy.0 = Release|x64
@@ -505,9 +505,9 @@ Global
{C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|ARM.ActiveCfg = Release|ARM
{C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|ARM.Build.0 = Release|ARM
{C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|ARM.Deploy.0 = Release|ARM
- {C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|Mixed Platforms.ActiveCfg = Release|x86
- {C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|Mixed Platforms.Build.0 = Release|x86
- {C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|Mixed Platforms.Deploy.0 = Release|x86
+ {C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|Mixed Platforms.Deploy.0 = Release|Any CPU
{C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|x64.ActiveCfg = Release|x64
{C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|x64.Build.0 = Release|x64
{C10CAED0-D8FA-4559-B3E7-40F104BC79A2}.Release|x64.Deploy.0 = Release|x64
@@ -533,9 +533,9 @@ Global
{C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|ARM.ActiveCfg = Release|ARM
{C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|ARM.Build.0 = Release|ARM
{C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|ARM.Deploy.0 = Release|ARM
- {C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|Mixed Platforms.ActiveCfg = Release|x86
- {C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|Mixed Platforms.Build.0 = Release|x86
- {C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|Mixed Platforms.Deploy.0 = Release|x86
+ {C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU
+ {C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|Mixed Platforms.Build.0 = Release|Any CPU
+ {C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|Mixed Platforms.Deploy.0 = Release|Any CPU
{C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|x64.ActiveCfg = Release|Any CPU
{C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|x86.ActiveCfg = Release|x86
{C53C2CC0-1429-40C8-B79C-EBDDBE292FB6}.Release|x86.Build.0 = Release|x86
@@ -618,4 +618,7 @@ Global
{C10CAED0-D8FA-4559-B3E7-40F104BC79A2} = {7E3D992A-8F37-4C5D-AD42-E052522816C5}
{C53C2CC0-1429-40C8-B79C-EBDDBE292FB6} = {7E3D992A-8F37-4C5D-AD42-E052522816C5}
EndGlobalSection
+ GlobalSection(Performance) = preSolution
+ HasPerformanceSessions = true
+ EndGlobalSection
EndGlobal
@@ -4,6 +4,7 @@
using System.Collections.Generic;
using Microsoft.AspNet.SignalR.Owin;
using Microsoft.Owin;
+using Microsoft.AspNet.SignalR.Transports;
namespace Microsoft.AspNet.SignalR.Hosting
{
@@ -17,6 +18,8 @@ public class HostContext
// Owin environment dictionary
public IDictionary<string, object> Environment { get; private set; }
+ internal ITransport Transport { get; set; }
+
public HostContext(IRequest request, IResponse response)
{
Request = request;
@@ -34,6 +34,9 @@ public class Connection : IConnection, ITransportConnection, ISubscriber
private readonly TraceSource _traceSource;
private readonly IAckHandler _ackHandler;
private readonly IProtectedData _protectedData;
+ private readonly Func<Message, bool> _excludeMessage;
+
+ private static readonly Action<Connection, Message> _processResultsAction = (connection, message) => ProcessResultsHelper(connection, message);
public Connection(IMessageBus newMessageBus,
JsonSerializer jsonSerializer,
@@ -61,6 +64,7 @@ public class Connection : IConnection, ITransportConnection, ISubscriber
_ackHandler = ackHandler;
_counters = performanceCounterManager;
_protectedData = protectedData;
+ _excludeMessage = m => ExcludeMessage(m);
}
public string DefaultSignal
@@ -261,7 +265,7 @@ private PersistentResponse GetResponse(MessageResult result)
Debug.Assert(WriteCursor != null, "Unable to resolve the cursor since the method is null");
- var response = new PersistentResponse(ExcludeMessage, WriteCursor);
+ var response = new PersistentResponse(_excludeMessage, WriteCursor);
response.Terminal = result.Terminal;
if (!result.Terminal)
@@ -299,30 +303,31 @@ private bool ExcludeMessage(Message message)
private void ProcessResults(MessageResult result)
{
- result.Messages.Enumerate<object>(message => message.IsAck || message.IsCommand,
- (state, message) =>
- {
- if (message.IsAck)
- {
- _ackHandler.TriggerAck(message.CommandId);
- }
- else if (message.IsCommand)
- {
- var command = _serializer.Parse<Command>(message.Value, message.Encoding);
- ProcessCommand(command);
-
- // Only send the ack if this command is waiting for it
- if (message.WaitForAck)
- {
- // If we're on the same box and there's a pending ack for this command then
- // just trip it
- if (!_ackHandler.TriggerAck(message.CommandId))
- {
- _bus.Ack(_connectionId, message.CommandId).Catch();
- }
- }
- }
- }, null);
+ result.Messages.Enumerate<Connection>(message => message.IsAck || message.IsCommand, _processResultsAction, this);
+ }
+
+ private static void ProcessResultsHelper(Connection connection, Message message)
+ {
+ if (message.IsAck)
+ {
+ connection._ackHandler.TriggerAck(message.CommandId);
+ }
+ else if (message.IsCommand)
+ {
+ var command = connection._serializer.Parse<Command>(message.Value, message.Encoding);
+ connection.ProcessCommand(command);
+
+ // Only send the ack if this command is waiting for it
+ if (message.WaitForAck)
+ {
+ // If we're on the same box and there's a pending ack for this command then
+ // just trip it
+ if (!connection._ackHandler.TriggerAck(message.CommandId))
+ {
+ connection._bus.Ack(connection._connectionId, message.CommandId).Catch();
+ }
+ }
+ }
}
private void ProcessCommand(Command command)
@@ -17,6 +17,8 @@ internal sealed class TaskQueue
private volatile bool _drained;
private readonly int? _maxSize;
private long _size;
+ private Action<object> _dequeueAction;
+ private Action<Func<object, Task>, object> _invokeNextAction;
public TaskQueue()
: this(TaskAsyncHelper.Empty)
@@ -26,13 +28,17 @@ public TaskQueue()
public TaskQueue(Task initialTask)
{
_lastQueuedTask = initialTask;
+ _dequeueAction = queue => Dequeue(queue);
+ _invokeNextAction = (next, nextState) => InvokeNext(next, nextState);
}
[SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode", Justification = "This is shared code")]
public TaskQueue(Task initialTask, int maxSize)
{
_lastQueuedTask = initialTask;
_maxSize = maxSize;
+ _dequeueAction = queue => Dequeue(queue);
+ _invokeNextAction = (next, nextState) => InvokeNext(next, nextState);
}
#if !CLIENT_NET45
@@ -83,31 +89,36 @@ public Task Enqueue(Func<object, Task> taskFunc, object state)
#endif
}
- Task newTask = _lastQueuedTask.Then((next, nextState) =>
- {
- return next(nextState).Finally(s =>
- {
- var queue = (TaskQueue)s;
- if (queue._maxSize != null)
- {
- // Decrement the number of items left in the queue
- Interlocked.Decrement(ref queue._size);
+ Task newTask = _lastQueuedTask.Then(_invokeNextAction, taskFunc, state);
+ _lastQueuedTask = newTask;
+ return newTask;
+ }
+ }
+
+ private Task InvokeNext(Func<object, Task> next, object nextState)
+ {
+ return next(nextState).Finally(_dequeueAction, this);
+ }
#if !CLIENT_NET45
- var counter = QueueSizeCounter;
- if (counter != null)
- {
- counter.Decrement();
- }
+ private void Dequeue(object state)
+#else
+ private static void Dequeue(object state)
#endif
- }
- },
- this);
- },
- taskFunc, state);
+ {
+ var queue = (TaskQueue)state;
+ if (queue._maxSize != null)
+ {
+ // Decrement the number of items left in the queue
+ Interlocked.Decrement(ref queue._size);
- _lastQueuedTask = newTask;
- return newTask;
+#if !CLIENT_NET45
+ var counter = QueueSizeCounter;
+ if (counter != null)
+ {
+ counter.Decrement();
+ }
+#endif
}
}
@@ -17,6 +17,7 @@ internal class DefaultSubscription : Subscription
private List<Cursor> _cursors;
private List<Topic> _cursorTopics;
+ private ulong[] _cursorsState;
private readonly IStringMinifier _stringMinifier;
@@ -82,7 +83,7 @@ public override bool AddEvent(string eventKey, Topic topic)
lock (_cursors)
{
// O(n), but small n and it's not common
- var index = _cursors.FindIndex(c => c.Key == eventKey);
+ var index = FindCursorIndex(eventKey);
if (index == -1)
{
_cursors.Add(new Cursor(eventKey, GetMessageId(topic), _stringMinifier.Minify(eventKey)));
@@ -102,7 +103,7 @@ public override void RemoveEvent(string eventKey)
lock (_cursors)
{
- var index = _cursors.FindIndex(c => c.Key == eventKey);
+ var index = FindCursorIndex(eventKey);
if (index != -1)
{
_cursors.RemoveAt(index);
@@ -118,7 +119,7 @@ public override void SetEventTopic(string eventKey, Topic topic)
lock (_cursors)
{
// O(n), but small n and it's not common
- var index = _cursors.FindIndex(c => c.Key == eventKey);
+ var index = FindCursorIndex(eventKey);
if (index != -1)
{
_cursorTopics[index] = topic;
@@ -141,11 +142,15 @@ protected override void PerformWork(IList<ArraySegment<Message>> items, out int
lock (_cursors)
{
- var cursors = new ulong[_cursors.Count];
+ // perf sensitive: re-use cursors array to minimize allocations
+ if ((_cursorsState == null) || (_cursorsState.Length != _cursors.Count))
+ {
+ _cursorsState = new ulong[_cursors.Count];
+ }
for (int i = 0; i < _cursors.Count; i++)
{
MessageStoreResult<Message> storeResult = _cursorTopics[i].Store.GetMessages(_cursors[i].Id, MaxMessages);
- cursors[i] = storeResult.FirstMessageId + (ulong)storeResult.Messages.Count;
+ _cursorsState[i] = storeResult.FirstMessageId + (ulong)storeResult.Messages.Count;
if (storeResult.Messages.Count > 0)
{
@@ -155,7 +160,7 @@ protected override void PerformWork(IList<ArraySegment<Message>> items, out int
}
// Return the state as a list of cursors
- state = cursors;
+ state = _cursorsState;
}
}
@@ -177,7 +182,7 @@ private bool UpdateCursor(string key, ulong id)
lock (_cursors)
{
// O(n), but small n and it's not common
- var index = _cursors.FindIndex(c => c.Key == key);
+ var index = FindCursorIndex(key);
if (index != -1)
{
_cursors[index].Id = id;
@@ -188,6 +193,19 @@ private bool UpdateCursor(string key, ulong id)
}
}
+ // perf: avoid List<T>.FindIndex which uses stateless predicate which requires closure
+ private int FindCursorIndex(string eventKey)
+ {
+ for (int i = 0; i < _cursors.Count; i++)
+ {
+ if (_cursors[i].Key == eventKey)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
private List<Cursor> GetCursorsFromEventKeys(IList<string> eventKeys, TopicLookup topics)
{
var list = new List<Cursor>(eventKeys.Count);
@@ -128,7 +128,7 @@ public MessageBus(IDependencyResolver resolver)
_createTopic = CreateTopic;
_addEvent = AddEvent;
_removeEvent = RemoveEvent;
- _disposeSubscription = DisposeSubscription;
+ _disposeSubscription = o => DisposeSubscription(o);
Topics = new TopicLookup();
}
@@ -12,6 +12,9 @@ namespace Microsoft.AspNet.SignalR.Messaging
{
public abstract class Subscription : ISubscription, IDisposable
{
+ private static readonly Action<Subscription, object> _beforeInvokeAction = (s, o) => s.BeforeInvoke(o);
+ private static readonly Action<Subscription, object> _emptyInvokeAction = (s, o) => { };
+
private readonly Func<MessageResult, object, Task<bool>> _callback;
private readonly object _callbackState;
private readonly IPerformanceCounterManager _counters;
@@ -76,10 +79,10 @@ protected Subscription(string identity, IList<string> eventKeys, Func<MessageRes
public virtual Task<bool> Invoke(MessageResult result)
{
- return Invoke(result, state => { }, state: null);
+ return Invoke(result, _emptyInvokeAction, state: null);
}
- private async Task<bool> Invoke(MessageResult result, Action<object> beforeInvoke, object state)
+ private async Task<bool> Invoke(MessageResult result, Action<Subscription, object> beforeInvoke, object state)
{
// Change the state from idle to invoking callback
var prevState = Interlocked.CompareExchange(ref _subscriptionState,
@@ -95,7 +98,7 @@ public virtual Task<bool> Invoke(MessageResult result)
}
}
- beforeInvoke(state);
+ beforeInvoke(this, state);
_counters.MessageBusMessagesReceivedTotal.IncrementBy(result.TotalCount);
_counters.MessageBusMessagesReceivedPerSec.IncrementBy(result.TotalCount);
@@ -119,9 +122,10 @@ public virtual Task<bool> Invoke(MessageResult result)
// Set the state to working
Interlocked.Exchange(ref _state, State.Working);
+ // perf sensitive: re-use messages list to minimize allocations
+ var items = new List<ArraySegment<Message>>();
while (Alive)
{
- var items = new List<ArraySegment<Message>>();
int totalCount;
object state;
@@ -131,7 +135,7 @@ public virtual Task<bool> Invoke(MessageResult result)
{
var messageResult = new MessageResult(items, totalCount);
- bool result = await Invoke(messageResult, s => BeforeInvoke(s), state);
+ bool result = await Invoke(messageResult, _beforeInvokeAction, state);
if (!result)
{
@@ -145,6 +149,7 @@ public virtual Task<bool> Invoke(MessageResult result)
{
break;
}
+ items.Clear();
}
}
Oops, something went wrong.

0 comments on commit aae6db8

Please sign in to comment.