Skip to content

Commit

Permalink
Add support for using Registry's via tuple (#239)
Browse files Browse the repository at this point in the history
  • Loading branch information
acco committed Jun 1, 2021
1 parent 367d315 commit 5174ff0
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 78 deletions.
50 changes: 34 additions & 16 deletions lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ defmodule Broadway do
Returned by `start_link/2`.
"""
@type on_start() :: {:ok, pid()} | :ignore | {:error, {:already_started, pid()} | term()}
@type name :: atom() | {:via, module(), term()}

@doc """
Invoked for preparing messages before handling (if defined).
Expand Down Expand Up @@ -843,6 +844,9 @@ defmodule Broadway do

@optional_callbacks prepare_messages: 2, handle_batch: 4, handle_failed: 2

defguardp is_broadway_name(name)
when is_atom(name) or (is_tuple(name) and tuple_size(name) == 3)

@doc false
defmacro __using__(opts) do
quote location: :keep, bind_quoted: [opts: opts, module: __CALLER__.module] do
Expand All @@ -859,7 +863,19 @@ defmodule Broadway do
Supervisor.child_spec(default, unquote(Macro.escape(opts)))
end

defoverridable child_spec: 1
@callback process_name(Broadway.name(), base_name :: String.t()) :: Broadway.name()
def process_name(broadway_name, base_name) when is_atom(broadway_name) do
:"#{broadway_name}.Broadway.#{base_name}"
end

def process_name(broadway_name, _base_name) do
raise ArgumentError, """
Expected Broadway to be started with a `name` of type atom, got: #{inspect(broadway_name)}.
If starting Broadway with a `name` that is not an atom, you must define process_name/2 in the module which uses Broadway.
"""
end

defoverridable child_spec: 1, process_name: 2
end
end

