Skip to content

Commit

Permalink
Reversed experimental changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
genaray committed Feb 15, 2024
1 parent 7ffa0ee commit 92a6b86
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 70 deletions.
73 changes: 26 additions & 47 deletions src/Arch/Core/Jobs/Jobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,35 @@ public void Execute(ref Chunk chunk)
/// is an <see cref="IJob"/> that can be scheduled using the <see cref="JobScheduler"/> and the <see cref="World"/> to iterate multithreaded over chunks.
/// </summary>
/// <typeparam name="T">The generic type that implements the <see cref="IChunkJob"/> interface.</typeparam>
public sealed class ChunkIterationJob<T> : IJobParallelFor where T : IChunkJob
public sealed class ChunkIterationJob<T> : IJob where T : IChunkJob
{

/// <summary>
/// Represents a section of chunk iteration from one archetype.
/// Initializes a new instance of the <see cref="ChunkIterationJob{T}"/> class.
/// </summary>
private struct ChunkIterationPart
public ChunkIterationJob()
{
public int Start;
public int Size;
public Chunk[]? Chunks;
Chunks = Array.Empty<Chunk>();
}

/// <summary>
/// Initializes a new instance of the <see cref="ChunkIterationJob{T}"/> class.
/// </summary>
public ChunkIterationJob()
/// <param name="start">The start at which this job begins to process the <see cref="Chunks"/>.</param>
/// <param name="size">The size or lengths, how man <see cref="Chunks"/> this job will process.</param>
/// <param name="chunks">The <see cref="Chunk"/> array being processed.</param>
public ChunkIterationJob(int start, int size, Chunk[] chunks)
{
Parts = new List<ChunkIterationPart>();
Start = start;
Size = size;
Chunks = chunks;
}

/// <summary>
/// A <see cref="Chunk"/> array, this will be processed.
/// </summary>
public Chunk[] Chunks { get; set; }

/// <summary>
/// An instance of the generic type <typeparamref name="T"/>, being invoked upon each chunk.
/// </summary>
Expand All @@ -162,51 +170,22 @@ public ChunkIterationJob()
/// </summary>
public int Size { get; set; }


private List<ChunkIterationPart> Parts { get; set; }

public int ThreadCount { get; } = Environment.ProcessorCount;
public int BatchSize { get; } = 16;
/// <summary>
/// The start index.
/// </summary>
public int Start;

/// <summary>
/// Add an array of chunks to be processed by this job.
/// Iterates over all <see cref="Chunks"/> between <see cref="Start"/> and <see cref="Size"/> and calls <see cref="Instance"/>.
/// </summary>
/// <param name="chunks">The chunks to add.</param>
/// <param name="start">The first chunk to process in <paramref name="chunks"/></param>
/// <param name="size">The amount of chunks to process in <paramref name="chunks"/></param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void AddChunks(Chunk[] chunks, int start, int size)
public void Execute()
{
Parts.Add(new ChunkIterationPart{
Chunks = chunks,
Start = start,
Size = size
});
}
ref var chunk = ref Chunks.DangerousGetReferenceAt(Start);

public void Execute(int index)
{
var sizeSoFar = 0;
for (var i = 0; i < Parts.Count; i++)
for (var chunkIndex = 0; chunkIndex < Size; chunkIndex++)
{
// If we're about to go over, we're ready to execute
var part = Parts[i];
if (sizeSoFar + part.Size > index)
{
// this had better be not null!
ref var chunk = ref part.Chunks!.DangerousGetReferenceAt(index - sizeSoFar + part.Start);
Instance?.Execute(ref chunk);
return;
}

sizeSoFar += part.Size;
ref var currentChunk = ref Unsafe.Add(ref chunk, chunkIndex);
Instance?.Execute(ref currentChunk);
}

throw new InvalidOperationException("Reached end of chunk, but could not find the correct index!");
}

public void Finish()
{
Parts.Clear();
}
}
47 changes: 25 additions & 22 deletions src/Arch/Core/Jobs/World.Jobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public partial class World
/// <summary>
/// A cache used for the parallel queries to prevent list allocations.
/// </summary>
internal List<IJobParallelFor> JobsCache { get; set; }
internal List<IJob> JobsCache { get; set; }

/// <summary>
/// Searches all matching <see cref="Entity"/>'s by a <see cref="QueryDescription"/> and calls the passed <see cref="ForEach"/> delegate.
Expand Down Expand Up @@ -97,39 +97,42 @@ public void ParallelQuery(in QueryDescription queryDescription, ForEach forEntit
throw new Exception("JobScheduler was not initialized, create one instance of JobScheduler. This creates a singleton used for parallel iterations.");
}

if (!SharedJobScheduler.IsMainThread)
{
throw new Exception("JobScheduler must be called from MainThread.");
}

// Cast pool in an unsafe fast way and run the query.
var query = Query(in queryDescription);

var pool = JobMeta<ChunkIterationJob<T>>.Pool;
var job = pool.Get();
job.Instance = innerJob;

var size = 0;
var query = Query(in queryDescription);
foreach (var archetype in query.GetArchetypeIterator())
{
var archetypeSize = archetype.ChunkCount;
var part = new RangePartitioner(Environment.ProcessorCount, archetypeSize);
foreach (var range in part)
{
job.AddChunks(archetype.Chunks, range.Start, range.Length);
size += range.Length;
var job = pool.Get();
job.Start = range.Start;
job.Size = range.Length;
job.Chunks = archetype.Chunks;
job.Instance = innerJob;

var jobHandle = SharedJobScheduler.Schedule(job);
JobsCache.Add(job);
JobHandles.Add(jobHandle);
}
}

// Schedule, flush, wait, return.
var handle = SharedJobScheduler.Schedule(job, size);
SharedJobScheduler.Flush();
//handle.Complete();
// Schedule, flush, wait, return.
var handle = SharedJobScheduler.CombineDependencies(JobHandles.Span);
SharedJobScheduler.Flush();
handle.Complete();

pool.Return(job);
for (var index = 0; index < JobsCache.Count; index++)
{
var job = Unsafe.As<ChunkIterationJob<T>>(JobsCache[index]);
pool.Return(job);
}

JobHandles.Clear();
JobsCache.Clear();
}
}

/*
/// <summary>
/// Finds all matching <see cref="Chunk"/>'s by a <see cref="QueryDescription"/> and calls an <see cref="IChunkJob"/> on them.
/// </summary>
Expand Down Expand Up @@ -176,5 +179,5 @@ public void ParallelQuery(in QueryDescription queryDescription, ForEach forEntit
JobHandles.Clear();

return handle;
}*/
}
}
2 changes: 1 addition & 1 deletion src/Arch/Core/World.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private World(int id)

// Multithreading/Jobs.
JobHandles = new PooledList<JobHandle>(Environment.ProcessorCount);
JobsCache = new List<IJobParallelFor>(Environment.ProcessorCount);
JobsCache = new List<IJob>(Environment.ProcessorCount);
}

/// <summary>
Expand Down

0 comments on commit 92a6b86

Please sign in to comment.