Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix CancellationTokenRegistration memory leak #284

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public async Task<IAnonymousProducer> CreateAsync(AnonymousProducerConfiguration
if (configuration == null) throw new ArgumentNullException(nameof(configuration));

cancellationToken.ThrowIfCancellationRequested();
cancellationToken.Register(() => _tcs.TrySetCanceled());
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());

var target = new Target
{
Expand Down
2 changes: 1 addition & 1 deletion src/ActiveMQ.Artemis.Client/Builders/ConnectionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public ConnectionBuilder(ILoggerFactory loggerFactory, Func<IMessageIdPolicy> me
public async Task<IConnection> CreateAsync(Endpoint endpoint, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.Register(() => _tcs.TrySetCanceled());
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());

var connectionFactory = new Amqp.ConnectionFactory();
try
Expand Down
2 changes: 1 addition & 1 deletion src/ActiveMQ.Artemis.Client/Builders/ConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async Task<IConsumer> CreateAsync(ConsumerConfiguration configuration, Ca
CheckConfiguration(configuration);

cancellationToken.ThrowIfCancellationRequested();
cancellationToken.Register(() => _tcs.TrySetCanceled());
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());

var source = new Source
{
Expand Down
2 changes: 1 addition & 1 deletion src/ActiveMQ.Artemis.Client/Builders/ProducerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Task<IProducer> CreateAsync(ProducerConfiguration configuration, Ca
if (string.IsNullOrWhiteSpace(configuration.Address)) throw new ArgumentNullException(nameof(configuration.Address), "The address cannot be empty.");

cancellationToken.ThrowIfCancellationRequested();
cancellationToken.Register(() => _tcs.TrySetCanceled());
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());

var target = new Target
{
Expand Down
9 changes: 5 additions & 4 deletions src/ActiveMQ.Artemis.Client/Builders/RpcClientBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public async Task<RpcClient> CreateAsync(string address, CancellationToken cance

private async Task<SenderLink> CreateSenderLink(string address, CancellationToken cancellationToken)
{
var tcs = TaskUtil.CreateTaskCompletionSource<bool>(cancellationToken);
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<bool>(ref cancellationToken);
using var _ = ctr;
var senderLink = new SenderLink(_session, Guid.NewGuid().ToString(), new Target
{
Address = address
Expand All @@ -36,7 +37,6 @@ private async Task<SenderLink> CreateSenderLink(string address, CancellationToke
senderLink.Closed -= OnClosed;
return senderLink;


void OnAttached(ILink link, Attach attach)
{
if (attach != null)
Expand All @@ -56,7 +56,8 @@ void OnClosed(IAmqpObject sender, Error error)

private async Task<(ReceiverLink receiverLink, string address)> CreateReceiverLink(CancellationToken cancellationToken)
{
var tcs = TaskUtil.CreateTaskCompletionSource<string>(cancellationToken);
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<string>(ref cancellationToken);
using var _ = ctr;
var receiverLink = new ReceiverLink(_session, Guid.NewGuid().ToString(), new Source
{
Dynamic = true
Expand All @@ -68,7 +69,7 @@ void OnClosed(IAmqpObject sender, Error error)

void OnAttached(ILink link, Attach attach)
{
if (attach != null && attach.Source is Source source)
if (attach is { Source: Source source })
{
tcs.TrySetResult(source.Address);
}
Expand Down
2 changes: 1 addition & 1 deletion src/ActiveMQ.Artemis.Client/Builders/SessionBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public SessionBuilder(Amqp.Connection connection)
public async Task<Session> CreateAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.Register(() => _tcs.TrySetCanceled());
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());

var begin = new Begin
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public TransactionCoordinatorBuilder(Session session)
public async Task<TransactionCoordinator> CreateAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.Register(() => _tcs.TrySetCanceled());
using var _ = cancellationToken.Register(() => _tcs.TrySetCanceled());

var attach = new Attach
{
Expand Down
22 changes: 15 additions & 7 deletions src/ActiveMQ.Artemis.Client/InternalUtilities/TaskUtil.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace ActiveMQ.Artemis.Client.InternalUtilities
{
internal static class TaskUtil
{
public static TaskCompletionSource<T> CreateTaskCompletionSource<T>(CancellationToken cancellationToken)
public static (TaskCompletionSource<T> tcs, CancellationTokenRegistration ctr) CreateTaskCompletionSource<T>(ref CancellationToken cancellationToken)
{
return CreateTaskCompletionSource<T>(ref cancellationToken, null);
}

public static (TaskCompletionSource<T> tcs, CancellationTokenRegistration ctr) CreateTaskCompletionSource<T>(ref CancellationToken cancellationToken, Action cleanup)
{
var tcs = new TaskCompletionSource<T>(TaskCreationOptions.RunContinuationsAsynchronously);
if (cancellationToken != default)
var ctr = cancellationToken != default ? cancellationToken.Register(() =>
{
cancellationToken.Register(() => tcs.TrySetCanceled());
}

return tcs;
if (tcs.TrySetCanceled())
{
cleanup?.Invoke();
}
}) : default;
return (tcs, ctr);
}
}
}
4 changes: 3 additions & 1 deletion src/ActiveMQ.Artemis.Client/Management/RpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public async Task<Message> SendAsync(Message message, CancellationToken cancella
message.SetCorrelationId(correlationId);
message.Properties.ReplyTo = _replyToAddress;

var tcs = TaskUtil.CreateTaskCompletionSource<Message>(cancellationToken);
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<Message>(ref cancellationToken);
using var cancellationTokenRegistration = ctr;
cancellationToken.Register(() => tcs.TrySetCanceled());
try
{
_pendingRequests.TryAdd(correlationId, tcs);
Expand Down
9 changes: 3 additions & 6 deletions src/ActiveMQ.Artemis.Client/ProducerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,11 @@ protected async Task SendInternalAsync(string address, RoutingType? routingType,

var txnId = await _transactionsManager.GetTxnIdAsync(transaction, cancellationToken).ConfigureAwait(false);
var transactionalState = txnId != null ? new TransactionalState { TxnId = txnId } : null;
var tcs = TaskUtil.CreateTaskCompletionSource<bool>(cancellationToken);
cancellationToken.Register(() =>
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<bool>(ref cancellationToken, () =>
{
if (tcs.TrySetCanceled())
{
_senderLink.Cancel(message.InnerMessage);
}
_senderLink.Cancel(message.InnerMessage);
});
using var _ = ctr;
message.DurabilityMode ??= _configuration.MessageDurabilityMode ?? DurabilityMode.Durable;
Send(address, routingType, message, transactionalState, _onOutcome, tcs);
await tcs.Task.ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@ public TransactionCoordinator(SenderLink senderLink)
_senderLink = senderLink;
}

public Task<byte[]> DeclareAsync(CancellationToken cancellationToken)
public async Task<byte[]> DeclareAsync(CancellationToken cancellationToken)
{
var message = new Amqp.Message(new Declare());
var tcs = TaskUtil.CreateTaskCompletionSource<byte[]>(cancellationToken);
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<byte[]>(ref cancellationToken);
using var _ = ctr;
_senderLink.Send(message, null, _onDeclareOutcome, tcs);
return tcs.Task;
return await tcs.Task;
}

public Task DischargeAsync(byte[] txnId, bool fail, CancellationToken cancellationToken)
public async Task DischargeAsync(byte[] txnId, bool fail, CancellationToken cancellationToken)
{
var message = new Amqp.Message(new Discharge { TxnId = txnId, Fail = fail });
var tcs = TaskUtil.CreateTaskCompletionSource<bool>(cancellationToken);
var (tcs, ctr) = TaskUtil.CreateTaskCompletionSource<bool>(ref cancellationToken);
using var _ = ctr;
_senderLink.Send(message, null, _onDischargeOutcome, tcs);
return tcs.Task;
await tcs.Task;
}

private static void OnDeclareOutcome(ILink link, Amqp.Message message, Outcome outcome, object state)
Expand Down
4 changes: 2 additions & 2 deletions test/ActiveMQ.Artemis.Client.UnitTests/ActiveMQNetSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected static async Task DisposeHostAndWaitUntilConnectionNotified(TestContai
{
var tcs = new TaskCompletionSource<bool>();
using var cts = new CancellationTokenSource(Timeout);
cts.Token.Register(() => tcs.TrySetCanceled());
await using var _ = cts.Token.Register(() => tcs.TrySetCanceled());
connection.ConnectionClosed += (_, _) =>
{
tcs.TrySetResult(true);
Expand All @@ -99,7 +99,7 @@ protected static async Task WaitUntilConnectionRecovered(IConnection connection)
{
var tcs = new TaskCompletionSource<bool>();
using var cts = new CancellationTokenSource(Timeout);
cts.Token.Register(() => tcs.TrySetCanceled());
await using var _ = cts.Token.Register(() => tcs.TrySetCanceled());
connection.ConnectionRecovered += (_, _) =>
{
tcs.TrySetResult(true);
Expand Down