Skip to content

Commit

Permalink
Incorporate feedback & improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
acco committed Jun 1, 2021
1 parent 5b63b2a commit 4b1cf2c
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 26 deletions.
14 changes: 7 additions & 7 deletions lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ defmodule Broadway do
Returned by `start_link/2`.
"""
@type on_start() :: {:ok, pid()} | :ignore | {:error, {:already_started, pid()} | term()}
@type via_tuple :: {:via, Registry, any()}
@type name :: atom() | {:via, module(), term()}

@doc """
Invoked for preparing messages before handling (if defined).
Expand Down Expand Up @@ -860,8 +860,8 @@ defmodule Broadway do
Supervisor.child_spec(default, unquote(Macro.escape(opts)))
end

@type via_tuple :: {:via, Registry, any()}
@callback process_name(broadway_name :: String.t() | via_tuple, base_name :: String.t()) ::
@type name :: atom() | {:via, module(), term()}
@callback process_name(broadway_name :: name(), base_name :: String.t()) ::
any()
def process_name(broadway_name, base_name) when is_atom(broadway_name) do
:"#{broadway_name}.Broadway.#{base_name}"
Expand Down Expand Up @@ -941,7 +941,7 @@ defmodule Broadway do
[MyBroadway.Producer_0, MyBroadway.Producer_1, ..., MyBroadway.Producer_7]
"""
@spec producer_names(broadway :: atom() | via_tuple()) :: [atom()]
@spec producer_names(broadway :: name()) :: [atom()]
def producer_names(broadway) do
Topology.producer_names(broadway)
end
Expand Down Expand Up @@ -982,7 +982,7 @@ defmodule Broadway do
]
"""
@spec topology(broadway :: atom() | via_tuple()) :: [
@spec topology(broadway :: name()) :: [
{atom(),
[
%{
Expand Down Expand Up @@ -1013,7 +1013,7 @@ defmodule Broadway do
The producer is randomly chosen among all sets of producers/stages.
This is used to send out of band data to a Broadway pipeline.
"""
@spec push_messages(broadway :: atom() | via_tuple(), messages :: [Message.t()]) :: :ok
@spec push_messages(broadway :: name(), messages :: [Message.t()]) :: :ok
def push_messages(broadway, messages) when is_list(messages) do
broadway
|> producer_names()
Expand Down Expand Up @@ -1160,7 +1160,7 @@ defmodule Broadway do
"""
@doc since: "0.6.0"
@spec get_rate_limiting(server :: atom() | via_tuple()) ::
@spec get_rate_limiting(server :: name()) ::
{:ok, rate_limiting_info} | {:error, :rate_limiting_not_enabled}
when rate_limiting_info: %{
required(:interval) => non_neg_integer(),
Expand Down
5 changes: 3 additions & 2 deletions lib/broadway/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,10 @@ defmodule Broadway.Options do

def validate_name(name) when is_atom(name), do: {:ok, name}

def validate_name({:via, Registry, {_, _}} = via), do: {:ok, via}
def validate_name({:via, _, _} = via), do: {:ok, via}

def validate_name(name) do
{:error, "expected :name to be an atom or a via tuple, got: #{inspect(name)}"}
{:error,
"expected :name to be an atom or a {:via, module, term} tuple, got: #{inspect(name)}"}
end
end
11 changes: 7 additions & 4 deletions lib/broadway/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ defmodule Broadway.Topology do
max_restarts: opts[:max_restarts],
max_seconds: opts[:max_seconds],
shutdown: opts[:shutdown],
resubscribe_interval: opts[:resubscribe_interval]
resubscribe_interval: opts[:resubscribe_interval],
terminator: nil,
rate_limiter: nil
}
|> put_terminator()
|> put_rate_limiter(opts)
Expand All @@ -164,9 +166,10 @@ defmodule Broadway.Topology do
end

defp put_rate_limiter(config, opts) do
case get_in(opts, [:producer, :rate_limiting]) do
nil -> Map.put(config, :rate_limiter, nil)
_ -> Map.put(config, :rate_limiter, process_name(config, "RateLimiter"))
if opts[:producer][:rate_limiting] do
Map.put(config, :rate_limiter, process_name(config, "RateLimiter"))
else
config
end
end

Expand Down
25 changes: 12 additions & 13 deletions test/broadway_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ defmodule BroadwayTest do
def handle_message(_, message, _), do: message
def handle_batch(_, messages, _, _), do: messages

def process_name({:via, Registry, {registry, {UsesRegistry, id}}}, base_name) do
{:via, Registry, {registry, {:"UsesRegistry.#{base_name}", id}}}
def process_name({:via, Registry, {registry, id}}, base_name) do
{:via, Registry, {registry, {id, base_name}}}
end
end

Expand Down Expand Up @@ -182,7 +182,7 @@ defmodule BroadwayTest do
describe "broadway configuration" do
test "invalid configuration options" do
assert_raise ArgumentError,
"invalid configuration given to Broadway.start_link/2, expected :name to be an atom or a via tuple, got: 1",
"invalid configuration given to Broadway.start_link/2, expected :name to be an atom or a {:via, module, term} tuple, got: 1",
fn -> Broadway.start_link(Forwarder, name: 1) end
end

Expand Down Expand Up @@ -2496,7 +2496,7 @@ defmodule BroadwayTest do
describe "starting under a registry using via tuple" do
setup do
{:ok, _registry} = start_supervised({Registry, keys: :unique, name: MyRegistry})
name = via_tuple({UsesRegistry, "1"})
name = via_tuple(:broadway)

{:ok, _broadway} =
Broadway.start_link(UsesRegistry,
Expand All @@ -2515,9 +2515,9 @@ defmodule BroadwayTest do

test "names processes in topology using process_name/2", %{name: name} do
assert is_pid(GenServer.whereis(name))
assert is_pid(GenServer.whereis(via_tuple({:"UsesRegistry.Producer_0", "1"})))
assert is_pid(GenServer.whereis(via_tuple({:"UsesRegistry.Processor_default_0", "1"})))
assert is_pid(GenServer.whereis(via_tuple({:"UsesRegistry.Terminator", "1"})))
assert is_pid(GenServer.whereis(via_tuple({:broadway, "Producer_0"})))
assert is_pid(GenServer.whereis(via_tuple({:broadway, "Processor_default_0"})))
assert is_pid(GenServer.whereis(via_tuple({:broadway, "Terminator"})))
end

test "get_rate_limiting/1", %{name: name} do
Expand All @@ -2526,21 +2526,20 @@ defmodule BroadwayTest do
end

test "Broadway.producer_names/1", %{name: name} do
assert Broadway.producer_names(name) == [via_tuple({:"UsesRegistry.Producer_0", "1"})]
assert Broadway.producer_names(name) == [via_tuple({:broadway, "Producer_0"})]
end

test "Broadway.topology/1", %{name: name} do
assert Broadway.topology(name) == [
{:producers,
[%{concurrency: 1, name: via_tuple({:"UsesRegistry.Producer", "1"})}]},
{:producers, [%{concurrency: 1, name: via_tuple({:broadway, "Producer"})}]},
{:processors,
[%{concurrency: 16, name: via_tuple({:"UsesRegistry.Processor_default", "1"})}]},
[%{concurrency: 16, name: via_tuple({:broadway, "Processor_default"})}]},
{:batchers,
[
%{
batcher_name: via_tuple({:"UsesRegistry.Batcher_default", "1"}),
batcher_name: via_tuple({:broadway, "Batcher_default"}),
concurrency: 1,
name: via_tuple({:"UsesRegistry.BatchProcessor_default", "1"})
name: via_tuple({:broadway, "BatchProcessor_default"})
}
]}
]
Expand Down

0 comments on commit 4b1cf2c

Please sign in to comment.