Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ jobs:
run: |
MIX_ENV=test mix event_store.setup
MIX_ENV=jsonb mix event_store.setup
MIX_ENV=text_ids mix event_store.setup

- name: Compile
run: mix compile --warnings-as-errors
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ MIT License
- [Using with PgBouncer](guides/Getting%20Started.md#using-with-pg-bouncer)
- [Using the EventStore](guides/Usage.md)
- [Writing to a stream](guides/Usage.md#writing-to-a-stream)
- [Using correlation and causation identifiers](guides/Usage.md#using-correlation-and-causation-identifiers)
- [Appending events to an existing stream](guides/Usage.md#appending-events-to-an-existing-stream)
- [Reading from a stream](guides/Usage.md#reading-from-a-stream)
- [Reading from all streams](guides/Usage.md#reading-from-all-streams)
Expand Down
30 changes: 30 additions & 0 deletions config/text_ids.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import Config

config :logger, backends: []

config :ex_unit,
capture_log: true,
assert_receive_timeout: 2_000,
refute_receive_timeout: 100

default_config = [
correlation_id_type: "text",
causation_id_type: "text",
username: "postgres",
password: "postgres",
database: "eventstore_text_ids_test",
hostname: "localhost",
pool_size: 1,
serializer: EventStore.JsonSerializer,
subscription_retry_interval: 1_000
]

config :eventstore, TestEventStore, default_config

config :eventstore,
SecondEventStore,
Keyword.put(default_config, :database, "eventstore_text_ids_test_2")

config :eventstore, SchemaEventStore, default_config

config :eventstore, event_stores: [TestEventStore, SecondEventStore, SchemaEventStore]
2 changes: 2 additions & 0 deletions guides/Getting Started.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ EventStore is [available in Hex](https://hex.pm/packages/eventstore) and can be
- `:schema` - define the Postgres schema to use (default: `public` schema).
- `:timeout` - set the default database query timeout in milliseconds (default: 15,000ms).
- `:shared_connection_pool` - allows a database connection pool to be shared amongst multiple event store instances (default: `nil`).
- `:correlation_id_type` - database column type used for `correlation_id` (`"uuid"` by default, or `"text"`).
- `:causation_id_type` - database column type used for `causation_id` (`"uuid"` by default, or `"text"`).

Subscription options:

Expand Down
58 changes: 58 additions & 0 deletions guides/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,64 @@ Append the events to the stream:
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events)
```

### Using correlation and causation identifiers

`EventStore.EventData` supports `correlation_id` and `causation_id` fields.

By default EventStore stores both fields using PostgreSQL's `uuid` type. If your
application already uses UUID values for correlation and causation, no
additional configuration is needed.

When the natural workflow identifier is already a string, you can configure
either field independently to use `text`:

```elixir
config :my_app, MyApp.EventStore,
correlation_id_type: "text",
causation_id_type: "text"
```

For example, an order checkout workflow may write events to different streams
while keeping the same `correlation_id`:

```elixir
alias EventStore.EventData

order_id = "ORD-2026-000123"

:ok =
MyApp.EventStore.append_to_stream("order-#{order_id}", 0, [
%EventData{
event_type: "OrderPlaced",
correlation_id: "order:#{order_id}",
causation_id: "checkout:web:req-42",
data: %{order_id: order_id},
metadata: %{}
}
])
```

Later, a payment event written to another stream can keep the same
`correlation_id` even though the stream is different:

```elixir
:ok =
MyApp.EventStore.append_to_stream("payment-pay_01jxyz...", 0, [
%EventData{
event_type: "PaymentCaptured",
correlation_id: "order:#{order_id}",
causation_id: "payment:pay_01jxyz...",
data: %{payment_id: "pay_01jxyz..."},
metadata: %{}
}
])
```

In this example the stream names are different, but the workflow identifier is
the same. This allows application code to carry a meaningful correlation value
across order and payment events without introducing a separate metadata field
just because the identifier is not a UUID.

### Appending events to an existing stream

The expected version should equal the number of events already persisted to the stream when appending to an existing stream.
Expand Down
4 changes: 4 additions & 0 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ defmodule EventStore do
conn = Keyword.fetch!(config, :conn)
schema = Keyword.fetch!(config, :schema)
serializer = Keyword.fetch!(config, :serializer)
correlation_id_type = Keyword.get(config, :correlation_id_type, "uuid")
causation_id_type = Keyword.get(config, :causation_id_type, "uuid")

query_timeout = timeout(opts, config)

Expand All @@ -471,6 +473,8 @@ defmodule EventStore do
query_timeout: query_timeout,
schema: schema,
serializer: serializer,
correlation_id_type: correlation_id_type,
causation_id_type: causation_id_type,
stream_uuid: stream_uuid,
subscription_name: subscription_name,
start_from: start_from
Expand Down
42 changes: 42 additions & 0 deletions lib/event_store/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,48 @@ defmodule EventStore.Config do
end
end

@doc """
Get the column type used to store the correlation identifier.

Supported types are:

- "uuid" - Enforces UUID format. The identifier must be a valid UUID.
- "text" - Allows any string value as an identifier.
"""
def correlation_id_type(event_store, config) do
case Keyword.get(config, :correlation_id_type, "uuid") do
valid when valid in ["uuid", "text"] ->
valid

invalid ->
raise ArgumentError,
inspect(event_store) <>
" `:correlation_id_type` expects either \"uuid\" or \"text\" but got: " <>
inspect(invalid)
end
end

@doc """
Get the column type used to store the causation identifier.

Supported types are:

- "uuid" - Enforces UUID format. The identifier must be a valid UUID.
- "text" - Allows any string value as an identifier.
"""
def causation_id_type(event_store, config) do
case Keyword.get(config, :causation_id_type, "uuid") do
valid when valid in ["uuid", "text"] ->
valid

invalid ->
raise ArgumentError,
inspect(event_store) <>
" `:causation_id_type` expects either \"uuid\" or \"text\" but got: " <>
inspect(invalid)
end
end

@postgrex_connection_opts [
:after_connect,
:after_connect_timeout,
Expand Down
2 changes: 2 additions & 0 deletions lib/event_store/config/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule EventStore.Config.Parser do

@config_defaults [
column_data_type: "bytea",
correlation_id_type: "uuid",
causation_id_type: "uuid",
enable_hard_deletes: false,
schema: "public",
timeout: 15_000
Expand Down
4 changes: 2 additions & 2 deletions lib/event_store/event_data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ defmodule EventStore.EventData do

@type t :: %EventData{
event_id: uuid() | nil,
correlation_id: uuid() | nil,
causation_id: uuid() | nil,
correlation_id: String.t() | nil,
causation_id: String.t() | nil,
event_type: String.t(),
data: term,
metadata: term | nil
Expand Down
29 changes: 24 additions & 5 deletions lib/event_store/notifications/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@ defmodule EventStore.Notifications.Publisher do
alias EventStore.Notifications.Notification

defmodule State do
defstruct [:conn, :event_store, :query_timeout, :schema, :serializer, :subscribe_to]
defstruct [
:conn,
:event_store,
:query_timeout,
:schema,
:serializer,
:subscribe_to,
correlation_id_type: "uuid",
causation_id_type: "uuid"
]

def new(opts) do
%State{
Expand All @@ -23,7 +32,9 @@ defmodule EventStore.Notifications.Publisher do
query_timeout: Keyword.fetch!(opts, :query_timeout),
schema: Keyword.fetch!(opts, :schema),
serializer: Keyword.fetch!(opts, :serializer),
subscribe_to: Keyword.fetch!(opts, :subscribe_to)
subscribe_to: Keyword.fetch!(opts, :subscribe_to),
correlation_id_type: Keyword.get(opts, :correlation_id_type, "uuid"),
causation_id_type: Keyword.get(opts, :causation_id_type, "uuid")
}
end
end
Expand Down Expand Up @@ -65,15 +76,23 @@ defmodule EventStore.Notifications.Publisher do
to_stream_version: to_stream_version
} = notification

%State{conn: conn, query_timeout: query_timeout, schema: schema, serializer: serializer} =
state
%State{
conn: conn,
query_timeout: query_timeout,
schema: schema,
serializer: serializer,
correlation_id_type: correlation_id_type,
causation_id_type: causation_id_type
} = state

count = to_stream_version - from_stream_version + 1

try do
case Storage.read_stream_forward(conn, stream_id, from_stream_version, count,
schema: schema,
timeout: query_timeout
timeout: query_timeout,
correlation_id_type: correlation_id_type,
causation_id_type: causation_id_type
) do
{:ok, events} ->
deserialized_events = deserialize_recorded_events(events, serializer)
Expand Down
4 changes: 4 additions & 0 deletions lib/event_store/notifications/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ defmodule EventStore.Notifications.Supervisor do
schema = Keyword.fetch!(config, :schema)
serializer = Keyword.fetch!(config, :serializer)
query_timeout = Keyword.fetch!(config, :timeout)
correlation_id_type = Keyword.get(config, :correlation_id_type, "uuid")
causation_id_type = Keyword.get(config, :causation_id_type, "uuid")

listener_name = Module.concat([event_store, Listener])
publisher_name = Module.concat([event_store, Publisher])
Expand Down Expand Up @@ -54,6 +56,8 @@ defmodule EventStore.Notifications.Supervisor do
event_store: event_store,
schema: schema,
serializer: serializer,
correlation_id_type: correlation_id_type,
causation_id_type: causation_id_type,
subscribe_to: listener_name,
name: publisher_name,
hibernate_after: hibernate_after}
Expand Down
8 changes: 4 additions & 4 deletions lib/event_store/recorded_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ defmodule EventStore.RecordedEvent do
- `event_id` - a globally unique UUID to identify the event.
- `stream_uuid` - the original stream identity for the event.
- `stream_version` - the original version of the stream for the event.
- `correlation_id` - an optional UUID identifier used to correlate related
- `correlation_id` - an optional string identifier used to correlate related
messages.
- `causation_id` - an optional UUID identifier used to identify which
- `causation_id` - an optional string identifier used to identify which
message you are responding to.
- `data` - the deserialized event data.
- `metadata` - a deserialized map of event metadata.
Expand All @@ -34,8 +34,8 @@ defmodule EventStore.RecordedEvent do
event_id: uuid(),
stream_uuid: String.t(),
stream_version: non_neg_integer(),
correlation_id: uuid() | nil,
causation_id: uuid() | nil,
correlation_id: String.t() | nil,
causation_id: String.t() | nil,
event_type: String.t(),
data: any(),
metadata: map() | nil,
Expand Down
10 changes: 6 additions & 4 deletions lib/event_store/sql/init.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ defmodule EventStore.Sql.Init do

def statements(config) do
column_data_type = Keyword.fetch!(config, :column_data_type)
correlation_id_type = Keyword.fetch!(config, :correlation_id_type)
causation_id_type = Keyword.fetch!(config, :causation_id_type)
schema = Keyword.fetch!(config, :schema)

[
~s(SET LOCAL search_path TO "#{schema}";),
create_streams_table(),
create_stream_uuid_index(),
create_events_table(column_data_type),
create_events_table(column_data_type, correlation_id_type, causation_id_type),
create_stream_events_table(),
create_stream_events_index(),
create_event_store_exception_function(),
Expand Down Expand Up @@ -58,14 +60,14 @@ defmodule EventStore.Sql.Init do
"""
end

defp create_events_table(column_data_type) do
defp create_events_table(column_data_type, correlation_id_type, causation_id_type) do
"""
CREATE TABLE events
(
event_id uuid PRIMARY KEY NOT NULL,
event_type text NOT NULL,
causation_id uuid NULL,
correlation_id uuid NULL,
causation_id #{causation_id_type} NULL,
correlation_id #{correlation_id_type} NULL,
data #{column_data_type} NOT NULL,
metadata #{column_data_type} NULL,
created_at timestamp with time zone DEFAULT NOW() NOT NULL
Expand Down
14 changes: 10 additions & 4 deletions lib/event_store/storage/appender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ defmodule EventStore.Storage.Appender do
"""
def append(conn, stream_id, events, opts) do
[%RecordedEvent{stream_uuid: stream_uuid} | _] = events
{correlation_id_type, opts} = Keyword.pop(opts, :correlation_id_type, "uuid")
{causation_id_type, opts} = Keyword.pop(opts, :causation_id_type, "uuid")

try do
events
|> Stream.map(&encode_uuids/1)
|> Stream.map(&encode_uuids(&1, correlation_id_type, causation_id_type))
|> Stream.chunk_every(1_000)
|> Enum.reduce(stream_id, fn batch, stream_id ->
event_count = length(batch)
Expand Down Expand Up @@ -83,18 +85,22 @@ defmodule EventStore.Storage.Appender do
end
end

defp encode_uuids(%RecordedEvent{} = event) do
defp encode_uuids(%RecordedEvent{} = event, correlation_id_type, causation_id_type) do
%RecordedEvent{event_id: event_id, causation_id: causation_id, correlation_id: correlation_id} =
event

%RecordedEvent{
event
| event_id: encode_uuid(event_id),
causation_id: encode_uuid(causation_id),
correlation_id: encode_uuid(correlation_id)
causation_id: encode_id(causation_id, causation_id_type),
correlation_id: encode_id(correlation_id, correlation_id_type)
}
end

defp encode_id(nil, _type), do: nil
defp encode_id(value, "uuid"), do: UUID.string_to_binary!(value)
defp encode_id(value, "text"), do: value

defp encode_uuid(nil), do: nil
defp encode_uuid(value), do: UUID.string_to_binary!(value)

Expand Down
Loading
Loading