Skip to content

Commit

Permalink
Event handler and process manager monitor event store subscription an…
Browse files Browse the repository at this point in the history
…d shutdown on terminate
  • Loading branch information
slashdotdash committed Jan 23, 2019
1 parent df1a6e0 commit 3d8c578
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 16 deletions.
21 changes: 17 additions & 4 deletions lib/commanded/event/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ defmodule Commanded.Event.Handler do
:last_seen_event,
:subscribe_from,
:subscribe_to,
:subscription
:subscription,
:subscription_ref
]

@doc false
Expand Down Expand Up @@ -435,6 +436,17 @@ defmodule Commanded.Event.Handler do
end
end

@doc false
# Stop event handler when event store subscription process terminates.
def handle_info(
{:DOWN, ref, :process, pid, reason},
%Handler{subscription_ref: ref, subscription: pid} = state
) do
Logger.debug(fn -> describe(state) <> " subscription DOWN due to: #{inspect(reason)}" end)

{:stop, reason, state}
end

defp subscribe_to_events(%Handler{} = state) do
%Handler{
handler_name: handler_name,
Expand All @@ -445,7 +457,9 @@ defmodule Commanded.Event.Handler do
{:ok, subscription} =
EventStore.subscribe_to(subscribe_to, handler_name, self(), subscribe_from)

%Handler{state | subscription: subscription}
subscription_ref = Process.monitor(subscription)

%Handler{state | subscription: subscription, subscription_ref: subscription_ref}
end

defp handle_event(event, handler, context \\ %{})
Expand Down Expand Up @@ -490,8 +504,7 @@ defmodule Commanded.Event.Handler do
try do
handler_module.handle(data, metadata)
rescue
e ->
{:error, e}
e -> {:error, e}
end
end

Expand Down
39 changes: 27 additions & 12 deletions lib/commanded/process_managers/process_router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ defmodule Commanded.ProcessManagers.ProcessRouter do

alias Commanded.EventStore
alias Commanded.EventStore.RecordedEvent
alias Commanded.ProcessManagers.FailureContext
alias Commanded.ProcessManagers.ProcessManagerInstance
alias Commanded.ProcessManagers.ProcessRouter
alias Commanded.ProcessManagers.Supervisor
alias Commanded.ProcessManagers.FailureContext
alias Commanded.Subscriptions

defmodule State do
Expand All @@ -26,6 +26,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
:subscribe_from,
:supervisor,
:subscription,
:subscription_ref,
:last_seen_event,
:process_event_timer,
process_managers: %{},
Expand Down Expand Up @@ -168,9 +169,11 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end

@doc false
def handle_info({:events, events}, %State{pending_events: pending_events} = state) do
def handle_info({:events, events}, %State{} = state) do
Logger.debug(fn -> describe(state) <> " received #{length(events)} event(s)" end)

%State{pending_events: pending_events} = state

unseen_events = Enum.reject(events, &event_already_seen?(&1, state))

state =
Expand Down Expand Up @@ -214,12 +217,25 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end
end

@doc false
# Stop process manager when event store subscription process terminates.
def handle_info(
{:DOWN, ref, :process, pid, reason},
%State{subscription_ref: ref, subscription: pid} = state
) do
Logger.debug(fn -> describe(state) <> " subscription DOWN due to: #{inspect(reason)}" end)

{:stop, reason, state}
end

@doc false
# Remove a process manager instance that has stopped with a normal exit reason.
def handle_info({:DOWN, _ref, :process, pid, :normal}, %State{} = state) do
%State{process_managers: process_managers} = state

{:noreply, %State{state | process_managers: remove_process_manager(process_managers, pid)}}
state = %State{state | process_managers: remove_process_manager(process_managers, pid)}

{:noreply, state}
end

@doc false
Expand All @@ -236,10 +252,12 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
{:ok, subscription} =
EventStore.subscribe_to(:all, process_manager_name, self(), subscribe_from)

%State{state | subscription: subscription}
subscription_ref = Process.monitor(subscription)

%State{state | subscription: subscription, subscription_ref: subscription_ref}
end

# ignore already seen event
# Ignore already seen event
defp event_already_seen?(
%RecordedEvent{event_number: event_number},
%State{last_seen_event: last_seen_event}
Expand Down Expand Up @@ -380,14 +398,14 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end
end

# continue processing any pending events and confirm receipt of the given event id
# Continue processing any pending events and confirm receipt of the given event id
defp ack_and_continue(%RecordedEvent{} = event, %State{} = state) do
GenServer.cast(self(), :process_pending_events)

confirm_receipt(event, state)
end

# confirm receipt of given event
# Confirm receipt of given event
defp confirm_receipt(%RecordedEvent{event_number: event_number} = event, %State{} = state) do
Logger.debug(fn ->
describe(state) <> " confirming receipt of event: #{inspect(event_number)}"
Expand Down Expand Up @@ -461,11 +479,8 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
end

defp do_ack_event(event, %State{} = state) do
%State{
consistency: consistency,
process_manager_name: name,
subscription: subscription
} = state
%State{consistency: consistency, process_manager_name: name, subscription: subscription} =
state

:ok = EventStore.ack_event(subscription, event)
:ok = Subscriptions.ack_event(name, consistency, event)
Expand Down
67 changes: 67 additions & 0 deletions test/event/event_handler_subscription_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule Commanded.Event.EventHandlerSubscriptionTest do
use Commanded.MockEventStoreCase

defmodule ExampleHandler do
use Commanded.Event.Handler, name: "ExampleHandler"
end

setup do
{:ok, subscription} = start_subscription()
{:ok, handler} = start_handler(subscription)

[
handler: handler,
subscription: subscription
]
end

describe "event handler subscription" do
test "should monitor subscription and terminate handler on shutdown", %{
handler: handler,
subscription: subscription
} do
Process.unlink(handler)
ref = Process.monitor(handler)

shutdown_subscription(subscription)

assert_receive {:DOWN, ^ref, :process, ^handler, :normal}
end
end

defp start_subscription do
pid =
spawn(fn ->
receive do
:shutdown -> :ok
end
end)

{:ok, pid}
end

defp start_handler(subscription) do
reply_to = self()

expect(MockEventStore, :subscribe_to, fn :all, "ExampleHandler", handler, :origin ->
send(handler, {:subscribed, subscription})
send(reply_to, {:subscribed, subscription})

{:ok, subscription}
end)

{:ok, pid} = ExampleHandler.start_link()

assert_receive {:subscribed, ^subscription}

{:ok, pid}
end

defp shutdown_subscription(subscription) do
ref = Process.monitor(subscription)

send(subscription, :shutdown)

assert_receive {:DOWN, ^ref, :process, _, _}
end
end
75 changes: 75 additions & 0 deletions test/process_managers/process_manager_subscription_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
defmodule Commanded.ProcessManagers.ProcessManagerSubscriptionTest do
use Commanded.MockEventStoreCase

defmodule ExampleRouter do
end

defmodule ExampleProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "ExampleProcessManager",
router: ExampleRouter

@derive Jason.Encoder
defstruct [:data]
end

setup do
{:ok, subscription} = start_subscription()
{:ok, pm} = start_process_manager(subscription)

[
pm: pm,
subscription: subscription
]
end

describe "process manager subscription" do
test "should monitor subscription and terminate process manager on shutdown", %{
pm: pm,
subscription: subscription
} do
Process.unlink(pm)
ref = Process.monitor(pm)

shutdown_subscription(subscription)

assert_receive {:DOWN, ^ref, :process, ^pm, :normal}
end
end

defp start_subscription do
pid =
spawn(fn ->
receive do
:shutdown -> :ok
end
end)

{:ok, pid}
end

defp start_process_manager(subscription) do
reply_to = self()

expect(MockEventStore, :subscribe_to, fn :all, "ExampleProcessManager", pm, :origin ->
send(pm, {:subscribed, subscription})
send(reply_to, {:subscribed, subscription})

{:ok, subscription}
end)

{:ok, pid} = ExampleProcessManager.start_link()

assert_receive {:subscribed, ^subscription}

{:ok, pid}
end

defp shutdown_subscription(subscription) do
ref = Process.monitor(subscription)

send(subscription, :shutdown)

assert_receive {:DOWN, ^ref, :process, _, _}
end
end

0 comments on commit 3d8c578

Please sign in to comment.