From acf54f5cde17faa96afbdd1e130c0af4f43e6971 Mon Sep 17 00:00:00 2001 From: Eben Date: Tue, 14 Feb 2023 20:45:54 +0200 Subject: [PATCH 01/23] - async waiting --- Shuttle.Core.Threading/IProcessor.cs | 3 ++- Shuttle.Core.Threading/IThreadActivity.cs | 3 ++- Shuttle.Core.Threading/ProcessorThread.cs | 3 ++- Shuttle.Core.Threading/ThreadActivity.cs | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/Shuttle.Core.Threading/IProcessor.cs b/Shuttle.Core.Threading/IProcessor.cs index 37b3089..139072b 100644 --- a/Shuttle.Core.Threading/IProcessor.cs +++ b/Shuttle.Core.Threading/IProcessor.cs @@ -1,9 +1,10 @@ using System.Threading; +using System.Threading.Tasks; namespace Shuttle.Core.Threading { public interface IProcessor { - void Execute(CancellationToken cancellationToken); + Task Execute(CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/IThreadActivity.cs b/Shuttle.Core.Threading/IThreadActivity.cs index cd7c426..3c74669 100644 --- a/Shuttle.Core.Threading/IThreadActivity.cs +++ b/Shuttle.Core.Threading/IThreadActivity.cs @@ -1,10 +1,11 @@ using System.Threading; +using System.Threading.Tasks; namespace Shuttle.Core.Threading { public interface IThreadActivity { - void Waiting(CancellationToken cancellationToken); + Task Waiting(CancellationToken cancellationToken); void Working(); } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index ed87a0f..780ee8d 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; using Shuttle.Core.Contract; using Shuttle.Core.Reflection; @@ -108,7 +109,7 @@ private void Work() { ProcessorExecuting.Invoke(this, _eventArgs); - _processor.Execute(CancellationToken); + _processor.Execute(CancellationToken).GetAwaiter().GetResult(); } ProcessorThreadStopped.Invoke(this, _eventArgs); diff --git a/Shuttle.Core.Threading/ThreadActivity.cs b/Shuttle.Core.Threading/ThreadActivity.cs index 7daa9d7..316504b 100644 --- a/Shuttle.Core.Threading/ThreadActivity.cs +++ b/Shuttle.Core.Threading/ThreadActivity.cs @@ -30,11 +30,11 @@ public ThreadActivity(IThreadActivityConfiguration threadActivityConfiguration) _durationIndex = 0; } - public void Waiting(CancellationToken cancellationToken) + public async Task Waiting(CancellationToken cancellationToken) { try { - Task.Delay(GetSleepTimeSpan(), cancellationToken).Wait(cancellationToken); + await Task.Delay(GetSleepTimeSpan(), cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { From 882a5606dd39e4031c5e29e96db82cca118bc12f Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 26 Feb 2023 11:38:24 +0200 Subject: [PATCH 02/23] - file rename --- .../CallContextFixture.cs | 28 +++++++++---------- .../.package/package.nuspec | 2 +- .../{CallContext.cs => AmbientContext.cs} | 9 +----- .../Properties/AssemblyInfo.cs | 4 +-- 4 files changed, 18 insertions(+), 25 deletions(-) rename Shuttle.Core.Threading/{CallContext.cs => AmbientContext.cs} (76%) diff --git a/Shuttle.Core.Threading.Tests/CallContextFixture.cs b/Shuttle.Core.Threading.Tests/CallContextFixture.cs index f8f8180..f23840e 100644 --- a/Shuttle.Core.Threading.Tests/CallContextFixture.cs +++ b/Shuttle.Core.Threading.Tests/CallContextFixture.cs @@ -26,24 +26,24 @@ public void Should_be_able_to_flow_data() Task.WaitAll( Task.Run(() => { - CallContext.SetData("d1", d1); - new Thread(() => t10 = CallContext.GetData("d1")).Start(); + AmbientContext.SetData("d1", d1); + new Thread(() => t10 = AmbientContext.GetData("d1")).Start(); Task.WaitAll( - Task.Run(() => t1 = CallContext.GetData("d1")) - .ContinueWith(t => Task.Run(() => t11 = CallContext.GetData("d1"))), - Task.Run(() => t12 = CallContext.GetData("d1")), - Task.Run(() => t13 = CallContext.GetData("d1")) + Task.Run(() => t1 = AmbientContext.GetData("d1")) + .ContinueWith(t => Task.Run(() => t11 = AmbientContext.GetData("d1"))), + Task.Run(() => t12 = AmbientContext.GetData("d1")), + Task.Run(() => t13 = AmbientContext.GetData("d1")) ); }), Task.Run(() => { - CallContext.SetData("d2", d2); - new Thread(() => t20 = CallContext.GetData("d2")).Start(); + AmbientContext.SetData("d2", d2); + new Thread(() => t20 = AmbientContext.GetData("d2")).Start(); Task.WaitAll( - Task.Run(() => t2 = CallContext.GetData("d2")) - .ContinueWith(t => Task.Run(() => t21 = CallContext.GetData("d2"))), - Task.Run(() => t22 = CallContext.GetData("d2")), - Task.Run(() => t23 = CallContext.GetData("d2")) + Task.Run(() => t2 = AmbientContext.GetData("d2")) + .ContinueWith(t => Task.Run(() => t21 = AmbientContext.GetData("d2"))), + Task.Run(() => t22 = AmbientContext.GetData("d2")), + Task.Run(() => t23 = AmbientContext.GetData("d2")) ); }) ); @@ -60,8 +60,8 @@ public void Should_be_able_to_flow_data() Assert.That(d2, Is.SameAs(t22)); Assert.That(d2, Is.SameAs(t23)); - Assert.Null(CallContext.GetData("d1")); - Assert.Null(CallContext.GetData("d2")); + Assert.Null(AmbientContext.GetData("d1")); + Assert.Null(AmbientContext.GetData("d2")); } } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/.package/package.nuspec b/Shuttle.Core.Threading/.package/package.nuspec index 58bb58a..a05190d 100644 --- a/Shuttle.Core.Threading/.package/package.nuspec +++ b/Shuttle.Core.Threading/.package/package.nuspec @@ -3,7 +3,7 @@ Shuttle.Core.Threading - 12.0.2 + 13.0.0 Eben Roux Eben Roux BSD-3-Clause diff --git a/Shuttle.Core.Threading/CallContext.cs b/Shuttle.Core.Threading/AmbientContext.cs similarity index 76% rename from Shuttle.Core.Threading/CallContext.cs rename to Shuttle.Core.Threading/AmbientContext.cs index dcbaa11..a526e6b 100644 --- a/Shuttle.Core.Threading/CallContext.cs +++ b/Shuttle.Core.Threading/AmbientContext.cs @@ -5,7 +5,7 @@ namespace Shuttle.Core.Threading { // shout-out to Daniel Cazzulino: http://www.cazzulino.com/callcontext-netstandard-netcore.html - public static class CallContext + public static class AmbientContext { private static readonly ConcurrentDictionary> State = new ConcurrentDictionary>(); @@ -22,12 +22,5 @@ public static object GetData(string name) return State.TryGetValue(name, out var data) ? data.Value : null; } - - public static object RemoveData(string name) - { - Guard.AgainstNullOrEmptyString(name, nameof(name)); - - return State.TryRemove(name, out var data) ? data.Value : null; - } } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/Properties/AssemblyInfo.cs b/Shuttle.Core.Threading/Properties/AssemblyInfo.cs index 3fbeb44..a053c92 100644 --- a/Shuttle.Core.Threading/Properties/AssemblyInfo.cs +++ b/Shuttle.Core.Threading/Properties/AssemblyInfo.cs @@ -13,10 +13,10 @@ [assembly: AssemblyTitle(".NET Standard")] #endif -[assembly: AssemblyVersion("12.0.2.0")] +[assembly: AssemblyVersion("13.0.0.0")] [assembly: AssemblyCopyright("Copyright (c) 2023, Eben Roux")] [assembly: AssemblyProduct("Shuttle.Core.Threading")] [assembly: AssemblyCompany("Eben Roux")] [assembly: AssemblyConfiguration("Release")] -[assembly: AssemblyInformationalVersion("12.0.2")] +[assembly: AssemblyInformationalVersion("13.0.0")] [assembly: ComVisible(false)] \ No newline at end of file From ecab995c6446eaec06e0e466c492b832f58a9ab8 Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 5 Mar 2023 09:04:04 +0200 Subject: [PATCH 03/23] - ambient context --- .../{CallContextFixture.cs => AmbientContextFixture.cs} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename Shuttle.Core.Threading.Tests/{CallContextFixture.cs => AmbientContextFixture.cs} (98%) diff --git a/Shuttle.Core.Threading.Tests/CallContextFixture.cs b/Shuttle.Core.Threading.Tests/AmbientContextFixture.cs similarity index 98% rename from Shuttle.Core.Threading.Tests/CallContextFixture.cs rename to Shuttle.Core.Threading.Tests/AmbientContextFixture.cs index f23840e..f4e7ac9 100644 --- a/Shuttle.Core.Threading.Tests/CallContextFixture.cs +++ b/Shuttle.Core.Threading.Tests/AmbientContextFixture.cs @@ -5,7 +5,7 @@ namespace Shuttle.Core.Threading.Tests { [TestFixture] - public class CallContextFixture + public class AmbientContextFixture { [Test] public void Should_be_able_to_flow_data() From d783fee62ed73b36552b30e97046b39625d903b5 Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 23 Jul 2023 12:49:36 +0200 Subject: [PATCH 04/23] - processor exception --- .../ProcessorExceptionEventArgs.cs | 21 +++++++++++++++++++ Shuttle.Core.Threading/ProcessorThread.cs | 16 +++++++++++--- 2 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 Shuttle.Core.Threading/ProcessorExceptionEventArgs.cs diff --git a/Shuttle.Core.Threading/ProcessorExceptionEventArgs.cs b/Shuttle.Core.Threading/ProcessorExceptionEventArgs.cs new file mode 100644 index 0000000..fd0acdd --- /dev/null +++ b/Shuttle.Core.Threading/ProcessorExceptionEventArgs.cs @@ -0,0 +1,21 @@ +using System; +using Shuttle.Core.Contract; + +namespace Shuttle.Core.Threading +{ + public class ProcessorExceptionEventArgs : EventArgs + { + public string Name { get; } + public int ManagedThreadId { get; } + public string ProcessorTypeFullName { get; } + public Exception Exception { get; } + + public ProcessorExceptionEventArgs(string name, int managedThreadId, string processorTypeFullName, Exception exception) + { + Name = Guard.AgainstNullOrEmptyString(name, nameof(name)); + ProcessorTypeFullName = Guard.AgainstNullOrEmptyString(processorTypeFullName, nameof(processorTypeFullName)); + Exception = Guard.AgainstNull(exception, nameof(exception)); + ManagedThreadId = managedThreadId; + } + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 780ee8d..162c047 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -1,6 +1,5 @@ using System; using System.Threading; -using System.Threading.Tasks; using Shuttle.Core.Contract; using Shuttle.Core.Reflection; @@ -52,6 +51,10 @@ public ProcessorThread(string name, IProcessor processor, ProcessorThreadOptions { }; + public event EventHandler ProcessorException = delegate + { + }; + public void Start() { if (_started) @@ -103,13 +106,20 @@ public void Stop(TimeSpan timeout) } } - private void Work() + private async void Work() { while (!CancellationToken.IsCancellationRequested) { ProcessorExecuting.Invoke(this, _eventArgs); - _processor.Execute(CancellationToken).GetAwaiter().GetResult(); + try + { + await _processor.Execute(CancellationToken); + } + catch (Exception ex) + { + ProcessorException.Invoke(this, new ProcessorExceptionEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, ex)); + } } ProcessorThreadStopped.Invoke(this, _eventArgs); From e6caad5a583cba6a8f1ce9a86bcec80d9135525a Mon Sep 17 00:00:00 2001 From: Eben Date: Sat, 19 Aug 2023 09:44:20 +0200 Subject: [PATCH 05/23] - aborting threads --- .../ThreadActivityFixture.cs | 7 +- .../IProcessorThreadPoolFactory.cs | 11 ++++ Shuttle.Core.Threading/ProcessorThread.cs | 64 ++++++++++--------- Shuttle.Core.Threading/ProcessorThreadPool.cs | 10 ++- .../ProcessorThreadPoolCreatedEventArgs.cs | 19 ++++++ .../ProcessorThreadPoolFactory.cs | 22 +++++++ .../ProcessorThreadStoppedEventArgs.cs | 13 ++++ .../Shuttle.Core.Threading.csproj | 1 + 8 files changed, 114 insertions(+), 33 deletions(-) create mode 100644 Shuttle.Core.Threading/IProcessorThreadPoolFactory.cs create mode 100644 Shuttle.Core.Threading/ProcessorThreadPoolCreatedEventArgs.cs create mode 100644 Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs create mode 100644 Shuttle.Core.Threading/ProcessorThreadStoppedEventArgs.cs diff --git a/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs b/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs index e991f50..ee1c02d 100644 --- a/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs +++ b/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; using Moq; using NUnit.Framework; @@ -9,7 +10,7 @@ namespace Shuttle.Core.Threading.Tests public class ThreadActivityFixture { [Test] - public void Should_be_able_to_have_the_thread_wait() + public async Task Should_be_able_to_have_the_thread_wait() { var activity = new ThreadActivity(new[] { @@ -20,11 +21,11 @@ public void Should_be_able_to_have_the_thread_wait() var start = DateTime.Now; var token = new CancellationToken(false); - activity.Waiting(token); + await activity.Waiting(token); Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 250); - activity.Waiting(token); + await activity.Waiting(token); Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 750); } diff --git a/Shuttle.Core.Threading/IProcessorThreadPoolFactory.cs b/Shuttle.Core.Threading/IProcessorThreadPoolFactory.cs new file mode 100644 index 0000000..c3d097e --- /dev/null +++ b/Shuttle.Core.Threading/IProcessorThreadPoolFactory.cs @@ -0,0 +1,11 @@ +using System; + +namespace Shuttle.Core.Threading +{ + public interface IProcessorThreadPoolFactory + { + event EventHandler ProcessorThreadPoolCreated; + + IProcessorThreadPool Create(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions); + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 162c047..48e216a 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -8,9 +8,6 @@ namespace Shuttle.Core.Threading public class ProcessorThread { private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); - - private readonly string _name; - private readonly IProcessor _processor; private readonly ProcessorThreadOptions _processorThreadOptions; private ProcessorThreadEventArgs _eventArgs; @@ -19,39 +16,43 @@ public class ProcessorThread public ProcessorThread(string name, IProcessor processor, ProcessorThreadOptions processorThreadOptions) { - Guard.AgainstNull(processor, nameof(processor)); - Guard.AgainstNull(processorThreadOptions, nameof(processorThreadOptions)); - - _name = name; - _processor = processor; - _processorThreadOptions = processorThreadOptions; - + Name = Guard.AgainstNull(name, nameof(name)); + Processor = Guard.AgainstNull(processor, nameof(processor)); + _processorThreadOptions = Guard.AgainstNull(processorThreadOptions, nameof(processorThreadOptions)); CancellationToken = _cancellationTokenSource.Token; } public CancellationToken CancellationToken { get; } - public event EventHandler ProcessorThreadStarting = delegate + public string Name { get; } + public IProcessor Processor { get; } + + internal void Deactivate() + { + _cancellationTokenSource.Cancel(); + } + + public event EventHandler ProcessorException = delegate { }; - public event EventHandler ProcessorThreadActive = delegate + public event EventHandler ProcessorExecuting = delegate { }; - public event EventHandler ProcessorThreadStopping = delegate + public event EventHandler ProcessorThreadActive = delegate { }; - public event EventHandler ProcessorThreadStopped = delegate + public event EventHandler ProcessorThreadStarting = delegate { }; - public event EventHandler ProcessorExecuting = delegate + public event EventHandler ProcessorThreadStopped = delegate { }; - public event EventHandler ProcessorException = delegate + public event EventHandler ProcessorThreadStopping = delegate { }; @@ -62,7 +63,7 @@ public void Start() return; } - _thread = new Thread(Work) { Name = _name }; + _thread = new Thread(Work) { Name = Name }; _thread.TrySetApartmentState(ApartmentState.MTA); @@ -71,7 +72,7 @@ public void Start() _thread.Start(); - _eventArgs = new ProcessorThreadEventArgs(_name, _thread.ManagedThreadId, _processor.GetType().FullName); + _eventArgs = new ProcessorThreadEventArgs(Name, _thread.ManagedThreadId, Processor.GetType().FullName); ProcessorThreadStarting.Invoke(this, _eventArgs); @@ -98,12 +99,24 @@ public void Stop(TimeSpan timeout) _cancellationTokenSource.Cancel(); - _processor.TryDispose(); + Processor.TryDispose(); + + var aborted = false; - if (_thread.IsAlive) + if (_thread.IsAlive && !_thread.Join(timeout)) { - _thread.Join(timeout); + try + { + _thread.Abort(); + } + catch (ThreadAbortException) + { + aborted = true; + } } + + ProcessorThreadStopped.Invoke(this, + new ProcessorThreadStoppedEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, aborted)); } private async void Work() @@ -114,20 +127,13 @@ private async void Work() try { - await _processor.Execute(CancellationToken); + await Processor.Execute(CancellationToken); } catch (Exception ex) { ProcessorException.Invoke(this, new ProcessorExceptionEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, ex)); } } - - ProcessorThreadStopped.Invoke(this, _eventArgs); - } - - internal void Deactivate() - { - _cancellationTokenSource.Cancel(); } } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThreadPool.cs b/Shuttle.Core.Threading/ProcessorThreadPool.cs index b746849..a80da08 100644 --- a/Shuttle.Core.Threading/ProcessorThreadPool.cs +++ b/Shuttle.Core.Threading/ProcessorThreadPool.cs @@ -43,7 +43,15 @@ public void Pause() { foreach (var thread in _threads) { - thread.Stop(_joinTimeout); + StopThread(thread); + } + } + + private void StopThread(ProcessorThread thread) + { + if (!thread.Stop(_joinTimeout)) + { + } } diff --git a/Shuttle.Core.Threading/ProcessorThreadPoolCreatedEventArgs.cs b/Shuttle.Core.Threading/ProcessorThreadPoolCreatedEventArgs.cs new file mode 100644 index 0000000..03cb6ee --- /dev/null +++ b/Shuttle.Core.Threading/ProcessorThreadPoolCreatedEventArgs.cs @@ -0,0 +1,19 @@ +using System; +using Shuttle.Core.Contract; + +namespace Shuttle.Core.Threading +{ + public class ProcessorThreadPoolCreatedEventArgs : EventArgs + { + public string Name { get; } + public int ThreadCount { get; } + public IProcessorFactory ProcessorFactory { get; } + + public ProcessorThreadPoolCreatedEventArgs(string name, int threadCount, IProcessorFactory processorFactory) + { + Name = Guard.AgainstNullOrEmptyString(name, nameof(name)); + ThreadCount = threadCount; + ProcessorFactory = Guard.AgainstNull(processorFactory, nameof(processorFactory)); + } + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs b/Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs new file mode 100644 index 0000000..4af7f14 --- /dev/null +++ b/Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs @@ -0,0 +1,22 @@ +using System; +using Microsoft.Extensions.Options; +using Shuttle.Core.Contract; + +namespace Shuttle.Core.Threading +{ + public class ProcessorThreadPoolFactory : IProcessorThreadPoolFactory + { + public event EventHandler ProcessorThreadPoolCreated = delegate + { + }; + + public IProcessorThreadPool Create(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions) + { + var result = new ProcessorThreadPool(name, threadCount, processorFactory, processorThreadOptions); + + ProcessorThreadPoolCreated.Invoke(this, new ProcessorThreadPoolCreatedEventArgs(name, threadCount, processorFactory)); + + return result; + } + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThreadStoppedEventArgs.cs b/Shuttle.Core.Threading/ProcessorThreadStoppedEventArgs.cs new file mode 100644 index 0000000..4c6fd5c --- /dev/null +++ b/Shuttle.Core.Threading/ProcessorThreadStoppedEventArgs.cs @@ -0,0 +1,13 @@ +namespace Shuttle.Core.Threading +{ + public class ProcessorThreadStoppedEventArgs : ProcessorThreadEventArgs + { + public bool Aborted { get; } + + public ProcessorThreadStoppedEventArgs(string name, int managedThreadId, string processorTypeFullName, bool aborted) + : base(name, managedThreadId, processorTypeFullName) + { + Aborted = aborted; + } + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj b/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj index 2bd619b..bddf494 100644 --- a/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj +++ b/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj @@ -14,6 +14,7 @@ + From 6c4a0a397fee7a83bdaa626a8d196f2faace5c8c Mon Sep 17 00:00:00 2001 From: Eben Date: Sat, 19 Aug 2023 09:44:47 +0200 Subject: [PATCH 06/23] - reverted initial thread stop mechanism --- Shuttle.Core.Threading/ProcessorThreadPool.cs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/Shuttle.Core.Threading/ProcessorThreadPool.cs b/Shuttle.Core.Threading/ProcessorThreadPool.cs index a80da08..b746849 100644 --- a/Shuttle.Core.Threading/ProcessorThreadPool.cs +++ b/Shuttle.Core.Threading/ProcessorThreadPool.cs @@ -43,15 +43,7 @@ public void Pause() { foreach (var thread in _threads) { - StopThread(thread); - } - } - - private void StopThread(ProcessorThread thread) - { - if (!thread.Stop(_joinTimeout)) - { - + thread.Stop(_joinTimeout); } } From ba93a0bd4c95b5d69caafc463798eb0b9286add4 Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 3 Sep 2023 09:31:36 +0200 Subject: [PATCH 07/23] - removed empty delegates from events --- .../.package/package.nuspec | 1 + Shuttle.Core.Threading/ProcessorThread.cs | 36 +++++++------------ .../ProcessorThreadPoolFactory.cs | 8 ++--- .../Shuttle.Core.Threading.csproj | 2 +- 4 files changed, 16 insertions(+), 31 deletions(-) diff --git a/Shuttle.Core.Threading/.package/package.nuspec b/Shuttle.Core.Threading/.package/package.nuspec index a05190d..a253a97 100644 --- a/Shuttle.Core.Threading/.package/package.nuspec +++ b/Shuttle.Core.Threading/.package/package.nuspec @@ -16,6 +16,7 @@ Copyright (c) 2023, Eben Roux shuttle threading processor + diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 48e216a..86c7760 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -32,29 +32,17 @@ internal void Deactivate() _cancellationTokenSource.Cancel(); } - public event EventHandler ProcessorException = delegate - { - }; + public event EventHandler ProcessorException; - public event EventHandler ProcessorExecuting = delegate - { - }; + public event EventHandler ProcessorExecuting; - public event EventHandler ProcessorThreadActive = delegate - { - }; + public event EventHandler ProcessorThreadActive; - public event EventHandler ProcessorThreadStarting = delegate - { - }; + public event EventHandler ProcessorThreadStarting; - public event EventHandler ProcessorThreadStopped = delegate - { - }; + public event EventHandler ProcessorThreadStopped; - public event EventHandler ProcessorThreadStopping = delegate - { - }; + public event EventHandler ProcessorThreadStopping; public void Start() { @@ -74,7 +62,7 @@ public void Start() _eventArgs = new ProcessorThreadEventArgs(Name, _thread.ManagedThreadId, Processor.GetType().FullName); - ProcessorThreadStarting.Invoke(this, _eventArgs); + ProcessorThreadStarting?.Invoke(this, _eventArgs); while (!_thread.IsAlive && !CancellationToken.IsCancellationRequested) { @@ -82,7 +70,7 @@ public void Start() if (!CancellationToken.IsCancellationRequested) { - ProcessorThreadActive.Invoke(this, _eventArgs); + ProcessorThreadActive?.Invoke(this, _eventArgs); } _started = true; @@ -95,7 +83,7 @@ public void Stop(TimeSpan timeout) throw new InvalidOperationException(Resources.ProcessorThreadNotStartedException); } - ProcessorThreadStopping.Invoke(this, _eventArgs); + ProcessorThreadStopping?.Invoke(this, _eventArgs); _cancellationTokenSource.Cancel(); @@ -115,7 +103,7 @@ public void Stop(TimeSpan timeout) } } - ProcessorThreadStopped.Invoke(this, + ProcessorThreadStopped?.Invoke(this, new ProcessorThreadStoppedEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, aborted)); } @@ -123,7 +111,7 @@ private async void Work() { while (!CancellationToken.IsCancellationRequested) { - ProcessorExecuting.Invoke(this, _eventArgs); + ProcessorExecuting?.Invoke(this, _eventArgs); try { @@ -131,7 +119,7 @@ private async void Work() } catch (Exception ex) { - ProcessorException.Invoke(this, new ProcessorExceptionEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, ex)); + ProcessorException?.Invoke(this, new ProcessorExceptionEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, ex)); } } } diff --git a/Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs b/Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs index 4af7f14..488b44f 100644 --- a/Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs +++ b/Shuttle.Core.Threading/ProcessorThreadPoolFactory.cs @@ -1,20 +1,16 @@ using System; -using Microsoft.Extensions.Options; -using Shuttle.Core.Contract; namespace Shuttle.Core.Threading { public class ProcessorThreadPoolFactory : IProcessorThreadPoolFactory { - public event EventHandler ProcessorThreadPoolCreated = delegate - { - }; + public event EventHandler ProcessorThreadPoolCreated; public IProcessorThreadPool Create(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions) { var result = new ProcessorThreadPool(name, threadCount, processorFactory, processorThreadOptions); - ProcessorThreadPoolCreated.Invoke(this, new ProcessorThreadPoolCreatedEventArgs(name, threadCount, processorFactory)); + ProcessorThreadPoolCreated?.Invoke(this, new ProcessorThreadPoolCreatedEventArgs(name, threadCount, processorFactory)); return result; } diff --git a/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj b/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj index bddf494..d8d65c9 100644 --- a/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj +++ b/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj @@ -1,7 +1,7 @@  - netstandard2.0;netstandard2.1 + netstandard2.1 false From d97b75bd6c95b35de41f41cb77aaeb7383d89af5 Mon Sep 17 00:00:00 2001 From: Eben Date: Sat, 16 Sep 2023 09:21:41 +0200 Subject: [PATCH 08/23] - fixed async naming conventions --- Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs | 4 ++-- Shuttle.Core.Threading/IProcessor.cs | 2 +- Shuttle.Core.Threading/IThreadActivity.cs | 2 +- Shuttle.Core.Threading/ProcessorThread.cs | 2 +- Shuttle.Core.Threading/ThreadActivity.cs | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs b/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs index ee1c02d..7745fba 100644 --- a/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs +++ b/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs @@ -21,11 +21,11 @@ public async Task Should_be_able_to_have_the_thread_wait() var start = DateTime.Now; var token = new CancellationToken(false); - await activity.Waiting(token); + await activity.WaitingAsync(token); Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 250); - await activity.Waiting(token); + await activity.WaitingAsync(token); Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 750); } diff --git a/Shuttle.Core.Threading/IProcessor.cs b/Shuttle.Core.Threading/IProcessor.cs index 139072b..8c1506c 100644 --- a/Shuttle.Core.Threading/IProcessor.cs +++ b/Shuttle.Core.Threading/IProcessor.cs @@ -5,6 +5,6 @@ namespace Shuttle.Core.Threading { public interface IProcessor { - Task Execute(CancellationToken cancellationToken); + Task ExecuteAsync(CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/IThreadActivity.cs b/Shuttle.Core.Threading/IThreadActivity.cs index 3c74669..c586a01 100644 --- a/Shuttle.Core.Threading/IThreadActivity.cs +++ b/Shuttle.Core.Threading/IThreadActivity.cs @@ -5,7 +5,7 @@ namespace Shuttle.Core.Threading { public interface IThreadActivity { - Task Waiting(CancellationToken cancellationToken); + Task WaitingAsync(CancellationToken cancellationToken); void Working(); } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 86c7760..a987128 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -115,7 +115,7 @@ private async void Work() try { - await Processor.Execute(CancellationToken); + await Processor.ExecuteAsync(CancellationToken).ConfigureAwait(false); } catch (Exception ex) { diff --git a/Shuttle.Core.Threading/ThreadActivity.cs b/Shuttle.Core.Threading/ThreadActivity.cs index 316504b..2036fdf 100644 --- a/Shuttle.Core.Threading/ThreadActivity.cs +++ b/Shuttle.Core.Threading/ThreadActivity.cs @@ -30,7 +30,7 @@ public ThreadActivity(IThreadActivityConfiguration threadActivityConfiguration) _durationIndex = 0; } - public async Task Waiting(CancellationToken cancellationToken) + public async Task WaitingAsync(CancellationToken cancellationToken) { try { From 09d34c89787df0452bd011f5ae4d4b64397b644d Mon Sep 17 00:00:00 2001 From: Eben Date: Sat, 16 Sep 2023 11:42:11 +0200 Subject: [PATCH 09/23] - sync added --- .../ThreadActivityFixture.cs | 32 ++++++++++--------- Shuttle.Core.Threading/IProcessor.cs | 1 + .../IProcessorThreadPool.cs | 2 ++ Shuttle.Core.Threading/IThreadActivity.cs | 1 + .../IThreadActivityConfiguration.cs | 9 ------ Shuttle.Core.Threading/ProcessorThread.cs | 22 ++++++++++++- Shuttle.Core.Threading/ProcessorThreadPool.cs | 32 +++++++++++++------ Shuttle.Core.Threading/ThreadActivity.cs | 25 +++++++++------ .../ThreadActivityOptions.cs | 9 ++++++ 9 files changed, 89 insertions(+), 44 deletions(-) delete mode 100644 Shuttle.Core.Threading/IThreadActivityConfiguration.cs create mode 100644 Shuttle.Core.Threading/ThreadActivityOptions.cs diff --git a/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs b/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs index 7745fba..c0ef735 100644 --- a/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs +++ b/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs @@ -1,33 +1,35 @@ using System; using System.Threading; using System.Threading.Tasks; -using Moq; +using Microsoft.Extensions.Options; using NUnit.Framework; -namespace Shuttle.Core.Threading.Tests +namespace Shuttle.Core.Threading.Tests; + +[TestFixture] +public class ThreadActivityFixture { - [TestFixture] - public class ThreadActivityFixture + [Test] + public async Task Should_be_able_to_have_the_thread_wait() { - [Test] - public async Task Should_be_able_to_have_the_thread_wait() + var activity = new ThreadActivity(Options.Create(new ThreadActivityOptions { - var activity = new ThreadActivity(new[] + DurationToSleepWhenIdle = new[] { TimeSpan.FromMilliseconds(250), TimeSpan.FromMilliseconds(500) - }); + } + })); - var start = DateTime.Now; - var token = new CancellationToken(false); + var start = DateTime.Now; + var token = new CancellationToken(false); - await activity.WaitingAsync(token); + await activity.WaitingAsync(token); - Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 250); + Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 250); - await activity.WaitingAsync(token); + await activity.WaitingAsync(token); - Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 750); - } + Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 750); } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/IProcessor.cs b/Shuttle.Core.Threading/IProcessor.cs index 8c1506c..4bf8fbc 100644 --- a/Shuttle.Core.Threading/IProcessor.cs +++ b/Shuttle.Core.Threading/IProcessor.cs @@ -5,6 +5,7 @@ namespace Shuttle.Core.Threading { public interface IProcessor { + void Execute(CancellationToken cancellationToken); Task ExecuteAsync(CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/IProcessorThreadPool.cs b/Shuttle.Core.Threading/IProcessorThreadPool.cs index e26af58..40b743d 100644 --- a/Shuttle.Core.Threading/IProcessorThreadPool.cs +++ b/Shuttle.Core.Threading/IProcessorThreadPool.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; namespace Shuttle.Core.Threading { @@ -8,6 +9,7 @@ public interface IProcessorThreadPool : IDisposable void Pause(); void Resume(); IProcessorThreadPool Start(); + IProcessorThreadPool StartAsync(); IEnumerable ProcessorThreads { get; } } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/IThreadActivity.cs b/Shuttle.Core.Threading/IThreadActivity.cs index c586a01..c430d4d 100644 --- a/Shuttle.Core.Threading/IThreadActivity.cs +++ b/Shuttle.Core.Threading/IThreadActivity.cs @@ -5,6 +5,7 @@ namespace Shuttle.Core.Threading { public interface IThreadActivity { + void Waiting(CancellationToken cancellationToken); Task WaitingAsync(CancellationToken cancellationToken); void Working(); } diff --git a/Shuttle.Core.Threading/IThreadActivityConfiguration.cs b/Shuttle.Core.Threading/IThreadActivityConfiguration.cs deleted file mode 100644 index 4dc86bb..0000000 --- a/Shuttle.Core.Threading/IThreadActivityConfiguration.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace Shuttle.Core.Threading -{ - public interface IThreadActivityConfiguration - { - TimeSpan[] DurationToSleepWhenIdle { get; } - } -} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index a987128..1cd9cae 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -13,6 +13,7 @@ public class ProcessorThread private bool _started; private Thread _thread; + private bool _sync; public ProcessorThread(string name, IProcessor processor, ProcessorThreadOptions processorThreadOptions) { @@ -45,12 +46,24 @@ internal void Deactivate() public event EventHandler ProcessorThreadStopping; public void Start() + { + Start(true); + } + + public void StartAsync() + { + Start(false); + } + + public void Start(bool sync) { if (_started) { return; } + _sync = sync; + _thread = new Thread(Work) { Name = Name }; _thread.TrySetApartmentState(ApartmentState.MTA); @@ -115,7 +128,14 @@ private async void Work() try { - await Processor.ExecuteAsync(CancellationToken).ConfigureAwait(false); + if (_sync) + { + Processor.Execute(CancellationToken); + } + else + { + await Processor.ExecuteAsync(CancellationToken).ConfigureAwait(false); + } } catch (Exception ex) { diff --git a/Shuttle.Core.Threading/ProcessorThreadPool.cs b/Shuttle.Core.Threading/ProcessorThreadPool.cs index b746849..0b64b4c 100644 --- a/Shuttle.Core.Threading/ProcessorThreadPool.cs +++ b/Shuttle.Core.Threading/ProcessorThreadPool.cs @@ -51,20 +51,20 @@ public void Resume() { foreach (var thread in _threads) { - thread.Start(); + thread.StartAsync(); } } public IProcessorThreadPool Start() { - if (_started) - { - return this; - } + Start(true); - StartThreads(); + return this; + } - _started = true; + public IProcessorThreadPool StartAsync() + { + Start(false); return this; } @@ -78,8 +78,13 @@ public void Dispose() GC.SuppressFinalize(this); } - private void StartThreads() + private void Start(bool sync) { + if (_started) + { + return; + } + var i = 0; while (i++ < _threadCount) @@ -88,8 +93,17 @@ private void StartThreads() _threads.Add(thread); - thread.Start(); + if (sync) + { + thread.Start(); + } + else + { + thread.StartAsync(); + } } + + _started = true; } protected virtual void Dispose(bool disposing) diff --git a/Shuttle.Core.Threading/ThreadActivity.cs b/Shuttle.Core.Threading/ThreadActivity.cs index 2036fdf..02d5b4e 100644 --- a/Shuttle.Core.Threading/ThreadActivity.cs +++ b/Shuttle.Core.Threading/ThreadActivity.cs @@ -1,8 +1,7 @@ using System; -using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Options; using Shuttle.Core.Contract; namespace Shuttle.Core.Threading @@ -14,20 +13,26 @@ public class ThreadActivity : IThreadActivity private int _durationIndex; - public ThreadActivity(IEnumerable durationToSleepWhenIdle) + public ThreadActivity(IOptions threadActivityOptions) { - Guard.AgainstNull(durationToSleepWhenIdle, nameof(durationToSleepWhenIdle)); + Guard.AgainstNull(threadActivityOptions, nameof(threadActivityOptions)); + Guard.AgainstNull(threadActivityOptions.Value, nameof(threadActivityOptions.Value)); - _durations = durationToSleepWhenIdle.ToArray(); + _durations = threadActivityOptions.Value.DurationToSleepWhenIdle == null || threadActivityOptions.Value.DurationToSleepWhenIdle.Length == 0 + ? new[] { DefaultDuration } + : threadActivityOptions.Value.DurationToSleepWhenIdle; _durationIndex = 0; } - public ThreadActivity(IThreadActivityConfiguration threadActivityConfiguration) + public void Waiting(CancellationToken cancellationToken) { - Guard.AgainstNull(threadActivityConfiguration, nameof(threadActivityConfiguration)); - - _durations = threadActivityConfiguration.DurationToSleepWhenIdle; - _durationIndex = 0; + try + { + Task.Delay(GetSleepTimeSpan(), cancellationToken).Wait(cancellationToken); + } + catch (OperationCanceledException) + { + } } public async Task WaitingAsync(CancellationToken cancellationToken) diff --git a/Shuttle.Core.Threading/ThreadActivityOptions.cs b/Shuttle.Core.Threading/ThreadActivityOptions.cs new file mode 100644 index 0000000..d0fcf9b --- /dev/null +++ b/Shuttle.Core.Threading/ThreadActivityOptions.cs @@ -0,0 +1,9 @@ +using System; + +namespace Shuttle.Core.Threading +{ + public class ThreadActivityOptions + { + public TimeSpan[] DurationToSleepWhenIdle { get; set; } + } +} \ No newline at end of file From f3702915290d9220350cbc59623ca7d54226ca8e Mon Sep 17 00:00:00 2001 From: Eben Date: Sat, 16 Sep 2023 18:45:21 +0200 Subject: [PATCH 10/23] - StartAsync --- Shuttle.Core.Threading/ProcessorThread.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 1cd9cae..a0aae5d 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -1,5 +1,6 @@ using System; using System.Threading; +using System.Threading.Tasks; using Shuttle.Core.Contract; using Shuttle.Core.Reflection; @@ -50,9 +51,11 @@ public void Start() Start(true); } - public void StartAsync() + public async Task StartAsync() { Start(false); + + await Task.CompletedTask; } public void Start(bool sync) From 3d659f3f3eee31bf994e3b87d9b170a61906fee3 Mon Sep 17 00:00:00 2001 From: Eben Date: Mon, 25 Sep 2023 12:53:55 +0200 Subject: [PATCH 11/23] - wait durations --- .../ThreadActivityFixture.cs | 32 +++++++++++++++---- Shuttle.Core.Threading/ThreadActivity.cs | 18 +++-------- 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs b/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs index c0ef735..8f4c2a8 100644 --- a/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs +++ b/Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs @@ -10,16 +10,36 @@ namespace Shuttle.Core.Threading.Tests; public class ThreadActivityFixture { [Test] - public async Task Should_be_able_to_have_the_thread_wait() + public void Should_be_able_to_have_the_thread_wait() { - var activity = new ThreadActivity(Options.Create(new ThreadActivityOptions - { - DurationToSleepWhenIdle = new[] + var activity = new ThreadActivity( + new[] { TimeSpan.FromMilliseconds(250), TimeSpan.FromMilliseconds(500) - } - })); + }); + + var start = DateTime.Now; + var token = new CancellationToken(false); + + activity.Waiting(token); + + Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 250); + + activity.Waiting(token); + + Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 750); + } + + [Test] + public async Task Should_be_able_to_have_the_thread_wait_async() + { + var activity = new ThreadActivity( + new[] + { + TimeSpan.FromMilliseconds(250), + TimeSpan.FromMilliseconds(500) + }); var start = DateTime.Now; var token = new CancellationToken(false); diff --git a/Shuttle.Core.Threading/ThreadActivity.cs b/Shuttle.Core.Threading/ThreadActivity.cs index 02d5b4e..7a5adab 100644 --- a/Shuttle.Core.Threading/ThreadActivity.cs +++ b/Shuttle.Core.Threading/ThreadActivity.cs @@ -1,26 +1,23 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; -using Microsoft.Extensions.Options; using Shuttle.Core.Contract; namespace Shuttle.Core.Threading { public class ThreadActivity : IThreadActivity { - private static readonly TimeSpan DefaultDuration = TimeSpan.FromMilliseconds(250); private readonly TimeSpan[] _durations; private int _durationIndex; - public ThreadActivity(IOptions threadActivityOptions) + public ThreadActivity(IEnumerable waitDurations) { - Guard.AgainstNull(threadActivityOptions, nameof(threadActivityOptions)); - Guard.AgainstNull(threadActivityOptions.Value, nameof(threadActivityOptions.Value)); + Guard.AgainstEmptyEnumerable(waitDurations, nameof(waitDurations)); - _durations = threadActivityOptions.Value.DurationToSleepWhenIdle == null || threadActivityOptions.Value.DurationToSleepWhenIdle.Length == 0 - ? new[] { DefaultDuration } - : threadActivityOptions.Value.DurationToSleepWhenIdle; + _durations = waitDurations.ToArray(); _durationIndex = 0; } @@ -53,11 +50,6 @@ public void Working() private TimeSpan GetSleepTimeSpan() { - if (_durations == null || _durations.Length == 0) - { - return DefaultDuration; - } - if (_durationIndex >= _durations.Length) { _durationIndex = _durations.Length - 1; From 8d5df451ec132a246be08acec474a92ffbdfe02a Mon Sep 17 00:00:00 2001 From: Eben Date: Mon, 25 Sep 2023 13:03:43 +0200 Subject: [PATCH 12/23] - replaced pause/resume with stop --- .../IProcessorThreadPool.cs | 5 ++--- Shuttle.Core.Threading/ProcessorThreadPool.cs | 21 +++++++------------ 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/Shuttle.Core.Threading/IProcessorThreadPool.cs b/Shuttle.Core.Threading/IProcessorThreadPool.cs index 40b743d..78e1389 100644 --- a/Shuttle.Core.Threading/IProcessorThreadPool.cs +++ b/Shuttle.Core.Threading/IProcessorThreadPool.cs @@ -6,10 +6,9 @@ namespace Shuttle.Core.Threading { public interface IProcessorThreadPool : IDisposable { - void Pause(); - void Resume(); + void Stop(); IProcessorThreadPool Start(); - IProcessorThreadPool StartAsync(); + Task StartAsync(); IEnumerable ProcessorThreads { get; } } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThreadPool.cs b/Shuttle.Core.Threading/ProcessorThreadPool.cs index 0b64b4c..5ed6c54 100644 --- a/Shuttle.Core.Threading/ProcessorThreadPool.cs +++ b/Shuttle.Core.Threading/ProcessorThreadPool.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using Shuttle.Core.Contract; using Shuttle.Core.Reflection; @@ -39,7 +40,7 @@ public ProcessorThreadPool(string name, int threadCount, IProcessorFactory proce } } - public void Pause() + public void Stop() { foreach (var thread in _threads) { @@ -47,24 +48,16 @@ public void Pause() } } - public void Resume() - { - foreach (var thread in _threads) - { - thread.StartAsync(); - } - } - public IProcessorThreadPool Start() { - Start(true); + Start(true).GetAwaiter().GetResult(); return this; } - public IProcessorThreadPool StartAsync() + public async Task StartAsync() { - Start(false); + await Start(false); return this; } @@ -78,7 +71,7 @@ public void Dispose() GC.SuppressFinalize(this); } - private void Start(bool sync) + private async Task Start(bool sync) { if (_started) { @@ -99,7 +92,7 @@ private void Start(bool sync) } else { - thread.StartAsync(); + await thread.StartAsync(); } } From 10a093cf8786821d1c1de4a38934786a53750006 Mon Sep 17 00:00:00 2001 From: Eben Date: Wed, 29 Nov 2023 17:40:33 +0200 Subject: [PATCH 13/23] - start ordering --- Shuttle.Core.Threading/ProcessorThread.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index a0aae5d..829da65 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -74,12 +74,12 @@ public void Start(bool sync) _thread.IsBackground = _processorThreadOptions.IsBackground; _thread.Priority = _processorThreadOptions.Priority; - _thread.Start(); - _eventArgs = new ProcessorThreadEventArgs(Name, _thread.ManagedThreadId, Processor.GetType().FullName); ProcessorThreadStarting?.Invoke(this, _eventArgs); + _thread.Start(); + while (!_thread.IsAlive && !CancellationToken.IsCancellationRequested) { } From 38281067b472d001820cdd017ea453f77a6c5738 Mon Sep 17 00:00:00 2001 From: Eben Date: Mon, 18 Dec 2023 15:11:14 +0200 Subject: [PATCH 14/23] - ProcessorThreadFixture --- Shuttle.Core.Threading.Tests/MockProcessor.cs | 29 ++++++ .../ProcessorThreadFixture.cs | 93 +++++++++++++++++++ Shuttle.Core.Threading/ProcessorThread.cs | 10 +- 3 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 Shuttle.Core.Threading.Tests/MockProcessor.cs create mode 100644 Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs diff --git a/Shuttle.Core.Threading.Tests/MockProcessor.cs b/Shuttle.Core.Threading.Tests/MockProcessor.cs new file mode 100644 index 0000000..f631df3 --- /dev/null +++ b/Shuttle.Core.Threading.Tests/MockProcessor.cs @@ -0,0 +1,29 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Shuttle.Core.Threading.Tests; + +public class MockProcessor : IProcessor +{ + private readonly TimeSpan _executionDuration; + + public int ExecutionCount { get; private set; } + + public MockProcessor(TimeSpan executionDuration) + { + _executionDuration = executionDuration; + } + + public void Execute(CancellationToken cancellationToken) + { + ExecuteAsync(cancellationToken).GetAwaiter().GetResult(); + } + + public async Task ExecuteAsync(CancellationToken cancellationToken) + { + await Task.Delay(_executionDuration, cancellationToken).ConfigureAwait(false); + + ExecutionCount++; + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs b/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs new file mode 100644 index 0000000..b5796fd --- /dev/null +++ b/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs @@ -0,0 +1,93 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using NUnit.Framework; + +namespace Shuttle.Core.Threading.Tests; + +public class ProcessorThreadFixture +{ + [Test] + public void Should_be_able_to_execute_processor() + { + Should_be_able_to_execute_processor_async(true).GetAwaiter().GetResult(); + } + + [Test] + public async Task Should_be_able_to_execute_processor_async() + { + await Should_be_able_to_execute_processor_async(false); + } + + private async Task Should_be_able_to_execute_processor_async(bool sync) + { + const int executionCount = 5; + + var executionDuration = TimeSpan.FromMilliseconds(200); + var mockProcessor = new MockProcessor(executionDuration); + var processorThread = new ProcessorThread("test", mockProcessor, new ProcessorThreadOptions()); + var cancellationTokenSource = new CancellationTokenSource(); + var cancellationToken = cancellationTokenSource.Token; + + processorThread.ProcessorException += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorException] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId} / exception = '{args.Exception}'"); + }; + + processorThread.ProcessorExecuting += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorExecuting] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + }; + + processorThread.ProcessorThreadActive += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadActive] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + }; + + processorThread.ProcessorThreadStarting += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStarting] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + }; + + processorThread.ProcessorThreadStopped += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId} / aborted = '{args.Aborted}'"); + }; + + processorThread.ProcessorThreadStopping += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopping] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + }; + + processorThread.ProcessorThreadOperationCanceled += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadOperationCanceled] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + }; + + + if (sync) + { + processorThread.Start(); + } + else + { + await processorThread.StartAsync(); + } + + var timeout = DateTime.Now.AddSeconds(5); + var timedOut = false; + + while (mockProcessor.ExecutionCount < executionCount && !timedOut) + { + await Task.Delay(25, cancellationToken).ConfigureAwait(false); + + timedOut = DateTime.Now >= timeout; + } + + cancellationTokenSource.Cancel(); + + processorThread.Stop(executionDuration * 2); + + Assert.That(timedOut, Is.False, $"[TIMEOUT] : Did not complete {executionCount} executions before {timeout:O}"); + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 829da65..e810f9c 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -35,16 +35,12 @@ internal void Deactivate() } public event EventHandler ProcessorException; - public event EventHandler ProcessorExecuting; - public event EventHandler ProcessorThreadActive; - public event EventHandler ProcessorThreadStarting; - public event EventHandler ProcessorThreadStopped; - public event EventHandler ProcessorThreadStopping; + public event EventHandler ProcessorThreadOperationCanceled; public void Start() { @@ -140,6 +136,10 @@ private async void Work() await Processor.ExecuteAsync(CancellationToken).ConfigureAwait(false); } } + catch (OperationCanceledException) + { + ProcessorThreadOperationCanceled?.Invoke(this, _eventArgs); + } catch (Exception ex) { ProcessorException?.Invoke(this, new ProcessorExceptionEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, ex)); From 0b18927b46052473eb2cbcc531efcbfa6462e929 Mon Sep 17 00:00:00 2001 From: Eben Date: Thu, 28 Dec 2023 11:33:58 +0200 Subject: [PATCH 15/23] - fixtures --- Shuttle.Core.Threading.Tests/MockProcessor.cs | 1 - .../ProcessorThreadFixture.cs | 36 +++---- .../ProcessorThreadPoolFixture.cs | 100 ++++++++++++++++++ .../IProcessorThreadPool.cs | 2 + .../ProcessorExceptionEventArgs.cs | 8 +- Shuttle.Core.Threading/ProcessorThread.cs | 19 ++-- .../ProcessorThreadCreatedEventArgs.cs | 14 +++ .../ProcessorThreadEventArgs.cs | 9 +- Shuttle.Core.Threading/ProcessorThreadPool.cs | 34 +++--- .../ProcessorThreadStoppedEventArgs.cs | 4 +- 10 files changed, 168 insertions(+), 59 deletions(-) create mode 100644 Shuttle.Core.Threading.Tests/ProcessorThreadPoolFixture.cs create mode 100644 Shuttle.Core.Threading/ProcessorThreadCreatedEventArgs.cs diff --git a/Shuttle.Core.Threading.Tests/MockProcessor.cs b/Shuttle.Core.Threading.Tests/MockProcessor.cs index f631df3..93400f9 100644 --- a/Shuttle.Core.Threading.Tests/MockProcessor.cs +++ b/Shuttle.Core.Threading.Tests/MockProcessor.cs @@ -23,7 +23,6 @@ public void Execute(CancellationToken cancellationToken) public async Task ExecuteAsync(CancellationToken cancellationToken) { await Task.Delay(_executionDuration, cancellationToken).ConfigureAwait(false); - ExecutionCount++; } } \ No newline at end of file diff --git a/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs b/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs index b5796fd..fba0998 100644 --- a/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs +++ b/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs @@ -8,60 +8,60 @@ namespace Shuttle.Core.Threading.Tests; public class ProcessorThreadFixture { [Test] - public void Should_be_able_to_execute_processor() + public void Should_be_able_to_execute_processor_thread() { - Should_be_able_to_execute_processor_async(true).GetAwaiter().GetResult(); + Should_be_able_to_execute_processor_thread_async(true).GetAwaiter().GetResult(); } [Test] - public async Task Should_be_able_to_execute_processor_async() + public async Task Should_be_able_to_execute_processor_thread_async() { - await Should_be_able_to_execute_processor_async(false); + await Should_be_able_to_execute_processor_thread_async(false); } - private async Task Should_be_able_to_execute_processor_async(bool sync) + private async Task Should_be_able_to_execute_processor_thread_async(bool sync) { - const int executionCount = 5; + const int minimumExecutionCount = 5; var executionDuration = TimeSpan.FromMilliseconds(200); var mockProcessor = new MockProcessor(executionDuration); - var processorThread = new ProcessorThread("test", mockProcessor, new ProcessorThreadOptions()); + var processorThread = new ProcessorThread("thread", mockProcessor, new ProcessorThreadOptions()); var cancellationTokenSource = new CancellationTokenSource(); var cancellationToken = cancellationTokenSource.Token; processorThread.ProcessorException += (sender, args) => { - Console.WriteLine($@"{DateTime.Now:O} - [ProcessorException] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId} / exception = '{args.Exception}'"); + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorException] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / exception = '{args.Exception}'"); }; processorThread.ProcessorExecuting += (sender, args) => { - Console.WriteLine($@"{DateTime.Now:O} - [ProcessorExecuting] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorExecuting] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); }; processorThread.ProcessorThreadActive += (sender, args) => { - Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadActive] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadActive] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); }; processorThread.ProcessorThreadStarting += (sender, args) => { - Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStarting] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStarting] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); }; processorThread.ProcessorThreadStopped += (sender, args) => { - Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId} / aborted = '{args.Aborted}'"); + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / aborted = '{args.Aborted}'"); }; processorThread.ProcessorThreadStopping += (sender, args) => { - Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopping] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopping] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); }; processorThread.ProcessorThreadOperationCanceled += (sender, args) => { - Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadOperationCanceled] : name = '{args.Name}' / processor = '{args.ProcessorTypeFullName}' / managed thread id = {args.ManagedThreadId}"); + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadOperationCanceled] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); }; @@ -74,10 +74,10 @@ private async Task Should_be_able_to_execute_processor_async(bool sync) await processorThread.StartAsync(); } - var timeout = DateTime.Now.AddSeconds(5); + var timeout = DateTime.Now.AddSeconds(500); var timedOut = false; - while (mockProcessor.ExecutionCount < executionCount && !timedOut) + while (mockProcessor.ExecutionCount <= minimumExecutionCount && !timedOut) { await Task.Delay(25, cancellationToken).ConfigureAwait(false); @@ -86,8 +86,8 @@ private async Task Should_be_able_to_execute_processor_async(bool sync) cancellationTokenSource.Cancel(); - processorThread.Stop(executionDuration * 2); + processorThread.Stop(); - Assert.That(timedOut, Is.False, $"[TIMEOUT] : Did not complete {executionCount} executions before {timeout:O}"); + Assert.That(timedOut, Is.False, $"[TIMEOUT] : Did not complete {minimumExecutionCount} executions before {timeout:O}"); } } \ No newline at end of file diff --git a/Shuttle.Core.Threading.Tests/ProcessorThreadPoolFixture.cs b/Shuttle.Core.Threading.Tests/ProcessorThreadPoolFixture.cs new file mode 100644 index 0000000..28a060f --- /dev/null +++ b/Shuttle.Core.Threading.Tests/ProcessorThreadPoolFixture.cs @@ -0,0 +1,100 @@ +using NUnit.Framework; +using System.Threading.Tasks; +using System.Threading; +using System; +using System.Linq; +using Moq; + +namespace Shuttle.Core.Threading.Tests; + +public class ProcessorThreadPoolFixture +{ + [Test] + public void Should_be_able_to_execute_processor_thread_pool() + { + Should_be_able_to_execute_processor_thread_pool_async(true).GetAwaiter().GetResult(); + } + + [Test] + public async Task Should_be_able_to_execute_processor_thread_pool_async() + { + await Should_be_able_to_execute_processor_thread_pool_async(false); + } + + private async Task Should_be_able_to_execute_processor_thread_pool_async(bool sync) + { + const int minimumExecutionCount = 5; + + var executionDuration = TimeSpan.FromMilliseconds(500); + var cancellationTokenSource = new CancellationTokenSource(); + var cancellationToken = cancellationTokenSource.Token; + var processorFactory = new Mock(); + + processorFactory.Setup(m => m.Create()).Returns(() => new MockProcessor(executionDuration)); + + var processorThreadPool = new ProcessorThreadPool("thread-pool", 5, processorFactory.Object, new ProcessorThreadOptions()); + + processorThreadPool.ProcessorThreadCreated += (sender, args) => + { + args.ProcessorThread.ProcessorException += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorException] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / exception = '{args.Exception}'"); + }; + + args.ProcessorThread.ProcessorExecuting += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorExecuting] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); + }; + + args.ProcessorThread.ProcessorThreadActive += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadActive] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); + }; + + args.ProcessorThread.ProcessorThreadStarting += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStarting] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); + }; + + args.ProcessorThread.ProcessorThreadStopped += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / aborted = '{args.Aborted}'"); + }; + + args.ProcessorThread.ProcessorThreadStopping += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopping] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); + }; + + args.ProcessorThread.ProcessorThreadOperationCanceled += (sender, args) => + { + Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadOperationCanceled] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); + }; + }; + + if (sync) + { + processorThreadPool.Start(); + } + else + { + await processorThreadPool.StartAsync(); + } + + var timeout = DateTime.Now.AddSeconds(5); + var timedOut = false; + + while (processorThreadPool.ProcessorThreads.Any(item => ((MockProcessor)item.Processor).ExecutionCount <= minimumExecutionCount && !timedOut)) + { + await Task.Delay(25, cancellationToken).ConfigureAwait(false); + + timedOut = DateTime.Now >= timeout; + } + + cancellationTokenSource.Cancel(); + + processorThreadPool.Stop(); + + Assert.That(timedOut, Is.False, $"[TIMEOUT] : Did not complete {minimumExecutionCount} executions before {timeout:O}"); + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/IProcessorThreadPool.cs b/Shuttle.Core.Threading/IProcessorThreadPool.cs index 78e1389..0e7e7e8 100644 --- a/Shuttle.Core.Threading/IProcessorThreadPool.cs +++ b/Shuttle.Core.Threading/IProcessorThreadPool.cs @@ -6,6 +6,8 @@ namespace Shuttle.Core.Threading { public interface IProcessorThreadPool : IDisposable { + event EventHandler ProcessorThreadCreated; + void Stop(); IProcessorThreadPool Start(); Task StartAsync(); diff --git a/Shuttle.Core.Threading/ProcessorExceptionEventArgs.cs b/Shuttle.Core.Threading/ProcessorExceptionEventArgs.cs index fd0acdd..9a73ac1 100644 --- a/Shuttle.Core.Threading/ProcessorExceptionEventArgs.cs +++ b/Shuttle.Core.Threading/ProcessorExceptionEventArgs.cs @@ -3,19 +3,17 @@ namespace Shuttle.Core.Threading { - public class ProcessorExceptionEventArgs : EventArgs + public class ProcessorThreadExceptionEventArgs : EventArgs { public string Name { get; } public int ManagedThreadId { get; } - public string ProcessorTypeFullName { get; } public Exception Exception { get; } - public ProcessorExceptionEventArgs(string name, int managedThreadId, string processorTypeFullName, Exception exception) + public ProcessorThreadExceptionEventArgs(string name, int managedThreadId, Exception exception) { Name = Guard.AgainstNullOrEmptyString(name, nameof(name)); - ProcessorTypeFullName = Guard.AgainstNullOrEmptyString(processorTypeFullName, nameof(processorTypeFullName)); - Exception = Guard.AgainstNull(exception, nameof(exception)); ManagedThreadId = managedThreadId; + Exception = Guard.AgainstNull(exception, nameof(exception)); } } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index e810f9c..2c2ef0c 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -34,7 +34,7 @@ internal void Deactivate() _cancellationTokenSource.Cancel(); } - public event EventHandler ProcessorException; + public event EventHandler ProcessorException; public event EventHandler ProcessorExecuting; public event EventHandler ProcessorThreadActive; public event EventHandler ProcessorThreadStarting; @@ -70,7 +70,7 @@ public void Start(bool sync) _thread.IsBackground = _processorThreadOptions.IsBackground; _thread.Priority = _processorThreadOptions.Priority; - _eventArgs = new ProcessorThreadEventArgs(Name, _thread.ManagedThreadId, Processor.GetType().FullName); + _eventArgs = new ProcessorThreadEventArgs(Name, _thread.ManagedThreadId); ProcessorThreadStarting?.Invoke(this, _eventArgs); @@ -88,7 +88,7 @@ public void Start(bool sync) _started = true; } - public void Stop(TimeSpan timeout) + public void Stop() { if (!_started) { @@ -102,8 +102,14 @@ public void Stop(TimeSpan timeout) Processor.TryDispose(); var aborted = false; + var joinTimeout = _processorThreadOptions.JoinTimeout; - if (_thread.IsAlive && !_thread.Join(timeout)) + if (joinTimeout.TotalSeconds < 1) + { + joinTimeout = TimeSpan.FromSeconds(1); + } + + if (_thread.IsAlive && !_thread.Join(joinTimeout)) { try { @@ -115,8 +121,7 @@ public void Stop(TimeSpan timeout) } } - ProcessorThreadStopped?.Invoke(this, - new ProcessorThreadStoppedEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, aborted)); + ProcessorThreadStopped?.Invoke(this, new ProcessorThreadStoppedEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, aborted)); } private async void Work() @@ -142,7 +147,7 @@ private async void Work() } catch (Exception ex) { - ProcessorException?.Invoke(this, new ProcessorExceptionEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, _eventArgs.ProcessorTypeFullName, ex)); + ProcessorException?.Invoke(this, new ProcessorThreadExceptionEventArgs(_eventArgs.Name, _eventArgs.ManagedThreadId, ex)); } } } diff --git a/Shuttle.Core.Threading/ProcessorThreadCreatedEventArgs.cs b/Shuttle.Core.Threading/ProcessorThreadCreatedEventArgs.cs new file mode 100644 index 0000000..0ea5366 --- /dev/null +++ b/Shuttle.Core.Threading/ProcessorThreadCreatedEventArgs.cs @@ -0,0 +1,14 @@ +using Shuttle.Core.Contract; + +namespace Shuttle.Core.Threading +{ + public class ProcessorThreadCreatedEventArgs : System.EventArgs + { + public ProcessorThread ProcessorThread { get; } + + public ProcessorThreadCreatedEventArgs(ProcessorThread processorThread) + { + ProcessorThread = Guard.AgainstNull(processorThread, nameof(processorThread)); + } + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThreadEventArgs.cs b/Shuttle.Core.Threading/ProcessorThreadEventArgs.cs index 8b18c8a..cd03071 100644 --- a/Shuttle.Core.Threading/ProcessorThreadEventArgs.cs +++ b/Shuttle.Core.Threading/ProcessorThreadEventArgs.cs @@ -7,16 +7,11 @@ public class ProcessorThreadEventArgs : EventArgs { public string Name { get; } public int ManagedThreadId { get; } - public string ProcessorTypeFullName { get; } - public ProcessorThreadEventArgs(string name, int managedThreadId, string processorTypeFullName) + public ProcessorThreadEventArgs(string name, int managedThreadId) { - Guard.AgainstNullOrEmptyString(name, nameof(name)); - Guard.AgainstNullOrEmptyString(processorTypeFullName, nameof(processorTypeFullName)); - - Name = name; + Name = Guard.AgainstNullOrEmptyString(name, nameof(name)); ManagedThreadId = managedThreadId; - ProcessorTypeFullName = processorTypeFullName; } } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThreadPool.cs b/Shuttle.Core.Threading/ProcessorThreadPool.cs index 5ed6c54..4c7b6af 100644 --- a/Shuttle.Core.Threading/ProcessorThreadPool.cs +++ b/Shuttle.Core.Threading/ProcessorThreadPool.cs @@ -11,10 +11,9 @@ public class ProcessorThreadPool : IProcessorThreadPool private readonly string _name; private readonly IProcessorFactory _processorFactory; private readonly ProcessorThreadOptions _processorThreadOptions; - private readonly List _threads = new List(); + private readonly List _processorThreads = new List(); private bool _disposed; private bool _started; - private readonly TimeSpan _joinTimeout; private readonly int _threadCount; public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions) @@ -30,21 +29,16 @@ public ProcessorThreadPool(string name, int threadCount, IProcessorFactory proce _name = name ?? Guid.NewGuid().ToString(); _processorFactory = processorFactory; _processorThreadOptions = processorThreadOptions; - - _joinTimeout = _processorThreadOptions.JoinTimeout; _threadCount = threadCount; - - if (_joinTimeout.TotalSeconds < 1) - { - _joinTimeout = TimeSpan.FromSeconds(1); - } } + public event EventHandler ProcessorThreadCreated; + public void Stop() { - foreach (var thread in _threads) + foreach (var thread in _processorThreads) { - thread.Stop(_joinTimeout); + thread.Stop(); } } @@ -62,7 +56,7 @@ public async Task StartAsync() return this; } - public IEnumerable ProcessorThreads => _threads.AsReadOnly(); + public IEnumerable ProcessorThreads => _processorThreads.AsReadOnly(); public void Dispose() { @@ -82,17 +76,19 @@ private async Task Start(bool sync) while (i++ < _threadCount) { - var thread = new ProcessorThread($"{_name} / {i}", _processorFactory.Create(), _processorThreadOptions); + var processorThread = new ProcessorThread($"{_name} / {i}", _processorFactory.Create(), _processorThreadOptions); + + ProcessorThreadCreated?.Invoke(this, new ProcessorThreadCreatedEventArgs(processorThread)); - _threads.Add(thread); + _processorThreads.Add(processorThread); if (sync) { - thread.Start(); + processorThread.Start(); } else { - await thread.StartAsync(); + await processorThread.StartAsync(); } } @@ -108,14 +104,14 @@ protected virtual void Dispose(bool disposing) if (disposing) { - foreach (var thread in _threads) + foreach (var thread in _processorThreads) { thread.Deactivate(); } - foreach (var thread in _threads) + foreach (var thread in _processorThreads) { - thread.Stop(_joinTimeout); + thread.Stop(); } _processorFactory.TryDispose(); diff --git a/Shuttle.Core.Threading/ProcessorThreadStoppedEventArgs.cs b/Shuttle.Core.Threading/ProcessorThreadStoppedEventArgs.cs index 4c6fd5c..6be9d97 100644 --- a/Shuttle.Core.Threading/ProcessorThreadStoppedEventArgs.cs +++ b/Shuttle.Core.Threading/ProcessorThreadStoppedEventArgs.cs @@ -4,8 +4,8 @@ public class ProcessorThreadStoppedEventArgs : ProcessorThreadEventArgs { public bool Aborted { get; } - public ProcessorThreadStoppedEventArgs(string name, int managedThreadId, string processorTypeFullName, bool aborted) - : base(name, managedThreadId, processorTypeFullName) + public ProcessorThreadStoppedEventArgs(string name, int managedThreadId, bool aborted) + : base(name, managedThreadId) { Aborted = aborted; } From cfc65eee85ce400888e29e8c04cb01e3a6db1da2 Mon Sep 17 00:00:00 2001 From: Eben Date: Mon, 1 Jan 2024 11:54:49 +0200 Subject: [PATCH 16/23] - removed extra line --- Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs b/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs index fba0998..328724b 100644 --- a/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs +++ b/Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs @@ -64,7 +64,6 @@ private async Task Should_be_able_to_execute_processor_thread_async(bool sync) Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadOperationCanceled] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}"); }; - if (sync) { processorThread.Start(); From 487088e5730e548a950230e75fe356b85f6b5b48 Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 7 Jan 2024 17:10:01 +0200 Subject: [PATCH 17/23] - synchronization service --- .../.package/package.nuspec | 2 +- .../ISynchronizationService.cs | 11 +++++++ Shuttle.Core.Threading/ProcessorThread.cs | 5 ++- .../Properties/AssemblyInfo.cs | 2 +- Shuttle.Core.Threading/Resources.Designer.cs | 9 +++++ Shuttle.Core.Threading/Resources.resx | 4 +++ .../SynchronizationService.cs | 33 +++++++++++++++++++ 7 files changed, 63 insertions(+), 3 deletions(-) create mode 100644 Shuttle.Core.Threading/ISynchronizationService.cs create mode 100644 Shuttle.Core.Threading/SynchronizationService.cs diff --git a/Shuttle.Core.Threading/.package/package.nuspec b/Shuttle.Core.Threading/.package/package.nuspec index a253a97..5024ffb 100644 --- a/Shuttle.Core.Threading/.package/package.nuspec +++ b/Shuttle.Core.Threading/.package/package.nuspec @@ -13,7 +13,7 @@ https://github.com/shuttle/Shuttle.Core.Threading Thread-based processing. - Copyright (c) 2023, Eben Roux + Copyright (c) 2024, Eben Roux shuttle threading processor diff --git a/Shuttle.Core.Threading/ISynchronizationService.cs b/Shuttle.Core.Threading/ISynchronizationService.cs new file mode 100644 index 0000000..9302ea4 --- /dev/null +++ b/Shuttle.Core.Threading/ISynchronizationService.cs @@ -0,0 +1,11 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace Shuttle.Core.Threading +{ + public interface ISynchronizationService + { + Task Wait(string name, CancellationToken cancellationToken = default); + void Release(string name); + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 2c2ef0c..91e1fbb 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -138,7 +138,10 @@ private async void Work() } else { - await Processor.ExecuteAsync(CancellationToken).ConfigureAwait(false); + using (ExecutionContext.SuppressFlow()) + { + await Processor.ExecuteAsync(CancellationToken).ConfigureAwait(false); + } } } catch (OperationCanceledException) diff --git a/Shuttle.Core.Threading/Properties/AssemblyInfo.cs b/Shuttle.Core.Threading/Properties/AssemblyInfo.cs index a053c92..5269859 100644 --- a/Shuttle.Core.Threading/Properties/AssemblyInfo.cs +++ b/Shuttle.Core.Threading/Properties/AssemblyInfo.cs @@ -14,7 +14,7 @@ #endif [assembly: AssemblyVersion("13.0.0.0")] -[assembly: AssemblyCopyright("Copyright (c) 2023, Eben Roux")] +[assembly: AssemblyCopyright("Copyright (c) 2024, Eben Roux")] [assembly: AssemblyProduct("Shuttle.Core.Threading")] [assembly: AssemblyCompany("Eben Roux")] [assembly: AssemblyConfiguration("Release")] diff --git a/Shuttle.Core.Threading/Resources.Designer.cs b/Shuttle.Core.Threading/Resources.Designer.cs index d72ae2e..ddc95d8 100644 --- a/Shuttle.Core.Threading/Resources.Designer.cs +++ b/Shuttle.Core.Threading/Resources.Designer.cs @@ -114,6 +114,15 @@ public static string ProcessorThreadStopping { } } + /// + /// Looks up a localized string similar to No syncronization item with name '{0}' has been registered.. + /// + public static string SynchronizationNameException { + get { + return ResourceManager.GetString("SynchronizationNameException", resourceCulture); + } + } + /// /// Looks up a localized string similar to The queue handler configuration requires thread count of at least 1. The input queue can not be processed.. /// diff --git a/Shuttle.Core.Threading/Resources.resx b/Shuttle.Core.Threading/Resources.resx index eccc460..6f2c3d6 100644 --- a/Shuttle.Core.Threading/Resources.resx +++ b/Shuttle.Core.Threading/Resources.resx @@ -140,6 +140,10 @@ [processor thread stopping] : managed thread id = {0} / processor type = {1} {0} = managed thread id, {1} = full type name of processor + + No syncronization item with name '{0}' has been registered. + {0} = name + The queue handler configuration requires thread count of at least 1. The input queue can not be processed. diff --git a/Shuttle.Core.Threading/SynchronizationService.cs b/Shuttle.Core.Threading/SynchronizationService.cs new file mode 100644 index 0000000..1a85ff8 --- /dev/null +++ b/Shuttle.Core.Threading/SynchronizationService.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Shuttle.Core.Contract; + +namespace Shuttle.Core.Threading +{ + public class SynchronizationService : ISynchronizationService + { + private readonly Dictionary _semaphores = new Dictionary(); + + public async Task Wait(string name, CancellationToken cancellationToken = default) + { + if (!_semaphores.ContainsKey(Guard.AgainstNullOrEmptyString(name, nameof(name)))) + { + _semaphores.Add(name, new SemaphoreSlim(1, 1)); + } + + await _semaphores[name].WaitAsync(cancellationToken); + } + + public void Release(string name) + { + if (!_semaphores.ContainsKey(Guard.AgainstNullOrEmptyString(name, nameof(name)))) + { + throw new ApplicationException(string.Format(Resources.SynchronizationNameException, name)); + } + + _semaphores[name].Release(); + } + } +} \ No newline at end of file From af90af527bc090552f6e5826291b8fbcc1f48a73 Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 7 Jan 2024 17:29:44 +0200 Subject: [PATCH 18/23] - synchronization service --- Shuttle.Core.Threading/SynchronizationService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Shuttle.Core.Threading/SynchronizationService.cs b/Shuttle.Core.Threading/SynchronizationService.cs index 1a85ff8..429ffba 100644 --- a/Shuttle.Core.Threading/SynchronizationService.cs +++ b/Shuttle.Core.Threading/SynchronizationService.cs @@ -17,7 +17,7 @@ public async Task Wait(string name, CancellationToken cancellationToken = defaul _semaphores.Add(name, new SemaphoreSlim(1, 1)); } - await _semaphores[name].WaitAsync(cancellationToken); + await _semaphores[name].WaitAsync(cancellationToken).ConfigureAwait(false); } public void Release(string name) From 1b7b9126e0cf4563ec53300f6c8798644f51561d Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 7 Jan 2024 17:41:23 +0200 Subject: [PATCH 19/23] - synchronization service --- Shuttle.Core.Threading/ISynchronizationService.cs | 2 +- Shuttle.Core.Threading/SynchronizationService.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Shuttle.Core.Threading/ISynchronizationService.cs b/Shuttle.Core.Threading/ISynchronizationService.cs index 9302ea4..156176b 100644 --- a/Shuttle.Core.Threading/ISynchronizationService.cs +++ b/Shuttle.Core.Threading/ISynchronizationService.cs @@ -5,7 +5,7 @@ namespace Shuttle.Core.Threading { public interface ISynchronizationService { - Task Wait(string name, CancellationToken cancellationToken = default); + Task WaitAsync(string name, CancellationToken cancellationToken = default); void Release(string name); } } \ No newline at end of file diff --git a/Shuttle.Core.Threading/SynchronizationService.cs b/Shuttle.Core.Threading/SynchronizationService.cs index 429ffba..d037905 100644 --- a/Shuttle.Core.Threading/SynchronizationService.cs +++ b/Shuttle.Core.Threading/SynchronizationService.cs @@ -10,7 +10,7 @@ public class SynchronizationService : ISynchronizationService { private readonly Dictionary _semaphores = new Dictionary(); - public async Task Wait(string name, CancellationToken cancellationToken = default) + public async Task WaitAsync(string name, CancellationToken cancellationToken = default) { if (!_semaphores.ContainsKey(Guard.AgainstNullOrEmptyString(name, nameof(name)))) { From a7ea4bae12bfc005f795f2260a6e6cb518eb87d3 Mon Sep 17 00:00:00 2001 From: Eben Date: Thu, 11 Jan 2024 17:55:02 +0200 Subject: [PATCH 20/23] - blocking semaphore --- .../BlockingSemaphoreSlim.cs | 20 ++++++++++ Shuttle.Core.Threading/IBlockingSemaphore.cs | 7 ++++ .../ISynchronizationService.cs | 11 ------ Shuttle.Core.Threading/ProcessorThread.cs | 39 +++++++++++-------- .../SynchronizationService.cs | 33 ---------------- 5 files changed, 50 insertions(+), 60 deletions(-) create mode 100644 Shuttle.Core.Threading/BlockingSemaphoreSlim.cs create mode 100644 Shuttle.Core.Threading/IBlockingSemaphore.cs delete mode 100644 Shuttle.Core.Threading/ISynchronizationService.cs delete mode 100644 Shuttle.Core.Threading/SynchronizationService.cs diff --git a/Shuttle.Core.Threading/BlockingSemaphoreSlim.cs b/Shuttle.Core.Threading/BlockingSemaphoreSlim.cs new file mode 100644 index 0000000..77a18bf --- /dev/null +++ b/Shuttle.Core.Threading/BlockingSemaphoreSlim.cs @@ -0,0 +1,20 @@ +using System.Threading; +using Shuttle.Core.Contract; + +namespace Shuttle.Core.Threading +{ + public class BlockingSemaphoreSlim : IBlockingSemaphore + { + private readonly SemaphoreSlim _semaphoreSlim; + + public BlockingSemaphoreSlim(SemaphoreSlim semaphoreSlim) + { + _semaphoreSlim = Guard.AgainstNull(semaphoreSlim, nameof(semaphoreSlim)); + } + + public void Release() + { + _semaphoreSlim.Release(); + } + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/IBlockingSemaphore.cs b/Shuttle.Core.Threading/IBlockingSemaphore.cs new file mode 100644 index 0000000..45a6b90 --- /dev/null +++ b/Shuttle.Core.Threading/IBlockingSemaphore.cs @@ -0,0 +1,7 @@ +namespace Shuttle.Core.Threading +{ + public interface IBlockingSemaphore + { + void Release(); + } +} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ISynchronizationService.cs b/Shuttle.Core.Threading/ISynchronizationService.cs deleted file mode 100644 index 156176b..0000000 --- a/Shuttle.Core.Threading/ISynchronizationService.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Threading; -using System.Threading.Tasks; - -namespace Shuttle.Core.Threading -{ - public interface ISynchronizationService - { - Task WaitAsync(string name, CancellationToken cancellationToken = default); - void Release(string name); - } -} \ No newline at end of file diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 91e1fbb..821fc7c 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -13,8 +13,8 @@ public class ProcessorThread private ProcessorThreadEventArgs _eventArgs; private bool _started; - private Thread _thread; private bool _sync; + private Thread _thread; public ProcessorThread(string name, IProcessor processor, ProcessorThreadOptions processorThreadOptions) { @@ -37,23 +37,16 @@ internal void Deactivate() public event EventHandler ProcessorException; public event EventHandler ProcessorExecuting; public event EventHandler ProcessorThreadActive; + public event EventHandler ProcessorThreadOperationCanceled; public event EventHandler ProcessorThreadStarting; public event EventHandler ProcessorThreadStopped; public event EventHandler ProcessorThreadStopping; - public event EventHandler ProcessorThreadOperationCanceled; public void Start() { Start(true); } - public async Task StartAsync() - { - Start(false); - - await Task.CompletedTask; - } - public void Start(bool sync) { if (_started) @@ -88,6 +81,13 @@ public void Start(bool sync) _started = true; } + public async Task StartAsync() + { + Start(false); + + await Task.CompletedTask; + } + public void Stop() { if (!_started) @@ -132,17 +132,24 @@ private async void Work() try { - if (_sync) - { - Processor.Execute(CancellationToken); - } - else + Task task = null; + + using (ExecutionContext.SuppressFlow()) { - using (ExecutionContext.SuppressFlow()) + if (_sync) + { + Processor.Execute(CancellationToken); + } + else { - await Processor.ExecuteAsync(CancellationToken).ConfigureAwait(false); + task = Processor.ExecuteAsync(CancellationToken); } } + + if (task != null) + { + await task; + } } catch (OperationCanceledException) { diff --git a/Shuttle.Core.Threading/SynchronizationService.cs b/Shuttle.Core.Threading/SynchronizationService.cs deleted file mode 100644 index d037905..0000000 --- a/Shuttle.Core.Threading/SynchronizationService.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Shuttle.Core.Contract; - -namespace Shuttle.Core.Threading -{ - public class SynchronizationService : ISynchronizationService - { - private readonly Dictionary _semaphores = new Dictionary(); - - public async Task WaitAsync(string name, CancellationToken cancellationToken = default) - { - if (!_semaphores.ContainsKey(Guard.AgainstNullOrEmptyString(name, nameof(name)))) - { - _semaphores.Add(name, new SemaphoreSlim(1, 1)); - } - - await _semaphores[name].WaitAsync(cancellationToken).ConfigureAwait(false); - } - - public void Release(string name) - { - if (!_semaphores.ContainsKey(Guard.AgainstNullOrEmptyString(name, nameof(name)))) - { - throw new ApplicationException(string.Format(Resources.SynchronizationNameException, name)); - } - - _semaphores[name].Release(); - } - } -} \ No newline at end of file From f0160979c128b4784073fb4275d88b06c16871b3 Mon Sep 17 00:00:00 2001 From: Eben Date: Sat, 13 Jan 2024 14:00:21 +0200 Subject: [PATCH 21/23] - package update --- .../Shuttle.Core.Threading.Tests.csproj | 6 +++--- Shuttle.Core.Threading/.package/package.nuspec | 2 +- Shuttle.Core.Threading/Shuttle.Core.Threading.csproj | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Shuttle.Core.Threading.Tests/Shuttle.Core.Threading.Tests.csproj b/Shuttle.Core.Threading.Tests/Shuttle.Core.Threading.Tests.csproj index 2d6c6b4..aacd873 100644 --- a/Shuttle.Core.Threading.Tests/Shuttle.Core.Threading.Tests.csproj +++ b/Shuttle.Core.Threading.Tests/Shuttle.Core.Threading.Tests.csproj @@ -6,9 +6,9 @@ - - - + + + diff --git a/Shuttle.Core.Threading/.package/package.nuspec b/Shuttle.Core.Threading/.package/package.nuspec index 5024ffb..b2e0b82 100644 --- a/Shuttle.Core.Threading/.package/package.nuspec +++ b/Shuttle.Core.Threading/.package/package.nuspec @@ -17,7 +17,7 @@ shuttle threading processor - + diff --git a/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj b/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj index d8d65c9..ba6ef36 100644 --- a/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj +++ b/Shuttle.Core.Threading/Shuttle.Core.Threading.csproj @@ -15,7 +15,7 @@ - + From 1f2b8ca00f9b7db9b19279945ef502db6a8d1a41 Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 28 Jan 2024 10:41:10 +0200 Subject: [PATCH 22/23] - removed `ExecutionContext.SuppressFlow()` --- Shuttle.Core.Threading/ProcessorThread.cs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/Shuttle.Core.Threading/ProcessorThread.cs b/Shuttle.Core.Threading/ProcessorThread.cs index 821fc7c..865b9c6 100644 --- a/Shuttle.Core.Threading/ProcessorThread.cs +++ b/Shuttle.Core.Threading/ProcessorThread.cs @@ -132,23 +132,13 @@ private async void Work() try { - Task task = null; - - using (ExecutionContext.SuppressFlow()) + if (_sync) { - if (_sync) - { - Processor.Execute(CancellationToken); - } - else - { - task = Processor.ExecuteAsync(CancellationToken); - } + Processor.Execute(CancellationToken); } - - if (task != null) + else { - await task; + await Processor.ExecuteAsync(CancellationToken); } } catch (OperationCanceledException) From d22b6d3f03b91af35b9b00f50862eed01ce539b8 Mon Sep 17 00:00:00 2001 From: Eben Date: Sun, 21 Apr 2024 16:18:46 +0200 Subject: [PATCH 23/23] - docs --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 89d65f8..d3d1f4d 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Provides various classes and interfaces to facilitate thread-based processing. public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions); ``` -Each thread pool has a `name` used only for identyfing the pool. The `threadCount` determines the number of `ProcessorThread` instances in the pool. Each `ProcessorThread` calls the `IProcessor.Execute(CancellationToken)` instance provided by the `IProcessorFactory.Create()` method in a loop while the `CancellationToken.IsCancellationRequested` returns `false`. +Each thread pool has a `name` used only for identyfing the pool. The `threadCount` determines the number of `ProcessorThread` instances in the pool. Each `ProcessorThread` calls the `IProcessor.Execute(CancellationToken)` method, or `IProcessor.ExecuteAsync(CancellationToken)` method if started asynchronously, on the instance provided by the `IProcessorFactory.Create()` method in a loop while the `CancellationToken.IsCancellationRequested` returns `false`. ## ProcessorThreadOptions