Skip to content

Commit

Permalink
Tidy up VA event handler. Start the consumer task after we've initial…
Browse files Browse the repository at this point in the history
…ized our TaskQueue object.
  • Loading branch information
Tkael committed Jun 3, 2024
1 parent baba704 commit 0607f6b
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions VoiceAttackResponder/VoiceAttackEventHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace EddiVoiceAttackResponder
{
internal static class VoiceAttackEventHandler
{
private static readonly ConcurrentDictionary<string, TaskQueue<Event>> eventQueues = new ConcurrentDictionary<string, TaskQueue<Event>>();
private static readonly ConcurrentDictionary<string, TaskQueue<Event>> taskQueues = new ConcurrentDictionary<string, TaskQueue<Event>>();
private static readonly CancellationTokenSource consumerCancellationTS = new CancellationTokenSource(); // This must be static so that it is visible to child threads and tasks

// We'll maintain a referenceable list of variables that we've set from events
Expand All @@ -22,29 +22,31 @@ internal static class VoiceAttackEventHandler
public static void Handle ( Event theEvent )
{
// Check for any completed, cancelled, or faulted consumer tasks and clean up consumer tasks which are no longer needed
foreach ( var eventQueue in eventQueues.Where( q => !q.Value.isRunning ) )
foreach ( var taskQueue in taskQueues.Where( q => !q.Value.isRunning ) )
{
if ( eventQueues.TryRemove( eventQueue.Key, out var removed ) )
if ( taskQueues.TryRemove( taskQueue.Key, out var removed ) )
{
removed.Dispose();
}
}

if ( theEvent is null || consumerCancellationTS.IsCancellationRequested ) { return; }

if ( eventQueues.TryGetValue( theEvent.type, out var taskQueue ) )
if ( taskQueues.TryGetValue( theEvent.type, out var runningTaskQueue ) )
{
// Add our event to an existing blocking collection for that event type.
taskQueue.Add( theEvent );
runningTaskQueue.Add( theEvent );
}
else
{
// Add our event to a new blocking collection for that event type and start a consumer task for that collection
eventQueues[ theEvent.type ] = new TaskQueue<Event>( () =>
var newTaskQueue = new TaskQueue<Event>( () =>
{
// ReSharper disable once AccessToModifiedClosure - OK to use vaProxy in this context.
dequeueEvents( eventQueues[ theEvent.type ], ref App.vaProxy );
}, consumerCancellationTS.Token, TaskCreationOptions.PreferFairness ) { theEvent };
dequeueEvents( taskQueues[ theEvent.type ], ref App.vaProxy );
}, consumerCancellationTS.Token ) { theEvent };
taskQueues.TryAdd( theEvent.type, newTaskQueue );
newTaskQueue.Start();
}
}

Expand Down Expand Up @@ -190,29 +192,35 @@ public static void StopEventHandling ()
Task.WhenAny(
Task.Run( () =>
{
while ( eventQueues.Values.Any( q => q.isRunning ) )
while ( taskQueues.Values.Any( q => q.isRunning ) )
{
Thread.Sleep( TimeSpan.FromMilliseconds( 50 ) );
}
} ),
Task.Delay( 2000 )
);
foreach ( var q in eventQueues.Values )
foreach ( var q in taskQueues.Values )
{ q.Dispose(); }
}
}

internal class TaskQueue<T> : BlockingCollection<T>
{
public bool isRunning => consumerTask != null &&
( !consumerTask.IsCanceled || consumerTask.IsCompleted || consumerTask.IsFaulted );
public bool isRunning => consumerTask != null &&
consumerTask.Status != TaskStatus.Canceled &&
consumerTask.Status != TaskStatus.Faulted &&
consumerTask.Status != TaskStatus.RanToCompletion;

private Task consumerTask { get; }

public TaskQueue ( Action action, CancellationToken cancellationToken, TaskCreationOptions creationOptions )
public TaskQueue ( Action action, CancellationToken cancellationToken )
{
consumerTask = new Task( action, cancellationToken, creationOptions );
consumerTask.Start();
consumerTask = new Task( action, cancellationToken, TaskCreationOptions.PreferFairness );
}

public void Start ()
{
consumerTask?.Start();
}
}
}

0 comments on commit 0607f6b

Please sign in to comment.