Skip to content

Commit

Permalink
Merge pull request #134 from commanded/feature/concurrency
Browse files Browse the repository at this point in the history
Subscription concurrency
  • Loading branch information
slashdotdash committed Sep 18, 2018
2 parents 10dc4ae + 47accab commit 02e1583
Show file tree
Hide file tree
Showing 39 changed files with 2,746 additions and 1,298 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
1 change: 1 addition & 0 deletions .travis.yml
Expand Up @@ -6,6 +6,7 @@ cache:

elixir:
- 1.6.6
- 1.7.3

otp_release:
- 21.0
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,7 @@

- Add `:socket` and `:socket_dir` config options ([#132](https://github.com/commanded/eventstore/pull/132)).
- Rename `uuid` dependency to `elixir_uuid` ([#135](https://github.com/commanded/eventstore/pull/135)).
- Subscription concurrency ([#134](https://github.com/commanded/eventstore/pull/134)).

## 0.15.1

Expand Down
2 changes: 1 addition & 1 deletion config/distributed.exs
@@ -1,6 +1,6 @@
use Mix.Config

config :logger, :console, level: :debug
config :logger, backends: []

config :ex_unit,
capture_log: true,
Expand Down
2 changes: 1 addition & 1 deletion config/jsonb.exs
@@ -1,6 +1,6 @@
use Mix.Config

config :logger, :console, level: :warn
config :logger, backends: []

config :ex_unit,
capture_log: true,
Expand Down
2 changes: 1 addition & 1 deletion config/local.exs
@@ -1,6 +1,6 @@
use Mix.Config

config :logger, :console, level: :warn
config :logger, backends: []

config :ex_unit,
capture_log: true,
Expand Down
2 changes: 1 addition & 1 deletion config/test.exs
@@ -1,6 +1,6 @@
use Mix.Config

config :logger, :console, level: :warn
config :logger, backends: []

config :ex_unit,
capture_log: true,
Expand Down
47 changes: 42 additions & 5 deletions guides/Subscriptions.md
Expand Up @@ -66,13 +66,11 @@ end

## Persistent subscriptions

Persistent subscriptions to a stream will guarantee *at least once* delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped. The last received and acknowledged event is stored by the EventStore to support resuming at a later time later or whenever the subscriber process restarts.
Persistent subscriptions to a stream will guarantee *at least once* delivery of every persisted event. Each subscription may be independently paused, then later resumed from where it stopped. The last received and acknowledged event is stored by the EventStore to support resuming at a later time or whenever the subscriber process restarts.

A subscription can be created to receive events appended to a single or all streams.

Events are received in batches after being persisted to storage. A batch contains the events appended to a single stream using `EventStore.append_to_stream/4`.

Subscriptions must be uniquely named and support a single subscriber. Attempting to connect two subscribers to the same subscription will return an `{:error, :subscription_already_exists}` error.
Subscriptions must be uniquely named and by default only support a single subscriber. Attempting to connect two subscribers to the same subscription will return `{:error, :subscription_already_exists}`. You can optionally create a [competing consumer subscription with multiple subscribers](#subscription-concurrency).

### `:subscribed` message

Expand Down Expand Up @@ -167,7 +165,7 @@ You can choose to receive events from a given starting position.

The supported options are:

- `:origin` - Start receiving events from the beginning of the stream or all streams.
- `:origin` - Start receiving events from the beginning of the stream or all streams (default).
- `:current` - Subscribe to newly appended events only, skipping already persisted events.
- `event_number` (integer) - Specify an exact event number to subscribe from. This will be the same as the stream version for single stream subscriptions.

Expand Down Expand Up @@ -233,6 +231,45 @@ receive do
end
```

### Subscription concurrency

A single persistent subscription can support multiple subscribers. Events will be distributed to subscribers evenly using a round-robin algorithm. The [competing consumers pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/competing-consumers) enables multiple subscribers to process events concurrently to optimise throughput, to improve scalability and availability, and to balance the workload.

By default a subscription will only allow a single subscriber but you can opt-in to concurrent subscriptions be providing a non-negative `concurrency_limit` as a subscription option.

#### Subscription concurrency configuration options

- `concurrency_limit` defines the maximum number of concurrent subscribers allowed to connect to the subscription. By default only one subscriber may connect. If too many subscribers attempt to connect to the
subscription an `{:error, :too_many_subscribers}` is returned.

- `buffer_size` limits how many in-flight events will be sent to the subscriber process before acknowledgement of successful processing. This limits the number of messages sent to the subscriber and stops their message queue from getting filled with events. Defaults to one in-flight event.

- `partition_by` is an optional function used to partition events to subscribers. It can be used to guarantee processing order when multiple subscribers have subscribed to a single subscription as described in [Ordering guarantee](#ordering-guarantee) below. The function is passed a single argument (an `EventStore.RecordedEvent` struct) and must return the partition key. As an example to guarantee events for a single stream are processed serially, but different streams are processed concurrently, you could use the `stream_uuid` as the partition key.

### Ordering guarantee

With multiple subscriber processes connected to a single subscription the ordering of event processing is no longer guaranteed since events may be processed in differing amounts of time. This can cause problems if your event handling code expects events to be processed in the order they were originally appended to a steam.

You can use a `partition_by` function to guarantee ordering of events within a particular group (e.g. per stream) but still allow events for different groups to be processed concurrently.


Partitioning gives you the benefits of competing consumers but still allows event ordering by partition where required.

#### Partition by example

```elixir
alias EventStore.RecordedEvent

by_stream = fn %RecordedEvent{stream_uuid: stream_uuid} -> stream_uuid end

{:ok, _subscription} =
EventStore.subscribe_to_stream(stream_uuid, "example", self(),
concurrency_limit: 10,
partition_by: by_stream
)
```

The above subscription would ensure that events for each stream are processed serially (by a single subscriber) in the order they were appended to the stream, but events for any other stream can be processed concurrently by another subscriber.

### Example persistent subscriber

Expand Down
110 changes: 94 additions & 16 deletions lib/event_store.ex
Expand Up @@ -346,16 +346,49 @@ defmodule EventStore do
notification messages.
- `opts` is an optional map providing additional subscription configuration:
- `start_from` is a pointer to the first event to receive. It must be one of:
- `start_from` is a pointer to the first event to receive.
It must be one of:
- `:origin` for all events from the start of the stream (default).
- `:current` for any new events appended to the stream after the
subscription has been created.
- any positive integer for a stream version to receive events after.
- `selector` to define a function to filter each event, i.e. returns
only those elements for which fun returns a truthy value
only those elements for which fun returns a truthy value.
- `mapper` to define a function to map each recorded event before sending
to the subscriber.
- `concurrency_limit` defines the maximum number of concurrent subscribers
allowed to connect to the subscription. By default only one subscriber
may connect. If too many subscribers attempt to connect to the
subscription an `{:error, :too_many_subscribers}` is returned.
- `buffer_size` limits how many in-flight events will be sent to the
subscriber process before acknowledgement of successful processing. This
limits the number of messages sent to the subscriber and stops their
message queue from getting filled with events. Defaults to one in-flight
event.
- `partition_by` is an optional function used to partition events to
subscribers. It can be used to guarantee processing order when multiple
subscribers have subscribed to a single subscription. The function is
passed a single argument (an `EventStore.RecordedEvent` struct) and must
return the partition key. As an example to guarantee events for a single
stream are processed serially, but different streams are processed
concurrently, you could use the `stream_uuid` as the partition key.
alias EventStore.RecordedEvent
by_stream = fn %RecordedEvent{stream_uuid: stream_uuid} -> stream_uuid end
{:ok, _subscription} =
EventStore.subscribe_to_stream(stream_uuid, "example", self(),
concurrency_limit: 10,
partition_by: by_stream
)
The subscription will resume from the last acknowledged event if it already
exists. It will ignore the `start_from` argument in this case.
Expand Down Expand Up @@ -392,7 +425,9 @@ defmodule EventStore do
"""
@spec subscribe_to_stream(String.t(), String.t(), pid, keyword) ::
{:ok, subscription :: pid}
| {:error, :already_subscribed}
| {:error, :subscription_already_exists}
| {:error, :too_many_subscribers}
| {:error, reason :: term}

def subscribe_to_stream(stream_uuid, subscription_name, subscriber, opts \\ [])
Expand All @@ -419,17 +454,26 @@ defmodule EventStore do
notification messages.
- `opts` is an optional map providing additional subscription configuration:
- `start_from` is a pointer to the first event to receive. It must be one of:
- `start_from` is a pointer to the first event to receive.
It must be one of:
- `:origin` for all events from the start of the stream (default).
- `:current` for any new events appended to the stream after the
subscription has been created.
- any positive integer for an event id to receive events after that
exact event.
- `selector` to define a function to filter each event, i.e. returns
only those elements for which fun returns a truthy value
- `mapper` to define a function to map each recorded event before sending
to the subscriber.
- `concurrency_limit` defines the maximum number of concurrent subscribers
allowed to connect to the subscription. By default only one subscriber
may connect. If too many subscribers attempt to connect to the
subscription an `{:error, :too_many_subscribers}` is returned.
The subscription will resume from the last acknowledged event if it already
exists. It will ignore the `start_from` argument in this case.
Expand All @@ -456,7 +500,9 @@ defmodule EventStore do
"""
@spec subscribe_to_all_streams(String.t(), pid, keyword) ::
{:ok, subscription :: pid}
| {:error, :already_subscribed}
| {:error, :subscription_already_exists}
| {:error, :too_many_subscribers}
| {:error, reason :: term}

def subscribe_to_all_streams(subscription_name, subscriber, opts \\ [])
Expand All @@ -472,8 +518,10 @@ defmodule EventStore do
Accepts a `RecordedEvent`, a list of `RecordedEvent`s, or the event number of
the recorded event to acknowledge.
"""
@spec ack(pid, EventStore.RecordedEvent.t() | list(EventStore.RecordedEvent.t()) | non_neg_integer()) ::
:ok | {:error, reason :: term}
@spec ack(
pid,
EventStore.RecordedEvent.t() | list(EventStore.RecordedEvent.t()) | non_neg_integer()
) :: :ok | {:error, reason :: term}
def ack(subscription, ack) do
Subscription.ack(subscription, ack)
end
Expand All @@ -483,8 +531,8 @@ defmodule EventStore do
- `stream_uuid` is the stream to unsubscribe from.
- `subscription_name` is used to identify the existing subscription to
remove.
- `subscription_name` is used to identify the existing subscription process
to stop.
Returns `:ok` on success.
"""
Expand All @@ -496,8 +544,8 @@ defmodule EventStore do
@doc """
Unsubscribe an existing subscriber from all event notifications.
- `subscription_name` is used to identify the existing subscription to
remove.
- `subscription_name` is used to identify the existing subscription process
to stop.
Returns `:ok` on success.
"""
Expand All @@ -506,6 +554,36 @@ defmodule EventStore do
Subscriptions.unsubscribe_from_stream(@all_stream, subscription_name)
end

@doc """
Delete an existing persistent subscription.
- `stream_uuid` is the stream the subscription is subscribed to.
- `subscription_name` is used to identify the existing subscription to
remove.
Returns `:ok` on success.
"""
@spec delete_subscription(String.t(), String.t()) :: :ok
def delete_subscription(stream_uuid, subscription_name) do
Subscriptions.delete_subscription(@conn, stream_uuid, subscription_name, opts())
end

@doc """
Delete an existing persistent subscription to all streams.
- `stream_uuid` is the stream the subscription is subscribed to.
- `subscription_name` is used to identify the existing subscription to
remove.
Returns `:ok` on success.
"""
@spec delete_all_streams_subscription(String.t()) :: :ok
def delete_all_streams_subscription(subscription_name) do
EventStore.delete_subscription(@all_stream, subscription_name)
end

@doc """
Read a snapshot, if available, for a given source.
Expand Down Expand Up @@ -544,13 +622,13 @@ defmodule EventStore do

@default_opts [pool: DBConnection.Poolboy]

defp opts(timeout \\ nil) do
case timeout do
timeout when is_integer(timeout) ->
Keyword.put(@default_opts, :timeout, timeout)
defp opts, do: @default_opts

_ ->
@default_opts
end
defp opts(timeout) when is_integer(timeout) do
Keyword.put(@default_opts, :timeout, timeout)
end

defp opts(:infinity) do
Keyword.put(@default_opts, :timeout, :infinity)
end
end
9 changes: 5 additions & 4 deletions lib/event_store/storage.ex
Expand Up @@ -3,13 +3,14 @@ defmodule EventStore.Storage do

alias EventStore.Snapshots.SnapshotData
alias EventStore.Storage

alias EventStore.Storage.{
Appender,
CreateStream,
QueryStreamInfo,
Reader,
Snapshot,
Subscription,
Subscription
}

@doc """
Expand Down Expand Up @@ -80,10 +81,10 @@ defmodule EventStore.Storage do
end

@doc """
Unsubscribe from an existing named subscription to a stream.
Delete an existing named subscription to a stream.
"""
def unsubscribe_from_stream(conn, stream_uuid, subscription_name, opts \\ []) do
Subscription.unsubscribe_from_stream(conn, stream_uuid, subscription_name, opts)
def delete_subscription(conn, stream_uuid, subscription_name, opts \\ []) do
Subscription.delete_subscription(conn, stream_uuid, subscription_name, opts)
end

@doc """
Expand Down

0 comments on commit 02e1583

Please sign in to comment.