Skip to content

Commit

Permalink
Move c:process_name/2 to the Broadway behaviour
Browse files Browse the repository at this point in the history
It seems like this callback was accidentally generated inside the module
calling "use Broadway" instead of being part of the Broadway behaviour.

This callback was originally introduced in
#239.
  • Loading branch information
whatyouhide committed Feb 8, 2022
1 parent 008bb07 commit c893a34
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
41 changes: 27 additions & 14 deletions lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,32 @@ defmodule Broadway do
@doc since: "0.5.0"
@callback handle_failed(messages :: [Message.t()], context :: term) :: [Message.t()]

@optional_callbacks prepare_messages: 2, handle_batch: 4, handle_failed: 2
@doc """
Invoked to get the process name of this Broadway pipeline.
`broadway_name` is the name given to `start_link/2` in the `:name` option. `base_name`
is a string used by Broadway to identify different components of the pipeline
whose name needs to be registered (such as "batcher" or "processor").
The return value of this callback must be a process name that is valid for registration.
See the name registration rules in the documentation for `GenServer`.
This callback is optional. If not defined, the `broadway_name` given to `start_link/2`
**must be an atom**: the default implementation of this callback will fail otherwise.
## Examples
@impl Broadway
def process_name({:via, module, term}, base_name) do
{:via, module, {term, base_name}}
end
"""
@doc since: "1.1.0"
@callback process_name(broadway_name :: Broadway.name(), base_name :: String.t()) ::
Broadway.name()

@optional_callbacks prepare_messages: 2, handle_batch: 4, handle_failed: 2, process_name: 2

defguardp is_broadway_name(name)
when is_atom(name) or (is_tuple(name) and tuple_size(name) == 3)
Expand All @@ -926,19 +951,7 @@ defmodule Broadway do
Supervisor.child_spec(default, unquote(Macro.escape(opts)))
end

@callback process_name(Broadway.name(), base_name :: String.t()) :: Broadway.name()
def process_name(broadway_name, base_name) when is_atom(broadway_name) do
:"#{broadway_name}.Broadway.#{base_name}"
end

def process_name(broadway_name, _base_name) do
raise ArgumentError, """
Expected Broadway to be started with a `name` of type atom, got: #{inspect(broadway_name)}.
If starting Broadway with a `name` that is not an atom, you must define process_name/2 in the module which uses Broadway.
"""
end

defoverridable child_spec: 1, process_name: 2
defoverridable child_spec: 1
end
end

Expand Down
21 changes: 18 additions & 3 deletions lib/broadway/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,25 @@ defmodule Broadway.Topology do
process_name(config, "#{base_name}_#{suffix}")
end

defp process_name(config, base_name) do
%{module: module, name: broadway_name} = config
defp process_name(%{module: module, name: broadway_name} = _config, base_name) do
if function_exported?(module, :process_name, 2) do
module.process_name(broadway_name, base_name)
else
default_process_name(broadway_name, base_name)
end
end

defp default_process_name(broadway_name, base_name) when is_atom(broadway_name) do
:"#{broadway_name}.Broadway.#{base_name}"
end

defp default_process_name(broadway_name, _base_name) do
raise ArgumentError, """
expected Broadway to be started with an atom :name, got: #{inspect(broadway_name)}
module.process_name(broadway_name, base_name)
If starting Broadway with a :name that is not an atom, you must define the \
process_name/2 callback in the module which calls "use Broadway" (see the documentation).
"""
end

defp process_names(config, type, processor_config) do
Expand Down
2 changes: 1 addition & 1 deletion test/broadway_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2700,7 +2700,7 @@ defmodule BroadwayTest do
)

assert Exception.message(error) =~
"you must define process_name/2 in the module which uses Broadway"
"you must define the process_name/2 callback in the module"
end

defp via_tuple(name), do: {:via, Registry, {MyRegistry, name}}
Expand Down

0 comments on commit c893a34

Please sign in to comment.