Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve NRE with AmpqConnection #1891

Merged
merged 2 commits into from
Mar 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 19 additions & 10 deletions src/Amqp/Akka.Streams.Amqp.RabbitMq/AmqpConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ public static IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings s
factory.HostName = details.HostAndPortList[0].host;
factory.Port = details.HostAndPortList[0].port;
}

if (details.ClientProvidedName != null)
{
factory.ClientProvidedName = details.ClientProvidedName;
}

if (details.Credentials.HasValue)
{
factory.UserName = details.Credentials.Value.Username;
factory.Password = details.Credentials.Value.Password;
}

if (!string.IsNullOrEmpty(details.VirtualHost))
factory.VirtualHost = details.VirtualHost;
if (details.Ssl != null)
Expand All @@ -56,6 +59,7 @@ public static IConnectionFactory ConnectionFactoryFrom(IAmqpConnectionSettings s
//leave it be as is
break;
}

return factory;
}

Expand All @@ -66,10 +70,14 @@ public static IConnection NewConnection(IConnectionFactory factory, IAmqpConnect
case AmqpConnectionDetails details:
{
if (details.HostAndPortList.Count == 0)
throw new ArgumentException("You need to supply at least one host/port pair.", nameof(settings));
throw new ArgumentException("You need to supply at least one host/port pair.",
nameof(settings));

return factory.CreateConnection(details.HostAndPortList
.Select(pair => new AmqpTcpEndpoint(pair.host, pair.port, details.Ssl)).ToList());
.Select(pair =>
details.Ssl == null
? new AmqpTcpEndpoint(pair.host, pair.port)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used a ternary operator to determine if we have SSL settings or not - use the appropriate CTOR depending.

: new AmqpTcpEndpoint(pair.host, pair.port, details.Ssl)).ToList());
}
default:
return factory.CreateConnection();
Expand All @@ -86,10 +94,9 @@ internal abstract class AmqpConnectorLogic : GraphStageLogic
protected IModel Channel;
protected Action<ShutdownEventArgs> ShutdownCallback;

protected AmqpConnectorLogic(Shape shape)
protected AmqpConnectorLogic(Shape shape)
: base(shape)
{

}

public abstract IAmqpConnectorSettings Settings { get; }
Expand Down Expand Up @@ -124,18 +131,19 @@ public override void PreStart()
case QueueDeclaration queueDeclaration:
Channel.QueueDeclare(queueDeclaration.Name, queueDeclaration.Durable,
queueDeclaration.Exclusive,
queueDeclaration.AutoDelete, queueDeclaration.Arguments.ToDictionary(key=> key.Key,val=> val.Value));
queueDeclaration.AutoDelete,
queueDeclaration.Arguments.ToDictionary(key => key.Key, val => val.Value));
break;
case BindingDeclaration bindingDeclaration:
Channel.QueueBind(bindingDeclaration.Queue, bindingDeclaration.Exchange,
bindingDeclaration.RoutingKey ?? "", bindingDeclaration.Arguments.ToDictionary(key => key.Key, val => val.Value));
bindingDeclaration.RoutingKey ?? "",
bindingDeclaration.Arguments.ToDictionary(key => key.Key, val => val.Value));
break;
case ExchangeDeclaration exchangeDeclaration:
Channel.ExchangeDeclare(exchangeDeclaration.Name, exchangeDeclaration.ExchangeType,
exchangeDeclaration.Durable, exchangeDeclaration.AutoDelete,
exchangeDeclaration.Arguments.ToDictionary(key => key.Key, val => val.Value));
break;

}
}

Expand Down Expand Up @@ -165,18 +173,19 @@ public override void PostStop()
{
if (Channel != null)
{
if(Channel.IsOpen)
if (Channel.IsOpen)
Channel.Close();
Channel.ModelShutdown -= OnChannelShutdown;
Channel = null;
}

if (Connection != null)
{
if(Connection.IsOpen)
if (Connection.IsOpen)
Connection.Close();
Connection.ConnectionShutdown -= OnConnectionShutdown;
Connection = null;
}
}
}
}
}