Skip to content

Commit

Permalink
MethodRunnerQueue/ConcurrentMethodRunnerQueue - Remove custom task ex…
Browse files Browse the repository at this point in the history
…ecution queue

- ConcurrentMethodRunnerQueue we now directly queue tasks on TaskScheduler.Default
- MethodRunnerQueue now uses LimitedConcurrencyLevelTaskScheduler (example from Microsoft) to limit execution to one concurrent task
- MethodRunnerQueue tasks are now run on the ThreadPool directly instead of our own custom thread, should have some threading implications
for user
- IMethodRunnerQueue now implements IDisposable

Resolves #2873
  • Loading branch information
amaitland committed Sep 6, 2019
1 parent f88708f commit 25ab209
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 183 deletions.
3 changes: 1 addition & 2 deletions CefSharp.Core/ManagedCefBrowserAdapter.h
Expand Up @@ -72,7 +72,6 @@ namespace CefSharp
}

_methodRunnerQueue->MethodInvocationComplete += gcnew EventHandler<MethodInvocationCompleteArgs^>(this, &ManagedCefBrowserAdapter::MethodInvocationComplete);
_methodRunnerQueue->Start();
}

!ManagedCefBrowserAdapter()
Expand All @@ -88,7 +87,7 @@ namespace CefSharp
if (_methodRunnerQueue != nullptr)
{
_methodRunnerQueue->MethodInvocationComplete -= gcnew EventHandler<MethodInvocationCompleteArgs^>(this, &ManagedCefBrowserAdapter::MethodInvocationComplete);
_methodRunnerQueue->Stop();
delete _methodRunnerQueue;
_methodRunnerQueue = nullptr;
}

Expand Down
6 changes: 2 additions & 4 deletions CefSharp.Test/Framework/ConcurrentMethodRunnerQueueFacts.cs
Expand Up @@ -31,13 +31,12 @@ public async Task StopConcurrentMethodRunnerQueueWhenMethodRunning()
methodInvocation.Parameters.Add("Echo Me!");
var methodRunnerQueue = new ConcurrentMethodRunnerQueue(objectRepository);

methodRunnerQueue.Start();
methodRunnerQueue.Enqueue(methodInvocation);

//Wait a litle for the queue to start processing our Method call
await Task.Delay(500);

var ex = Record.Exception(() => methodRunnerQueue.Stop());
var ex = Record.Exception(() => methodRunnerQueue.Dispose());

Assert.Null(ex);
}
Expand All @@ -59,14 +58,13 @@ public void ValidateAsyncTaskMethodOutput()

methodRunnerQueue.MethodInvocationComplete += (sender, args) =>
{
methodRunnerQueue.Stop();
methodRunnerQueue.Dispose();
actualResult = args.Result.Result.ToString();
manualResetEvent.Set();
};

methodRunnerQueue.Start();
methodRunnerQueue.Enqueue(methodInvocation);

manualResetEvent.WaitOne(3000);
Expand Down
1 change: 1 addition & 0 deletions CefSharp/CefSharp.csproj
Expand Up @@ -103,6 +103,7 @@
<Compile Include="DefaultApp.cs" />
<Compile Include="Enums\SchemeOptions.cs" />
<Compile Include="Internals\ReadOnlyNameValueCollection.cs" />
<Compile Include="Internals\TaskScheduler\LimitedConcurrencyLevelTaskScheduler.cs" />
<Compile Include="ResourceRequestHandlerFactory.cs" />
<Compile Include="ResourceRequestHandlerFactoryItem.cs" />
<Compile Include="Enums\AlphaType.cs" />
Expand Down
168 changes: 59 additions & 109 deletions CefSharp/Internals/ConcurrentMethodRunnerQueue.cs
Expand Up @@ -3,7 +3,6 @@
// Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -18,11 +17,7 @@ namespace CefSharp.Internals
public class ConcurrentMethodRunnerQueue : IMethodRunnerQueue
{
private readonly JavascriptObjectRepository repository;
private readonly AutoResetEvent stopped = new AutoResetEvent(false);
private readonly BlockingCollection<Func<MethodInvocationResult>> queue = new BlockingCollection<Func<MethodInvocationResult>>();
private readonly object lockObject = new object();
private volatile CancellationTokenSource cancellationTokenSource;
private volatile bool running;
private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

public event EventHandler<MethodInvocationCompleteArgs> MethodInvocationComplete;

Expand All @@ -31,129 +26,80 @@ public ConcurrentMethodRunnerQueue(JavascriptObjectRepository repository)
this.repository = repository;
}

