Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow event handlers to subscribe to individual streams #203

Merged
merged 3 commits into from
Sep 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: elixir

elixir:
- 1.7.2
- 1.7.3

otp_release:
- 21.0
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Allow aggregate identity to be of any type that implements the `String.Chars` protocol ([#166](https://github.com/commanded/commanded/pull/166)).
- Process manager and event handler error & exception handling ([#192](https://github.com/commanded/commanded/pull/192)).
- Process manager event handling timeout ([#193](https://github.com/commanded/commanded/pull/193)).
- Allow event handlers to subscribe to individual streams ([#203](https://github.com/commanded/commanded/pull/203)).

### Bug fixes

Expand Down
7 changes: 2 additions & 5 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use Mix.Config

alias Commanded.EventStore.Adapters.InMemory
alias Commanded.Serialization.JsonSerializer

config :logger, :console, level: :warn, format: "[$level] $message\n"

config :ex_unit,
capture_log: true,
assert_receive_timeout: 200
assert_receive_timeout: 1_000

config :commanded,
dispatch_consistency_timeout: 100,
event_store_adapter: InMemory
event_store_adapter: Commanded.EventStore.Adapters.InMemory
14 changes: 14 additions & 0 deletions guides/Events.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ Use the `:current` position when you don't want newly created event handlers to

You should start your event handlers using a [supervisor](#supervision) to ensure they are restarted on error.

### Subscribing to an individual stream

By default event handlers will subscribe to all events appended to any stream. Provide a `subscribe_to` option to subscribe to a single stream.

```elixir
defmodule ExampleHandler do
use Commanded.Event.Handler,
name: __MODULE__,
subscribe_to: "stream1234"
end
```

This will ensure the handler only receives events appended to that stream.

### `init/0` callback

You can define an `init/0` function in your handler to be called once it has started and successfully subscribed to the event store.
Expand Down
18 changes: 12 additions & 6 deletions lib/commanded/aggregates/aggregate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -449,22 +449,23 @@ defmodule Commanded.Aggregates.Aggregate do
defp persist_events(pending_events, aggregate_state, context, %Aggregate{} = state) do
%Aggregate{aggregate_uuid: aggregate_uuid, aggregate_version: expected_version} = state

with {:ok, stream_version} <-
append_to_stream(pending_events, aggregate_uuid, expected_version, context) do
with :ok <- append_to_stream(pending_events, aggregate_uuid, expected_version, context) do
aggregate_version = expected_version + length(pending_events)

state = %Aggregate{
state
| aggregate_state: aggregate_state,
aggregate_version: stream_version
aggregate_version: aggregate_version
}

{{:ok, stream_version, pending_events}, state}
{{:ok, aggregate_version, pending_events}, state}
else
{:error, _error} = reply ->
{reply, state}
end
end

defp append_to_stream([], _stream_uuid, expected_version, _context), do: {:ok, expected_version}
defp append_to_stream([], _stream_uuid, _expected_version, _context), do: :ok

defp append_to_stream(pending_events, stream_uuid, expected_version, context) do
%ExecutionContext{
Expand All @@ -473,7 +474,12 @@ defmodule Commanded.Aggregates.Aggregate do
metadata: metadata
} = context

event_data = Mapper.map_to_event_data(pending_events, causation_id, correlation_id, metadata)
event_data =
Mapper.map_to_event_data(pending_events,
causation_id: causation_id,
correlation_id: correlation_id,
metadata: metadata
)

EventStore.append_to_stream(stream_uuid, expected_version, event_data)
end
Expand Down
7 changes: 4 additions & 3 deletions lib/commanded/assertions/event_assertions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ defmodule Commanded.Assertions.EventAssertions do
end

defp create_subscription(subscription_name),
do: EventStore.subscribe_to_all_streams(subscription_name, self(), :origin)
do: EventStore.subscribe_to(:all, subscription_name, self(), :origin)

defp remove_subscription(subscription_name),
do: EventStore.unsubscribe_from_all_streams(subscription_name)
do: EventStore.unsubscribe(subscription_name)

defp ack_events(subscription, events), do: EventStore.ack_event(subscription, List.last(events))
defp ack_events(subscription, events),
do: EventStore.ack_event(subscription, List.last(events))
end
47 changes: 39 additions & 8 deletions lib/commanded/event/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ defmodule Commanded.Event.Handler do

{:ok, _handler} = ExampleHandler.start_link(start_from: :current)


### Subscribing to an individual stream

By default event handlers will subscribe to all events appended to any stream.
Provide a `subscribe_to` option to subscribe to a single stream.

defmodule ExampleHandler do
use Commanded.Event.Handler,
name: __MODULE__,
subscribe_to: "stream1234"
end

This will ensure the handler only receives events appended to that stream.

## `c:init/0` callback

You can define an `c:init/0` function in your handler to be called once it has
Expand Down Expand Up @@ -302,7 +316,7 @@ defmodule Commanded.Event.Handler do
{valid, invalid} =
module_opts
|> Keyword.merge(local_opts)
|> Keyword.split([:consistency, :start_from] ++ additional_allowed_opts)
|> Keyword.split([:consistency, :start_from, :subscribe_to] ++ additional_allowed_opts)

if Enum.any?(invalid) do
raise "#{inspect(module)} specifies invalid options: #{inspect(Keyword.keys(invalid))}"
Expand Down Expand Up @@ -331,6 +345,7 @@ defmodule Commanded.Event.Handler do
:handler_module,
:last_seen_event,
:subscribe_from,
:subscribe_to,
:subscription
]

Expand All @@ -342,7 +357,8 @@ defmodule Commanded.Event.Handler do
handler_name: handler_name,
handler_module: handler_module,
consistency: consistency(opts),
subscribe_from: start_from(opts)
subscribe_from: start_from(opts),
subscribe_to: subscribe_to(opts)
}

Registration.start_link(name, __MODULE__, handler)
Expand All @@ -367,14 +383,17 @@ defmodule Commanded.Event.Handler do

@doc false
def handle_call(:config, _from, %Handler{} = state) do
%Handler{consistency: consistency, subscribe_from: subscribe_from} = state
%Handler{consistency: consistency, subscribe_from: subscribe_from, subscribe_to: subscribe_to} =
state

{:reply, [consistency: consistency, start_from: subscribe_from], state}
config = [consistency: consistency, start_from: subscribe_from, subscribe_to: subscribe_to]

{:reply, config, state}
end

@doc false
def handle_cast(:subscribe_to_events, %Handler{} = state) do
{:noreply, subscribe_to_all_streams(state)}
{:noreply, subscribe_to_events(state)}
end

@doc false
Expand Down Expand Up @@ -417,11 +436,15 @@ defmodule Commanded.Event.Handler do
end
end

defp subscribe_to_all_streams(%Handler{} = state) do
%Handler{handler_name: handler_name, subscribe_from: subscribe_from} = state
defp subscribe_to_events(%Handler{} = state) do
%Handler{
handler_name: handler_name,
subscribe_from: subscribe_from,
subscribe_to: subscribe_to
} = state

{:ok, subscription} =
EventStore.subscribe_to_all_streams(handler_name, self(), subscribe_from)
EventStore.subscribe_to(subscribe_to, handler_name, self(), subscribe_from)

%Handler{state | subscription: subscription}
end
Expand Down Expand Up @@ -579,6 +602,14 @@ defmodule Commanded.Event.Handler do
end
end

defp subscribe_to(opts) do
case opts[:subscribe_to] || :all do
:all -> :all
stream when is_binary(stream) -> stream
invalid -> "Invalid `subscribe_to` option: #{inspect(invalid)}"
end
end

defp describe(%Handler{handler_module: handler_module}),
do: inspect(handler_module)
end
23 changes: 9 additions & 14 deletions lib/commanded/event/mapper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,25 @@ defmodule Commanded.Event.Mapper do
@moduledoc false

alias Commanded.EventStore.TypeProvider
alias Commanded.EventStore.{EventData,RecordedEvent}
alias Commanded.EventStore.{EventData, RecordedEvent}

def map_to_event_data(events, causation_id, correlation_id, metadata)
when is_list(events)
do
Enum.map(events, &map_to_event_data(&1, causation_id, correlation_id, metadata))
def map_to_event_data(events, fields) when is_list(events) do
Enum.map(events, &map_to_event_data(&1, fields))
end

def map_to_event_data(event, causation_id, correlation_id, metadata) do
def map_to_event_data(event, fields) do
%EventData{
causation_id: causation_id,
correlation_id: correlation_id,
causation_id: Keyword.get(fields, :causation_id),
correlation_id: Keyword.get(fields, :correlation_id),
event_type: TypeProvider.to_string(event),
data: event,
metadata: metadata,
metadata: Keyword.get(fields, :metadata, %{})
}
end

def map_from_recorded_events(recorded_events)
when is_list(recorded_events)
do
def map_from_recorded_events(recorded_events) when is_list(recorded_events) do
Enum.map(recorded_events, &map_from_recorded_event/1)
end

def map_from_recorded_event(%RecordedEvent{data: data}),
do: data
def map_from_recorded_event(%RecordedEvent{data: data}), do: data
end
Loading