Skip to content

Commit

Permalink
Retry command execution on concurrency error
Browse files Browse the repository at this point in the history
Retry executing a command if appending events to the aggregate's stream fails with an `{:error, :wrong_expected_version}` concurrency error. The missing event(s) are fetched from the event store to update the aggregate state and attempt the command again.

Aggregate instances are singletons on a single node, and cluster if using the distributed registry. However, this allows Commanded to still work when run on multiple nodes that are not connected to form a cluster.
  • Loading branch information
slashdotdash committed Jan 30, 2018
1 parent 4964f62 commit 42e899c
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 86 deletions.
3 changes: 2 additions & 1 deletion config/test.exs
@@ -1,6 +1,7 @@
use Mix.Config

alias Commanded.EventStore.Adapters.InMemory
alias Commanded.Serialization.JsonSerializer

config :logger, :console, level: :warn, format: "[$level] $message\n"

Expand All @@ -12,5 +13,5 @@ config :commanded,
dispatch_consistency_timeout: 100,
event_store_adapter: InMemory,
reset_storage: fn ->
{:ok, _event_store} = InMemory.start_link(serializer: Commanded.Serialization.JsonSerializer)
{:ok, _event_store} = InMemory.start_link(serializer: JsonSerializer)
end
133 changes: 86 additions & 47 deletions lib/commanded/aggregates/aggregate.ex
Expand Up @@ -190,36 +190,47 @@ defmodule Commanded.Aggregates.Aggregate do
end

# Load events from the event store, in batches, to rebuild the aggregate state
defp rebuild_from_events(%Aggregate{aggregate_module: aggregate_module, aggregate_uuid: aggregate_uuid, aggregate_version: aggregate_version} = state) do
defp rebuild_from_events(%Aggregate{} = state) do
%Aggregate{
aggregate_uuid: aggregate_uuid,
aggregate_version: aggregate_version
} = state

case EventStore.stream_forward(aggregate_uuid, aggregate_version + 1, @read_event_batch_size) do
{:error, :stream_not_found} ->
# aggregate does not exist so return initial state
# aggregate does not exist, return initial state
state

event_stream ->
# rebuild aggregate state from event stream
event_stream
|> Stream.map(fn event ->
{event.data, event.stream_version}
end)
|> Stream.transform(state, fn ({event, stream_version}, state) ->
case event do
nil -> {:halt, state}
event ->
state = %Aggregate{state |
aggregate_version: stream_version,
aggregate_state: aggregate_module.apply(state.aggregate_state, event),
}

{[state], state}
end
end)
|> Stream.take(-1)
|> Enum.at(0)
|> case do
nil -> state
state -> state
end
rebuild_from_event_stream(event_stream, state)
end
end

# Rebuild aggregate state from a `Stream` of its events
defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do
%Aggregate{aggregate_module: aggregate_module} = state

event_stream
|> Stream.map(fn event ->
{event.data, event.stream_version}
end)
|> Stream.transform(state, fn ({event, stream_version}, state) ->
case event do
nil -> {:halt, state}
event ->
state = %Aggregate{state |
aggregate_version: stream_version,
aggregate_state: aggregate_module.apply(state.aggregate_state, event),
}

{[state], state}
end
end)
|> Stream.take(-1)
|> Enum.at(0)
|> case do
nil -> state
state -> state
end
end

Expand All @@ -243,7 +254,7 @@ defmodule Commanded.Aggregates.Aggregate do
Map.get(metadata, "snapshot_version", 1) == expected_version
end

# do snapshot aggregate state
# take a snapshot now?
defp snapshot_required?(%Aggregate{aggregate_version: aggregate_version, snapshot_every: snapshot_every, snapshot_version: snapshot_version})
when aggregate_version - snapshot_version >= snapshot_every, do: true

Expand All @@ -267,38 +278,66 @@ defmodule Commanded.Aggregates.Aggregate do
metadata: %{"snapshot_version" => aggregate_version}
}

Logger.debug(fn -> "Recording snapshot of aggregate state for: #{inspect aggregate_uuid}@#{aggregate_version} (#{inspect aggregate_module})" end)
Logger.debug(fn -> inspect(state) <> " recording snapshot" end)

:ok = EventStore.record_snapshot(snapshot)

%Aggregate{state | snapshot_version: aggregate_version}
end

defp execute_command(
%ExecutionContext{handler: handler, function: function, command: command} = context,
%Aggregate{aggregate_module: aggregate_module, aggregate_version: expected_version, aggregate_state: aggregate_state} = state)
do
case Kernel.apply(handler, function, [aggregate_state, command]) do
{:error, _reason} = reply ->
{reply, state}
defp execute_command(%ExecutionContext{retry_attempts: retry_attempts}, %Aggregate{} = state)
when retry_attempts < 0, do: {{:error, :too_many_attempts}, state}

none when none in [nil, []] ->
{{:ok, expected_version, []}, state}
defp execute_command(%ExecutionContext{} = context, %Aggregate{} = state) do
%ExecutionContext{
command: command,
handler: handler,
function: function,
retry_attempts: retry_attempts
} = context

%Commanded.Aggregate.Multi{} = multi ->
case Commanded.Aggregate.Multi.run(multi) do
{:error, _reason} = reply ->
{reply, state}
%Aggregate{
aggregate_module: aggregate_module,
aggregate_version: expected_version,
aggregate_state: aggregate_state
} = state

{reply, state} =
case Kernel.apply(handler, function, [aggregate_state, command]) do
{:error, _reason} = reply ->
{reply, state}

none when none in [nil, []] ->
{{:ok, expected_version, []}, state}

%Commanded.Aggregate.Multi{} = multi ->
case Commanded.Aggregate.Multi.run(multi) do
{:error, _reason} = reply ->
{reply, state}

