Skip to content
Ben Smith edited this page Mar 18, 2019 · 22 revisions

Frequently asked questions

What event store can I use with Commanded?

You can choose between:

There is also an in-memory event store for test use only.

Can I use Apache Kafka with Commanded?

No, and it's probably not a good idea to use Kafka as an event store.

Kafka is ideal for event streaming, but not persisting domain events. It might be a good complement to your event store as a way of transporting events to downstream query services, read models, or other services.

What versions of the supporting libraries can I use with Commanded?

Please refer to the Compatibility matrix for supported versions of Commanded and its related libraries.

How do I build a read model?

Use an event handler to project domain events into a read model store. You can use almost any storage adapter: relational database, NoSQL, full-text search index, filesystem.

You can use Commanded Ecto projections to build a read model using one of the databases supported by Ecto (PostgreSQL, MySQL, et al).

How do I deal with eventually consistent read models?

Use Commanded's :strong consistency guarantee when dispatching commands to ensure any read models are up-to-date once the command has been successfully dispatch.

How do I handle read model projections that crash?

Make your projections liberal in what they accept, conservative in what they do. Design them in a way that they cannot crash. You shouldn’t be using database constraints in the read model; invariants are protected by the aggregates. Once a domain event has happened you must be able to handle it. As an example, use an "upsert" instead of an insert to prevent duplicates in a projection. Ecto has support for doing upserts using the :on_conflict option.

Event handlers, such as read model projectors, can implement an error/3 callback function to handle any exceptions or errors. It allows you to decide whether to stop the process, skip the problematic event, or retry to attempt to resolve transient problems.

You can also configure the restart strategy of each individual projector, so that instead of restarting it is stopped on error (e.g. :temporary restart strategy). You must have a way of being notified of exceptions and errors so that you can manually step in and fix the issue, then restart the projector. This ensures your app continues working, but without the errored projector.

How do I reduce aggregate memory usage?

By default aggregate processes will run indefinitely once started. You can provide a timeout value to shutdown inactive processes using the aggregate lifespan feature. This will reduce memory consumed by unused aggregates, however the next commanded sent to the aggregate will require its state to be rehydrated by reading its domain events from the event store.

How do I model my aggregates, entities, and aggregate roots?

This is perhaps the hardest part of effectively applying CQRS/ES to your application. I would recommend reading the material referenced in Domain modelling.

How do I handle concurrent commands?

Aggregate instances are standard GenServer processes so concurrent commands sent to the same instance will be handled serially. Elixir’s process model simplifies command handling by serialising requests via the process mailbox. Commands sent to different aggregates or instances are processed concurrently.

When two commands are sent to the same aggregate instance at the same time the first command received will be immediately processed. The second command will be enqueued into the process mailbox and handled once the first has completed. The aggregate’s state held by the GenServer process is updated after each successfully persisted domain event. This ensures that subsequent commands always execute against the latest aggregate state.

Handling concurrent commands is no different from handling commands sent one after another serially. Your aggregate determines whether a command can be handled or not, how you do so will depend upon your particular business case and what invariants need to be enforced.

Running Commanded on multiple nodes that aren't connected to form a cluster affects the above processing as you may have multiple aggregate instance processes running on different nodes. This is where the event store’s stream version checking (optimistic concurrency) will ensure that concurrent event writes don't conflict. One write will succeed, the other will fail. On the failure node, the missing events are read from the event store, the aggregate state is updated, and the failed command is retried. This is all handled for you automatically by Commanded.

How do I enrich commands with external data?

You can enrich commands with external data, such as from an external API or read model projection in one of the following ways:

  1. During command handling, in the aggregate.
  2. Before command dispatch.
  3. Using a command dispatch middleware.

Commands are executed serially by each aggregate instance, so if you make an external API request during command execution it will block any other command sent to that aggregate instance. This might be acceptable for your scenario if you don’t expect much concurrency or you’re actually creating a new aggregate.

The middleware approach allows you to keep your aggregate command handling functions pure, and easily testable, by ensuring the command is fully populated before being handled by an aggregate. You can use the following Commanded middleware for command enrichment gist as an example.

Command dispatch also uses a 5 second timeout by default (same as a GenServer call), which can be increased in the router or during command dispatch.

How do I schedule commands for later dispatch?

Use Commanded scheduler to schedule one-off commands to be dispatched at a date in the future.

It was built to work with Commanded and exposes an API to schedule commands directly:

Commanded.Scheduler.schedule_once("schedule-" <> ticket_uuid, %TimeoutReservation{..}, ~N[2020-01-01 12:00:00])

It also supports scheduling a command from a process manager:

def handle(%TicketProcessManager{}, %TicketReserved{} = event) do
  %TicketReserved{ticket_uuid: ticket_uuid, expires_at: expires_at} = event

    schedule_uuid: "schedule-" <> ticket_uuid,
    command: %TimeoutReservation{ticket_uuid: ticket_uuid},
    due_at: expires_at
You can’t perform that action at this time.