Skip to content

Commit

Permalink
Merge pull request #132 from commanded/feature/concurrency-error
Browse files Browse the repository at this point in the history
Retry command execution on concurrency error
  • Loading branch information
slashdotdash committed Jan 31, 2018
2 parents d3e8506 + 4b70329 commit 83e61f8
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 106 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,7 @@
- Aggregate state snapshots ([#121](https://github.com/commanded/commanded/pull/121)).
- New `error/3` callback for process manager and deprecated `error/4` ([#124](https://github.com/commanded/commanded/pull/124))
- Router support for identity prefix function.
- Retry command execution on concurrency error ([#132](https://github.com/commanded/commanded/pull/132)).

## v0.15.1

Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
@@ -1,6 +1,7 @@
use Mix.Config

alias Commanded.EventStore.Adapters.InMemory
alias Commanded.Serialization.JsonSerializer

config :logger, :console, level: :warn, format: "[$level] $message\n"

Expand All @@ -12,5 +13,5 @@ config :commanded,
dispatch_consistency_timeout: 100,
event_store_adapter: InMemory,
reset_storage: fn ->
{:ok, _event_store} = InMemory.start_link(serializer: Commanded.Serialization.JsonSerializer)
{:ok, _event_store} = InMemory.start_link(serializer: JsonSerializer)
end
208 changes: 141 additions & 67 deletions lib/commanded/aggregates/aggregate.ex
Expand Up @@ -78,9 +78,9 @@ defmodule Commanded.Aggregates.Aggregate do
do: {aggregate_module, aggregate_uuid}

def init(%Aggregate{} = state) do
# initial aggregate state is populated by loading state snapshot and/or
# events from event store
GenServer.cast(self(), :populate_aggregate_state)
# initial aggregate state is populated by loading its state snapshot and/or
# events from the event store
:ok = GenServer.cast(self(), :populate_aggregate_state)

{:ok, state}
end
Expand Down Expand Up @@ -131,30 +131,37 @@ defmodule Commanded.Aggregates.Aggregate do
{:noreply, populate_aggregate_state(state)}
end

def handle_cast({:snapshot_state, lifespan}, %Aggregate{} = state) do
{:noreply, do_snapshot(state), lifespan}
@doc false
def handle_cast({:snapshot_state, lifespan_timeout}, %Aggregate{} = state) do
{:noreply, do_snapshot(state), lifespan_timeout}
end

@doc false
def handle_call({:execute_command, %ExecutionContext{} = context}, _from, %Aggregate{} = state) do
{reply, state} = execute_command(context, state)

timeout = aggregate_lifespan_timeout(context)
lifespan_timeout = aggregate_lifespan_timeout(context)

if snapshotting_enabled?(state) && snapshot_required?(state) do
GenServer.cast(self(), {:snapshot_state, timeout})
end
:ok = GenServer.cast(self(), {:snapshot_state, lifespan_timeout})

{:reply, reply, state, timeout}
{:reply, reply, state}
else
{:reply, reply, state, lifespan_timeout}
end
end

@doc false
def handle_call(:aggregate_state, _from, %Aggregate{aggregate_state: aggregate_state} = state) do
def handle_call(:aggregate_state, _from, %Aggregate{} = state) do
%Aggregate{aggregate_state: aggregate_state} = state

{:reply, aggregate_state, state}
end

@doc false
def handle_call(:aggregate_version, _from, %Aggregate{aggregate_version: aggregate_version} = state) do
def handle_call(:aggregate_version, _from, %Aggregate{} = state) do
%Aggregate{aggregate_version: aggregate_version} = state

{:reply, aggregate_version, state}
end

Expand All @@ -167,7 +174,12 @@ defmodule Commanded.Aggregates.Aggregate do
# If the snapshot exists, fetch any subsequent events to rebuild its state.
# Otherwise start with the aggregate struct and stream all existing events for
# the aggregate from the event store to rebuild its state from those events.
defp populate_aggregate_state(%Aggregate{aggregate_module: aggregate_module, aggregate_uuid: aggregate_uuid} = state) do
defp populate_aggregate_state(%Aggregate{} = state) do
%Aggregate{
aggregate_module: aggregate_module,
aggregate_uuid: aggregate_uuid
} = state

aggregate =
with true <- snapshotting_enabled?(state),
{:ok, snapshot} <- EventStore.read_snapshot(aggregate_uuid),
Expand All @@ -190,46 +202,69 @@ defmodule Commanded.Aggregates.Aggregate do
end

# Load events from the event store, in batches, to rebuild the aggregate state
defp rebuild_from_events(%Aggregate{aggregate_module: aggregate_module, aggregate_uuid: aggregate_uuid, aggregate_version: aggregate_version} = state) do
defp rebuild_from_events(%Aggregate{} = state) do
%Aggregate{
aggregate_uuid: aggregate_uuid,
aggregate_version: aggregate_version
} = state

case EventStore.stream_forward(aggregate_uuid, aggregate_version + 1, @read_event_batch_size) do
{:error, :stream_not_found} ->
# aggregate does not exist so return initial state
# aggregate does not exist, return initial state
state

event_stream ->
# rebuild aggregate state from event stream
event_stream
|> Stream.map(fn event ->
{event.data, event.stream_version}
end)
|> Stream.transform(state, fn ({event, stream_version}, state) ->
case event do
nil -> {:halt, state}
event ->
state = %Aggregate{state |
aggregate_version: stream_version,
aggregate_state: aggregate_module.apply(state.aggregate_state, event),
}

{[state], state}
end
end)
|> Stream.take(-1)
|> Enum.at(0)
|> case do
nil -> state
state -> state
end
rebuild_from_event_stream(event_stream, state)
end
end

# Rebuild aggregate state from a `Stream` of its events
defp rebuild_from_event_stream(event_stream, %Aggregate{} = state) do
%Aggregate{aggregate_module: aggregate_module} = state

event_stream
|> Stream.map(fn event ->
{event.data, event.stream_version}
end)
|> Stream.transform(state, fn {event, stream_version}, state ->
case event do
nil ->
{:halt, state}

event ->
state = %Aggregate{
state
| aggregate_version: stream_version,
aggregate_state: aggregate_module.apply(state.aggregate_state, event)
}

{[state], state}
end
end)
|> Stream.take(-1)
|> Enum.at(0)
|> case do
nil -> state
state -> state
end
end

defp aggregate_lifespan_timeout(%ExecutionContext{command: command, lifespan: lifespan}) do
case lifespan.after_command(command) do
:infinity -> :infinity
:hibernate -> :hibernate
timeout when is_integer(timeout) and timeout >= 0 -> timeout
:infinity ->
:infinity

:hibernate ->
:hibernate

timeout when is_integer(timeout) and timeout >= 0 ->
timeout

invalid ->
Logger.warn(fn -> "Invalid timeout for aggregate lifespan #{inspect lifespan}, expected a non-negative integer, `:infinity`, or `:hibernate` but got: #{inspect invalid}" end)
Logger.warn(fn ->
"Invalid timeout for aggregate lifespan #{inspect lifespan}, expected a non-negative integer, `:infinity`, or `:hibernate` but got: #{inspect invalid}"
end)

:infinity
end
end
Expand All @@ -243,7 +278,7 @@ defmodule Commanded.Aggregates.Aggregate do
Map.get(metadata, "snapshot_version", 1) == expected_version
end

# do snapshot aggregate state
# take a snapshot now?
defp snapshot_required?(%Aggregate{aggregate_version: aggregate_version, snapshot_every: snapshot_every, snapshot_version: snapshot_version})
when aggregate_version - snapshot_version >= snapshot_every, do: true

Expand All @@ -259,6 +294,8 @@ defmodule Commanded.Aggregates.Aggregate do
aggregate_state: aggregate_state,
} = state

Logger.debug(fn -> inspect(state) <> " recording snapshot" end)

snapshot = %SnapshotData{
source_uuid: aggregate_uuid,
source_version: aggregate_version,
Expand All @@ -267,46 +304,76 @@ defmodule Commanded.Aggregates.Aggregate do
metadata: %{"snapshot_version" => aggregate_version}
}

Logger.debug(fn -> "Recording snapshot of aggregate state for: #{inspect aggregate_uuid}@#{aggregate_version} (#{inspect aggregate_module})" end)

:ok = EventStore.record_snapshot(snapshot)

%Aggregate{state | snapshot_version: aggregate_version}
end

defp execute_command(
%ExecutionContext{handler: handler, function: function, command: command} = context,
%Aggregate{aggregate_module: aggregate_module, aggregate_version: expected_version, aggregate_state: aggregate_state} = state)
do
case Kernel.apply(handler, function, [aggregate_state, command]) do
{:error, _reason} = reply ->
{reply, state}
defp execute_command(%ExecutionContext{retry_attempts: retry_attempts}, %Aggregate{} = state)
when retry_attempts < 0, do: {{:error, :too_many_attempts}, state}

none when none in [nil, []] ->
{{:ok, expected_version, []}, state}
defp execute_command(%ExecutionContext{} = context, %Aggregate{} = state) do
%ExecutionContext{
command: command,
handler: handler,
function: function,
retry_attempts: retry_attempts
} = context

%Commanded.Aggregate.Multi{} = multi ->
case Commanded.Aggregate.Multi.run(multi) do
{:error, _reason} = reply ->
{reply, state}
%Aggregate{
aggregate_module: aggregate_module,
aggregate_version: expected_version,
aggregate_state: aggregate_state
} = state

{aggregate_state, pending_events} ->
persist_events(pending_events, aggregate_state, context, state)
end
Logger.debug(fn -> inspect(state) <> " executing command: #{inspect command}" end)

events ->
pending_events = List.wrap(events)
aggregate_state = apply_events(aggregate_module, aggregate_state, pending_events)
{reply, state} =
case Kernel.apply(handler, function, [aggregate_state, command]) do
{:error, _reason} = reply ->
{reply, state}

persist_events(pending_events, aggregate_state, context, state)
none when none in [nil, []] ->
{{:ok, expected_version, []}, state}

%Commanded.Aggregate.Multi{} = multi ->
case Commanded.Aggregate.Multi.run(multi) do
{:error, _reason} = reply ->
{reply, state}

{aggregate_state, pending_events} ->
persist_events(pending_events, aggregate_state, context, state)
end

events ->
pending_events = List.wrap(events)
aggregate_state = apply_events(aggregate_module, aggregate_state, pending_events)

persist_events(pending_events, aggregate_state, context, state)
end

case reply do
{:error, :wrong_expected_version} ->
Logger.debug(fn -> inspect(state) <> " wrong expected version, retrying command" end)

# fetch missing events from event store
state = rebuild_from_events(state)

# retry command, but decrement retry attempts (to prevent infinite retries)
execute_command(%ExecutionContext{context | retry_attempts: retry_attempts - 1}, state)

reply ->
{reply, state}
end
end

defp apply_events(aggregate_module, aggregate_state, events) do
Enum.reduce(events, aggregate_state, &aggregate_module.apply(&2, &1))
end

defp persist_events(pending_events, aggregate_state, context, %Aggregate{aggregate_uuid: aggregate_uuid, aggregate_version: expected_version} = state) do
defp persist_events(pending_events, aggregate_state, context, %Aggregate{} = state) do
%Aggregate{aggregate_uuid: aggregate_uuid, aggregate_version: expected_version} = state

with {:ok, stream_version} <- append_to_stream(pending_events, aggregate_uuid, expected_version, context) do
state = %Aggregate{state |
aggregate_state: aggregate_state,
Expand All @@ -315,12 +382,19 @@ defmodule Commanded.Aggregates.Aggregate do

{{:ok, stream_version, pending_events}, state}
else
{:error, _reason} = reply -> {reply, state}
{:error, _reason} = reply ->
{reply, state}
end
end

defp append_to_stream([], _stream_uuid, expected_version, _context), do: {:ok, expected_version}
defp append_to_stream(pending_events, stream_uuid, expected_version, %ExecutionContext{causation_id: causation_id, correlation_id: correlation_id, metadata: metadata}) do
defp append_to_stream(pending_events, stream_uuid, expected_version, context) do
%ExecutionContext{
causation_id: causation_id,
correlation_id: correlation_id,
metadata: metadata
} = context

event_data = Mapper.map_to_event_data(pending_events, causation_id, correlation_id, metadata)

EventStore.append_to_stream(stream_uuid, expected_version, event_data)
Expand Down
15 changes: 15 additions & 0 deletions lib/commanded/aggregates/aggregate_inspect.ex
@@ -0,0 +1,15 @@
alias Commanded.Aggregates.Aggregate

defimpl Inspect, for: Aggregate do
import Inspect.Algebra

def inspect(%Aggregate{} = aggregate, opts) do
%Aggregate{
aggregate_module: aggregate_module,
aggregate_uuid: aggregate_uuid,
aggregate_version: aggregate_version
} = aggregate

concat(["#", to_doc(aggregate_module, opts), "<#{aggregate_uuid}@#{aggregate_version}>"])
end
end
4 changes: 4 additions & 0 deletions lib/commanded/aggregates/execution_context.ex
Expand Up @@ -7,6 +7,9 @@ defmodule Commanded.Aggregates.ExecutionContext do
- `command` - the command to execute, typically a struct
(e.g. `%OpenBankAccount{...}`).
- `retry_attempts` - the number of retries permitted if an
`{:error, :wrong_expected_version}` is encountered when appending events.
- `causation_id` - the UUID assigned to the dispatched command.
- `correlation_id` - a UUID used to correlate related commands/events.
Expand All @@ -33,6 +36,7 @@ defmodule Commanded.Aggregates.ExecutionContext do

defstruct [
:command,
:retry_attempts,
:causation_id,
:correlation_id,
:function,
Expand Down

0 comments on commit 83e61f8

Please sign in to comment.