Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for using Registry's via tuple #239

Merged
merged 4 commits into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ defmodule Broadway do
Returned by `start_link/2`.
"""
@type on_start() :: {:ok, pid()} | :ignore | {:error, {:already_started, pid()} | term()}
@type name :: atom() | {:via, module(), term()}

@doc """
Invoked for preparing messages before handling (if defined).
Expand Down Expand Up @@ -859,7 +860,21 @@ defmodule Broadway do
Supervisor.child_spec(default, unquote(Macro.escape(opts)))
end

defoverridable child_spec: 1
@type name :: atom() | {:via, module(), term()}
@callback process_name(broadway_name :: name(), base_name :: String.t()) ::
any()
acco marked this conversation as resolved.
Show resolved Hide resolved
def process_name(broadway_name, base_name) when is_atom(broadway_name) do
:"#{broadway_name}.Broadway.#{base_name}"
end

def process_name(broadway_name, _base_name) do
raise ArgumentError, """
Expected Broadway to be started with a `name` of type atom, got: #{inspect(broadway_name)}.
If starting Broadway with a `name` that is not an atom, you must define process_name/2 in the module which uses Broadway.
"""
end

defoverridable child_spec: 1, process_name: 2
end
end

Expand Down Expand Up @@ -926,8 +941,8 @@ defmodule Broadway do
[MyBroadway.Producer_0, MyBroadway.Producer_1, ..., MyBroadway.Producer_7]

"""
@spec producer_names(broadway :: atom()) :: [atom()]
def producer_names(broadway) when is_atom(broadway) do
@spec producer_names(broadway :: name()) :: [atom()]
acco marked this conversation as resolved.
Show resolved Hide resolved
def producer_names(broadway) do
acco marked this conversation as resolved.
Show resolved Hide resolved
Topology.producer_names(broadway)
end

Expand Down Expand Up @@ -967,7 +982,7 @@ defmodule Broadway do
]

"""
@spec topology(broadway :: atom()) :: [
@spec topology(broadway :: name()) :: [
{atom(),
[
%{
Expand All @@ -977,7 +992,7 @@ defmodule Broadway do
}
]}
]
def topology(broadway) when is_atom(broadway) do
def topology(broadway) do
Topology.topology(broadway)
end

Expand All @@ -998,8 +1013,8 @@ defmodule Broadway do
The producer is randomly chosen among all sets of producers/stages.
This is used to send out of band data to a Broadway pipeline.
"""
@spec push_messages(broadway :: atom(), messages :: [Message.t()]) :: :ok
def push_messages(broadway, messages) when is_atom(broadway) and is_list(messages) do
@spec push_messages(broadway :: name(), messages :: [Message.t()]) :: :ok
def push_messages(broadway, messages) when is_list(messages) do
broadway
|> producer_names()
|> Enum.random()
Expand Down Expand Up @@ -1145,13 +1160,13 @@ defmodule Broadway do

"""
@doc since: "0.6.0"
@spec get_rate_limiting(server :: atom()) ::
@spec get_rate_limiting(server :: name()) ::
{:ok, rate_limiting_info} | {:error, :rate_limiting_not_enabled}
when rate_limiting_info: %{
required(:interval) => non_neg_integer(),
required(:allowed_messages) => non_neg_integer()
}
def get_rate_limiting(broadway) when is_atom(broadway) do
def get_rate_limiting(broadway) do
with {:ok, rate_limiter_name} <- Topology.get_rate_limiter(broadway) do
{:ok, Topology.RateLimiter.get_rate_limiting(rate_limiter_name)}
end
Expand Down
13 changes: 11 additions & 2 deletions lib/broadway/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ defmodule Broadway.Options do
[
name: [
required: true,
type: :atom,
type: {:custom, __MODULE__, :validate_name, []},
doc: """
Used for name registration. All processes/stages
Used for name registration. When an atom, all processes/stages
created will be named using this value as prefix.
acco marked this conversation as resolved.
Show resolved Hide resolved
"""
],
Expand Down Expand Up @@ -290,4 +290,13 @@ defmodule Broadway.Options do
]
]
end

def validate_name(name) when is_atom(name), do: {:ok, name}

def validate_name({:via, _, _} = via), do: {:ok, via}
acco marked this conversation as resolved.
Show resolved Hide resolved

