/
supervisor.ex
64 lines (49 loc) · 1.81 KB
/
supervisor.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
defmodule Commanded.Aggregates.Supervisor do
@moduledoc """
Supervises `Commanded.Aggregates.Aggregate` instance processes.
"""
use DynamicSupervisor
require Logger
alias Commanded.Aggregates.Aggregate
alias Commanded.Registration
def start_link(opts) do
{start_opts, supervisor_opts} =
Keyword.split(opts, [:debug, :name, :timeout, :spawn_opt, :hibernate_after])
DynamicSupervisor.start_link(__MODULE__, supervisor_opts, start_opts)
end
@doc """
Open an aggregate instance process for the given aggregate module and unique
identity.
Returns `{:ok, aggregate_uuid}` when a process is successfully started, or is
already running.
"""
def open_aggregate(application, aggregate_module, aggregate_uuid)
when is_atom(application) and is_atom(aggregate_module) and is_binary(aggregate_uuid) do
Logger.debug(fn ->
"Locating aggregate process for `#{inspect(aggregate_module)}` with UUID " <>
inspect(aggregate_uuid)
end)
supervisor_name = Module.concat([application, __MODULE__])
aggregate_name = Aggregate.name(application, aggregate_module, aggregate_uuid)
args = [
application: application,
aggregate_module: aggregate_module,
aggregate_uuid: aggregate_uuid
]
case Registration.start_child(application, aggregate_name, supervisor_name, {Aggregate, args}) do
{:ok, _pid} ->
{:ok, aggregate_uuid}
{:ok, _pid, _info} ->
{:ok, aggregate_uuid}
{:error, {:already_started, _pid}} ->
{:ok, aggregate_uuid}
reply ->
reply
end
end
def open_aggregate(_application, _aggregate_module, aggregate_uuid),
do: {:error, {:unsupported_aggregate_identity_type, aggregate_uuid}}
def init(args) do
DynamicSupervisor.init(strategy: :one_for_one, extra_arguments: [args])
end
end