public void Start()
public void Dispose()
{
lock (lockObject)
{
if (!running)
{
cancellationTokenSource = new CancellationTokenSource();
Task.Factory.StartNew(ConsumeTasks, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
running = true;
}
}
cancellationTokenSource.Cancel();
}

public void Stop()
public void Enqueue(MethodInvocation methodInvocation)
{
lock (lockObject)
var task = new Task(() =>
{
if (running)
var result = ExecuteMethodInvocation(methodInvocation);
//If the call failed or returned null then we'll fire the event immediately
if (!result.Success || result.Result == null)
{
cancellationTokenSource.Cancel();
stopped.WaitOne();
//clear the queue
while (queue.Count > 0)
{
queue.Take();
}
cancellationTokenSource = null;
running = false;
OnMethodInvocationComplete(result, cancellationTokenSource.Token);
}
}
}

public void Enqueue(MethodInvocation methodInvocation)
{
queue.Add(() => ExecuteMethodInvocation(methodInvocation));
}

private void ConsumeTasks()
{
try
{
//Executes tasks on the ThreadPool
while (!cancellationTokenSource.IsCancellationRequested)
else
{
var func = queue.Take(cancellationTokenSource.Token);
var resultType = result.Result.GetType();
var task = new Task(() =>
//If the returned type is Task then we'll ContinueWith and perform the processing then.
if (typeof(Task).IsAssignableFrom(resultType))
{
var result = func();
var resultTask = (Task)result.Result;
//If the call failed or returned null then we'll fire the event immediately
if (!result.Success || result.Result == null)
if (resultType.IsGenericType)
{
OnMethodInvocationComplete(result);
}
else
{
var resultType = result.Result.GetType();
//If the returned type is Task then we'll ContinueWith and perform the processing then.
if (typeof(Task).IsAssignableFrom(resultType))
resultTask.ContinueWith((t) =>
{
var resultTask = (Task)result.Result;
if (resultType.IsGenericType)
if (t.Status == TaskStatus.RanToCompletion)
{
resultTask.ContinueWith((t) =>
{
if (t.Status == TaskStatus.RanToCompletion)
{
//We use some reflection to get the Result
//If someone has a better way of doing this then please submit a PR
result.Result = resultType.GetProperty("Result").GetValue(resultTask);
}
else
{
result.Success = false;
result.Result = null;
var aggregateException = t.Exception;
//TODO: Add support for passing a more complex message
// to better represent the Exception
if (aggregateException.InnerExceptions.Count == 1)
{
result.Message = aggregateException.InnerExceptions[0].ToString();
}
else
{
result.Message = t.Exception.ToString();
}
}
OnMethodInvocationComplete(result);
},
cancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler.Default);
//We use some reflection to get the Result
//If someone has a better way of doing this then please submit a PR
result.Result = resultType.GetProperty("Result").GetValue(resultTask);
}
else
{
//If it's not a generic Task then it doesn't have a return object
//So we'll just set the result to null and continue on
result.Success = false;
result.Result = null;
OnMethodInvocationComplete(result);
var aggregateException = t.Exception;
//TODO: Add support for passing a more complex message
// to better represent the Exception
if (aggregateException.InnerExceptions.Count == 1)
{
result.Message = aggregateException.InnerExceptions[0].ToString();
}
else
{
result.Message = t.Exception.ToString();
}
}
}
else
{
OnMethodInvocationComplete(result);
}
}
}, cancellationTokenSource.Token);
OnMethodInvocationComplete(result, cancellationTokenSource.Token);
},
cancellationTokenSource.Token, TaskContinuationOptions.None, TaskScheduler.Default);
}
else
{
//If it's not a generic Task then it doesn't have a return object
//So we'll just set the result to null and continue on
result.Result = null;
task.Start(TaskScheduler.Default);
OnMethodInvocationComplete(result, cancellationTokenSource.Token);
}
}
else
{
OnMethodInvocationComplete(result, cancellationTokenSource.Token);
}
}
}
catch (OperationCanceledException)
{
// Note: Task has been cancelled
}
finally
{
stopped.Set();
}
}, cancellationTokenSource.Token);

task.Start(TaskScheduler.Default);
}

private MethodInvocationResult ExecuteMethodInvocation(MethodInvocation methodInvocation)
Expand Down Expand Up @@ -183,9 +129,13 @@ private MethodInvocationResult ExecuteMethodInvocation(MethodInvocation methodIn
};
}

private void OnMethodInvocationComplete(MethodInvocationResult e)
private void OnMethodInvocationComplete(MethodInvocationResult e, CancellationToken token)
{
MethodInvocationComplete?.Invoke(this, new MethodInvocationCompleteArgs(e));
//If cancellation has been requested we don't need to continue.
if (!token.IsCancellationRequested)
{
MethodInvocationComplete?.Invoke(this, new MethodInvocationCompleteArgs(e));
}
}
}
}
4 changes: 1 addition & 3 deletions CefSharp/Internals/IMethodRunnerQueue.cs
Expand Up @@ -6,12 +6,10 @@

namespace CefSharp.Internals
{
public interface IMethodRunnerQueue
public interface IMethodRunnerQueue : IDisposable
{
event EventHandler<MethodInvocationCompleteArgs> MethodInvocationComplete;

void Enqueue(MethodInvocation methodInvocation);
void Start();
void Stop();
}
}

0 comments on commit 25ab209

Please sign in to comment.