Skip to content

Commit

Permalink
added custom barrier for system runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Doraku committed Sep 23, 2018
1 parent cc4a03a commit fa5936e
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 10 deletions.
4 changes: 2 additions & 2 deletions source/DefaultEcs.Benchmark/Program.cs
Expand Up @@ -11,8 +11,8 @@ private static void Main()
//typeof(DefaultEcs.CreateEntity),
//typeof(DefaultEcs.EntitySetEnumeration),
//typeof(DefaultEcs.EntitySetWithComponentEnumeration),
//typeof(DefaultEcs.System),
typeof(DefaultEcs.Serialization),
typeof(DefaultEcs.System),
//typeof(DefaultEcs.Serialization),
//typeof(Performance.SingleComponentEntityEnumeration),
//typeof(Performance.DoubleComponentEntityEnumeration),
//typeof(Message.Publish),
Expand Down
36 changes: 28 additions & 8 deletions source/DefaultEcs/System/SystemRunner.cs
Expand Up @@ -4,6 +4,7 @@
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using DefaultEcs.Technical.System;

namespace DefaultEcs.System
{
Expand All @@ -18,7 +19,7 @@ public sealed class SystemRunner<T> : IDisposable
internal static readonly SystemRunner<T> Default = new SystemRunner<T>(1);

private readonly CancellationTokenSource _disposeHandle;
private readonly Barrier _barrier;
private readonly WorkerBarrier _barrier;
private readonly Task[] _tasks;

private volatile ASystem<T> _currentSystem;
Expand All @@ -37,8 +38,8 @@ public SystemRunner(int degreeOfParallelism)
IEnumerable<int> indices = degreeOfParallelism >= 1 ? Enumerable.Range(0, degreeOfParallelism - 1) : throw new ArgumentException("Argument cannot be inferior to one", nameof(degreeOfParallelism));

_disposeHandle = new CancellationTokenSource();
_barrier = degreeOfParallelism > 1 ? new Barrier(degreeOfParallelism) : null;
_tasks = indices.Select(index => new Task(Update, index, TaskCreationOptions.LongRunning)).ToArray();
_barrier = degreeOfParallelism > 1 ? new WorkerBarrier(degreeOfParallelism) : null;

foreach (Task task in _tasks)
{
Expand All @@ -54,14 +55,24 @@ private void Update(object state)
{
int index = (int)state;

_barrier.SignalAndWait();
while (!_barrier.StartWorker())
{
_barrier.Wait();
}

while (!_disposeHandle.IsCancellationRequested)
{
_currentSystem.Update(index, _tasks.Length);

_barrier.SignalAndWait();
_barrier.SignalAndWait();
while (!_barrier.AllStarted())
{
_barrier.Wait();
}
_barrier.Signal();
while (!_barrier.StartWorker())
{
_barrier.Wait();
}
}
}

Expand All @@ -70,11 +81,20 @@ internal void Update(ASystem<T> system)
{
_currentSystem = system;

_barrier?.SignalAndWait();
_barrier?.Start();

system.Update(_tasks.Length, _tasks.Length);

_barrier?.SignalAndWait();
while (!(_barrier?.AllStarted() ?? true))
{
_barrier.Wait();
}
_barrier?.Signal();
while (!(_barrier?.IsDone() ?? true))
{
_barrier.Wait();
}
_barrier?.End();
}

#endregion
Expand All @@ -88,7 +108,7 @@ public void Dispose()
{
_disposeHandle.Cancel();

_barrier?.SignalAndWait();
_barrier.Start();

Task.WaitAll(_tasks);

Expand Down
80 changes: 80 additions & 0 deletions source/DefaultEcs/Technical/System/WorkerBarrier.cs
@@ -0,0 +1,80 @@
using System;
using System.Runtime.CompilerServices;
using System.Threading;

namespace DefaultEcs.Technical.System
{
internal sealed class WorkerBarrier : IDisposable
{
#region Fields

private readonly int _count;
private readonly ManualResetEventSlim _handle;

private int _runningCount;
private int _startingCount;

#endregion

public WorkerBarrier(int workerCount)
{
_count = workerCount;
_handle = new ManualResetEventSlim();

_runningCount = 0;
_startingCount = 0;
}

#region Methods

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Start()
{
Interlocked.Exchange(ref _runningCount, _count);
Interlocked.Exchange(ref _startingCount, _count - 1);
_handle.Set();
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool StartWorker()
{
if (Volatile.Read(ref _startingCount) > 0)
{
Interlocked.Decrement(ref _startingCount);

return true;
}

return false;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool AllStarted() => Volatile.Read(ref _startingCount) == 0;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Signal()
{
if (Interlocked.Decrement(ref _runningCount) == 0)
{
_handle.Set();
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool IsDone() => Volatile.Read(ref _runningCount) == 0;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Wait() => _handle.Wait();

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void End() => _handle.Reset();

#endregion

#region IDisposable

public void Dispose() => _handle.Dispose();

#endregion
}
}

0 comments on commit fa5936e

Please sign in to comment.