Skip to content

Commit

Permalink
ExecutionContext & Task Statistics (#1959)
Browse files Browse the repository at this point in the history
  • Loading branch information
PascalSenn authored and michaelstaib committed May 19, 2020
1 parent 76d21ee commit d00a8f8
Show file tree
Hide file tree
Showing 13 changed files with 759 additions and 65 deletions.
19 changes: 19 additions & 0 deletions src/HotChocolate/Core/src/Execution/ExecutionContext.Pooling.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Threading;
using System.Threading.Tasks;
using HotChocolate.Fetching;
using HotChocolate.Execution.Utilities;
using Microsoft.Extensions.ObjectPool;
using System;

namespace HotChocolate.Execution
{
internal partial class ExecutionContext : IExecutionContext
{
public void Reset()
{
_taskQueue.Clear();
_taskStatistics.Clear();
ResetTaskSource();
}
}
}
88 changes: 88 additions & 0 deletions src/HotChocolate/Core/src/Execution/ExecutionContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
using System.Threading;
using System.Threading.Tasks;
using HotChocolate.Fetching;
using HotChocolate.Execution.Utilities;
using Microsoft.Extensions.ObjectPool;
using System;

namespace HotChocolate.Execution
{
internal partial class ExecutionContext : IExecutionContext
{
private readonly TaskQueue _taskQueue;
private readonly TaskStatistics _taskStatistics;
private readonly object _engineLock = new object();
private TaskCompletionSource<bool>? _waitForEngineTask;

public ExecutionContext(
ObjectPool<ResolverTask> taskPool,
IBatchDispatcher batchDispatcher)
{
_taskStatistics = new TaskStatistics();
_taskQueue = new TaskQueue(_taskStatistics, taskPool);
TaskPool = taskPool;
BatchDispatcher = batchDispatcher;
BatchDispatcher.TaskEnqueued += BatchDispatcherEventHandler;
TaskStats.StateChanged += TaskStatisticsEventHandler;
}

public ITaskQueue Tasks => _taskQueue;

public ITaskStatistics TaskStats => _taskStatistics;

public bool IsCompleted => TaskStats.Enqueued == 0 && TaskStats.Running == 0;

public ObjectPool<ResolverTask> TaskPool { get; }

public IBatchDispatcher BatchDispatcher { get; }

public Task WaitForEngine(CancellationToken cancellationToken)
{
TaskCompletionSource<bool>? waitForEngineTask = _waitForEngineTask;
if (waitForEngineTask == null)
{
return Task.CompletedTask;
}

cancellationToken.Register(() => waitForEngineTask.SetCanceled());
return waitForEngineTask.Task;
}

private void SetEngineState()
{
lock (_engineLock)
{
if (TaskStats.Enqueued > 0 || BatchDispatcher.HasTasks || IsCompleted)
{
// in case there is a task someone might be already waiting,
// in this case we have to complete the task and clear it
if (_waitForEngineTask != null)
{
ResetTaskSource();
}
}
else
{
// in case there is a task someone might be already waiting,
// if there is no task we have to create one
if (_waitForEngineTask == null)
{
_waitForEngineTask = new TaskCompletionSource<bool>();
}
}
}
}

private void BatchDispatcherEventHandler(object? source, EventArgs args) =>
SetEngineState();

private void TaskStatisticsEventHandler(object? source, EventArgs args) =>
SetEngineState();

private void ResetTaskSource()
{
_waitForEngineTask?.SetResult(true);
_waitForEngineTask = null;
}
}
}
1 change: 1 addition & 0 deletions src/HotChocolate/Core/src/Execution/ITaskStatistics.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;

namespace HotChocolate.Execution
{
Expand Down
1 change: 1 addition & 0 deletions src/HotChocolate/Core/src/Execution/MutationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public async Task<IExecutionResult> ExecuteAsync(
if (selection.IsVisible(operationContext.Variables))
{
operationContext.Execution.Tasks.Enqueue(
operationContext,
selection,
responseIndex++,
resultMap,
Expand Down
47 changes: 47 additions & 0 deletions src/HotChocolate/Core/src/Execution/TaskStatistics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using System;
using System.Threading;

namespace HotChocolate.Execution
{
internal class TaskStatistics : ITaskStatistics
{
private int _running;
private int _enqueued;

public event EventHandler<EventArgs>? StateChanged;

public int Enqueued => _enqueued;

public int Running => _running;

public void TaskEnqueued()
{
Interlocked.Increment(ref _enqueued);
StateChanged?.Invoke(this, EventArgs.Empty);
}

public void TaskDequeued()
{
Interlocked.Decrement(ref _enqueued);
StateChanged?.Invoke(this, EventArgs.Empty);
}

public void TaskStarted()
{
Interlocked.Increment(ref _running);
StateChanged?.Invoke(this, EventArgs.Empty);
}

public void TaskCompleted()
{
Interlocked.Decrement(ref _running);
StateChanged?.Invoke(this, EventArgs.Empty);
}

public void Clear()
{
_running = 0;
_enqueued = 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal interface ITaskQueue
/// Initializes a <see cref="ResolverTask"/> and enqueues it.
/// </summary>
void Enqueue(
IOperationContext operationContext,
IPreparedSelection selection,
int responseIndex,
ResultMap resultMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static ResultMap EnqueueResolverTasks(
if (selection.IsVisible(operationContext.Variables))
{
operationContext.Execution.Tasks.Enqueue(
operationContext,
selection,
responseIndex++,
resultMap,
Expand Down
9 changes: 4 additions & 5 deletions src/HotChocolate/Core/src/Execution/Utilities/TaskQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ namespace HotChocolate.Execution.Utilities
internal class TaskQueue : ITaskQueue
{
private readonly ObjectPool<ResolverTask> _resolverTaskPool;
private readonly IOperationContext _operationContext;
private readonly ITaskStatistics _stats;
private readonly ConcurrentQueue<ResolverTask> _queue =
new ConcurrentQueue<ResolverTask>();

internal TaskQueue(
IOperationContext operationContext,
ITaskStatistics stats,
ObjectPool<ResolverTask> resolverTaskPool)
{
_operationContext = operationContext;
_resolverTaskPool = resolverTaskPool;
_stats = operationContext.Execution.TaskStats;
_stats = stats;
}

/// <inheritdoc/>
Expand All @@ -42,6 +40,7 @@ public bool TryDequeue([NotNullWhen(true)] out ResolverTask? task)

/// <inheritdoc/>
public void Enqueue(
IOperationContext operationContext,
IPreparedSelection selection,
int responseIndex,
ResultMap resultMap,
Expand All @@ -52,7 +51,7 @@ public void Enqueue(
ResolverTask resolverTask = _resolverTaskPool.Get();

resolverTask.Initialize(
_operationContext,
operationContext,
selection,
resultMap,
responseIndex,
Expand Down
2 changes: 1 addition & 1 deletion src/HotChocolate/Core/src/Fetching/BatchScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class BatchScheduler

public void Dispatch()
{
while(_queue.TryDequeue(out Action? dispatch))
while (_queue.TryDequeue(out Action? dispatch))
{
dispatch();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
using System.Collections.Generic;
using Microsoft.Extensions.ObjectPool;
using Xunit;

namespace HotChocolate.Execution.Utilities
{
public class BufferedObjectPoolTests
public partial class BufferedObjectPoolTests
{
[Fact]
public void PoolShouldCreateBuffer()
{
// arrange
var pool = new TestPool(2, 4);
var pool = new TestPool<PoolElement>(2, 4);
var bufferedPool = new BufferedObjectPool<PoolElement>(pool);

// act
Expand All @@ -25,7 +23,7 @@ public void PoolShouldCreateBuffer()
public void PoolShouldCreateBufferWhenUsedUp()
{
// arrange
var pool = new TestPool(2, 4);
var pool = new TestPool<PoolElement>(2, 4);
var bufferedPool = new BufferedObjectPool<PoolElement>(pool);

// act
Expand All @@ -43,7 +41,7 @@ public void PoolShouldCreateBufferWhenUsedUp()
public void PoolShouldReturnBufferWhenNotLongerUsed()
{
// arrange
var pool = new TestPool(2, 4);
var pool = new TestPool<PoolElement>(2, 4);
var bufferedPool = new BufferedObjectPool<PoolElement>(pool);

// act
Expand All @@ -59,58 +57,5 @@ public void PoolShouldReturnBufferWhenNotLongerUsed()
Assert.Single(pool.Rented);
Assert.Single(pool.Returned);
}

private class PoolElement
{

}

private class TestPool : DefaultObjectPool<ObjectBuffer<PoolElement>>
{
public List<ObjectBuffer<PoolElement>> Rented =
new List<ObjectBuffer<PoolElement>>();

public List<ObjectBuffer<PoolElement>> Returned =
new List<ObjectBuffer<PoolElement>>();


public TestPool(int bufferSize, int size)
: base(new Policy(bufferSize), size)
{
}

public override ObjectBuffer<PoolElement> Get()
{
ObjectBuffer<PoolElement> buffer = base.Get();
Rented.Add(buffer);
Returned.Remove(buffer);
return buffer;
}
public override void Return(ObjectBuffer<PoolElement> obj)
{
Returned.Add(obj);
Rented.Remove(obj);
base.Return(obj);
}

private class Policy : IPooledObjectPolicy<ObjectBuffer<PoolElement>>
{
private int _bufferSize;

public Policy(int bufferSize)
{
_bufferSize = bufferSize;
}

public ObjectBuffer<PoolElement> Create() =>
new ObjectBuffer<PoolElement>(_bufferSize, x => { });

public bool Return(ObjectBuffer<PoolElement> obj)
{
obj.Reset();
return true;
}
}
}
}
}
Loading

0 comments on commit d00a8f8

Please sign in to comment.