Skip to content

Commit

Permalink
Improve cleanup and automate standby
Browse files Browse the repository at this point in the history
  • Loading branch information
episource committed Feb 7, 2019
1 parent 6430fa8 commit 95f6c4a
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 8 deletions.
54 changes: 46 additions & 8 deletions episource.unblocker/Unblocker.cs
Original file line number Diff line number Diff line change
@@ -1,40 +1,57 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq.Expressions;
using System.Security;
using System.Threading;
using System.Threading.Tasks;

using episource.unblocker.hosting;
using episource.unblocker.tasks;

namespace episource.unblocker {
public sealed class Unblocker : IDisposable {
private static readonly TimeSpan CleanupDelay = TimeSpan.FromMilliseconds(250);
private static readonly TimeSpan DefaultCleanupDelay = TimeSpan.FromMilliseconds(150);
private static readonly TimeSpan DefaultStandbyDelay = TimeSpan.FromMilliseconds(10000);

private readonly object stateLock = new object();
private readonly int maxIdleWorkers;
private readonly Queue<WorkerClient> idleClients = new Queue<WorkerClient>();
private readonly LinkedList<WorkerClient> busyClients = new LinkedList<WorkerClient>();
private readonly DebugMode debugMode;
private readonly CountdownTask cleanupTask;
private readonly CountdownTask standbyTask;

private bool disposed;
private volatile bool disposed;

public Unblocker(int maxIdleWorkers = 1, DebugMode debug = DebugMode.None) {
public Unblocker(
int maxIdleWorkers = 1, DebugMode debug = DebugMode.None,
TimeSpan? cleanupDelay = null, TimeSpan? standbyDelay = null
) {
this.maxIdleWorkers = maxIdleWorkers;
this.debugMode = debug;

this.cleanupTask = new CountdownTask(cleanupDelay ?? DefaultCleanupDelay, this.Cleanup);
this.standbyTask = new CountdownTask(standbyDelay ?? DefaultStandbyDelay, this.Standby);
}

public async Task<T> InvokeAsync<T>(
Expression<Func<CancellationToken, T>> invocation, CancellationToken ct = new CancellationToken(),
TimeSpan? cancellationTimeout = null, SecurityZone securityZone = SecurityZone.MyComputer
) {
if (this.disposed) {
throw new ObjectDisposedException("This instance has been disposed.");
}

try {
this.cleanupTask.Cancel();
this.standbyTask.Cancel();

return await this.ActivateWorker()
.InvokeRemotely(invocation, ct, cancellationTimeout, securityZone)
.ConfigureAwait(false);
} finally {
this.Cleanup();
// standbyTask started during cleanup
this.cleanupTask.Reset();
}

}
Expand All @@ -48,19 +65,33 @@ public async Task InvokeAsync(
}

try {
this.cleanupTask.Cancel();
this.standbyTask.Cancel();

await this.ActivateWorker().InvokeRemotely(invocation, ct, cancellationTimeout, securityZone);
} finally {
this.Cleanup();
// standbyTask started during cleanup
this.cleanupTask.Reset();
}
}

// releases all idle workers
public void Standby() {
if (this.debugMode != DebugMode.None) {
Console.WriteLine("Standby started.");
}

lock (this.stateLock) {
this.RecoverWorkers();

foreach (var client in this.idleClients) {
client.Dispose();
}
this.idleClients.Clear();

if (this.busyClients.Count > 0) {
this.cleanupTask.TryStart();
}
}
}

Expand All @@ -81,10 +112,13 @@ private WorkerClient ActivateWorker() {
}
}

private async void Cleanup() {
await Task.Delay(CleanupDelay).ConfigureAwait(false);
private void Cleanup() {
if (this.debugMode != DebugMode.None) {
Console.WriteLine("Cleanup started.");
}

lock (this.stateLock) {
this.standbyTask.Reset();
this.RecoverWorkers();
this.EnsureWorkerLimit();
}
Expand Down Expand Up @@ -135,6 +169,8 @@ private void EnsureWorkerLimit() {
}
}
}

#region IDisposable

public void Dispose() {
this.Dispose(true);
Expand All @@ -157,5 +193,7 @@ public void Dispose() {
}
}
}

#endregion
}
}
1 change: 1 addition & 0 deletions episource.unblocker/episource.unblocker.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
<DependentUpon>WorkerServer.cs</DependentUpon>
</Compile>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="tasks\CountdownTask.cs" />
<Compile Include="Unblocker.cs" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
Expand Down
93 changes: 93 additions & 0 deletions episource.unblocker/tasks/CountdownTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace episource.unblocker.tasks {
public sealed class CountdownTask {
private readonly TimeSpan countdown;
private readonly Action<CancellationToken> action;
private volatile CancellationTokenSource cts;


public CountdownTask(TimeSpan countdown, Action action)
: this(countdown, ct => action()) { }

public CountdownTask(TimeSpan countdown, Action<CancellationToken> action) {
if (countdown == null) {
throw new ArgumentNullException("countdown");
}

if (action == null) {
throw new ArgumentNullException("action");
}

this.countdown = countdown;
this.action = action;
}

public bool IsRunning {
get { return this.cts != null; }
}

public void Cancel() {
var activeCts = this.cts;
if (activeCts != null) {
activeCts.Cancel();
activeCts.Dispose();
}

}

public void Reset() {
var newCts = new CancellationTokenSource();
CancellationTokenSource activeCts;
do {
activeCts = this.cts;
if (activeCts != null) {
activeCts.Cancel();
activeCts.Dispose();
}
} while (Interlocked.CompareExchange(ref this.cts, newCts, activeCts) != activeCts);

this.ScheduleAction(newCts, newCts.Token);
}

public void Start() {
if (!this.TryStart()) {
throw new InvalidOperationException("Countdown already started.");
}
}

public bool TryStart() {
var newCts = new CancellationTokenSource();

if (Interlocked.CompareExchange(ref this.cts, newCts, null) != null) {
newCts.Dispose();
return false;
}

this.ScheduleAction(newCts, newCts.Token);
return true;
}

// CancellationTokenSource passed together with corresponding CancellationToken:
// cts might have been disposed already, which would make cts.Token throw!
// cts still needed for CompareExchange
private async void ScheduleAction(CancellationTokenSource cts, CancellationToken ct) {
try {
await Task.Delay(this.countdown, ct).ConfigureAwait(false);
ct.ThrowIfCancellationRequested();
this.action(ct);
} catch (TaskCanceledException e) {
// prevent expected exception to show up as unhandled AppDomain exception
if (e.CancellationToken != ct) {
throw;
}
}

// new task might already be scheduled => CompareExchange
Interlocked.CompareExchange(ref this.cts, null, cts);
cts.Dispose();
}
}
}

0 comments on commit 95f6c4a

Please sign in to comment.