Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .github/workflows/on-pr-quality.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,36 @@ jobs:
cargo_build_args: "--verbose"
cargo_test_args: "--verbose"
cargo_incremental: true

postgres:
name: Postgres Integration Tests
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: sourced_rust_test
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U postgres -d sourced_rust_test"
--health-interval 5s
--health-timeout 5s
--health-retries 10
env:
CARGO_TERM_COLOR: always
DATABASE_URL: postgres://postgres:postgres@localhost:5432/sourced_rust_test
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
with:
persist-credentials: false
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
toolchain: stable
- name: Run Postgres integration tests
run: |
cargo test --test postgres_repository --all-features --verbose
cargo test --test postgres_repository_conformance --all-features --verbose
cargo test --test distributed_read_model --all-features --verbose
35 changes: 34 additions & 1 deletion .github/workflows/on-push-main-version-and-tag.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,44 @@ jobs:
cargo_test_args: "--verbose"
cargo_incremental: true

postgres:
name: Postgres Integration Tests
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: sourced_rust_test
ports:
- 5432:5432
options: >-
--health-cmd "pg_isready -U postgres -d sourced_rust_test"
--health-interval 5s
--health-timeout 5s
--health-retries 10
env:
CARGO_TERM_COLOR: always
DATABASE_URL: postgres://postgres:postgres@localhost:5432/sourced_rust_test
steps:
- uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5
with:
persist-credentials: false
- uses: dtolnay/rust-toolchain@29eef336d9b2848a0b548edc03f92a220660cdb8
with:
toolchain: stable
- name: Run Postgres integration tests
run: |
cargo test --test postgres_repository --all-features --verbose
cargo test --test postgres_repository_conformance --all-features --verbose
cargo test --test distributed_read_model --all-features --verbose

# This uses commit logs and tags from git to determine the next version number and create a tag for the release.
# Some commits such as chore: will not trigger a version bump and tag; this is by design.
version-and-tag:
name: Version and Tag
needs: quality
needs: [quality, postgres]
uses: unbounded-tech/workflow-vnext-tag/.github/workflows/workflow.yaml@v1.20.2
secrets:
DEPLOY_KEY: ${{ secrets.DEPLOY_KEY }}
Expand Down
60 changes: 37 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Each handler is a module with `COMMAND`, `guard`, and `handle`:
use serde::Deserialize;
use serde_json::{json, Value};
use sourced_rust::microsvc::{Context, HandlerError, HasRepo};
use sourced_rust::{AggregateBuilder, OutboxCommitExt, OutboxMessage, Repository};
use sourced_rust::{AggregateBuilder, SyncOutboxCommitExt, OutboxMessage, Repository};

pub const COMMAND: &str = "todo.create";

Expand All @@ -116,7 +116,7 @@ where
// Outbox message derives id, snapshot payload, and metadata automatically
let outbox = OutboxMessage::domain_event("TodoInitialized", &todo)
.map_err(|e| HandlerError::Other(Box::new(e)))?;
repo.outbox(outbox).commit(&mut todo)?;
repo.outbox_sync(outbox).commit_sync(&mut todo)?;

Ok(json!({ "id": input.id }))
}
Expand Down Expand Up @@ -187,7 +187,7 @@ Every infrastructure concern in `sourced_rust` follows the same pattern: a **tra
|---|---|---|---|
| Storage | `Repository` (`Get + Commit`) | `HashMapRepository` | Postgres, DynamoDB, etc. |
| Messaging | `Publisher` + `Subscriber` | `InMemoryQueue` | Kafka, Redis Streams, SQS, etc. |
| Read model store | `ReadModelStore` | `InMemoryReadModelStore` | Postgres, MongoDB, etc. |
| Read model rows | `ReadModelWritePlanStore` + `RelationalReadModelQueryStore` | `InMemoryReadModelStore` | Postgres, SQLite, etc. |
| Snapshot store | `SnapshotStore` | `InMemorySnapshotStore` | Postgres, S3, etc. |
| Outbox publishing | `OutboxPublisher` | `LogPublisher` | Any `Publisher` impl |
| Locking | `Lock` + `LockManager` | `InMemoryLockManager` | Redis, Postgres advisory, etc. |
Expand Down Expand Up @@ -486,7 +486,7 @@ let outbox = OutboxMessage::encode_for_entity(
&order.entity, // metadata propagates automatically
)?;

