Skip to content

Commit

Permalink
Added timeout mechanism to make scheduler adapt faster
Browse files Browse the repository at this point in the history
 * Extracted all constants controlling scheduler to AdaptivePollingReceiver
 * Added role interface for the back-off strategy
 * Decoupled the sender and receiver from the pipeline ContextItemRemovalDisposable
 * Removed the ITaskTracker abstraction
  • Loading branch information
SzymonPobiega committed Sep 11, 2015
1 parent ce80620 commit 7b37d23
Show file tree
Hide file tree
Showing 19 changed files with 234 additions and 126 deletions.
53 changes: 40 additions & 13 deletions src/NServiceBus.SqlServer/AdaptiveExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.Transports.SQLServer
{
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NServiceBus.CircuitBreakers;
Expand All @@ -12,38 +13,60 @@ abstract class AdaptiveExecutor<T> : IExecutor
protected abstract void Finally(T value);
protected abstract void HandleException(Exception ex);
protected abstract IRampUpController CreateRampUpController(Action rampUpCallback);
protected abstract ITaskTracker CreateTaskTracker(int maximumConcurrency);

protected AdaptiveExecutor(RepeatedFailuresOverTimeCircuitBreaker circuitBreaker)
protected abstract IBackOffStrategy CreateBackOffStrategy();
protected AdaptiveExecutor(string name, RepeatedFailuresOverTimeCircuitBreaker circuitBreaker, TransportNotifications transportNotifications, TimeSpan slowTaskThreshold)
{
this.name = name;
this.circuitBreaker = circuitBreaker;
this.transportNotifications = transportNotifications;
this.slowTaskThreshold = slowTaskThreshold;
}

public virtual void Start(int maximumConcurrency, CancellationToken token)
{
if (taskTracker != null)
{
throw new InvalidOperationException("The executor has already been started. Use Stop() to stop it.");
throw new InvalidOperationException("The executor has already been started.");
}
this.token = token;
taskTracker = CreateTaskTracker(maximumConcurrency);
taskTracker = new TaskTracker(maximumConcurrency, transportNotifications, name);
StartTask();
slowTaskTimer = new Timer(_ => MonitorSlowTasks(), null, TimeSpan.Zero, slowTaskThreshold);
}

public virtual void Stop()
{
if (taskTracker == null)
{
throw new InvalidOperationException("The executor has not been started.");
}
using (var waitHandle = new ManualResetEvent(false))
{
slowTaskTimer.Dispose(waitHandle);
waitHandle.WaitOne();
}
taskTracker.ShutdownAll();

taskTracker = null;
}

void MonitorSlowTasks()
{
var allStates = taskTracker.GetTaskStates();
foreach (var state in allStates.Cast<ExecutorTaskState>())
{
state.TriggerAnotherTaskIfLongRunning(StartTask);
}
}

void StartTask()
{
taskTracker.StartAndTrack(() =>
{
var state = new ExecutorTaskState();
var taskId = Guid.NewGuid();
var receiveTask = Task.Factory
.StartNew(ReceiveLoop, null, token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
.StartNew(ReceiveLoop, state, token, TaskCreationOptions.LongRunning, TaskScheduler.Default)
.ContinueWith(t =>
{
t.Exception.Handle(ex =>
Expand All @@ -52,7 +75,6 @@ void StartTask()
circuitBreaker.Failure(ex);
return true;
});
}, token, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default)
.ContinueWith((_, s) =>
{
Expand All @@ -66,18 +88,20 @@ void StartTask()
StartTask();
}, taskId, token);
return Tuple.Create(taskId, receiveTask);
return Tuple.Create(taskId, receiveTask, state);
});
}

void ReceiveLoop(object obj)
{
var backOff = new BackOff(1000);
var state = (ExecutorTaskState) obj;
var backOff = CreateBackOffStrategy();
var rampUpController = CreateRampUpController(StartTask);

while (!token.IsCancellationRequested && rampUpController.CheckHasEnoughWork())
{
bool success;
state.Reset();
rampUpController.RampUpIfTooMuchWork();
var result = Init();
try
Expand All @@ -98,13 +122,16 @@ void ReceiveLoop(object obj)
}

circuitBreaker.Success();
backOff.Wait(() => !success);
backOff.ConditionalWait(() => !success, Thread.Sleep);
}
}

Timer slowTaskTimer;
readonly TimeSpan slowTaskThreshold;
readonly string name;
readonly RepeatedFailuresOverTimeCircuitBreaker circuitBreaker;
readonly TransportNotifications transportNotifications;
CancellationToken token;
ITaskTracker taskTracker;

TaskTracker taskTracker;
}
}
26 changes: 22 additions & 4 deletions src/NServiceBus.SqlServer/AdaptivePollingReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,31 @@