Expand Down Expand Up @@ -926,8 +942,8 @@ defmodule Broadway do
[MyBroadway.Producer_0, MyBroadway.Producer_1, ..., MyBroadway.Producer_7]
"""
@spec producer_names(broadway :: atom()) :: [atom()]
def producer_names(broadway) when is_atom(broadway) do
@spec producer_names(name()) :: [name()]
def producer_names(broadway) when is_broadway_name(broadway) do
Topology.producer_names(broadway)
end

Expand Down Expand Up @@ -967,7 +983,7 @@ defmodule Broadway do
]
"""
@spec topology(broadway :: atom()) :: [
@spec topology(broadway :: name()) :: [
{atom(),
[
%{
Expand All @@ -977,7 +993,7 @@ defmodule Broadway do
}
]}
]
def topology(broadway) when is_atom(broadway) do
def topology(broadway) when is_broadway_name(broadway) do
Topology.topology(broadway)
end

Expand All @@ -998,8 +1014,8 @@ defmodule Broadway do
The producer is randomly chosen among all sets of producers/stages.
This is used to send out of band data to a Broadway pipeline.
"""
@spec push_messages(broadway :: atom(), messages :: [Message.t()]) :: :ok
def push_messages(broadway, messages) when is_atom(broadway) and is_list(messages) do
@spec push_messages(broadway :: name(), messages :: [Message.t()]) :: :ok
def push_messages(broadway, messages) when is_broadway_name(broadway) and is_list(messages) do
broadway
|> producer_names()
|> Enum.random()
Expand Down Expand Up @@ -1050,8 +1066,9 @@ defmodule Broadway do
Note that messages sent using this function will ignore demand and :transform
option specified in :producer option in `Broadway.start_link/2`.
"""
@spec test_message(broadway :: atom(), term, opts :: Keyword.t()) :: reference
def test_message(broadway, data, opts \\ []) when is_list(opts) do
@spec test_message(broadway :: name(), term, opts :: Keyword.t()) :: reference
def test_message(broadway, data, opts \\ [])
when is_broadway_name(broadway) and is_list(opts) do
test_messages(broadway, [data], :flush, opts)
end

Expand Down Expand Up @@ -1100,12 +1117,13 @@ defmodule Broadway do
Note that messages sent using this function will ignore demand and :transform
option specified in :producer option in `Broadway.start_link/2`.
"""
@spec test_batch(broadway :: atom(), data :: [term], opts :: Keyword.t()) :: reference
def test_batch(broadway, batch_data, opts \\ []) when is_list(batch_data) and is_list(opts) do
@spec test_batch(broadway :: name(), data :: [term], opts :: Keyword.t()) :: reference
def test_batch(broadway, batch_data, opts \\ [])
when is_broadway_name(broadway) and is_list(batch_data) and is_list(opts) do
test_messages(broadway, batch_data, Keyword.get(opts, :batch_mode, :bulk), opts)
end

defp test_messages(broadway, data, batch_mode, opts) do
defp test_messages(broadway, data, batch_mode, opts) when is_broadway_name(broadway) do
metadata = Map.new(Keyword.get(opts, :metadata, []))

acknowledger =
Expand Down Expand Up @@ -1145,13 +1163,13 @@ defmodule Broadway do
"""
@doc since: "0.6.0"
@spec get_rate_limiting(server :: atom()) ::
@spec get_rate_limiting(server :: name()) ::
{:ok, rate_limiting_info} | {:error, :rate_limiting_not_enabled}
when rate_limiting_info: %{
required(:interval) => non_neg_integer(),
required(:allowed_messages) => non_neg_integer()
}
def get_rate_limiting(broadway) when is_atom(broadway) do
def get_rate_limiting(broadway) when is_broadway_name(broadway) do
with {:ok, rate_limiter_name} <- Topology.get_rate_limiter(broadway) do
{:ok, Topology.RateLimiter.get_rate_limiting(rate_limiter_name)}
end
Expand All @@ -1175,9 +1193,9 @@ defmodule Broadway do
"""
@doc since: "0.6.0"
@spec update_rate_limiting(server :: atom(), opts :: Keyword.t()) ::
@spec update_rate_limiting(server :: name(), opts :: Keyword.t()) ::
:ok | {:error, :rate_limiting_not_enabled}
def update_rate_limiting(broadway, opts) when is_atom(broadway) and is_list(opts) do
def update_rate_limiting(broadway, opts) when is_broadway_name(broadway) and is_list(opts) do
definition = [
allowed_messages: [type: :pos_integer],
interval: [type: :pos_integer]
Expand Down
13 changes: 11 additions & 2 deletions lib/broadway/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ defmodule Broadway.Options do
[
name: [
required: true,
type: :atom,
type: {:custom, __MODULE__, :validate_name, []},
doc: """
Used for name registration. All processes/stages
Used for name registration. When an atom, all processes/stages
created will be named using this value as prefix.
"""
],
Expand Down Expand Up @@ -290,4 +290,13 @@ defmodule Broadway.Options do
]
]
end

def validate_name(name) when is_atom(name), do: {:ok, name}

def validate_name({:via, module, _term} = via) when is_atom(module), do: {:ok, via}

def validate_name(name) do
{:error,
"expected :name to be an atom or a {:via, module, term} tuple, got: #{inspect(name)}"}
end
end
82 changes: 47 additions & 35 deletions lib/broadway/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ defmodule Broadway.Topology do
:persistent_term.put({Broadway, config.name}, %__MODULE__{
context: config.context,
topology: build_topology_details(config),
producer_names: process_names(config.name, "Producer", config.producer_config),
producer_names: process_names(config, "Producer", config.producer_config),
batchers_names:
Enum.map(config.batchers_config, &process_name(config.name, "Batcher", elem(&1, 0))),
rate_limiter_name:
config.producer_config[:rate_limiting] && RateLimiter.rate_limiter_name(opts[:name])
Enum.map(config.batchers_config, &process_name(config, "Batcher", elem(&1, 0))),
rate_limiter_name: config.rate_limiter
})

{:ok,
Expand Down Expand Up @@ -134,7 +133,7 @@ defmodule Broadway.Topology do
build_batchers_supervisor_and_terminator_specs(config, producers_names, processors_names)

supervisor_opts = [
name: :"#{name_prefix(config.name)}.Supervisor",
name: process_name(config, "Supervisor"),
max_restarts: config.max_restarts,
max_seconds: config.max_seconds,
strategy: :rest_for_one
Expand All @@ -151,12 +150,27 @@ defmodule Broadway.Topology do
processors_config: init_processors_config(opts[:processors]),
batchers_config: opts[:batchers],
context: opts[:context],
terminator: :"#{name_prefix(opts[:name])}.Terminator",
max_restarts: opts[:max_restarts],
max_seconds: opts[:max_seconds],
shutdown: opts[:shutdown],
resubscribe_interval: opts[:resubscribe_interval]
resubscribe_interval: opts[:resubscribe_interval],
terminator: nil,
rate_limiter: nil
}
|> put_terminator()
|> put_rate_limiter(opts)
end

defp put_terminator(config) do
Map.put(config, :terminator, process_name(config, "Terminator"))
end

defp put_rate_limiter(config, opts) do
if opts[:producer][:rate_limiting] do
Map.put(config, :rate_limiter, process_name(config, "RateLimiter"))
else
config
end
end

defp init_processors_config(config) do
Expand All @@ -181,10 +195,10 @@ defmodule Broadway.Topology do
end

defp build_rate_limiter_spec(config, producers_names) do
%{name: broadway_name, producer_config: producer_config} = config
%{producer_config: producer_config} = config

opts = [
name: broadway_name,
name: process_name(config, "RateLimiter"),
rate_limiting: producer_config[:rate_limiting],
producers_names: producers_names
]
Expand All @@ -194,10 +208,10 @@ defmodule Broadway.Topology do

defp build_producers_specs(config, opts) do
%{
name: broadway_name,
producer_config: producer_config,
processors_config: processors_config,
shutdown: shutdown
shutdown: shutdown,
rate_limiter: rate_limiter
} = config

n_producers = producer_config[:concurrency]
Expand All @@ -215,11 +229,11 @@ defmodule Broadway.Topology do
{GenStage.PartitionDispatcher, partitions: 0..(n_processors - 1), hash: hash_func}
end

args = [broadway: opts, dispatcher: dispatcher] ++ producer_config
args = [broadway: opts, dispatcher: dispatcher, rate_limiter: rate_limiter] ++ producer_config

names_and_specs =
for index <- 0..(n_producers - 1) do
name = process_name(broadway_name, "Producer", index)
name = process_name(config, "Producer", index)
start_options = start_options(name, producer_config)

spec = %{
Expand Down Expand Up @@ -253,7 +267,7 @@ defmodule Broadway.Topology do
raise "Only one set of processors is allowed for now"
end

names = process_names(topology_name, "Processor_#{key}", processor_config)
names = process_names(config, "Processor_#{key}", processor_config)

# The partition of the processor depends on the next processor or the batcher,
# so we handle it here.
Expand Down Expand Up @@ -337,7 +351,7 @@ defmodule Broadway.Topology do
defp build_batcher_spec(config, batcher_config, processors) do
%{terminator: terminator, shutdown: shutdown} = config
{key, options} = batcher_config
name = process_name(config.name, "Batcher", key)
name = process_name(config, "Batcher", key)

args =
[
Expand Down Expand Up @@ -374,7 +388,7 @@ defmodule Broadway.Topology do
shutdown: shutdown
} = config

names = process_names(broadway_name, "BatchProcessor_#{key}", batcher_config)
names = process_names(config, "BatchProcessor_#{key}", batcher_config)

args = [
topology_name: broadway_name,
Expand Down Expand Up @@ -426,48 +440,46 @@ defmodule Broadway.Topology do
[
producers: [
%{
name: topology_name(config.name, "Producer"),
name: process_name(config, "Producer"),
concurrency: config.producer_config[:concurrency]
}
],
processors:
Enum.map(config.processors_config, fn {name, processor_config} ->
%{
name: topology_name(config.name, "Processor_#{name}"),
name: process_name(config, "Processor", name),
concurrency: processor_config[:concurrency]
}
end),
batchers:
Enum.map(config.batchers_config, fn {name, batcher_config} ->
%{
batcher_name: topology_name(config.name, "Batcher_#{name}"),
name: topology_name(config.name, "BatchProcessor_#{name}"),
batcher_name: process_name(config, "Batcher", name),
name: process_name(config, "BatchProcessor", name),
concurrency: batcher_config[:concurrency]
}
end)
]
end

defp topology_name(prefix, type) do
:"#{name_prefix(prefix)}.#{type}"
defp process_name(config, base_name, suffix) do
process_name(config, "#{base_name}_#{suffix}")
end

defp name_prefix(prefix) do
"#{prefix}.Broadway"
end
defp process_name(config, base_name) do
%{module: module, name: broadway_name} = config

defp process_name(prefix, type, index) do
:"#{name_prefix(prefix)}.#{type}_#{index}"
module.process_name(broadway_name, base_name)
end

defp process_names(prefix, type, config) do
for index <- 0..(config[:concurrency] - 1) do
process_name(prefix, type, index)
defp process_names(config, type, processor_config) do
for index <- 0..(processor_config[:concurrency] - 1) do
process_name(config, type, index)
end
end

defp build_producer_supervisor_spec(config, children) do
name = :"#{name_prefix(config.name)}.ProducerSupervisor"
name = process_name(config, "ProducerSupervisor")
children_count = length(children)

# TODO: Allow max_restarts and max_seconds as configuration
Expand All @@ -482,7 +494,7 @@ defmodule Broadway.Topology do
defp build_processor_supervisor_spec(config, children) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.ProcessorSupervisor",
process_name(config, "ProcessorSupervisor"),
strategy: :one_for_all,
max_restarts: 0
)
Expand All @@ -493,7 +505,7 @@ defmodule Broadway.Topology do

build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatchersSupervisor",
process_name(config, "BatchersSupervisor"),
strategy: :one_for_one,
max_restarts: 2 * children_count,
max_seconds: children_count
Expand All @@ -503,7 +515,7 @@ defmodule Broadway.Topology do
defp build_batcher_supervisor_spec(config, children, key) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatcherSupervisor_#{key}",
process_name(config, "BatcherSupervisor", key),
strategy: :rest_for_one,
max_restarts: 4,
max_seconds: 2
Expand All @@ -513,7 +525,7 @@ defmodule Broadway.Topology do
defp build_batch_processor_supervisor_spec(config, children, key) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatchProcessorSupervisor_#{key}",
process_name(config, "BatchProcessorSupervisor", key),
strategy: :one_for_all,
max_restarts: 0
)
Expand Down
Loading

0 comments on commit 5174ff0

Please sign in to comment.