Skip to content

Commit

Permalink
Do not start new PM instance on :continue
Browse files Browse the repository at this point in the history
  • Loading branch information
ayarulin committed Jul 24, 2018
1 parent 840b932 commit 456dec5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
12 changes: 12 additions & 0 deletions lib/commanded/process_managers/process_manager_instance.ex
Expand Up @@ -39,6 +39,13 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
{:ok, state}
end

@doc """
Checks whether or not the process manager has already processed events
"""
def new?(process_manager) do
GenServer.call(process_manager, :new?)
end

@doc """
Handle the given event by delegating to the process manager module
"""
Expand Down Expand Up @@ -75,6 +82,11 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do
{:reply, process_state, state}
end

@doc false
def handle_call(:new?, _from, %ProcessManagerInstance{last_seen_event: last_seen_event} = state) do
{:reply, is_nil(last_seen_event), state}
end

@doc """
Attempt to fetch intial process state from snapshot storage
"""
Expand Down
9 changes: 8 additions & 1 deletion lib/commanded/process_managers/process_router.ex
Expand Up @@ -235,7 +235,14 @@ defmodule Commanded.ProcessManagers.ProcessRouter do
|> Enum.reduce(state, fn (process_uuid, state) ->
{process_instance, state} = continue_process_manager(process_uuid, state)

delegate_event(process_instance, event, state)
case ProcessManagerInstance.new?(process_instance) do
false -> delegate_event(process_instance, event, state)
true ->
Logger.debug(fn -> describe(state) <> " is not interested in event: #{inspect event_number} (#{inspect stream_id}@#{inspect stream_version})" end)
stop_process_manager(process_uuid, state)

ack_and_continue(event, state)
end
end)

{:stop, process_uuid} ->
Expand Down
23 changes: 23 additions & 0 deletions test/process_managers/process_manager_routing_test.exs
Expand Up @@ -8,6 +8,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerRoutingTest do
alias Commanded.ExampleDomain.BankAccount.Commands.OpenAccount
alias Commanded.ExampleDomain.BankAccount.Events.{MoneyDeposited,MoneyWithdrawn}
alias Commanded.ExampleDomain.MoneyTransfer.Commands.TransferMoney
alias Commanded.ExampleDomain.BankAccount.Commands.WithdrawMoney
alias Commanded.ExampleDomain.MoneyTransfer.Events.MoneyTransferRequested
alias Commanded.Helpers.CommandAuditMiddleware
alias Commanded.ProcessManagers.ProcessRouter
Expand Down Expand Up @@ -60,4 +61,26 @@ defmodule Commanded.ProcessManagers.ProcessManagerRoutingTest do

assert [{^transfer_uuid, _}] = ProcessRouter.process_instances(process_router)
end

test "should not create entirely new instance when process manager expected to continue" do
account_number1 = UUID.uuid4
transfer_uuid = UUID.uuid4

{:ok, process_router} = TransferMoneyProcessManager.start_link()

assert ProcessRouter.process_instances(process_router) == []

:timer.sleep 500

:ok = BankRouter.dispatch(%OpenAccount{account_number: account_number1, initial_balance: 1_000})
:ok = BankRouter.dispatch(%WithdrawMoney{
transfer_uuid: transfer_uuid,
account_number: account_number1,
amount: 100,
})

:timer.sleep 300

assert [] = ProcessRouter.process_instances(process_router)
end
end

0 comments on commit 456dec5

Please sign in to comment.