Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@ Provides various classes and interfaces to facilitate thread-based processing.
## ProcessorThreadPool

``` c#
public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory)
public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions);
```

Each thread pool has a `name` used only for identyfing the pool and for logging. The `threadCount` is specified and will run a `Thread` that calls the `IProcessor.Execute(IThreadState state)` instance provided by the `IProcessorFactory.Create()` method in a loop while the `IThreadState.Active` returns `true`.
Each thread pool has a `name` used only for identyfing the pool. The `threadCount` determines the number of `ProcessorThread` instances in the pool. Each `ProcessorThread` calls the `IProcessor.Execute(CancellationToken)` instance provided by the `IProcessorFactory.Create()` method in a loop while the `CancellationToken.IsCancellationRequested` returns `false`.

## ProcessorThreadOptions

| Option | Default | Description |
| --- | --- | --- |
| `JoinTimeout` | `00:00:15` | The duration to allow the processor thread to join the main thread. |
| `IsBackground` | `true` | Indicates whether the thread will be started as a background thread. Background threads are instantly killed when the host process stops. |
| `Priority` | `ThreadPriority.Normal` | Indicates the [thread priority](https://docs.microsoft.com/en-us/dotnet/api/system.threading.thread.priority?view=net-6.0). |
10 changes: 5 additions & 5 deletions Shuttle.Core.Threading/.package/package.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@
<package>
<metadata>
<id>Shuttle.Core.Threading</id>
<version>11.1.2</version>
<version>12.0.0</version>
<authors>Eben Roux</authors>
<owners>Eben Roux</owners>
<license type="expression">BSD-3-Clause</license>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<icon>images\logo.png</icon>
<readme>docs\README.md</readme>
<repository type="git" url="https://github.com/shuttle/Shuttle.Core.Threading.git" />
<projectUrl>https://github.com/shuttle/Shuttle.Core.Threading</projectUrl>
<description>Thread-based processing.</description>
<copyright>Copyright (c) 2022, Eben Roux</copyright>
<tags>shuttle threading processor</tags>
<dependencies>
<dependency id="Shuttle.Core.Configuration" version="10.0.4" />
<dependency id="Shuttle.Core.Contract" version="10.0.3" />
<dependency id="Shuttle.Core.Logging" version="10.0.4" />
<dependency id="Shuttle.Core.Reflection" version="11.0.3" />
<dependency id="Shuttle.Core.Contract" version="10.1.0" />
<dependency id="Shuttle.Core.Reflection" version="12.0.0" />
</dependencies>
</metadata>
<files>
<file src="..\..\..\.media\logo.png" target="images" />
<file src="..\..\..\README.md" target="docs\" />
<file src="lib\**\*.*" target="lib" />
</files>
</package>
2 changes: 2 additions & 0 deletions Shuttle.Core.Threading/.package/package.nuspec.template
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<license type="expression">BSD-3-Clause</license>
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<icon>images\logo.png</icon>
<readme>docs\README.md</readme>
<repository type="git" url="https://github.com/shuttle/Shuttle.Core.Threading.git" />
<projectUrl>https://github.com/shuttle/Shuttle.Core.Threading</projectUrl>
<description>Thread-based processing.</description>
Expand All @@ -20,6 +21,7 @@
</metadata>
<files>
<file src="..\..\..\.media\logo.png" target="images" />
<file src="..\..\..\README.md" target="docs\" />
<file src="lib\**\*.*" target="lib" />
</files>
</package>
2 changes: 2 additions & 0 deletions Shuttle.Core.Threading/IProcessorThreadPool.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;

namespace Shuttle.Core.Threading
{
Expand All @@ -7,5 +8,6 @@ public interface IProcessorThreadPool : IDisposable
void Pause();
void Resume();
IProcessorThreadPool Start();
IEnumerable<ProcessorThread> ProcessorThreads { get; }
}
}
93 changes: 44 additions & 49 deletions Shuttle.Core.Threading/ProcessorThread.cs
Original file line number Diff line number Diff line change
@@ -1,39 +1,55 @@
using System;
using System.Threading;
using Shuttle.Core.Configuration;
using Shuttle.Core.Contract;
using Shuttle.Core.Logging;
using Shuttle.Core.Reflection;

