Permalink
Browse files

Extend `Commanded.EventStore` behaviour to include a `child_spec/0` f…

…unction

Allows an event store adapter to define a child specification for processes it requires to be started.

For testing it is no longer necessary to manually start the `InMemory` event store adapter. It will be started automatically as part of the Commanded application.
  • Loading branch information...
slashdotdash committed Oct 11, 2018
1 parent e52a5b3 commit 0fd3758e5945f2bd8736619c48f212cc2bf552ad
@@ -1,6 +1,7 @@
language: elixir
elixir:
- 1.6.6
- 1.7.3
otp_release:
@@ -10,6 +10,9 @@ config :commanded,
dispatch_consistency_timeout: 100,
event_store_adapter: Commanded.EventStore.Adapters.InMemory
config :commanded, Commanded.EventStore.Adapters.InMemory,
serializer: Commanded.Serialization.JsonSerializer
config :commanded, Commanded.Aggregates.LifespanAggregate,
snapshot_every: 2,
snapshot_version: 1
@@ -113,7 +113,7 @@ defmodule Commanded.Assertions.EventAssertions do
try do
apply(callback_fn, [subscription])
after
remove_subscription(subscription_name)
remove_subscription(subscription)
end
end
@@ -176,8 +176,8 @@ defmodule Commanded.Assertions.EventAssertions do
defp create_subscription(subscription_name),
do: EventStore.subscribe_to(:all, subscription_name, self(), :origin)
defp remove_subscription(subscription_name),
do: EventStore.unsubscribe(subscription_name)
defp remove_subscription(subscription),
do: EventStore.unsubscribe(subscription)
defp ack_events(subscription, events),
do: EventStore.ack_event(subscription, List.last(events))
@@ -21,12 +21,6 @@ defmodule Commanded.EventStore.Adapters.InMemory do
]
end
defmodule Subscription do
@moduledoc false
defstruct [:stream_uuid, :name, :subscriber, :start_from, last_seen_event_number: 0]
end
alias Commanded.EventStore.Adapters.InMemory.{State, Subscription}
alias Commanded.EventStore.{EventData, RecordedEvent, SnapshotData}
@@ -41,6 +35,16 @@ defmodule Commanded.EventStore.Adapters.InMemory do
{:ok, state}
end
@impl Commanded.EventStore
def child_spec do
opts = Application.get_env(:commanded, __MODULE__)
[
child_spec(opts),
{DynamicSupervisor, strategy: :one_for_one, name: __MODULE__.SubscriptionsSupervisor}
]
end
@impl Commanded.EventStore
def append_to_stream(stream_uuid, expected_version, events) do
GenServer.call(__MODULE__, {:append, stream_uuid, expected_version, events})
@@ -74,8 +78,8 @@ defmodule Commanded.EventStore.Adapters.InMemory do
end
@impl Commanded.EventStore
def unsubscribe(subscription_name) do
GenServer.call(__MODULE__, {:unsubscribe, subscription_name})
def unsubscribe(subscription) do
GenServer.call(__MODULE__, {:unsubscribe, subscription})
end
@impl Commanded.EventStore
@@ -207,15 +211,10 @@ defmodule Commanded.EventStore.Adapters.InMemory do
{reply, state} =
case Map.get(subscriptions, subscription_name) do
nil ->
state = persistent_subscription(subscription, state)
{{:ok, subscriber}, state}
persistent_subscription(subscription, state)
%Subscription{subscriber: nil} = subscription ->
state =
persistent_subscription(%Subscription{subscription | subscriber: subscriber}, state)
{{:ok, subscriber}, state}
persistent_subscription(%Subscription{subscription | subscriber: subscriber}, state)
_subscription ->
{{:error, :subscription_already_exists}, state}
@@ -225,10 +224,15 @@ defmodule Commanded.EventStore.Adapters.InMemory do
end
@impl GenServer
def handle_call({:unsubscribe, subscription_name}, _from, %State{} = state) do
def handle_call({:unsubscribe, subscription}, _from, %State{} = state) do
%State{persistent_subscriptions: subscriptions} = state
state = %State{state | persistent_subscriptions: Map.delete(subscriptions, subscription_name)}
:ok = stop_subscription(subscription)
state = %State{
state
| persistent_subscriptions: remove_subscriber_by_pid(subscriptions, subscription)
}
{:reply, :ok, state}
end
@@ -266,7 +270,11 @@ defmodule Commanded.EventStore.Adapters.InMemory do
end
def handle_call(:reset!, _from, %State{} = state) do
%State{serializer: serializer} = state
%State{serializer: serializer, persistent_subscriptions: subscriptions} = state
for {_name, %Subscription{subscriber: subscriber}} <- subscriptions, is_pid(subscriber) do
:ok = stop_subscription(subscriber)
end
{:reply, :ok, %State{serializer: serializer}}
end
@@ -359,19 +367,30 @@ defmodule Commanded.EventStore.Adapters.InMemory do
end
defp persistent_subscription(%Subscription{} = subscription, %State{} = state) do
%Subscription{name: subscription_name, subscriber: subscriber} = subscription
%Subscription{name: subscription_name} = subscription
%State{persistent_subscriptions: subscriptions, persisted_events: persisted_events} = state
Process.monitor(subscriber)
subscription_spec = subscription |> Subscription.child_spec() |> Map.put(:restart, :temporary)
{:ok, pid} =
DynamicSupervisor.start_child(__MODULE__.SubscriptionsSupervisor, subscription_spec)
send(subscriber, {:subscribed, subscriber})
Process.monitor(pid)
catch_up(subscription, persisted_events, state)
%State{
subscription = %Subscription{subscription | subscriber: pid}
state = %State{
state
| persistent_subscriptions: Map.put(subscriptions, subscription_name, subscription)
}
{{:ok, pid}, state}
end
defp stop_subscription(subscription) do
DynamicSupervisor.terminate_child(__MODULE__.SubscriptionsSupervisor, subscription)
end
defp remove_subscriber_by_pid(subscriptions, pid) do
@@ -435,19 +454,8 @@ defmodule Commanded.EventStore.Adapters.InMemory do
defp publish_to_persistent_subscriptions(stream_uuid, events, %State{} = state) do
%State{persistent_subscriptions: subscriptions} = state
subscribers =
subscriptions
|> Map.values()
|> Enum.filter(fn
%Subscription{stream_uuid: ^stream_uuid} -> true
%Subscription{stream_uuid: :all} -> true
%Subscription{} -> false
end)
|> Enum.map(& &1.subscriber)
|> Enum.filter(&is_pid/1)
for subscriber <- subscribers do
send(subscriber, {:events, events})
for {_name, %Subscription{subscriber: subscriber}} <- subscriptions, is_pid(subscriber) do
send(subscriber, {:events, stream_uuid, events})
end
end
@@ -0,0 +1,40 @@
defmodule Commanded.EventStore.Adapters.InMemory.Subscription do
@moduledoc false
use GenServer
alias Commanded.EventStore.Adapters.InMemory.Subscription
defstruct [:stream_uuid, :name, :subscriber, :ref, :start_from, last_seen_event_number: 0]
def start_link(%Subscription{} = state) do
GenServer.start_link(__MODULE__, state)
end
@impl GenServer
def init(%Subscription{} = state) do
%Subscription{subscriber: subscriber} = state
send(subscriber, {:subscribed, self()})
state = %Subscription{state | ref: Process.monitor(subscriber)}
{:ok, state}
end
@impl GenServer
def handle_info({:events, stream_uuid, events}, %Subscription{} = state) do
%Subscription{subscriber: subscriber, stream_uuid: subscription_stream_uuid} = state
if subscription_stream_uuid in [:all, stream_uuid] do
send(subscriber, {:events, events})
end
{:noreply, state}
end
@impl GenServer
def handle_info({:DOWN, _ref, :process, pid, reason}, %Subscription{} = state) do
{:stop, reason, state}
end
end
@@ -7,12 +7,19 @@ defmodule Commanded.EventStore do
@type stream_uuid :: String.t()
@type start_from :: :origin | :current | integer
@type expected_version :: :any_version | :no_stream | :stream_exists | non_neg_integer()
@type expected_version :: :any_version | :no_stream | :stream_exists | non_neg_integer
@type subscription_name :: String.t()
@type subscription :: any
@type subscriber :: pid
@type source_uuid :: String.t()
@type snapshot :: SnapshotData.t()
@type error :: term
@doc """
Return a child spec defining all processes required by the event store.
"""
@callback child_spec() :: [:supervisor.child_spec()]
@doc """
Append one or more events to a stream atomically.
"""
@@ -80,8 +87,8 @@ defmodule Commanded.EventStore do
{:ok, subscription} = Commanded.EventStore.subscribe_to("stream1234", "Example", self())
"""
@callback subscribe_to(stream_uuid | :all, subscription_name, subscriber :: pid, start_from) ::
{:ok, subscription :: pid}
@callback subscribe_to(stream_uuid | :all, subscription_name, subscriber, start_from) ::
{:ok, subscription}
| {:error, :subscription_already_exists}
| {:error, error}
@@ -94,7 +101,7 @@ defmodule Commanded.EventStore do
@doc """
Unsubscribe an existing subscriber from event notifications.
"""
@callback unsubscribe(subscription_name) :: :ok
@callback unsubscribe(subscription) :: :ok
@doc """
Read a snapshot, if available, for a given source.
@@ -111,6 +118,11 @@ defmodule Commanded.EventStore do
"""
@callback delete_snapshot(source_uuid) :: :ok | {:error, error}
@spec child_spec() :: [:supervisor.child_spec()]
def child_spec do
event_store_adapter().child_spec()
end
@doc """
Append one or more events to a stream atomically.
"""
@@ -154,8 +166,8 @@ defmodule Commanded.EventStore do
@doc """
Create a persistent subscription to an event stream.
"""
@spec subscribe_to(stream_uuid | :all, subscription_name, subscriber :: pid, start_from) ::
{:ok, subscription :: pid}
@spec subscribe_to(stream_uuid | :all, subscription_name, subscriber, start_from) ::
{:ok, subscription}
| {:error, :subscription_already_exists}
| {:error, error}
def subscribe_to(stream_uuid, subscription_name, subscriber, start_from)
@@ -175,9 +187,9 @@ defmodule Commanded.EventStore do
@doc """
Unsubscribe an existing subscriber from all event notifications.
"""
@spec unsubscribe(subscription_name) :: :ok
def unsubscribe(subscription_name) when is_binary(subscription_name) do
event_store_adapter().unsubscribe(subscription_name)
@spec unsubscribe(subscription) :: :ok
def unsubscribe(subscription) do
event_store_adapter().unsubscribe(subscription)
end
@doc """
@@ -2,20 +2,22 @@ defmodule Commanded.Supervisor do
@moduledoc false
use Supervisor
alias Commanded.{PubSub, Registration}
alias Commanded.{EventStore, PubSub, Registration}
def start_link do
Supervisor.start_link(__MODULE__, [])
end
def init(_) do
children =
Registration.child_spec() ++ PubSub.child_spec() ++
[
{Task.Supervisor, name: Commanded.Commands.TaskDispatcher},
{Commanded.Aggregates.Supervisor, []},
{Commanded.Subscriptions, []}
]
EventStore.child_spec() ++
Registration.child_spec() ++
PubSub.child_spec() ++
[
{Task.Supervisor, name: Commanded.Commands.TaskDispatcher},
{Commanded.Aggregates.Supervisor, []},
{Commanded.Subscriptions, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
@@ -7,7 +7,7 @@ defmodule Commanded.Mixfile do
[
app: :commanded,
version: @version,
elixir: "~> 1.5",
elixir: "~> 1.6",
elixirc_paths: elixirc_paths(Mix.env()),
deps: deps(),
description: description(),
@@ -197,7 +197,7 @@ defmodule Commanded.EventStore.SubscriptionTestCase do
assert_receive_events(subscription, 1, from: 1)
:ok = EventStore.unsubscribe("subscriber")
:ok = EventStore.unsubscribe(subscription)
:ok = EventStore.append_to_stream("stream2", 0, build_events(2))
:ok = EventStore.append_to_stream("stream3", 0, build_events(3))
@@ -219,13 +219,8 @@ defmodule Commanded.EventStore.SubscriptionTestCase do
assert length(received_events) == 3
end)
# wait for last `ack`
wait_for_event_store()
ProcessHelper.shutdown(subscriber)
wait_for_event_store()
{:ok, subscriber} = Subscriber.start_link()
wait_until(fn ->
@@ -5,14 +5,10 @@ defmodule Commanded.StorageCase do
require Logger
alias Commanded.EventStore.Adapters.InMemory
setup do
on_exit(fn ->
:ok = Application.stop(:commanded)
InMemory.reset!()
{:ok, _apps} = Application.ensure_all_started(:commanded)
end)
end
@@ -29,6 +29,8 @@ defmodule Commanded.MockEventStoreCase do
end
defp use_event_store_adapter(adapter) do
expect(MockEventStore, :child_spec, fn -> [] end)
:ok = Application.stop(:commanded)
Application.put_env(:commanded, :event_store_adapter, adapter)
@@ -1,6 +1 @@
alias Commanded.EventStore.Adapters.InMemory
alias Commanded.Serialization.JsonSerializer
{:ok, _pid} = InMemory.start_link(serializer: JsonSerializer)
ExUnit.start()

0 comments on commit 0fd3758

Please sign in to comment.