Skip to content

Commit

Permalink
replace all promisepool to new taskpool system
Browse files Browse the repository at this point in the history
  • Loading branch information
neuecc committed May 28, 2020
1 parent d5db96b commit 21bf08a
Show file tree
Hide file tree
Showing 19 changed files with 467 additions and 281 deletions.
20 changes: 14 additions & 6 deletions src/UniTask.NetCore/NetCore/UniTask.Yield.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,22 @@ static void Continuation(object state)

#if NETCOREAPP3_1

sealed class ThreadPoolWorkItem : IThreadPoolWorkItem
sealed class ThreadPoolWorkItem : IThreadPoolWorkItem, ITaskPoolNode<ThreadPoolWorkItem>
{
static readonly ConcurrentQueue<ThreadPoolWorkItem> pool = new ConcurrentQueue<ThreadPoolWorkItem>();
static TaskPool<ThreadPoolWorkItem> pool;
public ThreadPoolWorkItem NextNode { get; set; }

static ThreadPoolWorkItem()
{
TaskPoolMonitor.RegisterSizeGetter(typeof(ThreadPoolWorkItem), () => pool.Size);
}

Action continuation;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static ThreadPoolWorkItem Create(Action continuation)
{
if (!pool.TryDequeue(out var item))
if (!pool.TryPop(out var item))
{
item = new ThreadPoolWorkItem();
}
Expand All @@ -82,9 +88,11 @@ public void Execute()
{
var call = continuation;
continuation = null;
pool.Enqueue(this);

call.Invoke();
if (call != null)
{
pool.TryPush(this);
call.Invoke();
}
}
}

Expand Down
25 changes: 16 additions & 9 deletions src/UniTask.NetCoreSandbox/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,21 +217,23 @@ static async Task Main(string[] args)

ThreadPool.SetMinThreads(100, 100);

List<UniTask<int>> list = new List<UniTask<int>>();
for (int i = 0; i < 321; i++)
//List<UniTask<int>> list = new List<UniTask<int>>();
for (int i = 0; i < short.MaxValue; i++)
{
list.Add(AsyncTest());
//// list.Add(AsyncTest());
await YieldCore();
}
//await UniTask.WhenAll(list);

Console.WriteLine("TOGO");
//Console.WriteLine("TOGO");

var a = await AsyncTest();
var b = AsyncTest();
var c = AsyncTest();
//var a = await AsyncTest();
//var b = AsyncTest();
//var c = AsyncTest();
await YieldCore();

await b;
await c;
//await b;
//await c;


foreach (var item in Cysharp.Threading.Tasks.Internal.TaskPoolMonitor.GetCacheSizeInfo())
Expand All @@ -242,6 +244,11 @@ static async Task Main(string[] args)
Console.ReadLine();
}

static async UniTask YieldCore()
{
await UniTask.Yield();
}