{aggregate_state, pending_events} ->
persist_events(pending_events, aggregate_state, context, state)
end

events ->
pending_events = List.wrap(events)
aggregate_state = apply_events(aggregate_module, aggregate_state, pending_events)

persist_events(pending_events, aggregate_state, context, state)
end

case reply do
{:error, :wrong_expected_version} ->
Logger.debug(fn -> inspect(state) <> " wrong expected version, retrying command" end)

{aggregate_state, pending_events} ->
persist_events(pending_events, aggregate_state, context, state)
end
# fetch missing events from event store
state = rebuild_from_events(state)

events ->
pending_events = List.wrap(events)
aggregate_state = apply_events(aggregate_module, aggregate_state, pending_events)
# retry command, but decrement retry attempts (to prevent infinite attempts)
execute_command(%ExecutionContext{context | retry_attempts: retry_attempts - 1}, state)

persist_events(pending_events, aggregate_state, context, state)
reply ->
{reply, state}
end
end

Expand Down
15 changes: 15 additions & 0 deletions lib/commanded/aggregates/aggregate_inspect.ex
@@ -0,0 +1,15 @@
alias Commanded.Aggregates.Aggregate

defimpl Inspect, for: Aggregate do
import Inspect.Algebra

def inspect(%Aggregate{} = aggregate, opts) do
%Aggregate{
aggregate_module: aggregate_module,
aggregate_uuid: aggregate_uuid,
aggregate_version: aggregate_version
} = aggregate

concat(["#", to_doc(aggregate_module, opts), "<#{aggregate_uuid}@#{aggregate_version}>"])
end
end
4 changes: 4 additions & 0 deletions lib/commanded/aggregates/execution_context.ex
Expand Up @@ -7,6 +7,9 @@ defmodule Commanded.Aggregates.ExecutionContext do
- `command` - the command to execute, typically a struct
(e.g. `%OpenBankAccount{...}`).
- `retry_attempts` - the number of retries permitted if an
`{:error, :wrong_expected_version}` is encountered when appending events.
- `causation_id` - the UUID assigned to the dispatched command.
- `correlation_id` - a UUID used to correlate related commands/events.
Expand All @@ -33,6 +36,7 @@ defmodule Commanded.Aggregates.ExecutionContext do

defstruct [
:command,
:retry_attempts,
:causation_id,
:correlation_id,
:function,
Expand Down
21 changes: 17 additions & 4 deletions lib/commanded/commands/dispatcher.ex
Expand Up @@ -27,6 +27,7 @@ defmodule Commanded.Commands.Dispatcher do
lifespan: nil,
metadata: nil,
middleware: [],
retry_attempts: nil
]
end

Expand Down Expand Up @@ -96,10 +97,21 @@ defmodule Commanded.Commands.Dispatcher do
end
end

defp to_execution_context(
%Pipeline{command: command, command_uuid: command_uuid, metadata: metadata},
%Payload{correlation_id: correlation_id, handler_module: handler_module, handler_function: handler_function, lifespan: lifespan})
do
defp to_execution_context(%Pipeline{} = pipeline, %Payload{} = payload) do
%Pipeline{
command: command,
command_uuid: command_uuid,
metadata: metadata
} = pipeline

%Payload{
correlation_id: correlation_id,
handler_module: handler_module,
handler_function: handler_function,
lifespan: lifespan,
retry_attempts: retry_attempts
} = payload

%ExecutionContext{
command: command,
causation_id: command_uuid,
Expand All @@ -108,6 +120,7 @@ defmodule Commanded.Commands.Dispatcher do
handler: handler_module,
function: handler_function,
lifespan: lifespan,
retry_attempts: retry_attempts
}
end

Expand Down
13 changes: 10 additions & 3 deletions lib/commanded/commands/router.ex
Expand Up @@ -109,7 +109,6 @@ defmodule Commanded.Commands.Router do
:ok = BankRouter.dispatch(command, metadata: %{"ip_address" => "127.0.0.1"})
"""

defmacro __using__(_) do
quote do
require Logger
Expand All @@ -130,6 +129,7 @@ defmodule Commanded.Commands.Router do
dispatch_timeout: 5_000,
lifespan: Commanded.Aggregates.DefaultLifespan,
metadata: %{},
retry_attempts: 10
]

@include_aggregate_version false
Expand Down Expand Up @@ -358,6 +358,7 @@ defmodule Commanded.Commands.Router do
include_aggregate_version = Keyword.get(opts, :include_aggregate_version) || @include_aggregate_version
include_execution_result = Keyword.get(opts, :include_execution_result) || @include_execution_result
lifespan = Keyword.get(opts, :lifespan) || unquote(lifespan) || @default[:lifespan]
retry_attempts = Keyword.get(opts, :retry_attempts) || @default[:retry_attempts]

{identity, identity_prefix} =
case Map.get(@registered_identities, unquote(aggregate)) do
Expand All @@ -371,7 +372,10 @@ defmodule Commanded.Commands.Router do
{identity, prefix}
end

Commanded.Commands.Dispatcher.dispatch(%Commanded.Commands.Dispatcher.Payload{
alias Commanded.Commands.Dispatcher
alias Commanded.Commands.Dispatcher.Payload

payload = %Payload{
command: command,
command_uuid: UUID.uuid4(),
causation_id: causation_id,
Expand All @@ -388,7 +392,10 @@ defmodule Commanded.Commands.Router do
lifespan: lifespan,
metadata: metadata,
middleware: @registered_middleware ++ @default[:middleware],
})
retry_attempts: retry_attempts
}

Dispatcher.dispatch(payload)
end
end
end
Expand Down

0 comments on commit 42e899c

Please sign in to comment.