Skip to content

Commit

Permalink
Add support for using Registry's via tuple
Browse files Browse the repository at this point in the history
Support overriding process_name/2 in Broadway module so that all processes in topology can be started with a via tuple.
  • Loading branch information
acco committed May 30, 2021
1 parent e95fc29 commit a55a5b1
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 46 deletions.
16 changes: 15 additions & 1 deletion lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,21 @@ defmodule Broadway do
Supervisor.child_spec(default, unquote(Macro.escape(opts)))
end

defoverridable child_spec: 1
@type via_tuple :: {:via, Registry, any()}
@callback process_name(broadway_name :: String.t() | via_tuple, base_name :: String.t()) ::
any()
def process_name(broadway_name, base_name) when is_atom(broadway_name) do
:"#{broadway_name}.Broadway.#{base_name}"
end

def process_name(broadway_name, _base_name) do
raise ArgumentError, """
Expected Broadway to be started with a `name` of type atom, got: #{inspect(broadway_name)}.
If starting Broadway with a `name` that is not an atom, you must define process_name/2 in the module which uses Broadway.
"""
end

defoverridable child_spec: 1, process_name: 2
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/broadway/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule Broadway.Options do
[
name: [
required: true,
type: :atom,
type: :any,
doc: """
Used for name registration. All processes/stages
created will be named using this value as prefix.
Expand Down
62 changes: 32 additions & 30 deletions lib/broadway/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,12 @@ defmodule Broadway.Topology do
:persistent_term.put({Broadway, config.name}, %__MODULE__{
context: config.context,
topology: build_topology_details(config),
producer_names: process_names(config.name, "Producer", config.producer_config),
producer_names: process_names(config, "Producer", config.producer_config),
batchers_names:
Enum.map(config.batchers_config, &process_name(config.name, "Batcher", elem(&1, 0))),
Enum.map(
config.batchers_config,
&process_name(config, "Batcher", elem(&1, 0))
),
rate_limiter_name:
config.producer_config[:rate_limiting] && RateLimiter.rate_limiter_name(opts[:name])
})
Expand Down Expand Up @@ -134,7 +137,7 @@ defmodule Broadway.Topology do
build_batchers_supervisor_and_terminator_specs(config, producers_names, processors_names)

supervisor_opts = [
name: :"#{name_prefix(config.name)}.Supervisor",
name: process_name(config, "Supervisor"),
max_restarts: config.max_restarts,
max_seconds: config.max_seconds,
strategy: :rest_for_one
Expand All @@ -151,12 +154,16 @@ defmodule Broadway.Topology do
processors_config: init_processors_config(opts[:processors]),
batchers_config: opts[:batchers],
context: opts[:context],
terminator: :"#{name_prefix(opts[:name])}.Terminator",
max_restarts: opts[:max_restarts],
max_seconds: opts[:max_seconds],
shutdown: opts[:shutdown],
resubscribe_interval: opts[:resubscribe_interval]
}
|> put_terminator()
end

defp put_terminator(config) do
Map.put(config, :terminator, process_name(config, "Terminator"))
end

defp init_processors_config(config) do
Expand Down Expand Up @@ -194,7 +201,6 @@ defmodule Broadway.Topology do

defp build_producers_specs(config, opts) do
%{
name: broadway_name,
producer_config: producer_config,
processors_config: processors_config,
shutdown: shutdown
Expand All @@ -219,7 +225,7 @@ defmodule Broadway.Topology do

names_and_specs =
for index <- 0..(n_producers - 1) do
name = process_name(broadway_name, "Producer", index)
name = process_name(config, "Producer", index)
start_options = start_options(name, producer_config)

spec = %{
Expand All @@ -237,7 +243,6 @@ defmodule Broadway.Topology do

defp build_processors_specs(config, producers) do
%{
name: broadway_name,
module: module,
processors_config: processors_config,
context: context,
Expand All @@ -253,7 +258,7 @@ defmodule Broadway.Topology do
raise "Only one set of processors is allowed for now"
end

names = process_names(broadway_name, "Processor_#{key}", processor_config)
names = process_names(config, "Processor_#{key}", processor_config)

# The partition of the processor depends on the next processor or the batcher,
# so we handle it here.
Expand Down Expand Up @@ -336,7 +341,7 @@ defmodule Broadway.Topology do
defp build_batcher_spec(config, batcher_config, processors) do
%{terminator: terminator, shutdown: shutdown} = config
{key, options} = batcher_config
name = process_name(config.name, "Batcher", key)
name = process_name(config, "Batcher", key)

args =
[
Expand Down Expand Up @@ -365,14 +370,13 @@ defmodule Broadway.Topology do

defp build_batch_processors_specs(config, {key, batcher_config}, batcher) do
%{
name: broadway_name,
module: module,
context: context,
terminator: terminator,
shutdown: shutdown
} = config

names = process_names(broadway_name, "BatchProcessor_#{key}", batcher_config)
names = process_names(config, "BatchProcessor_#{key}", batcher_config)

args = [
resubscribe: :never,
Expand Down Expand Up @@ -423,48 +427,46 @@ defmodule Broadway.Topology do
[
producers: [
%{
name: topology_name(config.name, "Producer"),
name: process_name(config, "Producer"),
concurrency: config.producer_config[:concurrency]
}
],
processors:
Enum.map(config.processors_config, fn {name, processor_config} ->
%{
name: topology_name(config.name, "Processor_#{name}"),
name: process_name(config, "Processor", name),
concurrency: processor_config[:concurrency]
}
end),
batchers:
Enum.map(config.batchers_config, fn {name, batcher_config} ->
%{
batcher_name: topology_name(config.name, "Batcher_#{name}"),
name: topology_name(config.name, "BatchProcessor_#{name}"),
batcher_name: process_name(config, "Batcher", name),
name: process_name(config, "BatchProcessor", name),
concurrency: batcher_config[:concurrency]
}
end)
]
end

defp topology_name(prefix, type) do
:"#{name_prefix(prefix)}.#{type}"
defp process_name(config, base_name, suffix) do
process_name(config, "#{base_name}_#{suffix}")
end

defp name_prefix(prefix) do
"#{prefix}.Broadway"
end
defp process_name(config, base_name) do
%{module: module, name: broadway_name} = config

defp process_name(prefix, type, index) do
:"#{name_prefix(prefix)}.#{type}_#{index}"
module.process_name(broadway_name, base_name)
end

defp process_names(prefix, type, config) do
for index <- 0..(config[:concurrency] - 1) do
process_name(prefix, type, index)
defp process_names(config, type, processor_config) do
for index <- 0..(processor_config[:concurrency] - 1) do
process_name(config, type, index)
end
end

defp build_producer_supervisor_spec(config, children) do
name = :"#{name_prefix(config.name)}.ProducerSupervisor"
name = process_name(config, "ProducerSupervisor")
children_count = length(children)

# TODO: Allow max_restarts and max_seconds as configuration
Expand All @@ -479,7 +481,7 @@ defmodule Broadway.Topology do
defp build_processor_supervisor_spec(config, children) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.ProcessorSupervisor",
process_name(config, "ProcessorSupervisor"),
strategy: :one_for_all,
max_restarts: 0
)
Expand All @@ -490,7 +492,7 @@ defmodule Broadway.Topology do

build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatchersSupervisor",
process_name(config, "BatchersSupervisor"),
strategy: :one_for_one,
max_restarts: 2 * children_count,
max_seconds: children_count
Expand All @@ -500,7 +502,7 @@ defmodule Broadway.Topology do
defp build_batcher_supervisor_spec(config, children, key) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatcherSupervisor_#{key}",
process_name(config, "BatcherSupervisor", key),
strategy: :rest_for_one,
max_restarts: 4,
max_seconds: 2
Expand All @@ -510,7 +512,7 @@ defmodule Broadway.Topology do
defp build_batch_processor_supervisor_spec(config, children, key) do
build_supervisor_spec(
children,
:"#{name_prefix(config.name)}.BatchProcessorSupervisor_#{key}",
process_name(config, "BatchProcessorSupervisor", key),
strategy: :one_for_all,
max_restarts: 0
)
Expand Down
10 changes: 9 additions & 1 deletion lib/broadway/topology/rate_limiter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ defmodule Broadway.Topology.RateLimiter do
:atomics.put(counter, @atomics_index, allowed)

for name <- producers_names,
pid = Process.whereis(name),
pid = get_pid(name),
is_pid(pid),
do: send(pid, {__MODULE__, :reset_rate_limiting})

Expand All @@ -107,4 +107,12 @@ defmodule Broadway.Topology.RateLimiter do
defp schedule_next_reset(interval) do
_ref = Process.send_after(self(), :reset_limit, interval)
end

defp get_pid(process_name) when is_atom(process_name) do
Process.whereis(process_name)
end

defp get_pid({:via, Registry, {registry, args}}) do
registry.whereis_name(args)
end
end
12 changes: 10 additions & 2 deletions lib/broadway/topology/subscriber.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ defmodule Broadway.Topology.Subscriber do
end

def handle_info(:cancel_consumers, %{terminator: terminator} = state) when terminator != nil do
if pid = Process.whereis(terminator) do
if pid = get_pid(terminator) do
send(pid, {:done, self()})
end

Expand Down Expand Up @@ -123,7 +123,7 @@ defmodule Broadway.Topology.Subscriber do
## Helpers

defp subscribe(process_name, state) do
if pid = Process.whereis(process_name) do
if pid = get_pid(process_name) do
opts = [to: pid, name: process_name] ++ state.subscription_options
GenStage.async_subscribe(self(), opts)
true
Expand All @@ -133,6 +133,14 @@ defmodule Broadway.Topology.Subscriber do
end
end

defp get_pid(process_name) when is_atom(process_name) do
Process.whereis(process_name)
end

defp get_pid({:via, Registry, {registry, args}}) do
registry.whereis_name(args)
end

defp maybe_resubscribe(process_name, %{resubscribe: integer}) when is_integer(integer) do
Process.send_after(self(), {:resubscribe, process_name}, integer)
true
Expand Down
10 changes: 5 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
%{
"certifi": {:hex, :certifi, "2.6.1", "dbab8e5e155a0763eea978c913ca280a6b544bfa115633fa20249c3d396d9493", [], [], "hexpm", "524c97b4991b3849dd5c17a631223896272c6b0af446778ba4675a1dff53bb7e"},
"certifi": {:hex, :certifi, "2.6.1", "dbab8e5e155a0763eea978c913ca280a6b544bfa115633fa20249c3d396d9493", [:rebar3], [], "hexpm", "524c97b4991b3849dd5c17a631223896272c6b0af446778ba4675a1dff53bb7e"},
"earmark_parser": {:hex, :earmark_parser, "1.4.13", "0c98163e7d04a15feb62000e1a891489feb29f3d10cb57d4f845c405852bbef8", [:mix], [], "hexpm", "d602c26af3a0af43d2f2645613f65841657ad6efc9f0e361c3b6c06b578214ba"},
"ex_doc": {:hex, :ex_doc, "0.24.2", "e4c26603830c1a2286dae45f4412a4d1980e1e89dc779fcd0181ed1d5a05c8d9", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "e134e1d9e821b8d9e4244687fb2ace58d479b67b282de5158333b0d57c6fb7da"},
"excoveralls": {:hex, :excoveralls, "0.13.3", "edc5f69218f84c2bf61b3609a22ddf1cec0fbf7d1ba79e59f4c16d42ea4347ed", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cc26f48d2f68666380b83d8aafda0fffc65dafcc8d8650358e0b61f6a99b1154"},
"gen_stage": {:hex, :gen_stage, "1.0.0", "51c8ae56ff54f9a2a604ca583798c210ad245f415115453b773b621c49776df5", [:mix], [], "hexpm", "1d9fc978db5305ac54e6f5fec7adf80cd893b1000cf78271564c516aa2af7706"},
"hackney": {:hex, :hackney, "1.17.4", "99da4674592504d3fb0cfef0db84c3ba02b4508bae2dff8c0108baa0d6e0977c", [], [{:certifi, "~>2.6.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "de16ff4996556c8548d512f4dbe22dd58a587bf3332e7fd362430a7ef3986b16"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"hackney": {:hex, :hackney, "1.17.4", "99da4674592504d3fb0cfef0db84c3ba02b4508bae2dff8c0108baa0d6e0977c", [:rebar3], [{:certifi, "~>2.6.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "de16ff4996556c8548d512f4dbe22dd58a587bf3332e7fd362430a7ef3986b16"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.2.2", "ba43e3f2709fd1aa1dce90aaabfd039d000469c05c56f0b8e31978e03fa39052", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "18a228f5f0058ee183f29f9eae0805c6e59d61c3b006760668d8d18ff0d12179"},
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
Expand All @@ -14,8 +14,8 @@
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"nimble_options": {:hex, :nimble_options, "0.3.0", "1872911bf50a048f04da26e02704e6aeafc362c2daa7636b6dbfda9492ccfcfa", [:mix], [], "hexpm", "180790a8644fea402452bc15bb54b9bf2c8e5c1fdeb6b39d8072e59c324edf7f"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
"telemetry": {:hex, :telemetry, "0.4.1", "ae2718484892448a24470e6aa341bc847c3277bfb8d4e9289f7474d752c09c7f", [:rebar3], [], "hexpm", "4738382e36a0a9a2b6e25d67c960e40e1a2c95560b9f936d8e29de8cd858480f"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
}
2 changes: 2 additions & 0 deletions test/broadway/dummy_producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ defmodule Broadway.DummyProducerTest do
use ExUnit.Case, async: true

defmodule Handler do
use Broadway

def handle_message(_processor, message, _context) do
message
end
Expand Down
Loading

0 comments on commit a55a5b1

Please sign in to comment.