Skip to content

Commit

Permalink
Add AMQP retry mechanism and example projects (#312)
Browse files Browse the repository at this point in the history
* Add AMQP retry mechanism and example projects

* Fix retry start index

* Make a distinction between internally managed connection reference and not

* Add better exception feedback on connection failure.

* Make settings responsible for closing Session and Connection objects.

* Update examples to show usage of both QueueNamed and Address settings

* Add IsPackable tag in example projects csproj files
  • Loading branch information
Arkatufus committed Dec 18, 2020
1 parent c8d3089 commit 4b62246
Show file tree
Hide file tree
Showing 15 changed files with 629 additions and 13 deletions.
19 changes: 18 additions & 1 deletion src/Alpakka.sln
Expand Up @@ -110,7 +110,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{DF4BD2BE-9
..\docs\web.config = ..\docs\web.config
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Amqp.RabbitMq.Tests", "Amqp\Akka.Streams.Amqp.RabbitMq.Tests\Akka.Streams.Amqp.RabbitMq.Tests.csproj", "{9CD223E4-CB08-4043-8687-16FE139310B4}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Streams.Amqp.RabbitMq.Tests", "Amqp\Akka.Streams.Amqp.RabbitMq.Tests\Akka.Streams.Amqp.RabbitMq.Tests.csproj", "{9CD223E4-CB08-4043-8687-16FE139310B4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{D2FE5AD5-7AF4-4749-82C6-5E1CAD4D0AAF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amqp.V1.Source", "Amqp\Examples\Amqp.V1.Source\Amqp.V1.Source.csproj", "{33CE5A75-2792-48F1-8F19-12473A531926}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amqp.V1.Sink", "Amqp\Examples\Amqp.V1.Sink\Amqp.V1.Sink.csproj", "{80039FDA-5979-4AA2-B380-08AEE3C6EF26}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -226,6 +232,14 @@ Global
{9CD223E4-CB08-4043-8687-16FE139310B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9CD223E4-CB08-4043-8687-16FE139310B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9CD223E4-CB08-4043-8687-16FE139310B4}.Release|Any CPU.Build.0 = Release|Any CPU
{33CE5A75-2792-48F1-8F19-12473A531926}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{33CE5A75-2792-48F1-8F19-12473A531926}.Debug|Any CPU.Build.0 = Debug|Any CPU
{33CE5A75-2792-48F1-8F19-12473A531926}.Release|Any CPU.ActiveCfg = Release|Any CPU
{33CE5A75-2792-48F1-8F19-12473A531926}.Release|Any CPU.Build.0 = Release|Any CPU
{80039FDA-5979-4AA2-B380-08AEE3C6EF26}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{80039FDA-5979-4AA2-B380-08AEE3C6EF26}.Debug|Any CPU.Build.0 = Debug|Any CPU
{80039FDA-5979-4AA2-B380-08AEE3C6EF26}.Release|Any CPU.ActiveCfg = Release|Any CPU
{80039FDA-5979-4AA2-B380-08AEE3C6EF26}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -266,6 +280,9 @@ Global
{32584532-EF44-45E2-9E43-CFE13609785D} = {9A8B3B60-F75D-4906-B368-739CA08C25D4}
{DF4BD2BE-9280-481E-A155-271F68A291E0} = {EEA93301-F97B-49F0-BB08-33F7CC5A24BB}
{9CD223E4-CB08-4043-8687-16FE139310B4} = {9A8B3B60-F75D-4906-B368-739CA08C25D4}
{D2FE5AD5-7AF4-4749-82C6-5E1CAD4D0AAF} = {9A8B3B60-F75D-4906-B368-739CA08C25D4}
{33CE5A75-2792-48F1-8F19-12473A531926} = {D2FE5AD5-7AF4-4749-82C6-5E1CAD4D0AAF}
{80039FDA-5979-4AA2-B380-08AEE3C6EF26} = {D2FE5AD5-7AF4-4749-82C6-5E1CAD4D0AAF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {BE1A9436-C6F6-4C73-9EB4-32DEC9C841A5}
Expand Down
@@ -0,0 +1,7 @@
{
"profiles": {
"Akka.Streams.Amqp.V1.Tests": {
"commandName": "Project"
}
}
}
79 changes: 79 additions & 0 deletions src/Amqp/Akka.Streams.Amqp.V1/AddressSinkSettings.cs
@@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Akka.Serialization;
using Amqp;
using Amqp.Framing;
using Amqp.Types;

namespace Akka.Streams.Amqp.V1
{
public class AddressSinkSettings<T> : IAmqpSinkSettings<T>
{
private readonly string _linkName;
private readonly string _queueName;
private readonly Serializer _serializer;
private readonly Address _address;
private readonly object _lock = new object();

private Connection _connection;
private Session _session;

public bool ManageConnection => true;

public AddressSinkSettings(
Address address,
string linkName,
string queueName,
Serializer serializer)
{
_address = address;
_linkName = linkName;
_queueName = queueName;
_serializer = serializer;
}

public byte[] GetBytes(T obj)
{
return _serializer.ToBinary(obj);
}

public void CloseConnection()
{
_session?.Close();
_connection?.Close();

_session = null;
_connection = null;
}

public async Task CloseConnectionAsync()
{
if(_session != null)
await _session.CloseAsync();

if(_connection != null)
await _connection.CloseAsync();
}

public SenderLink GetSenderLink()
{
if (_connection == null || _connection.IsClosed)
_connection = new Connection(_address);

if (_session == null || _session.IsClosed)
_session = new Session(_connection);

return new SenderLink(
_session,
_linkName,
new Target
{
Address = _queueName,
Capabilities = new[] { new Symbol("queue") }
},
null);
}
}
}
79 changes: 79 additions & 0 deletions src/Amqp/Akka.Streams.Amqp.V1/AddressSourceSettings.cs
@@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Akka.Serialization;
using Amqp;
using Amqp.Framing;
using Amqp.Types;

namespace Akka.Streams.Amqp.V1
{
public class AddressSourceSettings<T> : IAmqpSourceSettings<T>
{
private readonly string _linkName;
private readonly string _queueName;
private readonly Serializer _serializer;

public int Credit { get; }
private readonly Address _address;
private Connection _connection;
private Session _session;

public bool ManageConnection => true;

public AddressSourceSettings(
Address address,
string linkName,
string queueName,
int credit,
Serializer serializer)
{
_address = address;
_linkName = linkName;
_queueName = queueName;
Credit = credit;
_serializer = serializer;
}

public T Convert(Message message)
{
var bString = message.GetBody<byte[]>();
return _serializer.FromBinary<T>(bString);
}

public void CloseConnection()
{
_session?.Close();
_connection?.Close();
}

public async Task CloseConnectionAsync()
{
if (_session != null)
await _session.CloseAsync();

if (_connection != null)
await _connection.CloseAsync();
}

public ReceiverLink GetReceiverLink()
{
if(_connection == null || _connection.IsClosed)
_connection = new Connection(_address);

if (_session == null || _session.IsClosed)
_session = new Session(_connection);

return new ReceiverLink(
_session,
_linkName,
new Source
{
Address = _queueName,
Capabilities = new[] { new Symbol("queue") }
},
null);
}
}
}
59 changes: 56 additions & 3 deletions src/Amqp/Akka.Streams.Amqp.V1/AmqpSinkStage.cs
@@ -1,11 +1,25 @@
using Akka.Streams.Stage;
using System;
using System.Collections.Generic;
using Amqp;
using System.Threading.Tasks;
using Amqp.Framing;

namespace Akka.Streams.Amqp.V1
{
public sealed class AmqpSinkStage<T> : GraphStageWithMaterializedValue<SinkShape<T>, Task>
{
private static readonly Dictionary<int, TimeSpan> RetryInterval =
new Dictionary<int, TimeSpan>()
{
{ 6, TimeSpan.FromMilliseconds(100) },
{ 5, TimeSpan.FromMilliseconds(500) },
{ 4, TimeSpan.FromMilliseconds(1000) },
{ 3, TimeSpan.FromMilliseconds(2000) },
{ 2, TimeSpan.FromMilliseconds(4000) },
{ 1, TimeSpan.FromMilliseconds(8000) },
};

public Inlet<T> In { get; }
public override SinkShape<T> Shape { get; }
public IAmqpSinkSettings<T> AmqpSourceSettings { get; }
Expand All @@ -28,13 +42,15 @@ private class AmqpSinkStageLogic : GraphStageLogic
{
private readonly AmqpSinkStage<T> _stage;
private readonly TaskCompletionSource<Done> _promise;
private readonly SenderLink _sender;
private readonly Action<(IAmqpObject, Error)> _disconnectedCallback;

private SenderLink _sender;

public AmqpSinkStageLogic(AmqpSinkStage<T> amqpSinkStage, TaskCompletionSource<Done> promise, SinkShape<T> shape) : base(shape)
{
_stage = amqpSinkStage;
_promise = promise;
_sender = amqpSinkStage.AmqpSourceSettings.GetSenderLink();
_disconnectedCallback = GetAsyncCallback<(IAmqpObject, Error)>(HandleDisconnection);

SetHandler(
inlet: _stage.In,
Expand All @@ -49,15 +65,52 @@ public AmqpSinkStageLogic(AmqpSinkStage<T> amqpSinkStage, TaskCompletionSource<D
);
}

private async Task Connect()
{
var retry = 7;
var exceptions = new List<Exception>();
while (true)
{
try
{
_sender = _stage.AmqpSourceSettings.GetSenderLink();
_sender.AddClosedCallback((sender, error) => _disconnectedCallback((sender, error)));
Log.Info("Connected to AMQP.V1 server.");
return;
}
catch (Exception e)
{
if (!_stage.AmqpSourceSettings.ManageConnection)
{
throw new ConnectionException(
"Failed to connect to AMQP.V1 server. Could not retry connection because SinkSettings does not manage the Connection object.", e);
}

retry--;
if (retry == 0)
throw new AggregateException("Failed to connect to AMQP.V1 server.", exceptions);

exceptions.Add(e);
Log.Error($"[{retry}] more retries to connect to AMQP.V1 server.");
await Task.Delay(RetryInterval[retry]);
}
}
}

private void HandleDisconnection((IAmqpObject sender, Error error) args)
=> FailStage(new DisconnectedException(args.sender, args.error));

public override void PreStart()
{
base.PreStart();

Connect().Wait();
Pull(_stage.In);
}

public override void PostStop()
{
_sender.Close();
_sender?.Close();
base.PostStop();
}
}
Expand Down

0 comments on commit 4b62246

Please sign in to comment.