Skip to content

Commit

Permalink
Improvements to Scaling
Browse files Browse the repository at this point in the history
Added a couple new events so you can find out when channels are created
or destroyed for a platform.  Also adjusted the autoscaling to be a
little bit more conservative.

When channels are destroyed they now process the rest of the queue
before being destroyed, this eliminates the case where a teardown may
have happened to a channel with queued notifications, where those
notifications may have been forever lost!
  • Loading branch information
Redth committed Sep 19, 2012
1 parent 66d871d commit 63ce45d
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 33 deletions.
28 changes: 28 additions & 0 deletions PushSharp.Common/ChannelEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,26 @@ namespace PushSharp.Common
{
public class ChannelEvents
{
public delegate void ChannelCreatedDelegate(PlatformType platformType, int newChannelCount);
public event ChannelCreatedDelegate OnChannelCreated;

public void RaiseChannelCreated(PlatformType platformType, int newChannelCount)
{
var evt = this.OnChannelCreated;
if (evt != null)
evt(platformType, newChannelCount);
}

public delegate void ChannelDestroyedDelegate(PlatformType platformType, int newChannelCount);
public event ChannelDestroyedDelegate OnChannelDestroyed;

public void RaiseChannelDestroyed(PlatformType platformType, int newChannelCount)
{
var evt = this.OnChannelDestroyed;
if (evt != null)
evt(platformType, newChannelCount);
}

public delegate void NotificationSendFailureDelegate(Notification notification, Exception notificationFailureException);
public event NotificationSendFailureDelegate OnNotificationSendFailure;

Expand Down Expand Up @@ -61,6 +81,10 @@ public void RaiseDeviceSubscriptionIdChanged(PlatformType platform, string oldDe

public void RegisterProxyHandler(ChannelEvents proxy)
{
this.OnChannelCreated += new ChannelCreatedDelegate((platformType, newCount) => proxy.RaiseChannelCreated(platformType, newCount));

this.OnChannelDestroyed += new ChannelDestroyedDelegate((platformType, newCount) => proxy.RaiseChannelDestroyed(platformType, newCount));

this.OnChannelException += new ChannelExceptionDelegate((exception, platformType, notification) => proxy.RaiseChannelException(exception, platformType, notification));

this.OnNotificationSendFailure += new NotificationSendFailureDelegate((notification, exception) => proxy.RaiseNotificationSendFailure(notification, exception));
Expand All @@ -74,6 +98,10 @@ public void RegisterProxyHandler(ChannelEvents proxy)

public void UnRegisterProxyHandler(ChannelEvents proxy)
{
this.OnChannelCreated -= new ChannelCreatedDelegate((platformType, newCount) => proxy.RaiseChannelCreated(platformType, newCount));

this.OnChannelDestroyed -= new ChannelDestroyedDelegate((platformType, newCount) => proxy.RaiseChannelDestroyed(platformType, newCount));

this.OnChannelException -= new ChannelExceptionDelegate((exception, platformType, notification) => proxy.RaiseChannelException(exception, platformType, notification));

this.OnNotificationSendFailure -= new NotificationSendFailureDelegate((notification, exception) => proxy.RaiseNotificationSendFailure(notification, exception));
Expand Down
15 changes: 12 additions & 3 deletions PushSharp.Common/PushChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public abstract class PushChannelBase : IDisposable

object queuedNotificationsLock = new object();
ConcurrentQueue<Notification> queuedNotifications;

ManualResetEventSlim waitQueuedNotification;

protected bool stopping;
protected Task taskSender;
protected CancellationTokenSource CancelTokenSource;
Expand All @@ -41,6 +42,7 @@ public PushChannelBase(PushChannelSettings channelSettings, PushServiceSettings

this.ChannelSettings = channelSettings;
this.ServiceSettings = serviceSettings ?? new PushServiceSettings();
this.waitQueuedNotification = new ManualResetEventSlim();

//Start our sending task
taskSender = new Task(() => Sender(), TaskCreationOptions.LongRunning);
Expand Down Expand Up @@ -89,11 +91,17 @@ public void QueueNotification(Notification notification, bool countsAsRequeue =
//If the count is -1, it can be queued infinitely, otherwise check that it's less than the max
if (this.ServiceSettings.MaxNotificationRequeues < 0 || notification.QueuedCount <= this.ServiceSettings.MaxNotificationRequeues)
{
//Reset the Enqueued time in case this is a requeue
notification.EnqueuedTimestamp = DateTime.UtcNow;

//Increase the queue counter
if (countsAsRequeue)
notification.QueuedCount++;

queuedNotifications.Enqueue(notification);

//Signal a possibly wait-stated Sender loop that there's work to do
waitQueuedNotification.Set();
}
else
Events.RaiseNotificationSendFailure(notification, new MaxSendAttemptsReachedException());
Expand All @@ -107,8 +115,9 @@ void Sender()

if (!queuedNotifications.TryDequeue(out notification))
{
//No notifications in queue, sleep a bit!
Thread.Sleep(250);
//No notifications in queue, go into wait state
waitQueuedNotification.Reset();
waitQueuedNotification.Wait(5000, this.CancelToken);
continue;
}

Expand Down
83 changes: 54 additions & 29 deletions PushSharp.Common/PushServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public PushServiceBase(TChannelSettings channelSettings, PushServiceSettings ser

public void QueueNotification(Notification notification)
{
notification.EnqueuedTimestamp = DateTime.UtcNow;

queuedNotifications.Enqueue(notification);
}

Expand Down Expand Up @@ -133,40 +135,38 @@ void CheckScale()
{
if (channels.Count <= 0)
{
SpinupChannel();
ScaleChannels(ChannelScaleAction.Create);
return;
}

var avgTime = GetAverageQueueWait();

if (avgTime < 1 && channels.Count > 1)
if (avgTime < 100 && channels.Count > 1)
{
TeardownChannel();
ScaleChannels(ChannelScaleAction.Destroy);
}
else if (avgTime > 5 && channels.Count < this.ServiceSettings.MaxAutoScaleChannels)
else if (channels.Count < this.ServiceSettings.MaxAutoScaleChannels)
{
var numChannelsToSpinUp = 1;

//Depending on the wait time, let's spin up more than 1 channel at a time
if (avgTime > 500)
numChannelsToSpinUp = 19;
else if (avgTime > 250)
numChannelsToSpinUp = 10;
else if (avgTime > 100)
if (avgTime > 5000)
numChannelsToSpinUp = 5;
else if (avgTime > 1000)
numChannelsToSpinUp = 2;
else if (avgTime > 100)
numChannelsToSpinUp = 1;

for (int i = 0; i < numChannelsToSpinUp; i++)
if (channels.Count < this.ServiceSettings.MaxAutoScaleChannels)
SpinupChannel();
ScaleChannels(ChannelScaleAction.Create, numChannelsToSpinUp);
}
}
else
{
while (channels.Count > ServiceSettings.Channels && !this.cancelTokenSource.IsCancellationRequested && !stopping)
TeardownChannel();
ScaleChannels(ChannelScaleAction.Destroy);

while (channels.Count < ServiceSettings.Channels && !this.cancelTokenSource.IsCancellationRequested && !stopping)
SpinupChannel();
ScaleChannels(ChannelScaleAction.Create);
}
}

Expand All @@ -177,6 +177,7 @@ void newChannel_OnQueueTimed(double queueTimeMilliseconds)
{
measurements.Add(queueTimeMilliseconds);

//Remove old measurements
while (measurements.Count > 1000)
measurements.RemoveAt(0);
}
Expand All @@ -196,30 +197,54 @@ double GetAverageQueueWait()
}
}

void SpinupChannel()
void ScaleChannels(ChannelScaleAction action, int count = 1)
{
lock (channels)
for (int i = 0; i < count; i++)
{
var newChannel = this.CreateChannel(this.ChannelSettings);
var newCount = 0;
bool? destroyed= null;

newChannel.Events.RegisterProxyHandler(this.Events);
lock (channels)
{
if (action == ChannelScaleAction.Create)
{
var newChannel = this.CreateChannel(this.ChannelSettings);

newChannel.OnQueueTimed += new Action<double>(newChannel_OnQueueTimed);
newChannel.Events.RegisterProxyHandler(this.Events);

channels.Add(newChannel);
}
}
newChannel.OnQueueTimed += new Action<double>(newChannel_OnQueueTimed);

void TeardownChannel()
{
lock (channels)
{
var channelOn = channels[0];
channels.RemoveAt(0);
channels.Add(newChannel);

channelOn.Events.UnRegisterProxyHandler(this.Events);
newCount = channels.Count;
destroyed = false;
}
else if (action == ChannelScaleAction.Destroy && channels.Count > 1)
{
var channelOn = channels[0];
channels.RemoveAt(0);

//Now stop the channel but let it finish
channelOn.Stop(true);

channelOn.Events.UnRegisterProxyHandler(this.Events);

newCount = channels.Count;
destroyed = true;
}
}

if (destroyed.HasValue && !destroyed.Value)
this.Events.RaiseChannelCreated(this.Platform, newCount);
else if (destroyed.HasValue && destroyed.Value)
this.Events.RaiseChannelDestroyed(this.Platform, newCount);
}
}
}

public enum ChannelScaleAction
{
Create,
Destroy
}
}
2 changes: 1 addition & 1 deletion PushSharp/PushService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class PushService : IDisposable
{
public ChannelEvents Events;

public bool WaitForQueuesToFinish { get; set; }
public bool WaitForQueuesToFinish { get; private set; }

Apple.ApplePushService appleService = null;
Android.C2dmPushService androidService = null;
Expand Down

0 comments on commit 63ce45d

Please sign in to comment.