Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Provides various classes and interfaces to facilitate thread-based processing.
public ProcessorThreadPool(string name, int threadCount, IProcessorFactory processorFactory, ProcessorThreadOptions processorThreadOptions);
```

Each thread pool has a `name` used only for identyfing the pool. The `threadCount` determines the number of `ProcessorThread` instances in the pool. Each `ProcessorThread` calls the `IProcessor.Execute(CancellationToken)` instance provided by the `IProcessorFactory.Create()` method in a loop while the `CancellationToken.IsCancellationRequested` returns `false`.
Each thread pool has a `name` used only for identyfing the pool. The `threadCount` determines the number of `ProcessorThread` instances in the pool. Each `ProcessorThread` calls the `IProcessor.Execute(CancellationToken)` method, or `IProcessor.ExecuteAsync(CancellationToken)` method if started asynchronously, on the instance provided by the `IProcessorFactory.Create()` method in a loop while the `CancellationToken.IsCancellationRequested` returns `false`.

## ProcessorThreadOptions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Shuttle.Core.Threading.Tests
{
[TestFixture]
public class CallContextFixture
public class AmbientContextFixture
{
[Test]
public void Should_be_able_to_flow_data()
Expand All @@ -26,24 +26,24 @@ public void Should_be_able_to_flow_data()
Task.WaitAll(
Task.Run(() =>
{
CallContext.SetData("d1", d1);
new Thread(() => t10 = CallContext.GetData("d1")).Start();
AmbientContext.SetData("d1", d1);
new Thread(() => t10 = AmbientContext.GetData("d1")).Start();
Task.WaitAll(
Task.Run(() => t1 = CallContext.GetData("d1"))
.ContinueWith(t => Task.Run(() => t11 = CallContext.GetData("d1"))),
Task.Run(() => t12 = CallContext.GetData("d1")),
Task.Run(() => t13 = CallContext.GetData("d1"))
Task.Run(() => t1 = AmbientContext.GetData("d1"))
.ContinueWith(t => Task.Run(() => t11 = AmbientContext.GetData("d1"))),
Task.Run(() => t12 = AmbientContext.GetData("d1")),
Task.Run(() => t13 = AmbientContext.GetData("d1"))
);
}),
Task.Run(() =>
{
CallContext.SetData("d2", d2);
new Thread(() => t20 = CallContext.GetData("d2")).Start();
AmbientContext.SetData("d2", d2);
new Thread(() => t20 = AmbientContext.GetData("d2")).Start();
Task.WaitAll(
Task.Run(() => t2 = CallContext.GetData("d2"))
.ContinueWith(t => Task.Run(() => t21 = CallContext.GetData("d2"))),
Task.Run(() => t22 = CallContext.GetData("d2")),
Task.Run(() => t23 = CallContext.GetData("d2"))
Task.Run(() => t2 = AmbientContext.GetData("d2"))
.ContinueWith(t => Task.Run(() => t21 = AmbientContext.GetData("d2"))),
Task.Run(() => t22 = AmbientContext.GetData("d2")),
Task.Run(() => t23 = AmbientContext.GetData("d2"))
);
})
);
Expand All @@ -60,8 +60,8 @@ public void Should_be_able_to_flow_data()
Assert.That(d2, Is.SameAs(t22));
Assert.That(d2, Is.SameAs(t23));

Assert.Null(CallContext.GetData("d1"));
Assert.Null(CallContext.GetData("d2"));
Assert.Null(AmbientContext.GetData("d1"));
Assert.Null(AmbientContext.GetData("d2"));
}
}
}
28 changes: 28 additions & 0 deletions Shuttle.Core.Threading.Tests/MockProcessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Shuttle.Core.Threading.Tests;

public class MockProcessor : IProcessor
{
private readonly TimeSpan _executionDuration;

public int ExecutionCount { get; private set; }

public MockProcessor(TimeSpan executionDuration)
{
_executionDuration = executionDuration;
}

public void Execute(CancellationToken cancellationToken)
{
ExecuteAsync(cancellationToken).GetAwaiter().GetResult();
}

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
await Task.Delay(_executionDuration, cancellationToken).ConfigureAwait(false);
ExecutionCount++;
}
}
92 changes: 92 additions & 0 deletions Shuttle.Core.Threading.Tests/ProcessorThreadFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;

namespace Shuttle.Core.Threading.Tests;

public class ProcessorThreadFixture
{
[Test]
public void Should_be_able_to_execute_processor_thread()
{
Should_be_able_to_execute_processor_thread_async(true).GetAwaiter().GetResult();
}

[Test]
public async Task Should_be_able_to_execute_processor_thread_async()
{
await Should_be_able_to_execute_processor_thread_async(false);
}

private async Task Should_be_able_to_execute_processor_thread_async(bool sync)
{
const int minimumExecutionCount = 5;

var executionDuration = TimeSpan.FromMilliseconds(200);
var mockProcessor = new MockProcessor(executionDuration);
var processorThread = new ProcessorThread("thread", mockProcessor, new ProcessorThreadOptions());
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

processorThread.ProcessorException += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorException] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / exception = '{args.Exception}'");
};

processorThread.ProcessorExecuting += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorExecuting] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

processorThread.ProcessorThreadActive += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadActive] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

processorThread.ProcessorThreadStarting += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStarting] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

processorThread.ProcessorThreadStopped += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / aborted = '{args.Aborted}'");
};

processorThread.ProcessorThreadStopping += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopping] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

processorThread.ProcessorThreadOperationCanceled += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadOperationCanceled] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

if (sync)
{
processorThread.Start();
}
else
{
await processorThread.StartAsync();
}

var timeout = DateTime.Now.AddSeconds(500);
var timedOut = false;

while (mockProcessor.ExecutionCount <= minimumExecutionCount && !timedOut)
{
await Task.Delay(25, cancellationToken).ConfigureAwait(false);

timedOut = DateTime.Now >= timeout;
}

cancellationTokenSource.Cancel();

processorThread.Stop();

Assert.That(timedOut, Is.False, $"[TIMEOUT] : Did not complete {minimumExecutionCount} executions before {timeout:O}");
}
}
100 changes: 100 additions & 0 deletions Shuttle.Core.Threading.Tests/ProcessorThreadPoolFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using NUnit.Framework;
using System.Threading.Tasks;
using System.Threading;
using System;
using System.Linq;
using Moq;

namespace Shuttle.Core.Threading.Tests;

public class ProcessorThreadPoolFixture
{
[Test]
public void Should_be_able_to_execute_processor_thread_pool()
{
Should_be_able_to_execute_processor_thread_pool_async(true).GetAwaiter().GetResult();
}

[Test]
public async Task Should_be_able_to_execute_processor_thread_pool_async()
{
await Should_be_able_to_execute_processor_thread_pool_async(false);
}

private async Task Should_be_able_to_execute_processor_thread_pool_async(bool sync)
{
const int minimumExecutionCount = 5;

var executionDuration = TimeSpan.FromMilliseconds(500);
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
var processorFactory = new Mock<IProcessorFactory>();

processorFactory.Setup(m => m.Create()).Returns(() => new MockProcessor(executionDuration));

var processorThreadPool = new ProcessorThreadPool("thread-pool", 5, processorFactory.Object, new ProcessorThreadOptions());

processorThreadPool.ProcessorThreadCreated += (sender, args) =>
{
args.ProcessorThread.ProcessorException += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorException] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / exception = '{args.Exception}'");
};

args.ProcessorThread.ProcessorExecuting += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorExecuting] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

args.ProcessorThread.ProcessorThreadActive += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadActive] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

args.ProcessorThread.ProcessorThreadStarting += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStarting] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

args.ProcessorThread.ProcessorThreadStopped += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopped] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId} / aborted = '{args.Aborted}'");
};

args.ProcessorThread.ProcessorThreadStopping += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadStopping] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};

args.ProcessorThread.ProcessorThreadOperationCanceled += (sender, args) =>
{
Console.WriteLine($@"{DateTime.Now:O} - [ProcessorThreadOperationCanceled] : name = '{args.Name}' / execution count = {((MockProcessor)((ProcessorThread)sender).Processor).ExecutionCount} / managed thread id = {args.ManagedThreadId}");
};
};

if (sync)
{
processorThreadPool.Start();
}
else
{
await processorThreadPool.StartAsync();
}

var timeout = DateTime.Now.AddSeconds(5);
var timedOut = false;

while (processorThreadPool.ProcessorThreads.Any(item => ((MockProcessor)item.Processor).ExecutionCount <= minimumExecutionCount && !timedOut))
{
await Task.Delay(25, cancellationToken).ConfigureAwait(false);

timedOut = DateTime.Now >= timeout;
}

cancellationTokenSource.Cancel();

processorThreadPool.Stop();

Assert.That(timedOut, Is.False, $"[TIMEOUT] : Did not complete {minimumExecutionCount} executions before {timeout:O}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.1" />
<PackageReference Include="Moq" Version="4.18.4" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.3.1" />
<PackageReference Include="Moq" Version="4.20.70" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>

<ItemGroup>
Expand Down
53 changes: 38 additions & 15 deletions Shuttle.Core.Threading.Tests/ThreadActivityFixture.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,55 @@
using System;
using System.Threading;
using Moq;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using NUnit.Framework;

namespace Shuttle.Core.Threading.Tests
namespace Shuttle.Core.Threading.Tests;

[TestFixture]
public class ThreadActivityFixture
{
[TestFixture]
public class ThreadActivityFixture
[Test]
public void Should_be_able_to_have_the_thread_wait()
{
var activity = new ThreadActivity(
new[]
{
TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(500)
});

var start = DateTime.Now;
var token = new CancellationToken(false);

activity.Waiting(token);

Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 250);

activity.Waiting(token);

Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 750);
}

[Test]
public async Task Should_be_able_to_have_the_thread_wait_async()
{
[Test]
public void Should_be_able_to_have_the_thread_wait()
{
var activity = new ThreadActivity(new[]
var activity = new ThreadActivity(
new[]
{
TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(500)
});

var start = DateTime.Now;
var token = new CancellationToken(false);
var start = DateTime.Now;
var token = new CancellationToken(false);

activity.Waiting(token);
await activity.WaitingAsync(token);

Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 250);
Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 250);

activity.Waiting(token);
await activity.WaitingAsync(token);

Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 750);
}
Assert.IsTrue((DateTime.Now - start).TotalMilliseconds >= 750);
}
}
Loading