Skip to content

Commit

Permalink
[Test] Subscribe to all stream and single stream
Browse files Browse the repository at this point in the history
Create PostgreSQL connection for notify test.
Allow short delay for distributed subscription tests.
Increase assert receive and refute timeouts in `distributed` env
Use latest Elixir and Erlang versions in Travis CI config
  • Loading branch information
slashdotdash committed Jan 9, 2018
1 parent 8cb4645 commit 99b7057
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 74 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
@@ -1,10 +1,10 @@
language: elixir

elixir:
- 1.5.2
- 1.5.3

otp_release:
- 20.1
- 20.2

services:
- postgresql
Expand Down
4 changes: 2 additions & 2 deletions config/distributed.exs
Expand Up @@ -5,8 +5,8 @@ config :logger, :console, level: :warn, format: "[$level] $message\n"

config :ex_unit,
capture_log: true,
assert_receive_timeout: 2_000,
refute_receive_timeout: 2_000
assert_receive_timeout: 10_000,
refute_receive_timeout: 5_000

config :eventstore, EventStore.Storage,
serializer: EventStore.JsonSerializer,
Expand Down
7 changes: 1 addition & 6 deletions lib/event_store/notifications/listener.ex
Expand Up @@ -6,11 +6,6 @@ defmodule EventStore.Notifications.Listener do
# executed by a trigger. The notification payload contains the first and last
# event number of the appended events. These events are then read from storage
# and published to interested subscribers.
#
# Erlang's global module is used to ensure only a single instance of the
# listener process is kept running on a cluster of nodes. This minimises
# connections to the event store database. There will be at most one `LISTEN`
# connection per cluster.

use GenStage

Expand Down Expand Up @@ -72,7 +67,7 @@ defmodule EventStore.Notifications.Listener do
{{:value, event}, queue} ->
state = %Listener{state | demand: demand - 1, queue: queue}
dispatch_events([event | events], state)
{:empty, queue} ->
{:empty, _queue} ->
{:noreply, Enum.reverse(events), state}
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/event_store/notifications/reader.ex
Expand Up @@ -16,13 +16,13 @@ defmodule EventStore.Notifications.Reader do
def init(_args) do
opts = [
dispatcher: GenStage.BroadcastDispatcher,
subscribe_to: [{Listener, max_demand: 1}],
subscribe_to: [{Listener, max_demand: 1}],
]

{:producer_consumer, :ok, opts}
end