repo.outbox(outbox).commit(&mut order)?;
repo.outbox_sync(outbox).commit_sync(&mut order)?;
```

The metadata flows through the full chain:
Expand Down Expand Up @@ -650,7 +650,7 @@ commands, or transport messages only when application code creates an
`OutboxMessage` for that purpose.

```rust
use sourced_rust::{OutboxCommitExt, OutboxMessage};
use sourced_rust::{SyncOutboxCommitExt, OutboxMessage};

let mut todo = Todo::default();
todo.entity.set_correlation_id("req-abc");
Expand All @@ -660,7 +660,7 @@ todo.initialize("todo-1".into(), "user-1".into(), "Buy milk".into())?;
let message = OutboxMessage::domain_event("TodoInitialized", &todo)?;

// Commit both in one repository batch
repo.outbox(message).commit(&mut todo)?;
repo.outbox_sync(message).commit_sync(&mut todo)?;
```

For custom payloads or IDs, use `encode_for_entity` instead:
Expand Down Expand Up @@ -848,7 +848,7 @@ Combine the outbox pattern with the service bus for reliable event publishing:
```rust
use sourced_rust::{
bus::Bus, HashMapRepository, InMemoryQueue, OutboxWorkerThread,
OutboxCommitExt, OutboxMessage, AggregateBuilder, Queueable,
SyncOutboxCommitExt, OutboxMessage, AggregateBuilder, Queueable,
};
use std::time::Duration;

Expand Down Expand Up @@ -877,7 +877,7 @@ let outbox = OutboxMessage::encode_for_entity(
&OrderCreatedPayload { order_id: "order-1".into() },
&order.entity, // metadata propagates automatically
)?;
order_repo.outbox(outbox).commit(&mut order)?;
order_repo.outbox_sync(outbox).commit_sync(&mut order)?;

// Other services receive the event via their subscriptions
let events = bus.subscribe(&["OrderCreated"]);
Expand All @@ -892,7 +892,7 @@ Use `OutboxMessage::encode_to()` to set a destination queue, and `spawn_routed`

```rust
use sourced_rust::{
CommitBuilderExt, HashMapRepository, InMemoryQueue, OutboxWorkerThread,
SyncCommitBuilderExt, HashMapRepository, InMemoryQueue, OutboxWorkerThread,
OutboxMessage,
};
use std::time::Duration;
Expand Down Expand Up @@ -923,9 +923,9 @@ let outbox_inventory = OutboxMessage::encode_to(
)?;

// Commit aggregate + multiple outbox messages in one transactional batch
repo.outbox(outbox_saga)
.outbox(outbox_inventory)
.commit(&mut order)?;
repo.outbox_sync(outbox_saga)
.outbox_sync(outbox_inventory)
.commit_sync(&mut order)?;

// Worker drains outbox and sends each message to its destination queue
let _stats = worker.stop()?;
Expand Down Expand Up @@ -1201,7 +1201,7 @@ microsvc::serve(service.clone(), "0.0.0.0:3000").await?;

## Read Models

Read models are query-optimized projections derived from aggregates, event records, or published messages. Document read models store a whole view in a document payload column; normalized relational read models use table metadata plus `ReadModelWritePlanBuilder` write plans.
Read models are query-optimized projections derived from aggregates, event records, or published messages. They are written as declared relational rows using table metadata plus `ReadModelWritePlanBuilder` write plans. Use JSON/JSONB columns for whole-view or semistructured fields.

### Defining a Read Model

Expand All @@ -1210,12 +1210,14 @@ use serde::{Deserialize, Serialize};
use sourced_rust::ReadModel;

#[derive(Clone, Debug, Serialize, Deserialize, ReadModel)]
#[collection("game_views")]
#[readmodel(table = "game_views")]
pub struct GameView {
#[id]
#[readmodel(id)]
pub id: String,
pub player_name: String,
pub score: i32,
#[readmodel(jsonb)]
pub metadata: serde_json::Value,
}
```

