Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
blehnen committed Sep 23, 2016
2 parents 4c36ee2 + 2462cc7 commit 647a1b5
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 30 deletions.
6 changes: 3 additions & 3 deletions Source/DotNetWorkQueue.Tests/DotNetWorkQueue.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
<HintPath>..\packages\SimpleInjector.3.2.2\lib\net45\SimpleInjector.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="SmartThreadPool, Version=2.2.3.0, Culture=neutral, PublicKeyToken=1126fe8b671e8a79, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\SmartThreadPool.dll.2.2.3\lib\SmartThreadPool.dll</HintPath>
<Reference Include="SmartThreadPool, Version=2.2.4.0, Culture=neutral, PublicKeyToken=1126fe8b671e8a79, processorArchitecture=MSIL">
<HintPath>..\packages\SmartThreadPool.dll.2.2.4\lib\net45\SmartThreadPool.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
Expand Down
2 changes: 1 addition & 1 deletion Source/DotNetWorkQueue.Tests/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net45" />
<package id="NSubstitute" version="1.10.0.0" targetFramework="net45" />
<package id="SimpleInjector" version="3.2.2" targetFramework="net45" />
<package id="SmartThreadPool.dll" version="2.2.3" targetFramework="net45" />
<package id="SmartThreadPool.dll" version="2.2.4" targetFramework="net45" />
<package id="xunit" version="2.1.0" targetFramework="net45" />
<package id="xunit.abstractions" version="2.0.0" targetFramework="net45" />
<package id="xunit.assert" version="2.1.0" targetFramework="net45" />
Expand Down
7 changes: 4 additions & 3 deletions Source/DotNetWorkQueue/DotNetWorkQueue.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@
<HintPath>..\packages\SimpleInjector.3.2.2\lib\net45\SimpleInjector.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="SmartThreadPool, Version=2.2.3.0, Culture=neutral, PublicKeyToken=1126fe8b671e8a79, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<HintPath>..\packages\SmartThreadPool.dll.2.2.3\lib\SmartThreadPool.dll</HintPath>
<Reference Include="SmartThreadPool, Version=2.2.4.0, Culture=neutral, PublicKeyToken=1126fe8b671e8a79, processorArchitecture=MSIL">
<HintPath>..\packages\SmartThreadPool.dll.2.2.4\lib\net45\SmartThreadPool.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Runtime.Caching" />
<Reference Include="System.ServiceModel" />
<Reference Include="System.Transactions" />
<Reference Include="System.Web" />
<Reference Include="System.Xml.Linq" />
Expand Down
26 changes: 19 additions & 7 deletions Source/DotNetWorkQueue/SchedulerContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public ITaskFactory CreateTaskFactory()
{
var container = _createContainerInternal().Create(QueueContexts.TaskFactory, _registerService, _transportInit, x => { }, _setOptions);
Containers.Add(container);
return CreateTaskFactoryInternal(container.GetInstance<ATaskScheduler>());
return CreateTaskFactoryInternal(container.GetInstance<ATaskScheduler>(), true);
}

/// <summary>
Expand All @@ -99,20 +99,32 @@ public ITaskFactory CreateTaskFactory()
/// <returns></returns>
public ITaskFactory CreateTaskFactory(ATaskScheduler scheduler)
{
return CreateTaskFactoryInternal(scheduler);
return CreateTaskFactoryInternal(scheduler, false);
}