# Fecth events from storage and pass onwards to subscibers
# Fetch events from storage and pass onwards to subscibers
def handle_events(events, _from, state) do
events = Enum.map(events, fn {first_event_number, last_event_number} ->
read_events(first_event_number, last_event_number)
Expand Down
40 changes: 27 additions & 13 deletions lib/event_store/notifications/supervisor.ex
@@ -1,9 +1,18 @@
defmodule EventStore.Notifications.Supervisor do
@moduledoc false

# Supervises the individual `GenStage` stages used to listen to and broadcast
# all events appended to storage.
#
# Erlang's global module is used to ensure only a single instance of this
# supervisor process, and its children including the PostgreSQL listener
# process, runs on a cluster of nodes. This minimises connections to the event
# store database. There will be at most one `LISTEN` connection per cluster.

use Supervisor

alias EventStore.Config

alias EventStore.Notifications.{
AllStreamBroadcaster,
Listener,
Expand All @@ -21,9 +30,11 @@ defmodule EventStore.Notifications.Supervisor do
case Supervisor.start_link(__MODULE__, args, name: {:global, __MODULE__}) do
{:ok, pid} ->
{:ok, pid}

{:error, {:already_started, pid}} ->
Process.link(pid)
{:ok, pid}

:ignore ->
:ignore
end
Expand All @@ -32,18 +43,21 @@ defmodule EventStore.Notifications.Supervisor do
def init(config) do
notification_opts = Config.notification_postgrex_opts(config)

Supervisor.init([
%{
id: EventStore.Notifications,
start: {Postgrex.Notifications, :start_link, [notification_opts]},
restart: :permanent,
shutdown: 5000,
type: :worker
},
{Listener, []},
{Reader, []},
{AllStreamBroadcaster, []},
{StreamBroadcaster, []},
], strategy: :one_for_all)
Supervisor.init(
[
%{
id: EventStore.Notifications,
start: {Postgrex.Notifications, :start_link, [notification_opts]},
restart: :permanent,
shutdown: 5000,
type: :worker
},
{Listener, []},
{Reader, []},
{AllStreamBroadcaster, []},
{StreamBroadcaster, []}
],
strategy: :one_for_all
)
end
end
19 changes: 12 additions & 7 deletions lib/event_store/supervisor.ex
@@ -1,8 +1,9 @@
defmodule EventStore.Supervisor do
@moduledoc false

use Supervisor

alias EventStore.{Config,Registration}
alias EventStore.{Config, Registration}

def start_link(args) do
Supervisor.start_link(__MODULE__, args)
Expand All @@ -11,12 +12,16 @@ defmodule EventStore.Supervisor do
def init(config) do
postgrex_opts = Config.postgrex_opts(config)

children = [
{Postgrex, postgrex_opts},
Supervisor.child_spec({Registry, keys: :unique, name: EventStore.Subscriptions.Subscription}, id: EventStore.Subscriptions.Subscription),
{EventStore.Subscriptions.Supervisor, config},
{EventStore.Notifications.Supervisor, config},
] ++ Registration.child_spec()
children =
[
{Postgrex, postgrex_opts},
Supervisor.child_spec(
{Registry, keys: :unique, name: EventStore.Subscriptions.Subscription},
id: EventStore.Subscriptions.Subscription
),
{EventStore.Subscriptions.Supervisor, config},
{EventStore.Notifications.Supervisor, config}
] ++ Registration.child_spec()

Supervisor.init(children, strategy: :one_for_one)
end
Expand Down
15 changes: 13 additions & 2 deletions test/notifications/notify_events_test.exs
@@ -1,12 +1,23 @@
defmodule EventStore.Notifications.NotifyEventsTest do
use EventStore.StorageCase

alias EventStore.EventFactory
alias EventStore.{Config,EventFactory,ProcessHelper}

@channel "events"

setup do
{:ok, ref} = Postgrex.Notifications.listen(EventStore.Notifications, @channel)
config = Config.parsed()
notification_opts =
config
|> Config.notification_postgrex_opts()
|> Keyword.put(:name, __MODULE__)

{:ok, conn} = Postgrex.Notifications.start_link(notification_opts)
{:ok, ref} = Postgrex.Notifications.listen(conn, @channel)

on_exit fn ->
ProcessHelper.shutdown(conn)
end

[ref: ref]
end
Expand Down
4 changes: 2 additions & 2 deletions test/storage/append_events_test.exs
Expand Up @@ -87,13 +87,13 @@ defmodule EventStore.Storage.AppendEventsTest do
end)
|> Enum.map(&Task.await/1)

assert [
assert results -- [
ok: Enum.to_list(1..10),
error: :wrong_expected_version,
error: :wrong_expected_version,
error: :wrong_expected_version,
error: :wrong_expected_version
] == results
] == []
end

defp create_stream(conn) do
Expand Down
53 changes: 42 additions & 11 deletions test/subscriptions/subscribe_to_stream_test.exs
Expand Up @@ -15,7 +15,6 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do
describe "single stream subscription" do
setup [:append_events_to_another_stream]

@tag :wip
test "subscribe to single stream from origin should receive all its events", %{subscription_name: subscription_name} do
stream_uuid = UUID.uuid4
events = EventFactory.create_events(3)
Expand Down Expand Up @@ -162,15 +161,6 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do

assert_receive {:DOWN, ^ref, _, _, _}
end

# append events to another stream so that for single stream subscription tests the
# event id does not match the stream version
def append_events_to_another_stream(_context) do
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(3)

:ok = EventStore.append_to_stream(stream_uuid, 0, events)
end
end

describe "all stream subscription" do
Expand Down Expand Up @@ -326,7 +316,7 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do
stream1_events = EventFactory.create_events(1)
stream2_events = EventFactory.create_events(1)

{:ok, subscription} = Subscriptions.subscribe_to_all_streams(subscription_name, self())
{:ok, subscription} = subscribe_to_all_streams(subscription_name, self())

:ok = Stream.append_to_stream(stream1_uuid, 0, stream1_events)
:ok = Stream.append_to_stream(stream2_uuid, 0, stream2_events)
Expand Down Expand Up @@ -454,6 +444,38 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do
end
end

describe "both single and all stream subscriptions" do
setup [:append_events_to_another_stream]

test "should receive events from both subscriptions", %{subscription_name: subscription_name} do
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(3)

{:ok, _subscription} = subscribe_to_stream(stream_uuid, subscription_name <> "-single", self())
{:ok, _subscription} = subscribe_to_all_streams(subscription_name <> "-all", self(), start_from_event_number: 3)

:ok = Stream.append_to_stream(stream_uuid, 0, events)

# should receive events twice
assert_receive_events(stream_uuid, events)
assert_receive_events(stream_uuid, events)
refute_receive {:events, _received_events}
end

defp assert_receive_events(stream_uuid, expected_events) do
assert_receive {:events, received_events}
assert pluck(received_events, :event_number) == [4, 5, 6]
assert pluck(received_events, :stream_uuid) == [stream_uuid, stream_uuid, stream_uuid]
assert pluck(received_events, :stream_version) == [1, 2, 3]
assert pluck(received_events, :correlation_id) == pluck(expected_events, :correlation_id)
assert pluck(received_events, :causation_id) == pluck(expected_events, :causation_id)
assert pluck(received_events, :event_type) == pluck(expected_events, :event_type)
assert pluck(received_events, :data) == pluck(expected_events, :data)
assert pluck(received_events, :metadata) == pluck(expected_events, :metadata)
refute pluck(received_events, :created_at) |> Enum.any?(&is_nil/1)
end
end

describe "duplicate subscriptions" do
setup [:create_two_duplicate_subscriptions]

Expand Down Expand Up @@ -559,6 +581,15 @@ defmodule EventStore.Subscriptions.SubscribeToStreamTest do
end
end

# append events to another stream so that for single stream subscription tests the
# event id does not match the stream version
def append_events_to_another_stream(_context) do
stream_uuid = UUID.uuid4()
events = EventFactory.create_events(3)

:ok = EventStore.append_to_stream(stream_uuid, 0, events)
end

# subscribe to a single stream and wait for the subscription to be subscribed
defp subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts \\ []) do
with {:ok, subscription} <- Subscriptions.subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts) do
Expand Down
35 changes: 11 additions & 24 deletions test/support/storage_case.ex
@@ -1,11 +1,11 @@
defmodule EventStore.StorageCase do
use ExUnit.CaseTemplate

