Skip to content

Commit

Permalink
Document event mapper and additional process manager configuration op…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
slashdotdash committed Aug 14, 2019
1 parent 6baa3b7 commit c5cd751
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,7 +1,7 @@
language: elixir

elixir:
- 1.9.0
- 1.9.1

otp_release:
- 22.0
4 changes: 2 additions & 2 deletions CHANGELOG.md
Expand Up @@ -8,7 +8,7 @@

### Bug fixes

- Fix regression in `Commanded.Middleware.Logger.delta/1` ([#295](https://github.com/commanded/commanded/pull/295)).
- Fix regression in `Commanded.Middleware.Logger.delta` ([#295](https://github.com/commanded/commanded/pull/295)).

## v0.19.0

Expand All @@ -19,7 +19,7 @@
- Add `.formatter.exs` to Hex package ([#247](https://github.com/commanded/commanded/pull/247)).
- Event upcasting ([#263](https://github.com/commanded/commanded/pull/263)).
- Support `:ok` tagged tuple events from aggregate ([#268](https://github.com/commanded/commanded/pull/268)).
- Modify `Commanded.Registration.start_child/3` to pass a child_spec ([#273](https://github.com/commanded/commanded/pull/273)).
- Modify `Commanded.Registration.start_child` to pass a child_spec ([#273](https://github.com/commanded/commanded/pull/273)).
- Add `supervisor_child_spec/2` to `Commanded.Registration` behaviour ([#277](https://github.com/commanded/commanded/pull/277)) used by [Commanded Horde Registry](https://github.com/uberbrodt/commanded_horde_registry).
- Ensure Commanded can be compiled when optional Jason dependency is not present ([#286](https://github.com/commanded/commanded/pull/286)).
- Fix Aggregate initialization races ([#287](https://github.com/commanded/commanded/pull/287)).
Expand Down
25 changes: 5 additions & 20 deletions guides/Process Managers.md
Expand Up @@ -209,25 +209,10 @@ You can choose to start the process router's event store subscription from the `

Process manager instance state is persisted to storage after each handled event. This allows the process manager to resume should the host process terminate.

## Event handling timeout
## Configuration options

You can configure a timeout for event handling to ensure that events are processed in a timely manner without getting stuck.
- `consistency` - defined as one of either `:strong` or `:eventual` (default) for event handling.
- `event_timeout` - a timeout for event handling to ensure that events are processed in a timely manner without getting stuck.
- `idle_timeout` - to reduce memory usage you can configure an idle timeout, in milliseconds, after which an inactive process instance will be shutdown.

An `event_timeout` option, defined in milliseconds, may be provided when using the `Commanded.ProcessManagers.ProcessManager` macro at compile time:

```elixir
defmodule TransferMoneyProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "TransferMoneyProcessManager",
router: BankRouter,
event_timeout: :timer.minutes(10)
end
```

Or may be configured when starting a process manager:

```elixir
{:ok, _pid} = TransferMoneyProcessManager.start_link(event_timeout: :timer.hours(1))
```

After the timeout has elapsed, indicating the process manager has not processed an event within the configured period, the process manager is stopped. The process manager will be restarted if supervised and will retry the event, this should help resolve transient problems.
See the process manager module docs for more details.
52 changes: 51 additions & 1 deletion lib/commanded/event/mapper.ex
@@ -1,13 +1,55 @@
defmodule Commanded.Event.Mapper do
@moduledoc false
@moduledoc """
Map events to/from the structs used for persistence.
## Example
Map domain event structs to `Commanded.EventStore.EventData` structs in
preparation for appending to the configured event store:
events = [%ExampleEvent1{}, %ExampleEvent2{}]
event_data = Commanded.Event.Mapper.map_to_event_data(events)
:ok = Commanded.EventStore.append_to_stream("stream-1234", :any_version, event_data)
"""

alias Commanded.EventStore.TypeProvider
alias Commanded.EventStore.{EventData, RecordedEvent}

@type event :: struct

@doc """
Map a domain event (or list of events) to an
`Commanded.EventStore.EventData` struct (or list of structs).
Optionally, include the `causation_id`, `correlation_id`, and `metadata`
associated with the event(s).
## Examples
event_data = Commanded.Event.Mapper.map_to_event_data(%ExampleEvent{})
event_data =
Commanded.Event.Mapper.map_to_event_data(
[
%ExampleEvent1{},
%ExampleEvent2{}
],
causation_id: UUID.uuid4(),
correlation_id: UUID.uuid4(),
metadata: %{"user_id" => user_id}
)
"""
def map_to_event_data(events, fields \\ [])

@spec map_to_event_data(list(event), map) :: list(EventData.t())
def map_to_event_data(events, fields) when is_list(events) do
Enum.map(events, &map_to_event_data(&1, fields))
end

@spec map_to_event_data(struct, map) :: EventData.t()
def map_to_event_data(event, fields) do
%EventData{
causation_id: Keyword.get(fields, :causation_id),
Expand All @@ -18,9 +60,17 @@ defmodule Commanded.Event.Mapper do
}
end

@doc """
Map a list of `Commanded.EventStore.RecordedEvent` structs to their event data.
"""
@spec map_from_recorded_events(list(RecordedEvent.t())) :: event
def map_from_recorded_events(recorded_events) when is_list(recorded_events) do
Enum.map(recorded_events, &map_from_recorded_event/1)
end

@doc """
Map an `Commanded.EventStore.RecordedEvent` struct to its event data.
"""
@spec map_from_recorded_event(RecordedEvent.t()) :: event
def map_from_recorded_event(%RecordedEvent{data: data}), do: data
end
2 changes: 2 additions & 0 deletions lib/commanded/event/upcast.ex
@@ -1,4 +1,6 @@
defmodule Commanded.Event.Upcast do
@moduledoc false

alias Commanded.Event.Upcaster
alias Commanded.EventStore.RecordedEvent

Expand Down
15 changes: 13 additions & 2 deletions lib/commanded/event/upcaster.ex
@@ -1,14 +1,25 @@
defprotocol Commanded.Event.Upcaster do
@doc """
@moduledoc """
Protocol to allow an event to be transformed before being passed to a
consumer.
You can use an upcaster to change the shape of an event
(e.g. add a new field with a default, rename a field).
(e.g. add a new field with a default, rename a field) or rename an event.
Because the upcaster changes any historical event to the latest version,
consumers (aggregates, event handlers, and process managers) only need
to support the latest version.
## Example
defimpl Commanded.Event.Upcaster, for: AnEvent do
def upcast(%AnEvent{} = event, _metadata) do
%AnEvent{name: name} = event
%AnEvent{event | first_name: name}
end
end
"""

@fallback_to_any true
Expand Down
96 changes: 59 additions & 37 deletions lib/commanded/process_managers/process_manager.ex
@@ -1,6 +1,6 @@
defmodule Commanded.ProcessManagers.ProcessManager do
@moduledoc """
Behaviour to define a process manager.
Macro used to define a process manager.
A process manager is responsible for coordinating one or more aggregates.
It handles events and dispatches commands in response. Process managers have
Expand All @@ -14,23 +14,19 @@ defmodule Commanded.ProcessManagers.ProcessManager do
- `c:apply/2`
- `c:error/3`
## Example
### Example
defmodule ExampleProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "ExampleProcessManager",
router: ExampleRouter
def interested?(%AnEvent{...}) do
# ...
end
def handle(%ExampleProcessManager{...}, %AnEvent{...}) do
# ...
end
def interested?(%AnEvent{uuid: uuid}), do: {:start, uuid}
def apply(%ExampleProcessManager{...}, %AnEvent{...}) do
# ...
def handle(%ExampleProcessManager{}, %ExampleEvent{}) do
[
%ExampleCommand{}
]
end
def error({:error, failure}, %ExampleEvent{}, _failure_context) do
Expand All @@ -42,11 +38,12 @@ defmodule Commanded.ProcessManagers.ProcessManager do
end
end
Start the process manager (or configure as a worker inside a [Supervisor](supervision.html))
Start the process manager (or configure as a worker inside a
[Supervisor](supervision.html))
{:ok, process_manager} = ExampleProcessManager.start_link()
# Error handling
## Error handling
You can define an `c:error/3` callback function to handle any errors or
exceptions during event handling or returned by commands dispatched from your
Expand All @@ -63,7 +60,7 @@ defmodule Commanded.ProcessManagers.ProcessManager do
event handler function or command dispatch. You should supervise your
process managers to ensure they are restarted on error.
## Example
### Example
defmodule ExampleProcessManager do
use Commanded.ProcessManagers.ProcessManager,
Expand All @@ -85,53 +82,78 @@ defmodule Commanded.ProcessManagers.ProcessManager do
end
end
# Idle process timeouts
## Idle process timeouts
Each instance of a process manager will run indefinitely once started. To
reduce memory usage you can configure an idle timeout, in milliseconds,
after which the process will be shutdown.
Each instance of a process manager will run indefinitely once started. To
reduce memory usage you can configure an idle timeout, in milliseconds,
after which the process will be shutdown.
The process will be restarted whenever another event is routed to it and its
state will be rehydrated from the instance snapshot.
The process will be restarted whenever another event is routed to it and its
state will be rehydrated from the instance snapshot.
## Example
### Example
defmodule ExampleProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "ExampleProcessManager",
router: ExampleRouter,
idle_timeout: :timer.minutes(10)
end
defmodule ExampleProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "ExampleProcessManager",
router: ExampleRouter,
idle_timeout: :timer.minutes(10)
end
## Event handling timeout
# Consistency
You can configure a timeout for event handling to ensure that events are
processed in a timely manner without getting stuck.
An `event_timeout` option, defined in milliseconds, may be provided when using
the `Commanded.ProcessManagers.ProcessManager` macro at compile time:
defmodule TransferMoneyProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "TransferMoneyProcessManager",
router: BankRouter,
event_timeout: :timer.minutes(10)
end
Or may be configured when starting a process manager:
{:ok, _pid} = TransferMoneyProcessManager.start_link(
event_timeout: :timer.hours(1)
)
After the timeout has elapsed, indicating the process manager has not
processed an event within the configured period, the process manager is
stopped. The process manager will be restarted if supervised and will retry
the event, this should help resolve transient problems.
## Consistency
For each process manager you can define its consistency, as one of either
`:strong` or `:eventual`.
This setting is used when dispatching commands and specifying the `consistency`
option.
This setting is used when dispatching commands and specifying the
`consistency` option.
When you dispatch a command using `:strong` consistency, after successful
command dispatch the process will block until all process managers configured to
use `:strong` consistency have processed the domain events created by the
command dispatch the process will block until all process managers configured
to use `:strong` consistency have processed the domain events created by the
command.
The default setting is `:eventual` consistency. Command dispatch will return
immediately upon confirmation of event persistence, not waiting for any
process managers.
## Example
### Example
defmodule ExampleProcessManager do
use Commanded.ProcessManagers.ProcessManager,
name: "ExampleProcessManager",
router: BankRouter
consistency: :strong
# ...
end
Please read the [Process managers](process-managers.html) guide for more details.
Please read the [Process managers](process-managers.html) guide for more
detail.
"""

alias Commanded.ProcessManagers.FailureContext
Expand Down Expand Up @@ -197,7 +219,7 @@ defmodule Commanded.ProcessManagers.ProcessManager do
The `c:handle/2` function can be omitted if you do not need to dispatch a
command and are only mutating the process manager's state.
"""
@callback handle(process_manager, domain_event) :: list(command)
@callback handle(process_manager, domain_event) :: command | list(command) | {:error, term}

@doc """
Mutate the process manager's state by applying the domain event.
Expand Down
16 changes: 5 additions & 11 deletions lib/mix/tasks/commanded.reset.ex
Expand Up @@ -2,32 +2,26 @@ defmodule Mix.Tasks.Commanded.Reset do
@moduledoc """
Reset an event handler.
## Examples
## Example
mix commanded.reset MyApp.MyEventHandler
"""

use Mix.Task
# import Mix.EventStore

# alias EventStore.Tasks.Init

@shortdoc "Reset an event handler to its start_from"

@switches [
]

@aliases [
]

@doc false
def run(args) do
module = args |> List.first
module = args |> List.first()

{:ok, _} = Application.ensure_all_started(:commanded)

case Commanded.Registration.whereis_name({Commanded.Event.Handler, module}) do
:undefined ->
IO.puts("No process found for #{module}")

pid ->
IO.puts("Resetting #{module}")
send(pid, :reset)
Expand Down
5 changes: 4 additions & 1 deletion mix.exs
Expand Up @@ -130,7 +130,7 @@ defmodule Commanded.Mixfile do
Events: [
Commanded.Event.FailureContext,
Commanded.Event.Handler,
Commanded.Event.Upcast,
Commanded.Event.Mapper,
Commanded.Event.Upcaster
],
"Process Managers": [
Expand Down Expand Up @@ -159,6 +159,9 @@ defmodule Commanded.Mixfile do
Commanded.Serialization.JsonSerializer,
Commanded.Serialization.ModuleNameTypeProvider
],
Tasks: [
Mix.Tasks.Commanded.Reset
],
Middleware: [
Commanded.Middleware,
Commanded.Middleware.ConsistencyGuarantee,
Expand Down

0 comments on commit c5cd751

Please sign in to comment.