/// <summary>
/// Creates the task factory.
/// </summary>
/// <param name="scheduler">The scheduler.</param>
/// <param name="weOwnScheduler">if set to <c>true</c> [we own scheduler].</param>
/// <returns></returns>
private ITaskFactory CreateTaskFactoryInternal(ATaskScheduler scheduler)
private ITaskFactory CreateTaskFactoryInternal(ATaskScheduler scheduler, bool weOwnScheduler)
{
var container = _createContainerInternal().Create(QueueContexts.TaskFactory, _registerService, _transportInit,
serviceRegister => serviceRegister.Register(() => scheduler, LifeStyles.Singleton), _setOptions);
Containers.Add(container);
return container.GetInstance<ITaskFactory>();
if (weOwnScheduler)
{
var container = _createContainerInternal().Create(QueueContexts.TaskFactory, _registerService, _transportInit,
serviceRegister => serviceRegister.Register(() => scheduler, LifeStyles.Singleton), _setOptions);
Containers.Add(container);
return container.GetInstance<ITaskFactory>();

}
else
{
var container = _createContainerInternal().Create(QueueContexts.TaskFactory, _registerService, _transportInit,
serviceRegister => serviceRegister.RegisterNonScopedSingleton(scheduler), _setOptions);
Containers.Add(container);
return container.GetInstance<ITaskFactory>();
}
}

#endregion
Expand Down
89 changes: 74 additions & 15 deletions Source/DotNetWorkQueue/TaskScheduling/TaskScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,15 +137,74 @@ public override RoomForNewTaskResult RoomForNewTask
if (_smartThreadPool == null)
throw new DotNetWorkQueueException("Start must be called on the scheduler before adding tasks");

var haveRoom = Interlocked.CompareExchange(ref _currentTaskCount, 0, 0) < MaximumConcurrencyLevel;
if (haveRoom)
if (HaveRoomForTask)
{
return Interlocked.Read(ref _currentTaskCount) > _smartThreadPool.MaxThreads ? RoomForNewTaskResult.RoomInQueue : RoomForNewTaskResult.RoomForTask;
return CurrentTaskCount > _smartThreadPool.MaxThreads ? RoomForNewTaskResult.RoomInQueue : RoomForNewTaskResult.RoomForTask;
}
return RoomForNewTaskResult.No;
}
}

/// <summary>
/// Returns true if there is room to add a task
/// </summary>
/// <value>
/// <c>true</c> if [have room for task]; otherwise, <c>false</c>.
/// </value>
protected virtual bool HaveRoomForTask => Interlocked.CompareExchange(ref _currentTaskCount, 0, 0) < MaximumConcurrencyLevel;

/// <summary>
/// Gets the current task count.
/// </summary>
/// <value>
/// The current task count.
/// </value>
protected virtual long CurrentTaskCount => Interlocked.Read(ref _currentTaskCount);

/// <summary>
/// Returns true if the work group has room for a new task
/// </summary>
/// <param name="group">The group.</param>
/// <returns></returns>
protected virtual bool HaveRoomForWorkGroupTask(IWorkGroup group)
{
return Interlocked.CompareExchange(ref _groups[group].CurrentWorkItems, 0, 0) < _groups[group].MaxWorkItems;
}

/// <summary>
/// Increments the task count for a specific group
/// </summary>
/// <param name="group">The group.</param>
protected virtual void IncrementGroup(IWorkGroup group)
{
Interlocked.Increment(ref _groups[group].CurrentWorkItems);
}

/// <summary>
/// Increments the counter for the running tasks
/// </summary>
protected virtual void IncrementCounter()
{
Interlocked.Increment(ref _currentTaskCount);
}

/// <summary>
/// De-increments the counter for the running tasks
/// </summary>
protected virtual void DeincrementCounter()
{
Interlocked.Decrement(ref _currentTaskCount);
}

/// <summary>
/// De-increments the task counter for a specific group.
/// </summary>
/// <param name="group">The group.</param>
protected virtual void DeincrementGroup(IWorkGroup group)
{
Interlocked.Decrement(ref _groups[group].CurrentWorkItems);
}

/// <summary>
/// If true, the task scheduler has room for the specified work group task
/// </summary>
Expand All @@ -156,10 +215,9 @@ public override RoomForNewTaskResult RoomForNewWorkGroupTask(IWorkGroup group)
if (IsDisposed)
return RoomForNewTaskResult.No;

