Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,69 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

---

## [0.0.12-pre-release]

### Added

#### `EntityChangeTracker.Create()` — canonical entry point (`RayTree.Core`)

New public static factory method that returns `IChangeTrackingBuilder`. This is the new
canonical way to start building a tracker:

```csharp
// Without logging (defaults to NullLoggerFactory)
var tracker = EntityChangeTracker.Create()
.ForEntity<Order>(e => e /* ... */)
.Build();

// With logging
var tracker = EntityChangeTracker.Create(loggerFactory)
.ForEntity<Order>(e => e /* ... */)
.Build();
```

The optional `ILoggerFactory?` parameter defaults to `NullLoggerFactory.Instance` when `null`.

### Changed (breaking)

#### `ChangeTrackingBuilder` constructor is now `internal`; class is `sealed` (`RayTree.Core`)

`ChangeTrackingBuilder` can no longer be instantiated directly or subclassed. Use
`EntityChangeTracker.Create()` instead:

```csharp
// Before
var tracker = new ChangeTrackingBuilder()
.ForEntity<Order>(e => e /* ... */)
.Build();

var tracker = new ChangeTrackingBuilder(loggerFactory)
.ForEntity<Order>(e => e /* ... */)
.Build();

// After
var tracker = EntityChangeTracker.Create()
.ForEntity<Order>(e => e /* ... */)
.Build();

var tracker = EntityChangeTracker.Create(loggerFactory)
.ForEntity<Order>(e => e /* ... */)
.Build();
```

#### `EntityChangeTracker` constructor is now `internal` (`RayTree.Core`)

`EntityChangeTracker` can no longer be constructed directly. Use `EntityChangeTracker.Create()`
(or `AddChangeTracking` in the Generic Host) to obtain a fully configured instance.

### Fixed

#### Duplicate `<see cref>` XML doc tag in `ChangeSubscriberBuilder.Build()` (`RayTree.Core`)

Removed a duplicate `<see cref>` tag from the `Build()` parameter remarks XML documentation.

---

## [0.0.11-pre-release]

### Added
Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors>nullable</WarningsAsErrors>

<VersionPrefix>0.0.11</VersionPrefix>
<VersionPrefix>0.0.12</VersionPrefix>
<VersionSuffix>pre-release</VersionSuffix>

<Authors>bitc0der</Authors>
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ using RayTree.Plugins.InMemory;
using RayTree.Plugins.RabbitMQ;
using RayTree.Plugins.Serializers.Json;

