From 3f15512abac23ecced8a3716da45599f1b6bf250 Mon Sep 17 00:00:00 2001 From: Ben Smith Date: Thu, 13 Jun 2019 17:30:42 +0100 Subject: [PATCH] Use `handle_continue/2` for process router initialisation --- lib/commanded/aggregates/aggregate.ex | 8 ++++++ .../process_managers/process_router.ex | 28 +++++++++++++------ 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/lib/commanded/aggregates/aggregate.ex b/lib/commanded/aggregates/aggregate.ex index 8df3b39a..b9ea3d40 100644 --- a/lib/commanded/aggregates/aggregate.ex +++ b/lib/commanded/aggregates/aggregate.ex @@ -85,6 +85,7 @@ defmodule Commanded.Aggregates.Aggregate do do: {aggregate_module, aggregate_uuid} @doc false + @impl GenServer def init(%Aggregate{} = state) do # Initial aggregate state is populated by loading its state snapshot and/or # events from the event store. @@ -150,6 +151,7 @@ defmodule Commanded.Aggregates.Aggregate do end @doc false + @impl GenServer def handle_continue(:subscribe_to_events, %Aggregate{} = state) do %Aggregate{aggregate_uuid: aggregate_uuid} = state @@ -159,6 +161,7 @@ defmodule Commanded.Aggregates.Aggregate do end @doc false + @impl GenServer def handle_cast(:take_snapshot, %Aggregate{} = state) do %Aggregate{ aggregate_state: aggregate_state, @@ -190,6 +193,7 @@ defmodule Commanded.Aggregates.Aggregate do end @doc false + @impl GenServer def handle_call({:execute_command, %ExecutionContext{} = context}, _from, %Aggregate{} = state) do %ExecutionContext{lifespan: lifespan, command: command} = context @@ -224,6 +228,7 @@ defmodule Commanded.Aggregates.Aggregate do end @doc false + @impl GenServer def handle_call(:aggregate_state, _from, %Aggregate{} = state) do %Aggregate{aggregate_state: aggregate_state} = state @@ -231,6 +236,7 @@ defmodule Commanded.Aggregates.Aggregate do end @doc false + @impl GenServer def handle_call(:aggregate_version, _from, %Aggregate{} = state) do %Aggregate{aggregate_version: aggregate_version} = state @@ -238,6 +244,7 @@ defmodule Commanded.Aggregates.Aggregate do end @doc false + @impl GenServer def handle_info({:events, events}, %Aggregate{} = state) do %Aggregate{lifespan_timeout: lifespan_timeout} = state @@ -262,6 +269,7 @@ defmodule Commanded.Aggregates.Aggregate do end @doc false + @impl GenServer def handle_info(:timeout, %Aggregate{} = state) do Logger.debug(fn -> describe(state) <> " stopping due to inactivity timeout" end) diff --git a/lib/commanded/process_managers/process_router.ex b/lib/commanded/process_managers/process_router.ex index b7d06ae0..a205e733 100644 --- a/lib/commanded/process_managers/process_router.ex +++ b/lib/commanded/process_managers/process_router.ex @@ -53,11 +53,11 @@ defmodule Commanded.ProcessManagers.ProcessRouter do Registration.start_link(name, __MODULE__, state) end + @impl GenServer def init(%State{} = state) do :ok = register_subscription(state) - :ok = GenServer.cast(self(), :subscribe_to_events) - {:ok, state} + {:ok, state, {:continue, :subscribe_to_events}} end @doc """ @@ -78,6 +78,13 @@ defmodule Commanded.ProcessManagers.ProcessRouter do end @doc false + @impl GenServer + def handle_continue(:subscribe_to_events, %State{} = state) do + {:noreply, subscribe_to_events(state)} + end + + @doc false + @impl GenServer def handle_call(:process_instances, _from, %State{} = state) do %State{process_managers: process_managers} = state @@ -87,6 +94,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do end @doc false + @impl GenServer def handle_call({:process_instance, process_uuid}, _from, %State{} = state) do %State{process_managers: process_managers} = state @@ -100,6 +108,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do end @doc false + @impl GenServer def handle_cast({:ack_event, event, instance}, %State{} = state) do %State{pending_acks: pending_acks} = state %RecordedEvent{event_number: event_number} = event @@ -124,15 +133,12 @@ defmodule Commanded.ProcessManagers.ProcessRouter do end @doc false - def handle_cast(:subscribe_to_events, %State{} = state) do - {:noreply, subscribe_to_all_streams(state)} - end - - @doc false + @impl GenServer def handle_cast(:process_pending_events, %State{pending_events: []} = state), do: {:noreply, state} @doc false + @impl GenServer def handle_cast(:process_pending_events, %State{} = state) do %State{pending_events: [event | pending_events]} = state @@ -155,6 +161,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do @doc false # Subscription to event store has successfully subscribed, init process router + @impl GenServer def handle_info({:subscribed, subscription}, %State{subscription: subscription} = state) do Logger.debug(fn -> describe(state) <> " has successfully subscribed to event store" end) @@ -164,6 +171,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do end @doc false + @impl GenServer def handle_info({:events, events}, %State{} = state) do Logger.debug(fn -> describe(state) <> " received #{length(events)} event(s)" end) @@ -196,6 +204,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do @doc false # Shutdown process manager when processing an event has taken too long. + @impl GenServer def handle_info({:event_timeout, event_number}, %State{} = state) do %State{pending_acks: pending_acks, event_timeout: event_timeout} = state @@ -217,6 +226,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do @doc false # Stop process manager when event store subscription process terminates. + @impl GenServer def handle_info( {:DOWN, ref, :process, pid, reason}, %State{subscription_ref: ref, subscription: pid} = state @@ -228,6 +238,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do @doc false # Remove a process manager instance that has stopped with a normal exit reason. + @impl GenServer def handle_info({:DOWN, _ref, :process, pid, :normal}, %State{} = state) do %State{process_managers: process_managers} = state @@ -238,6 +249,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do @doc false # Stop process router when a process manager instance terminates abnormally. + @impl GenServer def handle_info({:DOWN, _ref, :process, _pid, reason}, %State{} = state) do Logger.warn(fn -> describe(state) <> " is stopping due to: #{inspect(reason)}" end) @@ -251,7 +263,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do Subscriptions.register(name, consistency) end - defp subscribe_to_all_streams(%State{} = state) do + defp subscribe_to_events(%State{} = state) do %State{process_manager_name: process_manager_name, subscribe_from: subscribe_from} = state {:ok, subscription} =