Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ See `CLAUDE.md` for deep architecture documentation, plugin contracts, and key d
`NullLoggerFactory` produces zero allocations and zero output.
- See [docs/configuration.md#what-gets-logged](docs/configuration.md#what-gets-logged) for the full per-class log
inventory (level, trigger, structured properties).
- **Connection-recovery logs** (added by the `add-connection-recovery` change) live in the runtime service classes that
own the connection (`NotificationBasedPublisher`, `OutboxPublisherService`, `KafkaPublisher`, `KafkaConsumer`,
`RabbitMqPublisher`) and use those classes' existing non-nullable `ILogger<T>`. **Exception:** `RabbitMqConsumer` has
no logger field (pre-existing exception to the logging-placement rule — message-receive errors silently NACK + requeue
with no useful context to log). Its connection disconnect/recovery events are therefore observable **only** via the
`raytree.connection.*` metric instruments; no log output. Plugins that emit connection-recovery metrics do so through
the public `RayTreeMeter.RecordConnectionDisconnect` / `RecordConnectionRecovery` / `RegisterConnectionStateGauge`
facade methods — no `InternalsVisibleTo` is granted to plugin assemblies for this purpose.

### Testing Conventions

Expand Down
89 changes: 89 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,95 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

---

## [0.0.18-pre-release]

### Added

#### Connection-loss recovery and observability across every connection-bearing plugin

Two real bugs fixed, three plugins instrumented:

- **`NotificationBasedPublisher`** previously detected LISTEN connection loss (`_listenerHealthy = false`) but never reopened — the fast path silently degraded to fallback polling until process restart. `ListenLoopAsync` now reconnects with exponential backoff bounded by the new `NotificationBasedPublisherOptions.ConnectionRecovery`. The fallback polling loop continues throughout, providing best-effort delivery during reconnect. Single-method design: one `try`, two `catch` arms, connection lifetime bounded by an `await using var conn` local — no class field needed.
- **`KafkaPublisher`** fatal errors (`Error.IsFatal == true`) previously left the native producer dead until process restart. The error callback now flips an atomic `Interlocked.CompareExchange` fault timestamp (no locking, no disposal on the librdkafka callback thread); the next `PublishAsync` rebuilds the producer through the existing lazy `GetProducerAsync` path under `_buildLock`, re-running the `WaitForTopic` probe when enabled. New `KafkaPublisherOptions.ConnectionRecovery`. Constructor adds optional `RayTreeMeter? meter = null` parameter.
- **`KafkaConsumer`** fatal exceptions on the dedicated poll thread previously poisoned the buffer channel. The poll thread now drains pending deferred-ack actions, disposes the dying consumer, and runs an inline exponential-backoff `RebuildConsumer` on the same thread (re-runs `WaitForTopic` probe + `Subscribe`). New `KafkaConsumerOptions.ConnectionRecovery`. Constructor adds optional `RayTreeMeter? meter = null` parameter.
- **`RabbitMqPublisher` / `RabbitMqConsumer`** delegate recovery entirely to `RabbitMQ.Client.AutomaticRecoveryEnabled` (library default — unchanged). The plugin subscribes to the SDK's `ConnectionShutdownAsync` / `RecoverySucceededAsync` / `ConnectionRecoveryErrorAsync` events to emit metrics + logs only. Both publishers/consumers add optional `RayTreeMeter? meter = null` constructor parameter; builder extensions accept it through.
- **`PostgreSqlOutbox`** and **`OutboxPublisherService`** gain a passive observability path: when a batch fails with an exception classified by `IOutbox.IsConnectionFault(ex)`, the polling loop emits `raytree.connection.disconnects{component="postgres.outbox"}` once per transition and demotes the per-batch `Error` log to `Warning`. First subsequent success emits `recoveries{outcome="succeeded"}`. **No retry code is added** — the existing polling cadence is the retry.

Four new metric instruments on the `"RayTree"` meter:

- `raytree.connection.disconnects` — counter, tagged `component` + `endpoint`
- `raytree.connection.recoveries` — counter, tagged `component` + `endpoint` + `outcome ∈ {"succeeded", "exhausted"}`
- `raytree.connection.recovery.duration` — histogram, unit `s`, same tags
- `raytree.connection.state` — observable gauge: `1` (connected) or `0` (in fault cycle)

`component` is one of `"rabbitmq.publisher"`, `"rabbitmq.consumer"`, `"kafka.publisher"`, `"kafka.consumer"`, `"postgres.notification"`, `"postgres.outbox"`. Suggested histogram bucket boundaries for `raytree.connection.recovery.duration`: `[0.1, 0.5, 1, 2, 5, 10, 30, 60, 120]` seconds. See [docs/opentelemetry-metrics.md](docs/opentelemetry-metrics.md) for the full inventory + alert queries.

#### `IOutbox` interface — three default-implemented members for connection-fault observability

`IOutbox` now exposes (with default implementations returning the no-op):

- `bool IsConnectionFault(Exception ex)` — defaults to `false`
- `string? ConnectionComponent { get; }` — defaults to `null`
- `string? ConnectionEndpoint { get; }` — defaults to `null`

`PostgreSqlOutbox<TEntity>` overrides all three — `IsConnectionFault` delegates to the shared internal `PostgresFault.IsConnectionFault` helper which uses `Npgsql.PostgresErrorCodes` constants (admin/crash shutdown, `08xxx` connection_exception family, transient socket/IO inner). `ConnectionComponent` returns `"postgres.outbox"`. `ConnectionEndpoint` is parsed once from the connection string. `InMemoryOutbox` and any third-party `IOutbox` inherit the no-op defaults — no breaking change.

#### `ConnectionRecoveryOptions` (`RayTree.Core.Resilience`)

New public class tuning the per-plugin exponential-backoff schedule:

- `Enabled` (bool, default `true`) — master switch; when `false` plugins surface the disconnect to callers instead of reconnecting
- `InitialDelay` (TimeSpan, default 1 s), `MaxDelay` (TimeSpan, default 30 s), `Factor` (double, default 2.0), `JitterFraction` (double, default 0.2), `MaxAttempts` (int?, default `null` — unlimited)

Per-field invariants throw `ArgumentOutOfRangeException` from object initializers. Cross-field invariants (`MaxDelay >= InitialDelay`) are enforced by `Validate()`, which plugins call on first use. `RabbitMqPublisherOptions` / `RabbitMqConsumerOptions` do **not** expose this property — RabbitMQ.Client owns recovery timing via its own `NetworkRecoveryInterval`.

#### Hosting binds connection-recovery defaults from configuration

`AddChangeTracking` now binds `ChangeTracking:Publisher:ConnectionRecovery` and `ChangeTracking:Subscriber:ConnectionRecovery` to NAMED `ConnectionRecoveryOptions`. Keys exposed as public constants `ChangeTrackingRecoveryKeys.Publisher` / `Subscriber`. Plugins that want to honor the bound defaults resolve via `IOptionsMonitor<ConnectionRecoveryOptions>.Get(key)` and assign explicitly.

#### Public facade methods on `RayTreeMeter`

Plugins consume Core via its public API only — **no new `InternalsVisibleTo` entries are added**. The four new instruments are emitted through three public facade methods (matching the existing `RecordPublishSuccess` pattern):

- `RecordConnectionDisconnect(string component, string endpoint)`
- `RecordConnectionRecovery(string component, string endpoint, string outcome, double durationSeconds)`
- `RegisterConnectionStateGauge(string component, string endpoint, Func<int> getState) → IDisposable`

### Changed

#### `OutboxPublisherService` — generic implementation, zero-reflection hot path

`OutboxPublisherService` public surface is unchanged (`new OutboxPublisherService(publisher, entityType, options, loggerFactory, meter)` still works — no test code needs updating). Internally the class is now a thin wrapper over a `TypedImpl<TEntity>` instantiated once at construction via `Activator.CreateInstance(typeof(TypedImpl<>).MakeGenericType(entityType), ...)`. The per-batch publish path is now zero-reflection: `IOutbox.GetUnpublishedAsync<TEntity>` and `IChangeSerializer.SerializeAsync(EntityChange<TEntity>, ...)` are direct generic calls instead of `MethodInfo.MakeGenericMethod(...).Invoke(...)`.

Other cleanups along the way:

- `_outboxUnhealthy` (bool) + `_firstUnhealthyAt` (DateTime) collapsed into single `DateTime? _unhealthySince` — same idiom used in `NotificationBasedPublisher.ListenLoopAsync`.
- `MaybeRunCleanupAsync` two near-identical try/catch blocks consolidated through a `TryCleanupAsync` helper.
- New `EffectivePollingInterval` property captures the `UseNotificationChannel ? FallbackPollingInterval : PollingInterval` choice as a named expression.

Net: 383 → 320 lines, no behaviour changes, all 178 Core unit tests pass without modification.

### Changed — BINARY-BREAKING

#### `KafkaPublisher` / `KafkaConsumer` / `RabbitMqPublisher` / `RabbitMqConsumer` constructors add optional `RayTreeMeter? meter` parameter

- `new KafkaPublisher(options, loggerFactory)` → `new KafkaPublisher(options, loggerFactory, meter)`
- `new KafkaConsumer(options, loggerFactory)` → `new KafkaConsumer(options, loggerFactory, meter)`
- `new RabbitMqPublisher(options, loggerFactory)` → `new RabbitMqPublisher(options, loggerFactory, meter)`
- `new RabbitMqConsumer(options)` → `new RabbitMqConsumer(options, meter)`

All new parameters are optional with `null` default — **source-compatible** but **binary-breaking** (adding an optional parameter changes the constructor signature). Downstream applications consuming these plugin packages as published NuGets must **recompile** against this version. Builder extensions (`UseKafka`, `UseRabbitMq` on both publisher and subscriber sides) also gain an optional `RayTreeMeter? meter = null` parameter that forwards to the plugin constructor.

### Architectural decisions captured

- **No shared `ConnectionRetry` helper in Core.** Each plugin that owns retry code (Postgres LISTEN, Kafka consumer) implements its own ~20-line inline exponential-backoff loop. Extracting a shared helper would require `InternalsVisibleTo` exposure to plugin assemblies (architectural smell) or a public-API commitment to a specific retry shape. Two short copies is cheaper. Both copies carry comments cross-referencing the other.
- **No new `InternalsVisibleTo` entries.** Plugin assemblies consume Core via its public API only. `RayTreeMeter` exposes public facade methods for connection metric emission (matching the existing pattern). `IOutbox` exposes default-implemented members for outbox-side classification.
- **`postgres.outbox` is observation-only.** The polling cadence is the natural retry — no rebuild loop, no `ConnectionRecoveryOptions` consumed on the outbox path. Metrics + log demotion only.
- **RabbitMQ recovery is owned by the SDK.** RayTree subscribes to events for metric emission; `AutomaticRecoveryEnabled` is left at the library default. RabbitMQ options do not expose `ConnectionRecovery` — `ConnectionFactory.NetworkRecoveryInterval` controls timing.
- **Write paths throw to callers.** `PostgreSqlOutbox.WriteAsync` and `PostgreSqlRepository` writes continue to surface exceptions on connection fault. Outbox writes typically run inside the caller's EF Core transaction; library auto-retry would break atomicity. The caller's unit-of-work owns write-side retry.

---

## [0.0.17-pre-release]

### Added
Expand Down
20 changes: 20 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ All durations are emitted in seconds (`s`) per OTel semantic conventions; bytes

`EntityChangeInterceptor` hooks into `SaveChangesAsync` to automatically call `TrackInsertAsync`/`TrackUpdateAsync`/`TrackDeleteAsync` based on EF change tracker state.

### Connection recovery (`src/RayTree.Core/Resilience` + per-plugin)

Every connection-bearing plugin emits four shared metric instruments on the `"RayTree"` meter so broker / database connection state is uniformly observable: `raytree.connection.disconnects` (counter), `raytree.connection.recoveries` (counter with `outcome=succeeded|exhausted`), `raytree.connection.recovery.duration` (histogram, seconds), `raytree.connection.state` (observable gauge: `1` connected / `0` in fault cycle). All four are tagged with `component` and `endpoint`. Helper methods `RayTreeMeter.RecordConnectionDisconnect` / `RecordConnectionRecovery` / `RegisterConnectionStateGauge` are public — **no `InternalsVisibleTo` is granted to plugin assemblies for this purpose**; plugins consume Core via the public API only.

Recovery shape per plugin:

- **`postgres.notification`** (`NotificationBasedPublisher.ListenLoopAsync`) — single long-lived `NpgsqlConnection` as a local `await using var` (no class field). On a fault classified by the shared `PostgresFault.IsConnectionFault` static helper (uses `Npgsql.PostgresErrorCodes` constants — admin/crash shutdown, `08xxx` connection_exception family, transient `NpgsqlException`, `SocketException`/`IOException` inner, `ObjectDisposedException`), runs an inline exponential-backoff loop bounded by `NotificationBasedPublisherOptions.ConnectionRecovery` that disposes the connection, opens a fresh one, reissues `LISTEN`. Single method, one `try`, two `catch` arms. Fallback polling continues throughout.
- **`postgres.outbox`** — **observation only, no rebuild.** Two seams emit the metric:
- *OutboxPublisherService.ProcessBatchAsync*: `HandleBatchError` consults `IOutbox.IsConnectionFault` + `ConnectionComponent` in the existing batch-error catch. On a classified fault: emit `disconnects`, demote per-batch `Error` log to `Warning`, set `_outboxUnhealthy = true` + `_firstUnhealthyAt`. On the first subsequent successful batch: emit `recoveries{outcome="succeeded"}` + duration via `EmitOutboxRecovered`.
- *NotificationBasedPublisher.FallbackPollingLoopAsync*: `ProcessSingleOutboxAsync` does the same classification per outbox, keyed on entity type via a `ConcurrentDictionary<Type, FallbackOutboxState>` so each outbox tracks its own transitions independently.

`IOutbox` exposes three default-implemented members (`IsConnectionFault`, `ConnectionComponent`, `ConnectionEndpoint`) that `PostgreSqlOutbox<TEntity>` overrides via `PostgresFault`. The polling cadence is the natural retry — no rebuild loop. Non-fault exceptions preserve the original `Error` path. `InMemoryOutbox` and third-party `IOutbox` implementations inherit no-op defaults — no breaking change.
- **`kafka.publisher`** — `SetErrorHandler` callback flips an atomic `Interlocked.CompareExchange` flag (`_faultTicks` UTC-ticks) and emits the disconnect metric. The callback does **no locking** and **no disposal** (avoids the documented Confluent.Kafka footgun of disposing inside the native callback). The next `PublishAsync` enters `GetProducerAsync` under a single `_buildLock`, disposes the dead producer on a normal call thread, re-runs the `WaitForTopic` probe (when enabled), builds a fresh producer, and emits the recovery metric. No inner backoff loop — the outbox-publisher retry loop is the outer cadence. State: 2 fields (`_producer`, `_faultTicks`) + 1 lock.
- **`kafka.consumer`** — fatal `KafkaException` on the dedicated poll thread → drops pending deferred-ack actions (broker redelivers via at-least-once contract on the new consumer's join) → disposes the dying consumer → runs an inline exponential-backoff `RebuildConsumer` on the same poll thread (re-runs `WaitForTopic` probe + `Subscribe`). Backoff math is a small private static helper (`Task.Delay(...).GetAwaiter().GetResult()` is safe here — the poll thread is dedicated with no SyncContext). On exhaustion completes the buffer channel with the fatal exception.
- **`rabbitmq.publisher`** / **`rabbitmq.consumer`** — **observe, don't own**. `RabbitMQ.Client.AutomaticRecoveryEnabled` (library default — RayTree does **not** disable it) performs the actual rebuild. Publisher subscribes to `ConnectionShutdownAsync` (Warning + disconnect metric when `Initiator != Application`), `RecoverySucceededAsync` (Information + recovery metric + duration), `ConnectionRecoveryErrorAsync` (Information only — per-internal-attempt, no metric). Consumer subscribes to shutdown + recovery events for metrics only (no logger field — pre-existing exception to the logging-placement rule stands). `RabbitMqPublisherOptions` / `RabbitMqConsumerOptions` do **not** expose `ConnectionRecovery` — the library's `NetworkRecoveryInterval` controls timing, exposed via `ConnectionFactory` not through RayTree.

The retry math (~20 LoC: exponential backoff with `±JitterFraction` jitter, capped at `MaxDelay`, optionally bounded by `MaxAttempts`) is duplicated across the two plugins that need it (Postgres LISTEN, Kafka consumer). This is intentional — extracting a shared helper would require either `InternalsVisibleTo` exposure to plugins (architectural smell — plugins must consume Core via public API only) or a public-API commitment to a specific retry shape. Two short copies is cheaper than either of those.

`ConnectionRecoveryOptions` is bound from `appsettings.json` via `AddChangeTracking` into named options (`ChangeTrackingRecoveryKeys.Publisher` / `.Subscriber` constants). Plugins do **not** auto-swap the bound default into their own `ConnectionRecovery` property (would require `IServiceProvider` plumbing through plugin builder extension signatures — out of scope); callers who want to honor the bound defaults resolve them via `IOptionsMonitor<ConnectionRecoveryOptions>.Get(key)` and assign explicitly. In practice most callers tune per plugin anyway.

## Key Design Decisions

- **Unified builder**: `IChangeTrackingBuilder.ForEntity<TEntity>()` takes `Action<IEntityBuilder<TEntity>>` where `IEntityBuilder<TEntity>` covers both publisher and subscriber configuration. The `where TEntity : class` constraint is required by the subscriber handler registration. Value types cannot be entity types. `UseSerializer`/`UseCompressor` at the global level forward the factory's output to the subscriber's global instance by calling `factory(typeof(object))` — this works correctly when the factory ignores the type parameter (the common case).
Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors>nullable</WarningsAsErrors>

<VersionPrefix>0.0.17</VersionPrefix>
<VersionPrefix>0.0.18</VersionPrefix>
<VersionSuffix>pre-release</VersionSuffix>

<Authors>bitc0der</Authors>
Expand Down
Loading
Loading