Expand All @@ -1224,7 +1226,7 @@ pub struct GameView {
When the response to a command must include the fully consistent, updated view, you can commit the aggregate and read model together:

```rust
use sourced_rust::CommitBuilderExt;
use sourced_rust::{ReadModelWritePlanBuilder, SyncReadModelWritePlanCommitExt};

// Player submits a move
game.make_move(player_move)?;
Expand All @@ -1233,36 +1235,48 @@ game.make_move(player_move)?;
let view = GameView::from(&game);

// Commit aggregate + view in one transactional batch
repo.readmodel(&view).commit(&mut game)?;
let mut read_models = ReadModelWritePlanBuilder::new();
read_models.upsert(&view)?;
repo.read_models_sync(read_models).commit_sync(&mut game)?;

// Return `view` to the client — it reflects the committed state
```

For relational read models, build a structured read-model write plan:
For related rows, build the same structured write plan:

```rust
use sourced_rust::{ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt};
use sourced_rust::{ReadModelWritePlanBuilder, SyncReadModelWritePlanCommitExt};

let mut read_models = ReadModelWritePlanBuilder::new();
read_models.upsert(&player_view)?;
read_models.upsert_related(&player_view, "weapons", &weapon_view)?;

repo.read_models(read_models).commit(&mut game)?;
repo.read_models_sync(read_models).commit_sync(&mut game)?;
```

Async persistent repositories use the same staging shape at the SQL boundary:

```rust,ignore
use sourced_rust::{AsyncReadModelWritePlanCommitExt, ReadModelWritePlanBuilder};

let mut read_models = ReadModelWritePlanBuilder::new();
read_models.upsert(&view)?;
repo.read_models(read_models).commit(&mut game).await?;
```

Distributed projectors can commit the same write-plan shape directly against a read-model adapter and mark messages processed in the same adapter transaction:

```rust
let mut read_models = ReadModelWritePlanBuilder::new();
read_models.document(&view)?.mark_processed("game-view-projector", event_id);
read_models.upsert(&view)?.mark_processed("game-view-projector", event_id);
let outcome = read_models.commit(&read_store)?;
```

This is a deliberate consistency tradeoff. The read model is in sync with the aggregate only when the repository implements `TransactionalCommit` and can write both in the same transaction boundary. For cross-service or cross-database views, use the eventually consistent outbox/projector pattern instead.

Bomberman `BoardView` remains a document-row example backed by a whole-view payload, not a normalized relational ORM example.
Bomberman `BoardView` is modeled as a read-model table row with JSON/JSONB columns for nested board state.

See [`docs/read-models.md`](docs/read-models.md) for the full guide, including relational metadata, document rows, workspace commits, schema bootstrap, distributed idempotency, and non-goals.
See [`docs/read-models.md`](docs/read-models.md) for the full guide, including relational metadata, workspace commits, schema bootstrap, distributed idempotency, and non-goals.

## Snapshots

Expand Down
27 changes: 16 additions & 11 deletions docs/async-repositories.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ persistence should override it with an explicit durable name through
- `AsyncGetStream` loads one or more event streams by full identity.
- `AsyncTransactionalCommit` commits `AsyncCommitBatch` values with stream
writes, read-model write plans, and snapshots under one backend transaction.
- `AsyncReadModelStore`, `AsyncReadModelWritePlanStore`, and
`AsyncRelationalReadModelQueryStore` mirror the current document and
relational read-model surfaces for async adapters.
- `AsyncReadModelWritePlanStore` and `AsyncRelationalReadModelQueryStore`
mirror the relational read-model write and primary-key load surfaces for
async adapters.
- `AsyncSnapshotStore` keys rebuildable snapshot cache records by full stream
identity. The record envelope carries stream identity, covered event version,
snapshot payload type/version, payload codec metadata, cache metadata, and
Expand All @@ -30,8 +30,8 @@ persistence should override it with an explicit durable name through

Async methods use an `_async` suffix where a synchronous method with the same
name already exists. This keeps `HashMapRepository`, `InMemoryReadModelStore`,
and `InMemorySnapshotStore` source-compatible when both sync and async traits
are imported.
and `InMemorySnapshotStore` unambiguous when both sync and async traits are
imported.

## In-Memory Reference

Expand All @@ -58,8 +58,8 @@ let repo = sourced_rust::SqliteRepository::connect_and_migrate("sqlite::memory:"
`migrations/sqlite`. Plain construction from an existing pool does not create
tables implicitly, so applications can control bootstrap order.

The first SQLite pass persists aggregate events, transactional document read
models, processed-message marks, and snapshots in one SQL transaction when they
The SQLite adapter persists aggregate events, relational read-model write
plans, processed-message marks, and snapshots in one SQL transaction when they
are staged through `AsyncCommitBatch`. It intentionally does not claim Postgres
production readiness: Postgres-specific column types, isolation behavior, error
mapping, deployment, and migration validation still belong to the Postgres
Expand All @@ -83,7 +83,12 @@ DATABASE_URL=postgres://sourced:sourced@localhost:5432/sourced_rust \
cargo test --features postgres --test postgres_repository
```

The first Postgres pass persists aggregate event streams and snapshots through
explicit migrations in `migrations/postgres`. It rejects non-empty read-model
write plans instead of creating generic read-model tables implicitly; durable
read-model persistence remains a separate adapter track.
The SQLite and Postgres adapters persist aggregate event streams, read-model
write plans, processed-message marks, snapshots, and outbox rows through
explicit migrations plus registered table schemas. Relational read-model
mutations (`upsert`, sparse `patch`, and `delete`) are lowered into SQL writes
against the tables generated from `#[derive(ReadModel)]` / `RelationalReadModel`
schema metadata, including JSON/JSONB columns and `_sourced_version` optimistic
versions. SQL repositories do not persist generic document rows; whole-view
state that belongs in SQL should be modeled as a declared read-model table with
an `id` column and JSON/JSONB columns for semistructured fields.
32 changes: 32 additions & 0 deletions docs/postgres-event-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,38 @@ SQL adapters retain only the latest cache record per stream. Future adapters may
retain last `N` or time-based cache records, but they must never prune aggregate
events.

## Transactional Read Models

Postgres read-model write plans write relational table rows inside the same
repository transaction as aggregate events. Mutations are written to the
registered read-model tables generated from schema metadata
(`bootstrap_table_schema_for_dev` for tests/local development, migration
artifacts for managed environments). Those writes use the model's declared
columns directly, including `jsonb` columns for collection fields and
`_sourced_version` for optimistic row versions.

There is no generic SQL document table in this repository contract. If a
command-side view needs whole-view state in SQL, define a read-model table with
an `id` column and one or more `jsonb` columns for the semistructured data.
Generic document mutations require a dedicated document adapter rather than the
Postgres event-store repository.

`read_model_processed_messages` stores idempotency marks for distributed
projectors that commit a read-model write plan and mark a message processed in
one transaction:

```sql
PRIMARY KEY (consumer_name, message_id);
CHECK (consumer_name <> '');
CHECK (message_id <> '');
```

`read_model_processed_messages` is shared by relational write-plan commits so
projectors can atomically write rows and record consumed messages.

Relationship include loading is a separate query concern; the transactional
write path persists the row mutations staged by `ReadModelWritePlan`.

## Commit Semantics

`Commit::commit` and `TransactionalCommit::commit_batch` establish the behavior
Expand Down
Loading
Loading