class AdaptivePollingReceiver : AdaptiveExecutor<ReceiveResult>
{
/// <summary>
/// Controls the ramping up of additional threads in case one of the current threads is processing a slow message.
/// If the processing takes more than a number of seconds, an additional thread is ramped up. Such thing can happen only once per processing a single message.
/// </summary>
const int SlowTaskThresholdInMilliseconds = 5000;
/// <summary>
/// The maximum number of failures of receive for a given thread before it decides to commit suicide.
/// </summary>
const int MaximumConsecutiveFailures = 7;
/// <summary>
/// The minimum number of successful message processing attempts for a given thread before it tries to ramp up another thread.
/// </summary>
const int MinimumConsecutiveSuccesses = 5;
/// <summary>
/// The maximum time for a thread to wait if a previous receive operation failed.
/// </summary>
const int MaximumBackOffTimeMilliseconds = 1000;

public AdaptivePollingReceiver(
IReceiveStrategy receiveStrategy,
TableBasedQueue queue,
Action<TransportMessage, Exception> endProcessMessage,
RepeatedFailuresOverTimeCircuitBreaker circuitBreaker,
TransportNotifications transportNotifications)
: base(circuitBreaker)
: base(queue.ToString(), circuitBreaker, transportNotifications, TimeSpan.FromMilliseconds(SlowTaskThresholdInMilliseconds))
{
this.receiveStrategy = receiveStrategy;
this.queue = queue;
Expand Down Expand Up @@ -54,12 +72,12 @@ protected override void HandleException(Exception ex)

protected override IRampUpController CreateRampUpController(Action rampUpCallback)
{
return new ReceiveRampUpController(rampUpCallback, transportNotifications, queue.ToString());
return new ReceiveRampUpController(rampUpCallback, transportNotifications, queue.ToString(), MaximumConsecutiveFailures, MinimumConsecutiveSuccesses);
}

protected override ITaskTracker CreateTaskTracker(int maximumConcurrency)
protected override IBackOffStrategy CreateBackOffStrategy()
{
return new ReceiveTaskTracker(maximumConcurrency, transportNotifications, queue.ToString());
return new BoundedExponentialBackOff(MaximumBackOffTimeMilliseconds);
}

readonly IReceiveStrategy receiveStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,24 @@ namespace NServiceBus.Transports.SQLServer
{
using System;
using System.Transactions;
using NServiceBus.Pipeline;
using NServiceBus.Unicast.Transport;

class AmbientTransactionReceiveStrategy : IReceiveStrategy
{
readonly PipelineExecutor pipelineExecutor;
readonly string connectionString;
readonly TableBasedQueue errorQueue;
readonly Func<TransportMessage, bool> tryProcessMessageCallback;
readonly TransactionOptions transactionOptions;
readonly ConnectionFactory sqlConnectionFactory;
readonly IConnectionStore connectionStore;

public AmbientTransactionReceiveStrategy(string connectionString, TableBasedQueue errorQueue, Func<TransportMessage, bool> tryProcessMessageCallback, ConnectionFactory sqlConnectionFactory, PipelineExecutor pipelineExecutor, TransactionSettings transactionSettings)
public AmbientTransactionReceiveStrategy(string connectionString, TableBasedQueue errorQueue, Func<TransportMessage, bool> tryProcessMessageCallback, ConnectionFactory sqlConnectionFactory, IConnectionStore connectionStore, TransactionSettings transactionSettings)
{
this.pipelineExecutor = pipelineExecutor;
this.tryProcessMessageCallback = tryProcessMessageCallback;
this.errorQueue = errorQueue;
this.connectionString = connectionString;
this.sqlConnectionFactory = sqlConnectionFactory;
this.connectionStore = connectionStore;

transactionOptions = new TransactionOptions
{
Expand All @@ -35,7 +34,7 @@ public ReceiveResult TryReceiveFrom(TableBasedQueue queue)
{
using (var connection = sqlConnectionFactory.OpenNewConnection(connectionString))
{
using (pipelineExecutor.SetConnection(connectionString, connection))
using (connectionStore.SetConnection(connectionString, connection))
{
var readResult = queue.TryReceive(connection);
if (readResult.IsPoison)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
namespace NServiceBus.Transports.SQLServer
{
using System;
using System.Threading;

/// <summary>
/// A utility class that does a sleep on very call up to a limit based on a condition.
/// </summary>
class BackOff
class BoundedExponentialBackOff : IBackOffStrategy
{
int maximum;
readonly int maximum;
int currentDelay = 50;

/// <summary>
/// Initializes a new instance.
/// </summary>
/// <param name="maximum">The maximum number of milliseconds for which the thread is blocked.</param>
public BackOff(int maximum)
public BoundedExponentialBackOff(int maximum)
{
this.maximum = maximum;
}
Expand All @@ -24,25 +23,28 @@ public BackOff(int maximum)
/// It executes the Thread sleep if condition is <c>true</c>, otherwise it resets.
/// </summary>
/// <param name="condition">If the condition is <c>true</c> then the wait is performed.</param>
public void Wait(Func<bool> condition)
/// <param name="waitAction"></param>
public void ConditionalWait(Func<bool> condition, Action<int> waitAction)
{
if (!condition())
if (condition())
{
currentDelay = 50;
return;
waitAction(currentDelay);
currentDelay = RecalculateDelay(currentDelay, maximum);
}

Thread.Sleep(currentDelay);

if (currentDelay < maximum)
else
{
currentDelay *= 2;
currentDelay = 50;
}
}

if (currentDelay > maximum)
static int RecalculateDelay(int currentDelay, int maximumDelay)
{
var newDelay = currentDelay*2;
if (newDelay > maximumDelay)
{
currentDelay = maximum;
newDelay = maximumDelay;
}
return newDelay;
}
}
}

This file was deleted.

8 changes: 6 additions & 2 deletions src/NServiceBus.SqlServer/Config/SqlServerTransportFeature.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,19 @@ protected override void Configure(FeatureConfigurationContext context, string co
config.Configure(context, connectionStringWithSchema);
}

context.Container.ConfigureComponent<SqlServerMessageSender>(DependencyLifecycle.InstancePerCall);
context.Container.ConfigureComponent(b => new SqlServerMessageSender(
b.Build<IConnectionStringProvider>(),
new ContextualConnectionStore(b.Build<PipelineExecutor>()),
new ContextualCallbackAddressStore(b.Build<PipelineExecutor>().CurrentContext),
b.Build<ConnectionFactory>()), DependencyLifecycle.InstancePerCall);

if (!context.Settings.GetOrDefault<bool>("Endpoint.SendOnly"))
{
context.Container.ConfigureComponent<TransportNotifications>(DependencyLifecycle.SingleInstance);
context.Container.ConfigureComponent<SqlServerQueueCreator>(DependencyLifecycle.InstancePerCall);

var errorQueue = ErrorQueueSettings.GetConfiguredErrorQueue(context.Settings);
context.Container.ConfigureComponent(b => new ReceiveStrategyFactory(b.Build<PipelineExecutor>(), b.Build<LocalConnectionParams>(), errorQueue, b.Build<ConnectionFactory>()), DependencyLifecycle.InstancePerCall);
context.Container.ConfigureComponent(b => new ReceiveStrategyFactory(new ContextualConnectionStore(b.Build<PipelineExecutor>()), b.Build<LocalConnectionParams>(), errorQueue, b.Build<ConnectionFactory>()), DependencyLifecycle.InstancePerCall);

context.Container.ConfigureComponent<SqlServerPollingDequeueStrategy>(DependencyLifecycle.InstancePerCall);
context.Container.ConfigureComponent(b => new SqlServerStorageContext(b.Build<PipelineExecutor>(), b.Build<LocalConnectionParams>()), DependencyLifecycle.InstancePerUnitOfWork);
Expand Down
23 changes: 23 additions & 0 deletions src/NServiceBus.SqlServer/ExecutorTaskState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace NServiceBus.Transports.SQLServer
{
using System;

class ExecutorTaskState
{
int seen;

public void Reset()
{
seen = 0;
}

public void TriggerAnotherTaskIfLongRunning(Action triggerAction)
{
seen += 1;
if (seen == 2)
{
triggerAction();
}
}
}
}
9 changes: 9 additions & 0 deletions src/NServiceBus.SqlServer/IBackOffStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace NServiceBus.Transports.SQLServer
{
using System;

interface IBackOffStrategy
{
void ConditionalWait(Func<bool> condition, Action<int> waitAction);
}
}
33 changes: 33 additions & 0 deletions src/NServiceBus.SqlServer/ICallbackAddressStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace NServiceBus.Transports.SQLServer
{
using NServiceBus.Pipeline;

interface ICallbackAddressStore
{
void SetCallbackAddress(Address callbackAddress);
Address TryGetCallbackAddress();
}

class ContextualCallbackAddressStore : ICallbackAddressStore
{
readonly BehaviorContext behaviorContext;
const string SqlServerCallbackAddressContextKey = "SqlServerCallbackAddress";

public ContextualCallbackAddressStore(BehaviorContext behaviorContext)
{
this.behaviorContext = behaviorContext;
}

public void SetCallbackAddress(Address callbackAddress)
{
behaviorContext.Set(SqlServerCallbackAddressContextKey,callbackAddress);
}

public Address TryGetCallbackAddress()
{
Address callbackAddress;
behaviorContext.TryGet(SqlServerCallbackAddressContextKey, out callbackAddress);
return callbackAddress;
}
}
}

0 comments on commit 7b37d23

Please sign in to comment.