Skip to content
This repository has been archived by the owner on Sep 17, 2023. It is now read-only.

Commit

Permalink
Extracts scheduling of consumer (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielwertheim committed Sep 19, 2020
1 parent cf27133 commit d3e3f06
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 11 deletions.
14 changes: 14 additions & 0 deletions src/MyNatsClient/IConsumerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MyNatsClient
{
/// <summary>
/// Responsible for returning a Task that continiously runs the consumer.
/// </summary>
public interface IConsumerFactory
{
Task Run(Action consumer, CancellationToken cancellationToken);
}
}
16 changes: 16 additions & 0 deletions src/MyNatsClient/Internals/DefaultTaskSchedulerConsumerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MyNatsClient.Internals
{
internal class DefaultTaskSchedulerConsumerFactory : IConsumerFactory
{
public Task Run(Action consumer, CancellationToken cancellationToken)
=> Task.Factory.StartNew(
consumer,
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
23 changes: 12 additions & 11 deletions src/MyNatsClient/NatsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public sealed class NatsClient : INatsClient, IDisposable
private readonly ConnectionInfo _connectionInfo;
private readonly ConcurrentDictionary<string, Subscription> _subscriptions;
private readonly INatsConnectionManager _connectionManager;
private readonly IConsumerFactory _consumerFactory;

private SemaphoreSlim _sync;
private CancellationTokenSource _cancellation;
Expand All @@ -46,7 +47,10 @@ public sealed class NatsClient : INatsClient, IDisposable
public INatsObservable<MsgOp> MsgOpStream => _opMediator.MsgOpsStream;
public bool IsConnected => _connection != null && _connection.IsConnected && _connection.CanRead;

public NatsClient(ConnectionInfo connectionInfo, ISocketFactory socketFactory = null)
public NatsClient(
ConnectionInfo connectionInfo,
ISocketFactory socketFactory = null,
IConsumerFactory consumerFactory = null)
{
if (connectionInfo == null)
throw new ArgumentNullException(nameof(connectionInfo));
Expand All @@ -60,6 +64,7 @@ public NatsClient(ConnectionInfo connectionInfo, ISocketFactory socketFactory =
_eventMediator = new NatsObservableOf<IClientEvent>();
_opMediator = new NatsOpMediator();
_connectionManager = new NatsConnectionManager(socketFactory ?? new SocketFactory());
_consumerFactory = consumerFactory ?? new DefaultTaskSchedulerConsumerFactory();

Events.SubscribeSafe(async ev =>
{
Expand Down Expand Up @@ -153,8 +158,6 @@ public async Task ConnectAsync()

await _sync.WaitAsync().ConfigureAwait(false);

IList<IOp> opsReceivedWhileConnecting;

try
{
if (IsConnected)
Expand All @@ -164,15 +167,13 @@ public async Task ConnectAsync()

_cancellation = new CancellationTokenSource();

IList<IOp> opsReceivedWhileConnecting;

(_connection, opsReceivedWhileConnecting) =
await _connectionManager.OpenConnectionAsync(_connectionInfo, _cancellation.Token).ConfigureAwait(false);

_consumer = Task.Factory
.StartNew(
Worker,
_cancellation.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default)
_consumer = _consumerFactory
.Run(ConsumerWork, _cancellation.Token)
.ContinueWith(async t =>
{
if (_isDisposed)
Expand Down Expand Up @@ -223,7 +224,7 @@ public async Task ConnectAsync()
}
}

private void Worker()
private void ConsumerWork()
{
bool ShouldDoWork() => !_isDisposed && IsConnected && _cancellation?.IsCancellationRequested == false;

Expand Down Expand Up @@ -304,7 +305,7 @@ public void Disconnect()
{
_sync?.Release();
}

_eventMediator.Emit(new ClientDisconnected(this, DisconnectReason.ByUser));
}

Expand Down

0 comments on commit d3e3f06

Please sign in to comment.