Skip to content

Commit

Permalink
Use handle_continue/2 for process router initialisation
Browse files Browse the repository at this point in the history
  • Loading branch information
slashdotdash committed Jun 13, 2019
1 parent 8eba924 commit 3f15512
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
8 changes: 8 additions & 0 deletions lib/commanded/aggregates/aggregate.ex
Expand Up @@ -85,6 +85,7 @@ defmodule Commanded.Aggregates.Aggregate do
do: {aggregate_module, aggregate_uuid}

@doc false
@impl GenServer
def init(%Aggregate{} = state) do
# Initial aggregate state is populated by loading its state snapshot and/or
# events from the event store.
Expand Down Expand Up @@ -150,6 +151,7 @@ defmodule Commanded.Aggregates.Aggregate do
end

@doc false
@impl GenServer
def handle_continue(:subscribe_to_events, %Aggregate{} = state) do
%Aggregate{aggregate_uuid: aggregate_uuid} = state

Expand All @@ -159,6 +161,7 @@ defmodule Commanded.Aggregates.Aggregate do
end

@doc false
@impl GenServer
def handle_cast(:take_snapshot, %Aggregate{} = state) do
%Aggregate{
aggregate_state: aggregate_state,
Expand Down Expand Up @@ -190,6 +193,7 @@ defmodule Commanded.Aggregates.Aggregate do
end

@doc false
@impl GenServer
def handle_call({:execute_command, %ExecutionContext{} = context}, _from, %Aggregate{} = state) do
%ExecutionContext{lifespan: lifespan, command: command} = context

Expand Down Expand Up @@ -224,20 +228,23 @@ defmodule Commanded.Aggregates.Aggregate do
end

@doc false
@impl GenServer
def handle_call(:aggregate_state, _from, %Aggregate{} = state) do
%Aggregate{aggregate_state: aggregate_state} = state

{:reply, aggregate_state, state}
end

@doc false
@impl GenServer
def handle_call(:aggregate_version, _from, %Aggregate{} = state) do
%Aggregate{aggregate_version: aggregate_version} = state

{:reply, aggregate_version, state}
end

@doc false
@impl GenServer
def handle_info({:events, events}, %Aggregate{} = state) do
%Aggregate{lifespan_timeout: lifespan_timeout} = state

Expand All @@ -262,6 +269,7 @@ defmodule Commanded.Aggregates.Aggregate do
end

@doc false
@impl GenServer
def handle_info(:timeout, %Aggregate{} = state) do
Logger.debug(fn -> describe(state) <> " stopping due to inactivity timeout" end)

Expand Down
28 changes: 20 additions & 8 deletions lib/commanded/process_managers/process_router.ex
Expand Up @@ -53,11 +53,11 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
Registration.start_link(name, __MODULE__, state)
end

@impl GenServer
def init(%State{} = state) do
:ok = register_subscription(state)
:ok = GenServer.cast(self(), :subscribe_to_events)

{:ok, state}
{:ok, state, {:continue, :subscribe_to_events}}
end

@doc """
Expand All @@ -78,6 +78,13 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end

@doc false
@impl GenServer
def handle_continue(:subscribe_to_events, %State{} = state) do
{:noreply, subscribe_to_events(state)}
end

@doc false
@impl GenServer
def handle_call(:process_instances, _from, %State{} = state) do
%State{process_managers: process_managers} = state

Expand All @@ -87,6 +94,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end

@doc false
@impl GenServer
def handle_call({:process_instance, process_uuid}, _from, %State{} = state) do
%State{process_managers: process_managers} = state

Expand All @@ -100,6 +108,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end

@doc false
@impl GenServer
def handle_cast({:ack_event, event, instance}, %State{} = state) do
%State{pending_acks: pending_acks} = state
%RecordedEvent{event_number: event_number} = event
Expand All @@ -124,15 +133,12 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end

@doc false
def handle_cast(:subscribe_to_events, %State{} = state) do
{:noreply, subscribe_to_all_streams(state)}
end

@doc false
@impl GenServer
def handle_cast(:process_pending_events, %State{pending_events: []} = state),
do: {:noreply, state}

@doc false
@impl GenServer
def handle_cast(:process_pending_events, %State{} = state) do
%State{pending_events: [event | pending_events]} = state

Expand All @@ -155,6 +161,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do

@doc false
# Subscription to event store has successfully subscribed, init process router
@impl GenServer
def handle_info({:subscribed, subscription}, %State{subscription: subscription} = state) do
Logger.debug(fn -> describe(state) <> " has successfully subscribed to event store" end)

Expand All @@ -164,6 +171,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end

@doc false
@impl GenServer
def handle_info({:events, events}, %State{} = state) do
Logger.debug(fn -> describe(state) <> " received #{length(events)} event(s)" end)

Expand Down Expand Up @@ -196,6 +204,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do

@doc false
# Shutdown process manager when processing an event has taken too long.
@impl GenServer
def handle_info({:event_timeout, event_number}, %State{} = state) do
%State{pending_acks: pending_acks, event_timeout: event_timeout} = state

Expand All @@ -217,6 +226,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do

@doc false
# Stop process manager when event store subscription process terminates.
@impl GenServer
def handle_info(
{:DOWN, ref, :process, pid, reason},
%State{subscription_ref: ref, subscription: pid} = state
Expand All @@ -228,6 +238,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do

@doc false
# Remove a process manager instance that has stopped with a normal exit reason.
@impl GenServer
def handle_info({:DOWN, _ref, :process, pid, :normal}, %State{} = state) do
%State{process_managers: process_managers} = state

Expand All @@ -238,6 +249,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do

@doc false
# Stop process router when a process manager instance terminates abnormally.
@impl GenServer
def handle_info({:DOWN, _ref, :process, _pid, reason}, %State{} = state) do
Logger.warn(fn -> describe(state) <> " is stopping due to: #{inspect(reason)}" end)

Expand All @@ -251,7 +263,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
Subscriptions.register(name, consistency)
end

defp subscribe_to_all_streams(%State{} = state) do
defp subscribe_to_events(%State{} = state) do
%State{process_manager_name: process_manager_name, subscribe_from: subscribe_from} = state

{:ok, subscription} =
Expand Down

0 comments on commit 3f15512

Please sign in to comment.