Skip to content
This repository has been archived by the owner on Sep 17, 2023. It is now read-only.

Extracts scheduling of consumer #61

Merged
merged 1 commit into from
Sep 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/MyNatsClient/IConsumerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MyNatsClient
{
/// <summary>
/// Responsible for returning a Task that continiously runs the consumer.
/// </summary>
public interface IConsumerFactory
{
Task Run(Action consumer, CancellationToken cancellationToken);
}
}
16 changes: 16 additions & 0 deletions src/MyNatsClient/Internals/DefaultTaskSchedulerConsumerFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace MyNatsClient.Internals
{
internal class DefaultTaskSchedulerConsumerFactory : IConsumerFactory
{
public Task Run(Action consumer, CancellationToken cancellationToken)
=> Task.Factory.StartNew(
consumer,
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
23 changes: 12 additions & 11 deletions src/MyNatsClient/NatsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public sealed class NatsClient : INatsClient, IDisposable
private readonly ConnectionInfo _connectionInfo;
private readonly ConcurrentDictionary<string, Subscription> _subscriptions;
private readonly INatsConnectionManager _connectionManager;
private readonly IConsumerFactory _consumerFactory;

private SemaphoreSlim _sync;
private CancellationTokenSource _cancellation;
Expand All @@ -46,7 +47,10 @@ public sealed class NatsClient : INatsClient, IDisposable
public INatsObservable<MsgOp> MsgOpStream => _opMediator.MsgOpsStream;
public bool IsConnected => _connection != null && _connection.IsConnected && _connection.CanRead;

public NatsClient(ConnectionInfo connectionInfo, ISocketFactory socketFactory = null)
public NatsClient(
ConnectionInfo connectionInfo,
ISocketFactory socketFactory = null,
IConsumerFactory consumerFactory = null)
{
if (connectionInfo == null)
throw new ArgumentNullException(nameof(connectionInfo));
Expand All @@ -60,6 +64,7 @@ public NatsClient(ConnectionInfo connectionInfo, ISocketFactory socketFactory =
_eventMediator = new NatsObservableOf<IClientEvent>();
_opMediator = new NatsOpMediator();
_connectionManager = new NatsConnectionManager(socketFactory ?? new SocketFactory());
_consumerFactory = consumerFactory ?? new DefaultTaskSchedulerConsumerFactory();

Events.SubscribeSafe(async ev =>
{
Expand Down Expand Up @@ -153,8 +158,6 @@ public async Task ConnectAsync()

await _sync.WaitAsync().ConfigureAwait(false);

IList<IOp> opsReceivedWhileConnecting;

try
{
if (IsConnected)
Expand All @@ -164,15 +167,13 @@ public async Task ConnectAsync()

_cancellation = new CancellationTokenSource();

IList<IOp> opsReceivedWhileConnecting;

(_connection, opsReceivedWhileConnecting) =
await _connectionManager.OpenConnectionAsync(_connectionInfo, _cancellation.Token).ConfigureAwait(false);

_consumer = Task.Factory
.StartNew(
Worker,
_cancellation.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default)
_consumer = _consumerFactory
.Run(ConsumerWork, _cancellation.Token)
.ContinueWith(async t =>
{
if (_isDisposed)
Expand Down Expand Up @@ -223,7 +224,7 @@ public async Task ConnectAsync()
}
}

private void Worker()
private void ConsumerWork()
{
bool ShouldDoWork() => !_isDisposed && IsConnected && _cancellation?.IsCancellationRequested == false;

Expand Down Expand Up @@ -304,7 +305,7 @@ public void Disconnect()
{
_sync?.Release();
}

_eventMediator.Emit(new ClientDisconnected(this, DisconnectReason.ByUser));
}

Expand Down