#pragma warning disable CS1998


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ namespace Cysharp.Threading.Tasks.CompilerServices
[StructLayout(LayoutKind.Auto)]
public struct AsyncUniTaskMethodBuilder
{
// cache items.
internal IMoveNextRunnerPromise runnerPromise;
Exception ex;

Expand Down Expand Up @@ -78,7 +77,7 @@ public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref
{
if (runnerPromise == null)
{
MoveNextRunnerPromise<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
AsyncUniTask<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
}

awaiter.OnCompleted(runnerPromise.MoveNext);
Expand All @@ -93,7 +92,7 @@ public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter
{
if (runnerPromise == null)
{
MoveNextRunnerPromise<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
AsyncUniTask<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
}

awaiter.UnsafeOnCompleted(runnerPromise.MoveNext);
Expand Down Expand Up @@ -134,7 +133,6 @@ private object ObjectIdForDebugger
[StructLayout(LayoutKind.Auto)]
public struct AsyncUniTaskMethodBuilder<T>
{
// cache items.
internal IMoveNextRunnerPromise<T> runnerPromise;
Exception ex;
T result;
Expand Down Expand Up @@ -204,7 +202,7 @@ public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref
{
if (runnerPromise == null)
{
MoveNextRunnerPromise<TStateMachine, T>.SetStateMachine(ref this, ref stateMachine);
AsyncUniTask<TStateMachine, T>.SetStateMachine(ref this, ref stateMachine);
}

awaiter.OnCompleted(runnerPromise.MoveNext);
Expand All @@ -219,7 +217,7 @@ public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter
{
if (runnerPromise == null)
{
MoveNextRunnerPromise<TStateMachine, T>.SetStateMachine(ref this, ref stateMachine);
AsyncUniTask<TStateMachine, T>.SetStateMachine(ref this, ref stateMachine);
}

awaiter.UnsafeOnCompleted(runnerPromise.MoveNext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Security;

namespace Cysharp.Threading.Tasks.CompilerServices
{
[StructLayout(LayoutKind.Auto)]
public struct AsyncUniTaskVoidMethodBuilder
{
internal IMoveNextRunner runner;
Expand Down Expand Up @@ -65,7 +67,7 @@ public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref
{
if (runner == null)
{
MoveNextRunner<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
AsyncUniTaskVoid<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
}

awaiter.OnCompleted(runner.MoveNext);
Expand All @@ -80,7 +82,7 @@ public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter
{
if (runner == null)
{
MoveNextRunner<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
AsyncUniTaskVoid<TStateMachine>.SetStateMachine(ref this, ref stateMachine);
}

awaiter.UnsafeOnCompleted(runner.MoveNext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ internal interface IMoveNextRunnerPromise<T> : IUniTaskSource<T>
void SetException(Exception exception);
}

internal sealed class MoveNextRunner<TStateMachine> : IMoveNextRunner, ITaskPoolNode<MoveNextRunner<TStateMachine>>
internal sealed class AsyncUniTaskVoid<TStateMachine> : IMoveNextRunner, ITaskPoolNode<AsyncUniTaskVoid<TStateMachine>>, IUniTaskSource
where TStateMachine : IAsyncStateMachine
{
static TaskPool<MoveNextRunner<TStateMachine>> pool;
static TaskPool<AsyncUniTaskVoid<TStateMachine>> pool;

TStateMachine stateMachine;

public Action MoveNext { get; }

public MoveNextRunner()
public AsyncUniTaskVoid()
{
MoveNext = Run;
}
Expand All @@ -47,22 +47,24 @@ public static void SetStateMachine(ref AsyncUniTaskVoidMethodBuilder builder, re
{
if (!pool.TryPop(out var result))
{
result = new MoveNextRunner<TStateMachine>();
result = new AsyncUniTaskVoid<TStateMachine>();
}
TaskTracker.TrackActiveTask(result, 3);

builder.runner = result; // set runner before copied.
result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
}

static MoveNextRunner()
static AsyncUniTaskVoid()
{
TaskPoolMonitor.RegisterSizeGettter(typeof(MoveNextRunner<TStateMachine>), () => pool.Size);
TaskPoolMonitor.RegisterSizeGetter(typeof(AsyncUniTaskVoid<TStateMachine>), () => pool.Size);
}

public MoveNextRunner<TStateMachine> NextNode { get; set; }
public AsyncUniTaskVoid<TStateMachine> NextNode { get; set; }

public void Return()
{
TaskTracker.RemoveTracking(this);
stateMachine = default;
pool.TryPush(this);
}
Expand All @@ -73,20 +75,40 @@ void Run()
{
stateMachine.MoveNext();
}

// dummy interface implementation for TaskTracker.

UniTaskStatus IUniTaskSource.GetStatus(short token)
{
return UniTaskStatus.Pending;
}

UniTaskStatus IUniTaskSource.UnsafeGetStatus()
{
return UniTaskStatus.Pending;
}

void IUniTaskSource.OnCompleted(Action<object> continuation, object state, short token)
{
}

void IUniTaskSource.GetResult(short token)
{
}
}

internal class MoveNextRunnerPromise<TStateMachine> : IMoveNextRunnerPromise, IUniTaskSource, ITaskPoolNode<MoveNextRunnerPromise<TStateMachine>>
internal sealed class AsyncUniTask<TStateMachine> : IMoveNextRunnerPromise, IUniTaskSource, ITaskPoolNode<AsyncUniTask<TStateMachine>>
where TStateMachine : IAsyncStateMachine
{
static TaskPool<MoveNextRunnerPromise<TStateMachine>> pool;
static TaskPool<AsyncUniTask<TStateMachine>> pool;

TStateMachine stateMachine;

public Action MoveNext { get; }

UniTaskCompletionSourceCore<AsyncUnit> core;

MoveNextRunnerPromise()
AsyncUniTask()
{
MoveNext = Run;
}
Expand All @@ -95,19 +117,19 @@ public static void SetStateMachine(ref AsyncUniTaskMethodBuilder builder, ref TS
{
if (!pool.TryPop(out var result))
{
result = new MoveNextRunnerPromise<TStateMachine>();
result = new AsyncUniTask<TStateMachine>();
}
TaskTracker.TrackActiveTask(result, 3);

builder.runnerPromise = result; // set runner before copied.
result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
}

public MoveNextRunnerPromise<TStateMachine> NextNode { get; set; }
public AsyncUniTask<TStateMachine> NextNode { get; set; }

static MoveNextRunnerPromise()
static AsyncUniTask()
{
TaskPoolMonitor.RegisterSizeGettter(typeof(MoveNextRunnerPromise<TStateMachine>), () => pool.Size);
TaskPoolMonitor.RegisterSizeGetter(typeof(AsyncUniTask<TStateMachine>), () => pool.Size);
}

bool TryReturn()
Expand Down Expand Up @@ -177,7 +199,7 @@ public void OnCompleted(Action<object> continuation, object state, short token)
core.OnCompleted(continuation, state, token);
}

~MoveNextRunnerPromise()
~AsyncUniTask()
{
if (TryReturn())
{
Expand All @@ -186,18 +208,18 @@ public void OnCompleted(Action<object> continuation, object state, short token)
}
}

internal class MoveNextRunnerPromise<TStateMachine, T> : IMoveNextRunnerPromise<T>, IUniTaskSource<T>, ITaskPoolNode<MoveNextRunnerPromise<TStateMachine, T>>
internal sealed class AsyncUniTask<TStateMachine, T> : IMoveNextRunnerPromise<T>, IUniTaskSource<T>, ITaskPoolNode<AsyncUniTask<TStateMachine, T>>
where TStateMachine : IAsyncStateMachine
{
static TaskPool<MoveNextRunnerPromise<TStateMachine, T>> pool;
static TaskPool<AsyncUniTask<TStateMachine, T>> pool;

TStateMachine stateMachine;

public Action MoveNext { get; }

UniTaskCompletionSourceCore<T> core;

MoveNextRunnerPromise()
AsyncUniTask()
{
MoveNext = Run;
}
Expand All @@ -206,19 +228,19 @@ public static void SetStateMachine(ref AsyncUniTaskMethodBuilder<T> builder, ref
{
if (!pool.TryPop(out var result))
{
result = new MoveNextRunnerPromise<TStateMachine, T>();
result = new AsyncUniTask<TStateMachine, T>();
}
TaskTracker.TrackActiveTask(result, 3);

builder.runnerPromise = result; // set runner before copied.
result.stateMachine = stateMachine; // copy struct StateMachine(in release build).
}

public MoveNextRunnerPromise<TStateMachine, T> NextNode { get; set; }
public AsyncUniTask<TStateMachine, T> NextNode { get; set; }

static MoveNextRunnerPromise()
static AsyncUniTask()
{
TaskPoolMonitor.RegisterSizeGettter(typeof(MoveNextRunnerPromise<TStateMachine, T>), () => pool.Size);
TaskPoolMonitor.RegisterSizeGetter(typeof(AsyncUniTask<TStateMachine, T>), () => pool.Size);
}

bool TryReturn()
Expand Down Expand Up @@ -294,7 +316,7 @@ public void OnCompleted(Action<object> continuation, object state, short token)
core.OnCompleted(continuation, state, token);
}

~MoveNextRunnerPromise()
~AsyncUniTask()
{
if (TryReturn())
{
Expand Down
Loading

0 comments on commit 21bf08a

Please sign in to comment.