Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/on-pr-quality.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ name: Rust Quality Pipeline for PRs

on:
pull_request:
branches:
- main

jobs:
quality:
Expand Down
59 changes: 33 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,23 @@ Each handler is a module with `COMMAND`, `guard`, and `handle`:
// handlers/todo_create.rs
use serde::Deserialize;
use serde_json::{json, Value};
use sourced_rust::microsvc::{Context, HandlerError};
use sourced_rust::microsvc::{Context, HandlerError, HasRepo};
use sourced_rust::{AggregateBuilder, OutboxCommitExt, OutboxMessage, Repository};

pub const COMMAND: &str = "todo.create";

#[derive(Deserialize)]
struct Input { id: String, user_id: String, task: String }

pub fn guard<R>(ctx: &Context<R>) -> bool {
pub fn guard<D>(ctx: &Context<D>) -> bool {
ctx.has_fields(&["id", "user_id", "task"])
}

pub fn handle<R: Repository + Clone>(ctx: &Context<R>) -> Result<Value, HandlerError> {
pub fn handle<D>(ctx: &Context<D>) -> Result<Value, HandlerError>
where
D: HasRepo,
D::Repo: Repository + Clone,
{
let input = ctx.input::<Input>()?;
let repo = ctx.repo().clone().aggregate::<Todo>();

Expand All @@ -126,7 +130,7 @@ use sourced_rust::{microsvc, HashMapRepository, Queueable};

fn main() -> Result<(), Box<dyn std::error::Error>> {
let service = Arc::new(sourced_rust::register_handlers!(
microsvc::Service::new(HashMapRepository::new().queued()),
microsvc::Service::with_repo(HashMapRepository::new().queued()),
handlers::todo_create,
handlers::todo_complete,
));
Expand Down Expand Up @@ -929,19 +933,19 @@ let _stats = worker.stop()?;

## Microservice Framework (`microsvc`)

The `microsvc` module provides a convention-based command handler framework for building microservices. Register command handlers on a `Service<R>`, then expose them over HTTP, bus transports, or direct dispatch.
The `microsvc` module provides a convention-based command handler framework for building microservices. Register command handlers on a `Service<D>`, then expose them over HTTP, bus transports, or direct dispatch.

### Defining a Service

A `Service<R>` is generic over a repository type. Register commands with closures or handler modules:
A `Service<D>` is generic over a dependency type. Use `Service::with_repo` for aggregate command handlers, `Service::with_read_model_store` for projection handlers, and `Service::with_repo_and_read_model_store` when a handler genuinely needs both:

```rust
use std::sync::Arc;
use sourced_rust::{microsvc, HashMapRepository, AggregateBuilder, Queueable};
use serde_json::json;

let service = Arc::new(
microsvc::Service::new(HashMapRepository::new().queued())
microsvc::Service::with_repo(HashMapRepository::new().queued())
.command("counter.create", |ctx| {
let input = ctx.input::<CreateCounter>()?;
let counter_repo = ctx.repo().clone().aggregate::<Counter>();
Expand Down Expand Up @@ -978,7 +982,7 @@ Add input validation with `command_guarded`. The guard runs before the handler
use serde_json::json;
use sourced_rust::{microsvc, HashMapRepository, Queueable};

let service = microsvc::Service::new(HashMapRepository::new().queued())
let service = microsvc::Service::with_repo(HashMapRepository::new().queued())
.command_guarded(
"admin.reset",
|ctx| ctx.role() == Some("admin"),
Expand All @@ -994,6 +998,7 @@ For larger services, organize handlers into separate files following a conventio
// src/handlers/counter_create.rs
use serde::Deserialize;
use serde_json::{json, Value};
use sourced_rust::microsvc::HasRepo;
use sourced_rust::{microsvc, AggregateBuilder, Repository};

pub const COMMAND: &str = "counter.create";
Expand All @@ -1003,13 +1008,15 @@ struct Input {
id: String,
}

pub fn guard<R>(ctx: &microsvc::Context<R>) -> bool {
pub fn guard<D>(ctx: &microsvc::Context<D>) -> bool {
ctx.has_fields(&["id"])
}

pub fn handle<R: Repository + Clone>(
ctx: &microsvc::Context<R>,
) -> Result<Value, microsvc::HandlerError> {
pub fn handle<D>(ctx: &microsvc::Context<D>) -> Result<Value, microsvc::HandlerError>
where
D: HasRepo,
D::Repo: Repository + Clone,
{
let input = ctx.input::<Input>()?;
let counter_repo = ctx.repo().clone().aggregate::<Counter>();
let mut counter = Counter::default();
Expand All @@ -1023,7 +1030,7 @@ Register them with the `register_handlers!` macro:

```rust
let service = sourced_rust::register_handlers!(
microsvc::Service::new(HashMapRepository::new().queued()),
microsvc::Service::with_repo(HashMapRepository::new().queued()),
handlers::counter_create,
handlers::counter_increment,
);
Expand All @@ -1039,7 +1046,7 @@ use serde_json::json;
use sourced_rust::{microsvc, HashMapRepository, Queueable};

let service = Arc::new(
microsvc::Service::new(HashMapRepository::new().queued())
microsvc::Service::with_repo(HashMapRepository::new().queued())
.command("counter.create", |ctx| { /* ... */ Ok(json!({ "id": "c1" })) })
);

Expand Down Expand Up @@ -1083,7 +1090,7 @@ use serde_json::json;
use sourced_rust::{microsvc, HashMapRepository, Queueable};

let service = Arc::new(
microsvc::Service::new(HashMapRepository::new().queued())
microsvc::Service::with_repo(HashMapRepository::new().queued())
.command("counter.create", |ctx| { /* ... */ Ok(json!({ "id": "c1" })) })
);

Expand Down Expand Up @@ -1132,7 +1139,7 @@ use sourced_rust::{microsvc, bus::{InMemoryQueue, Sender, Event}, HashMapReposit

let queue = InMemoryQueue::new();
let service = Arc::new(
microsvc::Service::new(HashMapRepository::new().queued())
microsvc::Service::with_repo(HashMapRepository::new().queued())
.command("counter.create", |ctx| { /* ... */ Ok(json!({ "id": "c1" })) })
);

Expand Down Expand Up @@ -1163,7 +1170,7 @@ A single service can handle commands from multiple transports simultaneously —

```rust
let service = Arc::new(
microsvc::Service::new(HashMapRepository::new().queued())
microsvc::Service::with_repo(HashMapRepository::new().queued())
.command("counter.create", |ctx| { /* ... */ Ok(json!({})) })
);

Expand Down Expand Up @@ -1194,7 +1201,7 @@ microsvc::serve(service.clone(), "0.0.0.0:3000").await?;

## Read Models

Read models are query-optimized projections derived from aggregates, event records, or published messages. Document read models store a whole view in a document payload column; normalized relational read models use table metadata plus `ReadModelSession` write plans.
Read models are query-optimized projections derived from aggregates, event records, or published messages. Document read models store a whole view in a document payload column; normalized relational read models use table metadata plus `ReadModelWritePlanBuilder` write plans.

### Defining a Read Model

Expand Down Expand Up @@ -1231,22 +1238,22 @@ repo.readmodel(&view).commit(&mut game)?;
// Return `view` to the client — it reflects the committed state
```

For relational read models, stage structured row mutations in a session:
For relational read models, build a structured read-model write plan:

```rust
use sourced_rust::{ReadModelSession, ReadModelSessionCommitExt};
use sourced_rust::{ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt};

let mut read_models = ReadModelSession::new();
read_models.save(&player_view)?;
read_models.save_related(&player_view, "weapons", &weapon_view)?;
let mut read_models = ReadModelWritePlanBuilder::new();
read_models.upsert(&player_view)?;
read_models.upsert_related(&player_view, "weapons", &weapon_view)?;

repo.read_models(read_models).commit(&mut game)?;
```

Distributed projectors can commit the same session shape directly against a read-model adapter and mark messages processed in the same adapter transaction:
Distributed projectors can commit the same write-plan shape directly against a read-model adapter and mark messages processed in the same adapter transaction:

```rust
let mut read_models = ReadModelSession::new();
let mut read_models = ReadModelWritePlanBuilder::new();
read_models.document(&view)?.mark_processed("game-view-projector", event_id);
let outcome = read_models.commit(&read_store)?;
```
Expand All @@ -1255,7 +1262,7 @@ This is a deliberate consistency tradeoff. The read model is in sync with the ag

Bomberman `BoardView` remains a document-row example backed by a whole-view payload, not a normalized relational ORM example.

See [`docs/read-models.md`](docs/read-models.md) for the full guide, including relational metadata, document rows, session commits, schema bootstrap, distributed idempotency, and non-goals.
See [`docs/read-models.md`](docs/read-models.md) for the full guide, including relational metadata, document rows, workspace commits, schema bootstrap, distributed idempotency, and non-goals.

## Snapshots

Expand Down
2 changes: 1 addition & 1 deletion docs/async-repositories.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ persistence should override it with an explicit durable name through
- `AsyncGetStream` loads one or more event streams by full identity.
- `AsyncTransactionalCommit` commits `AsyncCommitBatch` values with stream
writes, read-model write plans, and snapshots under one backend transaction.
- `AsyncReadModelStore`, `AsyncReadModelSessionStore`, and
- `AsyncReadModelStore`, `AsyncReadModelWritePlanStore`, and
`AsyncRelationalReadModelQueryStore` mirror the current document and
relational read-model surfaces for async adapters.
- `AsyncSnapshotStore` keys rebuildable snapshot cache records by full stream
Expand Down
43 changes: 22 additions & 21 deletions docs/read-models.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ The current implementation keeps these paths explicit:

| Path | API | Use when |
|---|---|---|
| Document rows | `ReadModelStore`, `.readmodel(&view)`, `ReadModelSession::document` | Whole-view JSON documents backed by a document payload column |
| Relational write mapping | `RelationalReadModel`, `ReadModelSession`, `ReadModelWritePlan` | Normalized tables, composite keys, foreign keys, JSONB columns |
| Explicit relationship includes | `store.session().load(...).include(...).one()`, `save_changes` | Internal primary-key reads with declared one-level relationships |
| Document rows | `ReadModelStore`, `.readmodel(&view)`, `ReadModelWritePlanBuilder::document` | Whole-view JSON documents backed by a document payload column |
| Relational write mapping | `RelationalReadModel`, `ReadModelWritePlanBuilder`, `ReadModelWritePlan` | Normalized tables, composite keys, foreign keys, JSONB columns |
| Explicit relationship includes | `store.workspace().load(...).include(...).one()`, `sync` | Internal primary-key reads with declared one-level relationships |
| Schema lifecycle | `ReadModelSchemaRegistry`, `ReadModelSchemaAdapter` | Migration artifact generation, startup verification, explicit dev/test bootstrap |

## Document Read Models
Expand Down Expand Up @@ -100,7 +100,7 @@ pub struct PlayerWeaponView {

The derive emits `RelationalReadModel` metadata, row conversion, primary-key
metadata, JSONB column metadata, indexes, and an adapter-owned version column.
Composite and delegated keys are represented in the schema and in session row
Composite and delegated keys are represented in the schema and in write-plan row
mutations.

Use `#[index]` or `#[index("index_name")]` for a secondary field index. Use
Expand All @@ -113,17 +113,17 @@ For compound indexes, put `#[index(columns = ["field_a", "field_b"])]` or

Relationship includes are primary-key anchored and opt-in. Register the
relational schemas with an adapter, load one root row, ask for each relationship
explicitly, mutate the hydrated struct, then save the tracked changes:
explicitly, mutate the hydrated struct, then sync the tracked workspace:

```rust
use sourced_rust::{InMemoryReadModelStore, ReadModelUnitOfWorkExt, RowKey, RowValue};
use sourced_rust::{InMemoryReadModelStore, ReadModelWorkspaceExt, RowKey, RowValue};

let store = InMemoryReadModelStore::new();
store.register_schema::<PlayerView>()?;
store.register_schema::<PlayerWeaponView>()?;

let mut read_models = store.session();
let mut player = read_models
let mut workspace = store.workspace();
let mut player = workspace
.load::<PlayerView>(RowKey::new([("player_id", RowValue::String("player-1".into()))]))
.include("weapons")
.one()?
Expand All @@ -137,14 +137,14 @@ player.weapons.push(PlayerWeaponView {
acquired_at: "2026-05-23".into(),
});

read_models.save_changes(player)?;
read_models.commit()?;
workspace.sync(player)?;
workspace.commit()?;
```

`has_many` relationships hydrate `Vec<T>` fields. `belongs_to` relationships
hydrate `Option<T>` fields.

`save_changes` makes storage match the struct: added items are inserted, changed
`sync` makes storage match the struct: added items are inserted, changed
items updated, and **removed items deleted**. For an included `has_many`
collection, dropping a child from the `Vec<T>` deletes that child row (the loaded
collection is the complete owned set, so the struct is the source of truth).
Expand All @@ -163,15 +163,16 @@ for normalized Postgres read models.

## Command-Side Atomic Writes

Use `ReadModelSession` when a command or projector stages multiple document or
normalized row mutations. The current repository APIs are synchronous:
Use `ReadModelWritePlanBuilder` when a command or projector stages multiple
document or normalized row mutations. The current repository APIs are
synchronous:

```rust
use sourced_rust::{ReadModelSession, ReadModelSessionCommitExt};
use sourced_rust::{ReadModelWritePlanBuilder, ReadModelWritePlanCommitExt};

let mut read_models = ReadModelSession::new();
read_models.save(&player)?;
read_models.save_related(&player, "weapons", &weapon)?;
let mut read_models = ReadModelWritePlanBuilder::new();
read_models.upsert(&player)?;
read_models.upsert_related(&player, "weapons", &weapon)?;

repo.read_models(read_models).commit(&mut aggregate)?;
```
Expand Down Expand Up @@ -207,18 +208,18 @@ repo.readmodel(&board_view).commit(&mut game)?;

## Standalone Distributed Projectors

A read-model service can commit a session without owning an aggregate
A read-model service can commit a write plan without owning an aggregate
repository:

```rust
use sourced_rust::{ReadModelError, ReadModelSession, ReadModelSessionStore};
use sourced_rust::{ReadModelError, ReadModelWritePlanBuilder, ReadModelWritePlanStore};

fn project_message(
store: &impl ReadModelSessionStore,
store: &impl ReadModelWritePlanStore,
event_id: &str,
view: &GameView,
) -> Result<(), ReadModelError> {
let mut read_models = ReadModelSession::new();
let mut read_models = ReadModelWritePlanBuilder::new();
read_models
.document(view)?
.mark_processed("game-view-projector", event_id);
Expand Down
Loading
Loading