alias EventStore.{Config,ProcessHelper,Registration}
alias EventStore.{Config, ProcessHelper}

setup do
config = Config.parsed()
registry = Registration.registry_provider()
registry = Application.get_env(:eventstore, :registry, :local)

before_reset(registry)

Expand All @@ -15,39 +15,26 @@ defmodule EventStore.StorageCase do

after_reset(registry)

on_exit fn ->
on_exit(fn ->
ProcessHelper.shutdown(conn)
end
end)

{:ok, %{conn: conn}}
end

defp before_reset(EventStore.Registration.Distributed) do
Application.stop(:swarm)

nodes()
|> Enum.map(&Task.async(fn ->
:ok = EventStore.Cluster.rpc(&1, Application, :stop, [:eventstore])
end))
|> Enum.map(&Task.await(&1, 5_000))
end

defp before_reset(_registry) do
defp before_reset(:local) do
Application.stop(:eventstore)
end

defp after_reset(EventStore.Registration.Distributed) do
nodes()
|> Enum.map(&Task.async(fn ->
{:ok, _} = EventStore.Cluster.rpc(&1, Application, :ensure_all_started, [:swarm])
{:ok, _} = EventStore.Cluster.rpc(&1, Application, :ensure_all_started, [:eventstore])
end))
|> Enum.map(&Task.await(&1, 5_000))
defp before_reset(:distributed) do
_ = :rpc.multicall(Application, :stop, [:eventstore])
end

defp after_reset(_registry) do
defp after_reset(:local) do
{:ok, _} = Application.ensure_all_started(:eventstore)
end

defp nodes, do: [Node.self() | Node.list(:connected)]
defp after_reset(:distributed) do
_ = :rpc.multicall(Application, :ensure_all_started, [:eventstore])
end
end
10 changes: 7 additions & 3 deletions test/test_helper.exs
@@ -1,6 +1,10 @@
ExUnit.start(exclude: [:distributed, :manual, :ignore])
exclude = [:manual, :ignore]

case Application.get_env(:eventstore, :registry) do
:distributed -> EventStore.Cluster.spawn()
_ -> :ok
:local ->
ExUnit.start(exclude: exclude)

:distributed ->
EventStore.Cluster.spawn()
ExUnit.start(exclude: exclude)
end

0 comments on commit 99b7057

Please sign in to comment.