Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate supported options and update docs #12

Merged
merged 1 commit into from Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
92 changes: 58 additions & 34 deletions lib/broadway_rabbitmq/amqp_client.ex
Expand Up @@ -13,14 +13,24 @@ defmodule BroadwayRabbitmq.AmqpClient do
@behaviour BroadwayRabbitmq.RabbitmqClient

@default_prefetch_count 50
@supported_options [
:queue,
:connection,
:declare,
:qos,
:backoff_min,
:backoff_max,
:backoff_type
]

@impl true
def init(opts) do
with {:ok, queue_name} <- validate(opts, :queue_name),
with {:ok, opts} <- validate_supported_opts(opts, "Broadway", @supported_options),
{:ok, queue} <- validate(opts, :queue),
{:ok, conn_opts} <- validate_conn_opts(opts),
{:ok, declare_opts} <- validate_declare_opts(opts),
{:ok, qos_opts} <- validate_qos_opts(opts) do
{:ok, queue_name,
{:ok, queue,
%{
connection: conn_opts,
declare: declare_opts,
Expand All @@ -30,10 +40,10 @@ defmodule BroadwayRabbitmq.AmqpClient do
end

@impl true
def setup_channel(queue_name, config) do
def setup_channel(queue, config) do
with {:ok, conn} <- Connection.open(config.connection),
{:ok, channel} <- Channel.open(conn),
{:ok, _} <- Queue.declare(channel, queue_name, config.declare),
{:ok, _} <- Queue.declare(channel, queue, config.declare),
:ok <- Basic.qos(channel, config.qos) do
{:ok, channel}
end
Expand All @@ -50,8 +60,8 @@ defmodule BroadwayRabbitmq.AmqpClient do
end

@impl true
def consume(channel, queue_name) do
{:ok, consumer_tag} = Basic.consume(channel, queue_name)
def consume(channel, queue) do
{:ok, consumer_tag} = Basic.consume(channel, queue)
consumer_tag
end

Expand All @@ -73,8 +83,8 @@ defmodule BroadwayRabbitmq.AmqpClient do
validate_option(key, opts[key] || default)
end

defp validate_option(:queue_name, value) when not is_binary(value) or value == "",
do: validation_error(:queue_name, "a non empty string", value)
defp validate_option(:queue, value) when not is_binary(value) or value == "",
do: validation_error(:queue, "a non empty string", value)

defp validate_option(_, value), do: {:ok, value}

Expand All @@ -83,37 +93,51 @@ defmodule BroadwayRabbitmq.AmqpClient do
end

defp validate_conn_opts(opts) do
{:ok, opts[:connection] || []}
# TODO: validate options
# :username,
# :password,
# :virtual_host,
# :host,
# :port,
# :channel_max,
# :frame_max,
# :heartbeat,
# :connection_timeout,
# :ssl_options,
# :client_properties,
# :socket_options
group = :connection
conn_opts = opts[group] || []

supported = [
:username,
:password,
:virtual_host,
:host,
:port,
:channel_max,
:frame_max,
:heartbeat,
:connection_timeout,
:ssl_options,
:client_properties,
:socket_options
]

validate_supported_opts(conn_opts, group, supported)
end

defp validate_declare_opts(opts) do
{:ok, opts[:declare] || []}
# TODO: validate options
# :durable,
# :auto_delete,
# :exclusive,
# :passive
group = :declare
declare_opts = opts[group] || []
supported = [:durable, :auto_delete, :exclusive, :passive]
validate_supported_opts(declare_opts, group, supported)
end

defp validate_qos_opts(opts) do
qos = Keyword.put_new(opts[:qos] || [], :prefetch_count, @default_prefetch_count)
{:ok, qos}
# TODO: validate options
# :prefetch_size,
# :prefetch_count,
# :global (don't use it. It doesn't make any difference with the current implementation)
group = :qos
qos_opts = opts[group] || []
supported = [:prefetch_size, :prefetch_count]

qos_opts
|> Keyword.put_new(:prefetch_count, @default_prefetch_count)
|> validate_supported_opts(group, supported)
end

defp validate_supported_opts(opts, group_name, supported_opts) do
opts
|> Keyword.keys()
|> Enum.reject(fn k -> k in supported_opts end)
|> case do
[] -> {:ok, opts}
keys -> {:error, "Unsupported options #{inspect(keys)} for #{inspect(group_name)}"}
end
end
end
93 changes: 93 additions & 0 deletions lib/broadway_rabbitmq/producer.ex
@@ -1,4 +1,97 @@
defmodule BroadwayRabbitmq.Producer do
@moduledoc """
A RabbitMQ producer for Broadway.

## Features

* Automatically acknowledges/rejects messages.
* Handles connection outages using backoff for retries.

## Options

* `:queue` - Required. The name of the queue.
* `:connection` - Optional. Defines a set of options used by the RabbitMQ
client to open the connection with the RabbitMQ broker. See
`AMQP.Connection.open/1` (documentation)[https://hexdocs.pm/amqp/AMQP.Connection.html#open/1]
for the full list of options.
* `:declare` - Optional. Defines a set of options used by the RabbitMQ client to
declare the queue. See `AMQP.Queue.declare/3`
(documentation)[https://hexdocs.pm/amqp/AMQP.Queue.html#declare/3]
for the full list of options.
* `:qos` - Optional. Defines a set of prefetch options used by the RabbitMQ client.
See `AMQP.Basic.qos/2`
(documentation)[https://hexdocs.pm/amqp/AMQP.Basic.html#qos/2] for the full
list of options. Pay attention that the `:global` option is not supported
by Broadway since each producer holds only one channel per connection.
* `:backoff_min` - The minimum backoff interval (default: `1_000`)
* `:backoff_max` - The maximum backoff interval (default: `30_000`)
* `:backoff_type` - The backoff strategy, `:stop` for no backoff and
to stop, `:exp` for exponential, `:rand` for random and `:rand_exp` for
random exponential (default: `:rand_exp`)

## Example

Broadway.start_link(MyBroadway,
name: MyBroadway,
producers: [
default: [
module:
{BroadwayRabbitmq.Producer,
queue: "my_queue",
connection: [
username: "user",
password: "password",
host: "192.168.0.10"
],
declare: [
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, "my_queue_error"}
]
]
qos: [
prefetch_count: 50
]},
stages: 5
]
],
processors: [
default: []
]
)

## Back-pressure and `:prefetch_count`

Unlike the RabittMQ client that has a default `:prefetch_count` = 0,
which disables back-pressure, BroadwayRabbitMQ overwrite the default
value to `50` enabling the back-pressure mechanism. You can still define
it as `0`, however, if you do this, make sure the machine has enough
resources to handle the number of messages coming from the broker.

This is important because the BroadwayRabbitMQ producer does not work
as a poller like BroadwaySQS. Instead, it maintains an active connection
with a subscribed consumer that receives messages continuously as they
arrive in the queue. This is more efficient than using the `basic.get`
method, however, it removes the ability of the GenStage producer to control
the demand. Therefore we need to use the `:prefetch_count` option to
impose back-pressure at the channel level.

## Connection loss and backoff

In case the connection cannot be opened or if a stablished connection is lost,
the producer will try to reconnect using an exponential random backoff strategy.
The strategy can be configured using the `:backoff_type` option.

## Unsupported options

Currently, Broadway does not accept options for `Basic.consume/4` which
is called internally by the producer with default values. That means options
like `:no_ack` are not supported. If you have a scenario where you need to
customize those options, please open an issue, so we can consider adding this
feature.
"""

use GenStage

require Logger
Expand Down
87 changes: 75 additions & 12 deletions test/broadway_rabbitmq/ampq_client_test.exs
Expand Up @@ -3,24 +3,87 @@ defmodule BroadwayRabbitmq.AmqpClientTest do

alias BroadwayRabbitmq.AmqpClient

test "default options" do
assert AmqpClient.init(queue: "queue") ==
{:ok, "queue", %{connection: [], declare: [], qos: [prefetch_count: 50]}}
end

describe "validate init options" do
test ":queue_name is required" do
assert AmqpClient.init([]) ==
{:error, "expected :queue_name to be a non empty string, got: nil"}
test "supported options" do
connection = [
username: nil,
password: nil,
virtual_host: nil,
host: nil,
port: nil,
channel_max: nil,
frame_max: nil,
heartbeat: nil,
connection_timeout: nil,
ssl_options: nil,
client_properties: nil,
socket_options: nil
]

declare = [
durable: nil,
auto_delete: nil,
exclusive: nil,
passive: nil
]

qos = [
prefetch_size: nil,
prefetch_count: nil
]

options = [
queue: "queue",
connection: connection,
declare: declare,
qos: qos
]

assert AmqpClient.init(options) ==
{:ok, "queue", %{connection: connection, declare: declare, qos: qos}}
end

test "unsupported options for Broadway" do
assert AmqpClient.init(queue: "queue", option_1: 1, option_2: 2) ==
{:error, "Unsupported options [:option_1, :option_2] for \"Broadway\""}
end

test "unsupported options for :connection" do
assert AmqpClient.init(queue: "queue", connection: [option_1: 1, option_2: 2]) ==
{:error, "Unsupported options [:option_1, :option_2] for :connection"}
end

test "unsupported options for :declare" do
assert AmqpClient.init(queue: "queue", declare: [option_1: 1, option_2: 2]) ==
{:error, "Unsupported options [:option_1, :option_2] for :declare"}
end

test "unsupported options for :qos" do
assert AmqpClient.init(queue: "queue", qos: [option_1: 1, option_2: 2]) ==
{:error, "Unsupported options [:option_1, :option_2] for :qos"}
end

test ":queue is required" do
assert AmqpClient.init([]) == {:error, "expected :queue to be a non empty string, got: nil"}

assert AmqpClient.init(queue_name: nil) ==
{:error, "expected :queue_name to be a non empty string, got: nil"}
assert AmqpClient.init(queue: nil) ==
{:error, "expected :queue to be a non empty string, got: nil"}
end

test ":queue_name should be a non empty string" do
assert AmqpClient.init(queue_name: "") ==
{:error, "expected :queue_name to be a non empty string, got: \"\""}
test ":queue should be a non empty string" do
assert AmqpClient.init(queue: "") ==
{:error, "expected :queue to be a non empty string, got: \"\""}

assert AmqpClient.init(queue_name: :an_atom) ==
{:error, "expected :queue_name to be a non empty string, got: :an_atom"}
assert AmqpClient.init(queue: :an_atom) ==
{:error, "expected :queue to be a non empty string, got: :an_atom"}

{:ok, queue_name, _} = AmqpClient.init(queue_name: "my_queue")
assert queue_name == "my_queue"
{:ok, queue, _} = AmqpClient.init(queue: "my_queue")
assert queue == "my_queue"
end
end
end
19 changes: 7 additions & 12 deletions test/broadway_rabbitmq/producer_test.exs
Expand Up @@ -33,11 +33,11 @@ defmodule BroadwayRabbitmq.ProducerTest do

@impl true
def init(opts) do
{:ok, opts[:queue_name], opts}
{:ok, opts[:queue], opts}
end

@impl true
def setup_channel(_queue_name, config) do
def setup_channel(_queue, config) do
test_pid = config[:test_pid]

status =
Expand Down Expand Up @@ -72,7 +72,7 @@ defmodule BroadwayRabbitmq.ProducerTest do
end

@impl true
def consume(_channel, _queue_name) do
def consume(_channel, _queue) do
:fake_consumer_tag
end

Expand Down Expand Up @@ -127,9 +127,9 @@ defmodule BroadwayRabbitmq.ProducerTest do
test "raise an ArgumentError with proper message when client options are invalid" do
assert_raise(
ArgumentError,
"invalid options given to BroadwayRabbitmq.AmqpClient.init/1, expected :queue_name to be a non empty string, got: nil",
"invalid options given to BroadwayRabbitmq.AmqpClient.init/1, expected :queue to be a non empty string, got: nil",
fn ->
BroadwayRabbitmq.Producer.init(queue_name: nil)
BroadwayRabbitmq.Producer.init(queue: nil)
end
)
end
Expand All @@ -139,19 +139,14 @@ defmodule BroadwayRabbitmq.ProducerTest do
ArgumentError,
"unknown type :unknown_type",
fn ->
BroadwayRabbitmq.Producer.init(queue_name: "test", backoff_type: :unknown_type)
BroadwayRabbitmq.Producer.init(queue: "test", backoff_type: :unknown_type)
end
)
end

test "defaut :prefetch_count is 50" do
{:producer, state, _} = BroadwayRabbitmq.Producer.init(queue_name: "test")
assert state[:config][:qos][:prefetch_count] == 50
end

test "producer :buffer_size is :prefetch_count * 5" do
qos = [prefetch_count: 12]
{:producer, _, options} = BroadwayRabbitmq.Producer.init(queue_name: "test", qos: qos)
{:producer, _, options} = BroadwayRabbitmq.Producer.init(queue: "test", qos: qos)

assert options[:buffer_size] == 60
end
Expand Down