var tracker = await new ChangeTrackingBuilder()
var tracker = await EntityChangeTracker.Create()
.UseSerializer<JsonSerializerPlugin>(_ => new JsonSerializerPlugin())
.UseCompressor<NoOpCompressorPlugin>(_ => new NoOpCompressorPlugin())
.ForEntity<Order>(e => e
Expand Down Expand Up @@ -114,7 +114,7 @@ Give each named handler its own broker subscription, retry budget, and deduplica
// For testing/local dev — InMemoryBroadcastQueue fans out to every subscriber
var broadcast = new InMemoryBroadcastQueue();

var tracker = new ChangeTrackingBuilder()
var tracker = EntityChangeTracker.Create()
.ForEntity<Order>(e => e
.UseOutbox(new InMemoryOutbox())
.UsePublisher(broadcast)
Expand Down Expand Up @@ -224,7 +224,7 @@ using RayTree.Plugins.Deduplication.Redis;

var multiplexer = await ConnectionMultiplexer.ConnectAsync("localhost:6379");

var tracker = new ChangeTrackingBuilder()
var tracker = EntityChangeTracker.Create()
.UseRedisDeduplication(multiplexer) // default options
// or with custom options:
.UseRedisDeduplication(multiplexer, opt =>
Expand All @@ -237,7 +237,7 @@ var tracker = new ChangeTrackingBuilder()
.Build();

// Custom store
var tracker = new ChangeTrackingBuilder()
var tracker = EntityChangeTracker.Create()
.UseDeduplicationStore(new MyCustomStore())
.ForEntity<Order>(e => e /* ... */)
.Build();
Expand Down
10 changes: 5 additions & 5 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ A modular .NET 8.0 entity change tracking system with outbox pattern support, qu
- **Modular Plugins** - Each serializer and compressor in its own package
- **In-Memory Testing** - Full in-memory implementation for development and testing
- **Auto-Initialization** - Automatic database schema initialization on `Build()` / `BuildAsync()`
- **Structured Logging** - `Microsoft.Extensions.Logging` throughout; pass `ILoggerFactory` to `ChangeTrackingBuilder` or let `AddChangeTracking` wire it from DI automatically
- **Structured Logging** - `Microsoft.Extensions.Logging` throughout; pass `ILoggerFactory` to `EntityChangeTracker.Create()` or let `AddChangeTracking` wire it from DI automatically
- **OpenTelemetry Metrics** - `System.Diagnostics.Metrics` instruments on a `"RayTree"` meter for outbox writes, publish/subscribe latency, payload size, queue depth, and retry shape. Zero OTel SDK dependency unless the optional `RayTree.OpenTelemetry` package is referenced. See [OpenTelemetry Metrics Guide](opentelemetry-metrics.md).

## Quick Start

```csharp
// Optional: pass ILoggerFactory for structured log output (defaults to NullLoggerFactory)
var builder = new ChangeTrackingBuilder(loggerFactory);
var builder = EntityChangeTracker.Create(loggerFactory);

builder.ForEntity<Product>(e => e
.UsePostgreSqlOutbox(new PostgreSqlOutboxOptions
Expand All @@ -44,7 +44,7 @@ await tracker.TrackDeleteAsync(new Product { Id = 1, Name = "Widget Pro" });
Set a serializer or compressor for all entity types at once using builder extension methods. Per-entity calls inside `ForEntity` override the global default:

```csharp
var builder = new ChangeTrackingBuilder();
var builder = EntityChangeTracker.Create();
builder.UseJsonSerializer();
builder.UseGzipCompressor();

Expand Down Expand Up @@ -252,7 +252,7 @@ Generated outbox columns (alongside the fixed metadata columns):
## In-Memory Mode (Testing)

```csharp
var builder = new ChangeTrackingBuilder();
var builder = EntityChangeTracker.Create();

builder.ForEntity<Product>(e => e
.UseOutbox(new InMemoryOutbox())
Expand Down Expand Up @@ -339,7 +339,7 @@ Cleanup errors are isolated: a transient database failure logs an error but does
Publisher options are global (not per entity) and are set on the top-level builder via `UsePublisherOptions`. When using the Generic Host they can alternatively be bound from `appsettings.json` via `AddChangeTracking` (see below).

```csharp
var builder = new ChangeTrackingBuilder(loggerFactory);
var builder = EntityChangeTracker.Create(loggerFactory);

// Rotation options are set at the builder level, not inside ForEntity.
builder.UsePublisherOptions(o =>
Expand Down
22 changes: 11 additions & 11 deletions docs/configuration.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# Configuration Guide

The primary configuration API is `ChangeTrackingBuilder`. It registers per-entity plugins, sets global defaults, and produces an `EntityChangeTracker` via `Build()` / `BuildAsync()`.
The primary configuration API is accessed via `EntityChangeTracker.Create()`, which returns `IChangeTrackingBuilder`. It registers per-entity plugins, sets global defaults, and produces an `EntityChangeTracker` via `Build()` / `BuildAsync()`.

## ChangeTrackingBuilder
## EntityChangeTracker.Create()

### Per-Entity Configuration

`ForEntity<T>` accepts a callback that scopes all per-entity configuration. The parent builder is always returned, so multiple entity registrations chain cleanly:

```csharp
var builder = new ChangeTrackingBuilder();
var builder = EntityChangeTracker.Create();

builder
.ForEntity<Product>(e => e
Expand Down Expand Up @@ -39,7 +39,7 @@ var tracker = builder.Build();
Extension methods on `IChangeTrackingBuilder` set a default factory applied to every entity type that does not have an explicit override:

```csharp
var builder = new ChangeTrackingBuilder();
var builder = EntityChangeTracker.Create();
builder.UseJsonSerializer(); // RayTree.Plugins.Serializers.Json
builder.UseGzipCompressor(); // RayTree.Plugins.Compressors.Gzip
// builder.UseProtobufSerializer()
Expand Down Expand Up @@ -130,7 +130,7 @@ See [trigger-setup.md](trigger-setup.md) for full details and hosting in ASP.NET

## ChangeTrackingConfiguration

`ChangeTrackingConfiguration` is a thin wrapper around `ChangeTrackingBuilder` that adds `WithPollingInterval()` and `WithBatchSize()` convenience methods. It does **not** expose per-entity fluent configuration — use `ChangeTrackingBuilder` directly for most scenarios.
`ChangeTrackingConfiguration` is a thin wrapper around `IChangeTrackingBuilder` that adds `WithPollingInterval()` and `WithBatchSize()` convenience methods. It does **not** expose per-entity fluent configuration — use `EntityChangeTracker.Create()` directly for most scenarios.

```csharp
var config = new ChangeTrackingConfiguration()
Expand Down Expand Up @@ -329,18 +329,18 @@ RayTree uses `Microsoft.Extensions.Logging` throughout. All runtime service clas

### Standalone (no DI)

Pass an `ILoggerFactory` to `ChangeTrackingBuilder`:
Pass an `ILoggerFactory` to `EntityChangeTracker.Create()`:

```csharp
// No logging (tests, scripts)
var tracker = new ChangeTrackingBuilder().Build();
var tracker = EntityChangeTracker.Create().Build();

// With logging
using var loggerFactory = LoggerFactory.Create(b => b.AddConsole());
var tracker = new ChangeTrackingBuilder(loggerFactory).Build();
var tracker = EntityChangeTracker.Create(loggerFactory).Build();
```

`ChangeTrackingBuilder` normalises `null` to `NullLoggerFactory.Instance`, so calling `new ChangeTrackingBuilder()` without an argument produces a working tracker with no log output.
`EntityChangeTracker.Create()` normalises `null` to `NullLoggerFactory.Instance`, so calling `EntityChangeTracker.Create()` without an argument produces a working tracker with no log output.

### ASP.NET Core (DI)

Expand Down Expand Up @@ -392,7 +392,7 @@ RayTree emits `System.Diagnostics.Metrics` instruments on a `Meter` named `"RayT

### Default (built-in meter)

`ChangeTrackingBuilder` creates a `RayTreeMeter` automatically and `EntityChangeTracker` disposes it. To collect the metrics, attach a `MeterListener` to the meter named `"RayTree"`, or use the OTel SDK via the `RayTree.OpenTelemetry` package:
`EntityChangeTracker.Create()` creates a `RayTreeMeter` automatically and `EntityChangeTracker` disposes it. To collect the metrics, attach a `MeterListener` to the meter named `"RayTree"`, or use the OTel SDK via the `RayTree.OpenTelemetry` package:

```csharp
services.AddOpenTelemetry()
Expand All @@ -407,7 +407,7 @@ Pass a `RayTreeMeter` instance to share it across trackers or to control its lif

```csharp
var meter = new RayTreeMeter();
var tracker = new ChangeTrackingBuilder(loggerFactory)
var tracker = EntityChangeTracker.Create(loggerFactory)
.UseMeter(meter)
.ForEntity<Order>(/* ... */)
.Build();
Expand Down
19 changes: 9 additions & 10 deletions docs/plugin-development.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface IRepository<TEntity> : IRepository where TEntity : class
Task InsertAsync(TEntity entity, CancellationToken cancellationToken = default);
Task UpdateAsync(TEntity entity, CancellationToken cancellationToken = default);
Task DeleteAsync(TEntity entity, CancellationToken cancellationToken = default);
Task<TEntity?> GetByIdAsync(object id, CancellationToken cancellationToken = default);
Task<TEntity?> GetByIdAsync(object[] keyValues, CancellationToken cancellationToken = default);
}

public interface IRepository
Expand Down Expand Up @@ -186,17 +186,16 @@ public static class MyOutboxExtensions

## Testing Plugins

Register plugins directly on `ChangePublisher` and construct `EntityChangeTracker` from it:
Use the builder to wire plugins and construct `EntityChangeTracker`:

```csharp
var publisher = new ChangePublisher(NullLoggerFactory.Instance);
publisher.RegisterOutbox(typeof(MyEntity), new InMemoryOutbox());
publisher.RegisterPublisher(typeof(MyEntity), new InMemoryQueue());
publisher.RegisterSerializer(typeof(MyEntity), new MyCustomSerializer());
publisher.RegisterCompressor(typeof(MyEntity), new MyCustomCompressor());

var tracker = new EntityChangeTracker(publisher);
await tracker.InitializeAsync();
using var tracker = EntityChangeTracker.Create()
.ForEntity<MyEntity>(e => e
.UseOutbox(new InMemoryOutbox())
.UsePublisher(new InMemoryQueue())
.UseSerializer(new MyCustomSerializer())
.UseCompressor(new MyCustomCompressor()))
.Build();
```

Verify serializer round-trips:
Expand Down
23 changes: 1 addition & 22 deletions src/RayTree.Core/Handling/ChangeSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,27 +219,6 @@ await Parallel.ForEachAsync(
async (envelope, token) => await DispatchAndAcknowledgeAsync(consumer, envelope, token));
}

public async Task ConsumeFromQueueAsync<TQueue>(
TQueue queue,
Func<TQueue, CancellationToken, IAsyncEnumerable<MessageEnvelope>> reader,
CancellationToken cancellationToken = default)
{
// Note: this overload does not have an IQueueConsumer to acknowledge against.
// Callers using a custom reader are responsible for any broker acknowledgement
// outside of ChangeSubscriber. This stays at-most-once by design — the typed
// overload ConsumeFromConsumerAsync(IQueueConsumer) is the path that participates
// in the optional Ack/Nack lifecycle.
var parallelOptions = new ParallelOptions
{
MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism,
CancellationToken = cancellationToken
};
await Parallel.ForEachAsync(
reader(queue, cancellationToken),
parallelOptions,
async (envelope, token) => await ProcessMessageAsync(envelope, token));
}

/// <summary>
/// Shared-mode dispatch wrapper: invokes <see cref="ProcessMessageAsync"/> and then
/// calls <see cref="IQueueConsumer.AcknowledgeAsync"/> on success, or
Expand Down Expand Up @@ -498,7 +477,7 @@ private async Task MaybeDedupCleanupAsync(CancellationToken cancellationToken)
if (DateTime.UtcNow - _lastDedupCleanup < _options.DeduplicationCleanupInterval) return;

// Only one concurrent caller runs cleanup; others skip rather than queue.
if (!_cleanupGate.Wait(0)) return;
if (!await _cleanupGate.WaitAsync(millisecondsTimeout: 0, cancellationToken)) return;
try
{
// Double-check after acquiring so a concurrent caller that already ran doesn't repeat it.
Expand Down
2 changes: 1 addition & 1 deletion src/RayTree.Core/Handling/ChangeSubscriberBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public IChangeSubscriberBuilder ForEntity<TEntity>(Action<IEntitySubscriberBuild
/// </summary>
/// <param name="dedupStoreOverride">
/// When provided (e.g., resolved from DI), takes precedence over any store set on this
/// builder via <see cref="UseDeduplicationStore"/> or <see cref="UseRedisDeduplication"/>.
/// builder via <see cref="UseDeduplicationStore"/>.
/// </param>
/// <param name="optionsOverride">
/// When provided (e.g., bound from <c>appsettings.json</c> via <c>IOptions</c>), takes
Expand Down
4 changes: 2 additions & 2 deletions src/RayTree.Core/Tracking/ChangeTrackingBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@

namespace RayTree.Core.Tracking;

public class ChangeTrackingBuilder : IChangeTrackingBuilder
public sealed class ChangeTrackingBuilder : IChangeTrackingBuilder
{
private readonly ChangePublisherBuilder _publisherBuilder = new();
private readonly ChangeSubscriberBuilder _subscriberBuilder = new();
private readonly ILoggerFactory _loggerFactory;
private RayTreeMeter? _meter;

public ChangeTrackingBuilder(ILoggerFactory? loggerFactory = null)
internal ChangeTrackingBuilder(ILoggerFactory? loggerFactory = null)
{
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
}
Expand Down
6 changes: 5 additions & 1 deletion src/RayTree.Core/Tracking/EntityChangeTracker.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Microsoft.Extensions.Logging;
using RayTree.Core.Distribution;
using RayTree.Core.Handling;
using RayTree.Core.Models;
Expand All @@ -20,13 +21,16 @@ public sealed class EntityChangeTracker : IEntityChangeTracker
/// <summary>The meter used by this tracker's publisher and subscriber.</summary>
public RayTreeMeter Meter => _meter;

public static IChangeTrackingBuilder Create(ILoggerFactory? loggerFactory = null)
=> new ChangeTrackingBuilder(loggerFactory);

/// <summary>
/// Constructs a tracker. When <paramref name="ownsMeter"/> is <c>true</c>,
/// <see cref="Dispose"/> also disposes <paramref name="meter"/>. Builders that create
/// the meter on the caller's behalf should pass <c>ownsMeter: true</c>; callers that
/// inject their own meter via <c>UseMeter</c> should pass <c>false</c>.
/// </summary>
public EntityChangeTracker(
internal EntityChangeTracker(
ChangePublisher publisher,
ChangeSubscriber? subscriber = null,
RayTreeMeter? meter = null,
Expand Down
4 changes: 2 additions & 2 deletions src/RayTree.Hosting/EntityChangeTrackerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ public class EntityChangeTrackerFactory
{
public EntityChangeTracker Create(Action<IChangeTrackingBuilder>? configure = null)
{
var builder = new ChangeTrackingBuilder();
var builder = EntityChangeTracker.Create();
configure?.Invoke(builder);
return builder.Build();
}

public async Task<EntityChangeTracker> CreateAsync(Action<IChangeTrackingBuilder>? configure = null,
CancellationToken cancellationToken = default)
{
var builder = new ChangeTrackingBuilder();
var builder = EntityChangeTracker.Create();
configure?.Invoke(builder);
return await builder.BuildAsync(cancellationToken);
}
Expand Down
2 changes: 1 addition & 1 deletion src/RayTree.Hosting/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public static IServiceCollection AddChangeTracking(

services.AddSingleton<EntityChangeTracker>(sp =>
{
var builder = new ChangeTrackingBuilder(sp.GetService<ILoggerFactory>());
var builder = EntityChangeTracker.Create(sp.GetService<ILoggerFactory>());
builder.UseMeter(sp.GetRequiredService<RayTreeMeter>());
configure?.Invoke(builder);
return builder.Build();
Expand Down
Loading
Loading