namespace Shuttle.Core.Threading
{
public class ProcessorThread
{
private static readonly int ThreadJoinTimeoutInterval =
ConfigurationItem<int>.ReadSetting("ThreadJoinTimeoutInterval", 1000).GetValue();
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

private readonly ILog _log;
private readonly string _name;
private readonly IProcessor _processor;
private readonly System.Threading.CancellationTokenSource _cancellationTokenSource = new System.Threading.CancellationTokenSource();
private readonly ProcessorThreadOptions _processorThreadOptions;
private ProcessorThreadEventArgs _eventArgs;

private bool _started;

private Thread _thread;

public ProcessorThread(string name, IProcessor processor)
public ProcessorThread(string name, IProcessor processor, ProcessorThreadOptions processorThreadOptions)
{
Guard.AgainstNull(processor, nameof(processor));
Guard.AgainstNull(processorThreadOptions, nameof(processorThreadOptions));

_name = name;
_processor = processor;
_processorThreadOptions = processorThreadOptions;

CancellationToken = _cancellationTokenSource.Token;

_log = Log.For(this);
}

public CancellationToken CancellationToken {get; }
public CancellationToken CancellationToken { get; }

public event EventHandler<ProcessorThreadEventArgs> ProcessorThreadStarting = delegate
{
};

public event EventHandler<ProcessorThreadEventArgs> ProcessorThreadActive = delegate
{
};

public event EventHandler<ProcessorThreadEventArgs> ProcessorThreadStopping = delegate
{
};

public event EventHandler<ProcessorThreadEventArgs> ProcessorThreadStopped = delegate
{
};

public event EventHandler<ProcessorThreadEventArgs> ProcessorExecuting = delegate
{
};

public void Start()
{
Expand All @@ -42,81 +58,60 @@ public void Start()
return;
}

_thread = new Thread(Work) {Name = _name};
_thread = new Thread(Work) { Name = _name };

try
{
_thread.SetApartmentState(ApartmentState.MTA);
}
catch (Exception ex)
{
#if !NETCOREAPP2_1
_log.Warning(ex.Message);
#else
_log.Information(ex.Message);
#endif
}
_thread.TrySetApartmentState(ApartmentState.MTA);

_thread.IsBackground = true;
_thread.Priority = ThreadPriority.Normal;
_thread.IsBackground = _processorThreadOptions.IsBackground;
_thread.Priority = _processorThreadOptions.Priority;

_thread.Start();

if (Log.IsTraceEnabled)
{
_log.Trace(string.Format(Resources.ProcessorThreadStarting, _thread.ManagedThreadId,
_processor.GetType().FullName));
}
_eventArgs = new ProcessorThreadEventArgs(_name, _thread.ManagedThreadId, _processor.GetType().FullName);

ProcessorThreadStarting.Invoke(this, _eventArgs);

while (!_thread.IsAlive && !CancellationToken.IsCancellationRequested)
{
}

if (!CancellationToken.IsCancellationRequested && Log.IsTraceEnabled)
if (!CancellationToken.IsCancellationRequested)
{
_log.Trace(string.Format(Resources.ProcessorThreadActive, _thread.ManagedThreadId,
_processor.GetType().FullName));
ProcessorThreadActive.Invoke(this, _eventArgs);
}

_started = true;
}

public void Stop()
public void Stop(TimeSpan timeout)
{
if (Log.IsTraceEnabled)
if (!_started)
{
_log.Trace(string.Format(Resources.ProcessorThreadStopping, _thread.ManagedThreadId,
_processor.GetType().FullName));
throw new InvalidOperationException(Resources.ProcessorThreadNotStartedException);
}

ProcessorThreadStopping.Invoke(this, _eventArgs);

_cancellationTokenSource.Cancel();

_processor.AttemptDispose();

if (_thread.IsAlive)
{
_thread.Join(ThreadJoinTimeoutInterval);
_thread.Join(timeout);
}
}

private void Work()
{
while (!CancellationToken.IsCancellationRequested)
{
if (Log.IsVerboseEnabled)
{
_log.Verbose(string.Format(Resources.ProcessorExecuting, _thread.ManagedThreadId,
_processor.GetType().FullName));
}
ProcessorExecuting.Invoke(this, _eventArgs);

_processor.Execute(CancellationToken);
}

if (Log.IsTraceEnabled)
{
_log.Trace(string.Format(Resources.ProcessorThreadStopped, _thread.ManagedThreadId,
_processor.GetType().FullName));
}
ProcessorThreadStopped.Invoke(this, _eventArgs);
}

internal void Deactivate()
Expand Down
22 changes: 22 additions & 0 deletions Shuttle.Core.Threading/ProcessorThreadEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;
using Shuttle.Core.Contract;

namespace Shuttle.Core.Threading
{
public class ProcessorThreadEventArgs : EventArgs
{
public string Name { get; }
public int ManagedThreadId { get; }
public string ProcessorTypeFullName { get; }

public ProcessorThreadEventArgs(string name, int managedThreadId, string processorTypeFullName)
{
Guard.AgainstNullOrEmptyString(name, nameof(name));
Guard.AgainstNullOrEmptyString(processorTypeFullName, nameof(processorTypeFullName));

Name = name;
ManagedThreadId = managedThreadId;
ProcessorTypeFullName = processorTypeFullName;
}
}
}
12 changes: 12 additions & 0 deletions Shuttle.Core.Threading/ProcessorThreadOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Threading;

namespace Shuttle.Core.Threading
{
public class ProcessorThreadOptions
{
public TimeSpan JoinTimeout { get; set; } = TimeSpan.FromSeconds(15);
public bool IsBackground { get; set; } = true;
public ThreadPriority Priority { get; set; } = ThreadPriority.Normal;
}
}
33 changes: 18 additions & 15 deletions Shuttle.Core.Threading/ProcessorThreadPool.cs
Original file line number Diff line number Diff line change
@@ -1,45 +1,50 @@
using System;
using System.Collections.Generic;
using Shuttle.Core.Contract;
using Shuttle.Core.Logging;
using Shuttle.Core.Reflection;

namespace Shuttle.Core.Threading
{
public class ProcessorThreadPool : IProcessorThreadPool
{
private readonly ILog _log;
private readonly string _name;
private readonly IProcessorFactory _processorFactory;
private readonly int _threadCount;
private readonly ProcessorThreadOptions _processorThreadOptions;
private readonly List<ProcessorThread> _threads = new List<ProcessorThread>();
private bool _disposed;
private bool _started;
private readonly TimeSpan _joinTimeout;
private readonly int _threadCount;

public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory)
public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions)
{
Guard.AgainstNull(processorFactory, nameof(processorFactory));
Guard.AgainstNull(processorThreadOptions, nameof(processorThreadOptions));

if (threadCount < 1)
{
throw new ThreadCountZeroException();
}

_name = name ?? Guid.NewGuid().ToString();
_threadCount = threadCount;
_processorFactory = processorFactory;
_processorThreadOptions = processorThreadOptions;

_log = Log.For(this);
_joinTimeout = _processorThreadOptions.JoinTimeout;
_threadCount = threadCount;

if (_joinTimeout.TotalSeconds < 1)
{
_joinTimeout = TimeSpan.FromSeconds(1);
}
}

public void Pause()
{
foreach (var thread in _threads)
{
thread.Stop();
thread.Stop(_joinTimeout);
}

_log.Information(string.Format(Resources.ThreadPoolStatusChange, _name, "paused"));
}

public void Resume()
Expand All @@ -48,8 +53,6 @@ public void Resume()
{
thread.Start();
}

_log.Information(string.Format(Resources.ThreadPoolStatusChange, _name, "resumed"));
}

public IProcessorThreadPool Start()
Expand All @@ -63,11 +66,11 @@ public IProcessorThreadPool Start()

_started = true;

_log.Information(string.Format(Resources.ThreadPoolStatusChange, _name, "started"));

return this;
}

public IEnumerable<ProcessorThread> ProcessorThreads => _threads.AsReadOnly();

public void Dispose()
{
Dispose(true);
Expand All @@ -81,7 +84,7 @@ private void StartThreads()

while (i++ < _threadCount)
{
var thread = new ProcessorThread($"{_name} / {i}", _processorFactory.Create());
var thread = new ProcessorThread($"{_name} / {i}", _processorFactory.Create(), _processorThreadOptions);

_threads.Add(thread);

Expand All @@ -105,7 +108,7 @@ protected virtual void Dispose(bool disposing)

foreach (var thread in _threads)
{
thread.Stop();
thread.Stop(_joinTimeout);
}

_processorFactory.AttemptDispose();
Expand Down
Loading