Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AMQP retry mechanism and example projects #312

Merged
merged 7 commits into from Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 18 additions & 1 deletion src/Alpakka.sln
Expand Up @@ -110,7 +110,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{DF4BD2BE-9
..\docs\web.config = ..\docs\web.config
EndProjectSection
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Streams.Amqp.RabbitMq.Tests", "Amqp\Akka.Streams.Amqp.RabbitMq.Tests\Akka.Streams.Amqp.RabbitMq.Tests.csproj", "{9CD223E4-CB08-4043-8687-16FE139310B4}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Streams.Amqp.RabbitMq.Tests", "Amqp\Akka.Streams.Amqp.RabbitMq.Tests\Akka.Streams.Amqp.RabbitMq.Tests.csproj", "{9CD223E4-CB08-4043-8687-16FE139310B4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Examples", "Examples", "{D2FE5AD5-7AF4-4749-82C6-5E1CAD4D0AAF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amqp.V1.Source", "Amqp\Examples\Amqp.V1.Source\Amqp.V1.Source.csproj", "{33CE5A75-2792-48F1-8F19-12473A531926}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amqp.V1.Sink", "Amqp\Examples\Amqp.V1.Sink\Amqp.V1.Sink.csproj", "{80039FDA-5979-4AA2-B380-08AEE3C6EF26}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -226,6 +232,14 @@ Global
{9CD223E4-CB08-4043-8687-16FE139310B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9CD223E4-CB08-4043-8687-16FE139310B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9CD223E4-CB08-4043-8687-16FE139310B4}.Release|Any CPU.Build.0 = Release|Any CPU
{33CE5A75-2792-48F1-8F19-12473A531926}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{33CE5A75-2792-48F1-8F19-12473A531926}.Debug|Any CPU.Build.0 = Debug|Any CPU
{33CE5A75-2792-48F1-8F19-12473A531926}.Release|Any CPU.ActiveCfg = Release|Any CPU
{33CE5A75-2792-48F1-8F19-12473A531926}.Release|Any CPU.Build.0 = Release|Any CPU
{80039FDA-5979-4AA2-B380-08AEE3C6EF26}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{80039FDA-5979-4AA2-B380-08AEE3C6EF26}.Debug|Any CPU.Build.0 = Debug|Any CPU
{80039FDA-5979-4AA2-B380-08AEE3C6EF26}.Release|Any CPU.ActiveCfg = Release|Any CPU
{80039FDA-5979-4AA2-B380-08AEE3C6EF26}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -266,6 +280,9 @@ Global
{32584532-EF44-45E2-9E43-CFE13609785D} = {9A8B3B60-F75D-4906-B368-739CA08C25D4}
{DF4BD2BE-9280-481E-A155-271F68A291E0} = {EEA93301-F97B-49F0-BB08-33F7CC5A24BB}
{9CD223E4-CB08-4043-8687-16FE139310B4} = {9A8B3B60-F75D-4906-B368-739CA08C25D4}
{D2FE5AD5-7AF4-4749-82C6-5E1CAD4D0AAF} = {9A8B3B60-F75D-4906-B368-739CA08C25D4}
{33CE5A75-2792-48F1-8F19-12473A531926} = {D2FE5AD5-7AF4-4749-82C6-5E1CAD4D0AAF}
{80039FDA-5979-4AA2-B380-08AEE3C6EF26} = {D2FE5AD5-7AF4-4749-82C6-5E1CAD4D0AAF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {BE1A9436-C6F6-4C73-9EB4-32DEC9C841A5}
Expand Down
@@ -0,0 +1,7 @@
{
"profiles": {
"Akka.Streams.Amqp.V1.Tests": {
"commandName": "Project"
}
}
}
79 changes: 79 additions & 0 deletions src/Amqp/Akka.Streams.Amqp.V1/AddressSinkSettings.cs
@@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Akka.Serialization;
using Amqp;
using Amqp.Framing;
using Amqp.Types;

namespace Akka.Streams.Amqp.V1
{
public class AddressSinkSettings<T> : IAmqpSinkSettings<T>
{
private readonly string _linkName;
private readonly string _queueName;
private readonly Serializer _serializer;
private readonly Address _address;
private readonly object _lock = new object();

private Connection _connection;
private Session _session;

public bool ManageConnection => true;

public AddressSinkSettings(
Address address,
string linkName,
string queueName,
Serializer serializer)
{
_address = address;
_linkName = linkName;
_queueName = queueName;
_serializer = serializer;
}

public byte[] GetBytes(T obj)
{
return _serializer.ToBinary(obj);
}

public void CloseConnection()
{
_session?.Close();
_connection?.Close();

_session = null;
_connection = null;
}

public async Task CloseConnectionAsync()
{
if(_session != null)
await _session.CloseAsync();

if(_connection != null)
await _connection.CloseAsync();
}

public SenderLink GetSenderLink()
{
if (_connection == null || _connection.IsClosed)
_connection = new Connection(_address);

if (_session == null || _session.IsClosed)
_session = new Session(_connection);

return new SenderLink(
_session,
_linkName,
new Target
{
Address = _queueName,
Capabilities = new[] { new Symbol("queue") }
},
null);
}
}
}
79 changes: 79 additions & 0 deletions src/Amqp/Akka.Streams.Amqp.V1/AddressSourceSettings.cs
@@ -0,0 +1,79 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Akka.Serialization;
using Amqp;
using Amqp.Framing;
using Amqp.Types;

namespace Akka.Streams.Amqp.V1
{
public class AddressSourceSettings<T> : IAmqpSourceSettings<T>
{
private readonly string _linkName;
private readonly string _queueName;
private readonly Serializer _serializer;

public int Credit { get; }
private readonly Address _address;
private Connection _connection;
private Session _session;

public bool ManageConnection => true;

public AddressSourceSettings(
Address address,
string linkName,
string queueName,
int credit,
Serializer serializer)
{
_address = address;
_linkName = linkName;
_queueName = queueName;
Credit = credit;
_serializer = serializer;
}

public T Convert(Message message)
{
var bString = message.GetBody<byte[]>();
return _serializer.FromBinary<T>(bString);
}

public void CloseConnection()
{
_session?.Close();
_connection?.Close();
}

public async Task CloseConnectionAsync()
{
if (_session != null)
await _session.CloseAsync();

if (_connection != null)
await _connection.CloseAsync();
}

public ReceiverLink GetReceiverLink()
{
if(_connection == null || _connection.IsClosed)
_connection = new Connection(_address);

if (_session == null || _session.IsClosed)
_session = new Session(_connection);

return new ReceiverLink(
_session,
_linkName,
new Source
{
Address = _queueName,
Capabilities = new[] { new Symbol("queue") }
},
null);
}
}
}
59 changes: 56 additions & 3 deletions src/Amqp/Akka.Streams.Amqp.V1/AmqpSinkStage.cs
@@ -1,11 +1,25 @@
using Akka.Streams.Stage;
using System;
using System.Collections.Generic;
using Amqp;
using System.Threading.Tasks;
using Amqp.Framing;

namespace Akka.Streams.Amqp.V1
{
public sealed class AmqpSinkStage<T> : GraphStageWithMaterializedValue<SinkShape<T>, Task>
{
private static readonly Dictionary<int, TimeSpan> RetryInterval =
new Dictionary<int, TimeSpan>()
{
{ 6, TimeSpan.FromMilliseconds(100) },
{ 5, TimeSpan.FromMilliseconds(500) },
{ 4, TimeSpan.FromMilliseconds(1000) },
{ 3, TimeSpan.FromMilliseconds(2000) },
{ 2, TimeSpan.FromMilliseconds(4000) },
{ 1, TimeSpan.FromMilliseconds(8000) },
};

public Inlet<T> In { get; }
public override SinkShape<T> Shape { get; }
public IAmqpSinkSettings<T> AmqpSourceSettings { get; }
Expand All @@ -28,13 +42,15 @@ private class AmqpSinkStageLogic : GraphStageLogic
{
private readonly AmqpSinkStage<T> _stage;
private readonly TaskCompletionSource<Done> _promise;
private readonly SenderLink _sender;
private readonly Action<(IAmqpObject, Error)> _disconnectedCallback;

private SenderLink _sender;

public AmqpSinkStageLogic(AmqpSinkStage<T> amqpSinkStage, TaskCompletionSource<Done> promise, SinkShape<T> shape) : base(shape)
{
_stage = amqpSinkStage;
_promise = promise;
_sender = amqpSinkStage.AmqpSourceSettings.GetSenderLink();
_disconnectedCallback = GetAsyncCallback<(IAmqpObject, Error)>(HandleDisconnection);

SetHandler(
inlet: _stage.In,
Expand All @@ -49,15 +65,52 @@ public AmqpSinkStageLogic(AmqpSinkStage<T> amqpSinkStage, TaskCompletionSource<D
);
}

private async Task Connect()
{
var retry = 7;
var exceptions = new List<Exception>();
while (true)
{
try
{
_sender = _stage.AmqpSourceSettings.GetSenderLink();
_sender.AddClosedCallback((sender, error) => _disconnectedCallback((sender, error)));
Log.Info("Connected to AMQP.V1 server.");
return;
}
catch (Exception e)
{
if (!_stage.AmqpSourceSettings.ManageConnection)
{
throw new ConnectionException(
"Failed to connect to AMQP.V1 server. Could not retry connection because SinkSettings does not manage the Connection object.", e);
}

retry--;
if (retry == 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

throw new AggregateException("Failed to connect to AMQP.V1 server.", exceptions);

exceptions.Add(e);
Log.Error($"[{retry}] more retries to connect to AMQP.V1 server.");
await Task.Delay(RetryInterval[retry]);
}
}
}

private void HandleDisconnection((IAmqpObject sender, Error error) args)
=> FailStage(new DisconnectedException(args.sender, args.error));

public override void PreStart()
{
base.PreStart();

Connect().Wait();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as below about blocking on a really long-running call

Pull(_stage.In);
}

public override void PostStop()
{
_sender.Close();
_sender?.Close();
base.PostStop();
}
}
Expand Down