def validate_name(name) do
{:error,
"expected :name to be an atom or a {:via, module, term} tuple, got: #{inspect(name)}"}
end
end
82 changes: 47 additions & 35 deletions lib/broadway/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ defmodule Broadway.Topology do
:persistent_term.put({Broadway, config.name}, %__MODULE__{
context: config.context,
topology: build_topology_details(config),
producer_names: process_names(config.name, "Producer", config.producer_config),
producer_names: process_names(config, "Producer", config.producer_config),
batchers_names:
Enum.map(config.batchers_config, &process_name(config.name, "Batcher", elem(&1, 0))),
rate_limiter_name:
config.producer_config[:rate_limiting] && RateLimiter.rate_limiter_name(opts[:name])
Enum.map(config.batchers_config, &process_name(config, "Batcher", elem(&1, 0))),
rate_limiter_name: config.rate_limiter
})

{:ok,
Expand Down Expand Up @@ -134,7 +133,7 @@ defmodule Broadway.Topology do
build_batchers_supervisor_and_terminator_specs(config, producers_names, processors_names)

supervisor_opts = [
name: :"#{name_prefix(config.name)}.Supervisor",
name: process_name(config, "Supervisor"),
max_restarts: config.max_restarts,
max_seconds: config.max_seconds,
strategy: :rest_for_one
Expand All @@ -151,12 +150,27 @@ defmodule Broadway.Topology do
processors_config: init_processors_config(opts[:processors]),
batchers_config: opts[:batchers],
context: opts[:context],
terminator: :"#{name_prefix(opts[:name])}.Terminator",
max_restarts: opts[:max_restarts],
max_seconds: opts[:max_seconds],
shutdown: opts[:shutdown],
resubscribe_interval: opts[:resubscribe_interval]
resubscribe_interval: opts[:resubscribe_interval],
terminator: nil,
rate_limiter: nil
}
|> put_terminator()
|> put_rate_limiter(opts)
acco marked this conversation as resolved.
Show resolved Hide resolved
end

defp put_terminator(config) do
Map.put(config, :terminator, process_name(config, "Terminator"))
acco marked this conversation as resolved.
Show resolved Hide resolved
end

defp put_rate_limiter(config, opts) do
if opts[:producer][:rate_limiting] do
Map.put(config, :rate_limiter, process_name(config, "RateLimiter"))
else
config
end
end

defp init_processors_config(config) do
Expand All @@ -181,10 +195,10 @@ defmodule Broadway.Topology do
end

defp build_rate_limiter_spec(config, producers_names) do
%{name: broadway_name, producer_config: producer_config} = config
%{producer_config: producer_config} = config

opts = [
name: broadway_name,
name: process_name(config, "RateLimiter"),
rate_limiting: producer_config[:rate_limiting],
producers_names: producers_names
]
Expand All @@ -194,10 +208,10 @@ defmodule Broadway.Topology do

defp build_producers_specs(config, opts) do
%{
name: broadway_name,
producer_config: producer_config,
processors_config: processors_config,
shutdown: shutdown
shutdown: shutdown,
rate_limiter: rate_limiter
} = config

n_producers = producer_config[:concurrency]
Expand All @@ -215,11 +229,11 @@ defmodule Broadway.Topology do
{GenStage.PartitionDispatcher, partitions: 0..(n_processors - 1), hash: hash_func}
end

args = [broadway: opts, dispatcher: dispatcher] ++ producer_config
args = [broadway: opts, dispatcher: dispatcher, rate_limiter: rate_limiter] ++ producer_config
josevalim marked this conversation as resolved.
Show resolved Hide resolved

names_and_specs =
for index <- 0..(n_producers - 1) do
name = process_name(broadway_name, "Producer", index)
name = process_name(config, "Producer", index)
start_options = start_options(name, producer_config)

spec = %{
Expand Down Expand Up @@ -253,7 +267,7 @@ defmodule Broadway.Topology do
raise "Only one set of processors is allowed for now"
end

names = process_names(topology_name, "Processor_#{key}", processor_config)
names = process_names(config, "Processor_#{key}", processor_config)

# The partition of the processor depends on the next processor or the batcher,
# so we handle it here.
Expand Down Expand Up @@ -337,7 +351,7 @@ defmodule Broadway.Topology do
defp build_batcher_spec(config, batcher_config, processors) do
%{terminator: terminator, shutdown: shutdown} = config
{key, options} = batcher_config
name = process_name(config.name, "Batcher", key)
name = process_name(config, "Batcher", key)

args =
[
Expand Down Expand Up @@ -374,7 +388,7 @@ defmodule Broadway.Topology do
shutdown: shutdown
} = config

names = process_names(broadway_name, "BatchProcessor_#{key}", batcher_config)
names = process_names(config, "BatchProcessor_#{key}", batcher_config)

args = [
topology_name: broadway_name,
Expand Down Expand Up @@ -426,48 +440,46 @@ defmodule Broadway.Topology do
[
producers: [
%{
name: topology_name(config.name, "Producer"),
name: process_name(config, "Producer"),
concurrency: config.producer_config[:concurrency]
}
],
processors:
Enum.map(config.processors_config, fn {name, processor_config} ->
%{
name: topology_name(config.name, "Processor_#{name}"),
name: process_name(config, "Processor", name),
concurrency: processor_config[:concurrency]
}
end),
batchers:
Enum.map(config.batchers_config, fn {name, batcher_config} ->
%{
batcher_name: topology_name(config.name, "Batcher_#{name}"),
name: topology_name(config.name, "BatchProcessor_#{name}"),
batcher_name: process_name(config, "Batcher", name),
name: process_name(config, "BatchProcessor", name),
concurrency: batcher_config[:concurrency]
}
end)
]
end

defp topology_name(prefix, type) do
:"#{name_prefix(prefix)}.#{type}"
defp process_name(config, base_name, suffix) do
process_name(config, "#{base_name}_#{suffix}")
end

defp name_prefix(prefix) do
"#{prefix}.Broadway"
end
defp process_name(config, base_name) do
%{module: module, name: broadway_name} = config

defp process_name(prefix, type, index) do
:"#{name_prefix(prefix)}.#{type}_#{index}"
module.process_name(broadway_name, base_name)
end

defp process_names(prefix, type, config) do
for index <- 0..(config[:concurrency] - 1) do
process_name(prefix, type, index)
defp process_names(config, type, processor_config) do
for index <- 0..(processor_config[:concurrency] - 1) do
process_name(config, type, index)
end
end

defp build_producer_supervisor_spec(config, children) do
name = :"#{name_prefix(config.name)}.ProducerSupervisor"
name = process_name(config, "ProducerSupervisor")
children_count = length(children)

# TODO: Allow max_restarts and max_seconds as configuration
Expand All @@ -482,7 +494,7 @@ defmodule Broadway.Topology do
defp build_processor_supervisor_spec(config, children) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.ProcessorSupervisor",
process_name(config, "ProcessorSupervisor"),
strategy: :one_for_all,
max_restarts: 0
)
Expand All @@ -493,7 +505,7 @@ defmodule Broadway.Topology do

build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatchersSupervisor",
process_name(config, "BatchersSupervisor"),
strategy: :one_for_one,
max_restarts: 2 * children_count,
max_seconds: children_count
Expand All @@ -503,7 +515,7 @@ defmodule Broadway.Topology do
defp build_batcher_supervisor_spec(config, children, key) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatcherSupervisor_#{key}",
process_name(config, "BatcherSupervisor", key),
strategy: :rest_for_one,
max_restarts: 4,
max_seconds: 2
Expand All @@ -513,7 +525,7 @@ defmodule Broadway.Topology do
defp build_batch_processor_supervisor_spec(config, children, key) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatchProcessorSupervisor_#{key}",
process_name(config, "BatchProcessorSupervisor", key),
strategy: :one_for_all,
max_restarts: 0
)
Expand Down
12 changes: 4 additions & 8 deletions lib/broadway/topology/producer_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ defmodule Broadway.Topology.ProducerStage do
{module, arg} = args[:module]
transformer = args[:transformer]
dispatcher = args[:dispatcher]
broadway_name = args[:broadway][:name]
rate_limiting_options = args[:rate_limiting]
rate_limiter = args[:rate_limiter]

# Inject the topology index only if the args are a keyword list.
arg =
Expand All @@ -44,16 +43,13 @@ defmodule Broadway.Topology.ProducerStage do
end

rate_limiting_state =
if rate_limiting_options do
rate_limiter =
broadway_name
|> RateLimiter.rate_limiter_name()
|> RateLimiter.get_rate_limiter_ref()
if rate_limiter do
rate_limiter_ref = RateLimiter.get_rate_limiter_ref(rate_limiter)
josevalim marked this conversation as resolved.
Show resolved Hide resolved

%{
state: :open,
draining?: false,
rate_limiter: rate_limiter,
rate_limiter: rate_limiter_ref,
# A queue of "batches" of messages that we buffered.
message_buffer: :queue.new(),
# A queue of demands (integers) that we buffered.
Expand Down
Loading