var haveRoom = Interlocked.CompareExchange(ref _groups[group].CurrentWorkItems, 0, 0) < _groups[group].MaxWorkItems;
if (haveRoom)
if (HaveRoomForWorkGroupTask(group))
{
return Interlocked.Read(ref _currentTaskCount) > _groups[group].GroupInfo.ConcurrencyLevel ? RoomForNewTaskResult.RoomInQueue : RoomForNewTaskResult.RoomForTask;
return CurrentTaskCount > _groups[group].GroupInfo.ConcurrencyLevel ? RoomForNewTaskResult.RoomInQueue : RoomForNewTaskResult.RoomForTask;
}
return RoomForNewTaskResult.No;
}
Expand Down Expand Up @@ -213,23 +271,24 @@ protected sealed override void QueueTask(Task task)
var state = information;
if (state.Group != null)
{
Interlocked.Increment(ref _groups[state.Group].CurrentWorkItems);
IncrementCounter();
IncrementGroup(state.Group);
_groups[state.Group].MetricCounter.Increment(1);
_taskCounter.Increment(1);
SetWaitHandle(state.Group);
_groups[state.Group].Group.QueueWorkItem(() => TryExecuteTaskWrapped(task, state));
}
else
{
Interlocked.Increment(ref _currentTaskCount);
IncrementCounter();
_taskCounter.Increment(1);
SetWaitHandle(null);
_smartThreadPool.QueueWorkItem(() => TryExecuteTask(task));
}
}
else
{
Interlocked.Increment(ref _currentTaskCount);
IncrementCounter();
_taskCounter.Increment(1);
SetWaitHandle(null);
_smartThreadPool.QueueWorkItem(() => TryExecuteTask(task));
Expand Down Expand Up @@ -340,14 +399,15 @@ private void PostExecuteWorkItemCallback(IWorkItemResult wir)
if (information != null) //if not null, this is a work group
{
var state = information;
Interlocked.Decrement(ref _groups[state.Group].CurrentWorkItems);
DeincrementCounter();
DeincrementGroup(state.Group);
_groups[state.Group].MetricCounter.Decrement(1);
_taskCounter.Decrement(_groups[state.Group].Group.Name, 1);
SetWaitHandle(state.Group);
}
else //is null, so this is not a work group item
{
Interlocked.Decrement(ref _currentTaskCount);
DeincrementCounter();
_taskCounter.Decrement(1);
SetWaitHandle(null);
}
Expand All @@ -357,11 +417,11 @@ private void PostExecuteWorkItemCallback(IWorkItemResult wir)
/// Sets the wait handle.
/// </summary>
/// <param name="group">The group.</param>
private void SetWaitHandle(IWorkGroup group)
protected void SetWaitHandle(IWorkGroup group)
{
if (group == null) //not a work group
{
if (Interlocked.CompareExchange(ref _currentTaskCount, 0, 0) < MaximumConcurrencyLevel)
if (HaveRoomForTask)
{
_waitForFreeThread.Set(null);
}
Expand All @@ -372,8 +432,7 @@ private void SetWaitHandle(IWorkGroup group)
}
else //work group
{
var currentCount = Interlocked.CompareExchange(ref _groups[group].CurrentWorkItems, 0, 0);
if (currentCount < _groups[group].MaxWorkItems)
if (HaveRoomForTask && HaveRoomForWorkGroupTask(group))
{
_waitForFreeThread.Set(group);
}
Expand Down
2 changes: 1 addition & 1 deletion Source/DotNetWorkQueue/packages.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
<package id="netfx-Guard" version="1.3.3.2" targetFramework="net45" />
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net45" />
<package id="SimpleInjector" version="3.2.2" targetFramework="net45" />
<package id="SmartThreadPool.dll" version="2.2.3" targetFramework="net45" />
<package id="SmartThreadPool.dll" version="2.2.4" targetFramework="net45" />
</packages>

0 comments on commit 647a1b5

Please sign in to comment.