diff --git a/CHANGELOG.md b/CHANGELOG.md index efd4c9f..4822dc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,69 @@ The format follows [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). --- +## [0.0.12-pre-release] + +### Added + +#### `EntityChangeTracker.Create()` — canonical entry point (`RayTree.Core`) + +New public static factory method that returns `IChangeTrackingBuilder`. This is the new +canonical way to start building a tracker: + +```csharp +// Without logging (defaults to NullLoggerFactory) +var tracker = EntityChangeTracker.Create() + .ForEntity(e => e /* ... */) + .Build(); + +// With logging +var tracker = EntityChangeTracker.Create(loggerFactory) + .ForEntity(e => e /* ... */) + .Build(); +``` + +The optional `ILoggerFactory?` parameter defaults to `NullLoggerFactory.Instance` when `null`. + +### Changed (breaking) + +#### `ChangeTrackingBuilder` constructor is now `internal`; class is `sealed` (`RayTree.Core`) + +`ChangeTrackingBuilder` can no longer be instantiated directly or subclassed. Use +`EntityChangeTracker.Create()` instead: + +```csharp +// Before +var tracker = new ChangeTrackingBuilder() + .ForEntity(e => e /* ... */) + .Build(); + +var tracker = new ChangeTrackingBuilder(loggerFactory) + .ForEntity(e => e /* ... */) + .Build(); + +// After +var tracker = EntityChangeTracker.Create() + .ForEntity(e => e /* ... */) + .Build(); + +var tracker = EntityChangeTracker.Create(loggerFactory) + .ForEntity(e => e /* ... */) + .Build(); +``` + +#### `EntityChangeTracker` constructor is now `internal` (`RayTree.Core`) + +`EntityChangeTracker` can no longer be constructed directly. Use `EntityChangeTracker.Create()` +(or `AddChangeTracking` in the Generic Host) to obtain a fully configured instance. + +### Fixed + +#### Duplicate `` XML doc tag in `ChangeSubscriberBuilder.Build()` (`RayTree.Core`) + +Removed a duplicate `` tag from the `Build()` parameter remarks XML documentation. + +--- + ## [0.0.11-pre-release] ### Added diff --git a/Directory.Build.props b/Directory.Build.props index f7fe202..6ee0e45 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -7,7 +7,7 @@ true nullable - 0.0.11 + 0.0.12 pre-release bitc0der diff --git a/README.md b/README.md index a32375f..5d18d6c 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ using RayTree.Plugins.InMemory; using RayTree.Plugins.RabbitMQ; using RayTree.Plugins.Serializers.Json; -var tracker = await new ChangeTrackingBuilder() +var tracker = await EntityChangeTracker.Create() .UseSerializer(_ => new JsonSerializerPlugin()) .UseCompressor(_ => new NoOpCompressorPlugin()) .ForEntity(e => e @@ -114,7 +114,7 @@ Give each named handler its own broker subscription, retry budget, and deduplica // For testing/local dev — InMemoryBroadcastQueue fans out to every subscriber var broadcast = new InMemoryBroadcastQueue(); -var tracker = new ChangeTrackingBuilder() +var tracker = EntityChangeTracker.Create() .ForEntity(e => e .UseOutbox(new InMemoryOutbox()) .UsePublisher(broadcast) @@ -224,7 +224,7 @@ using RayTree.Plugins.Deduplication.Redis; var multiplexer = await ConnectionMultiplexer.ConnectAsync("localhost:6379"); -var tracker = new ChangeTrackingBuilder() +var tracker = EntityChangeTracker.Create() .UseRedisDeduplication(multiplexer) // default options // or with custom options: .UseRedisDeduplication(multiplexer, opt => @@ -237,7 +237,7 @@ var tracker = new ChangeTrackingBuilder() .Build(); // Custom store -var tracker = new ChangeTrackingBuilder() +var tracker = EntityChangeTracker.Create() .UseDeduplicationStore(new MyCustomStore()) .ForEntity(e => e /* ... */) .Build(); diff --git a/docs/README.md b/docs/README.md index 00a8b35..31d1a19 100644 --- a/docs/README.md +++ b/docs/README.md @@ -11,14 +11,14 @@ A modular .NET 8.0 entity change tracking system with outbox pattern support, qu - **Modular Plugins** - Each serializer and compressor in its own package - **In-Memory Testing** - Full in-memory implementation for development and testing - **Auto-Initialization** - Automatic database schema initialization on `Build()` / `BuildAsync()` -- **Structured Logging** - `Microsoft.Extensions.Logging` throughout; pass `ILoggerFactory` to `ChangeTrackingBuilder` or let `AddChangeTracking` wire it from DI automatically +- **Structured Logging** - `Microsoft.Extensions.Logging` throughout; pass `ILoggerFactory` to `EntityChangeTracker.Create()` or let `AddChangeTracking` wire it from DI automatically - **OpenTelemetry Metrics** - `System.Diagnostics.Metrics` instruments on a `"RayTree"` meter for outbox writes, publish/subscribe latency, payload size, queue depth, and retry shape. Zero OTel SDK dependency unless the optional `RayTree.OpenTelemetry` package is referenced. See [OpenTelemetry Metrics Guide](opentelemetry-metrics.md). ## Quick Start ```csharp // Optional: pass ILoggerFactory for structured log output (defaults to NullLoggerFactory) -var builder = new ChangeTrackingBuilder(loggerFactory); +var builder = EntityChangeTracker.Create(loggerFactory); builder.ForEntity(e => e .UsePostgreSqlOutbox(new PostgreSqlOutboxOptions @@ -44,7 +44,7 @@ await tracker.TrackDeleteAsync(new Product { Id = 1, Name = "Widget Pro" }); Set a serializer or compressor for all entity types at once using builder extension methods. Per-entity calls inside `ForEntity` override the global default: ```csharp -var builder = new ChangeTrackingBuilder(); +var builder = EntityChangeTracker.Create(); builder.UseJsonSerializer(); builder.UseGzipCompressor(); @@ -252,7 +252,7 @@ Generated outbox columns (alongside the fixed metadata columns): ## In-Memory Mode (Testing) ```csharp -var builder = new ChangeTrackingBuilder(); +var builder = EntityChangeTracker.Create(); builder.ForEntity(e => e .UseOutbox(new InMemoryOutbox()) @@ -339,7 +339,7 @@ Cleanup errors are isolated: a transient database failure logs an error but does Publisher options are global (not per entity) and are set on the top-level builder via `UsePublisherOptions`. When using the Generic Host they can alternatively be bound from `appsettings.json` via `AddChangeTracking` (see below). ```csharp -var builder = new ChangeTrackingBuilder(loggerFactory); +var builder = EntityChangeTracker.Create(loggerFactory); // Rotation options are set at the builder level, not inside ForEntity. builder.UsePublisherOptions(o => diff --git a/docs/configuration.md b/docs/configuration.md index f6d2928..c5cd96a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1,15 +1,15 @@ # Configuration Guide -The primary configuration API is `ChangeTrackingBuilder`. It registers per-entity plugins, sets global defaults, and produces an `EntityChangeTracker` via `Build()` / `BuildAsync()`. +The primary configuration API is accessed via `EntityChangeTracker.Create()`, which returns `IChangeTrackingBuilder`. It registers per-entity plugins, sets global defaults, and produces an `EntityChangeTracker` via `Build()` / `BuildAsync()`. -## ChangeTrackingBuilder +## EntityChangeTracker.Create() ### Per-Entity Configuration `ForEntity` accepts a callback that scopes all per-entity configuration. The parent builder is always returned, so multiple entity registrations chain cleanly: ```csharp -var builder = new ChangeTrackingBuilder(); +var builder = EntityChangeTracker.Create(); builder .ForEntity(e => e @@ -39,7 +39,7 @@ var tracker = builder.Build(); Extension methods on `IChangeTrackingBuilder` set a default factory applied to every entity type that does not have an explicit override: ```csharp -var builder = new ChangeTrackingBuilder(); +var builder = EntityChangeTracker.Create(); builder.UseJsonSerializer(); // RayTree.Plugins.Serializers.Json builder.UseGzipCompressor(); // RayTree.Plugins.Compressors.Gzip // builder.UseProtobufSerializer() @@ -130,7 +130,7 @@ See [trigger-setup.md](trigger-setup.md) for full details and hosting in ASP.NET ## ChangeTrackingConfiguration -`ChangeTrackingConfiguration` is a thin wrapper around `ChangeTrackingBuilder` that adds `WithPollingInterval()` and `WithBatchSize()` convenience methods. It does **not** expose per-entity fluent configuration — use `ChangeTrackingBuilder` directly for most scenarios. +`ChangeTrackingConfiguration` is a thin wrapper around `IChangeTrackingBuilder` that adds `WithPollingInterval()` and `WithBatchSize()` convenience methods. It does **not** expose per-entity fluent configuration — use `EntityChangeTracker.Create()` directly for most scenarios. ```csharp var config = new ChangeTrackingConfiguration() @@ -329,18 +329,18 @@ RayTree uses `Microsoft.Extensions.Logging` throughout. All runtime service clas ### Standalone (no DI) -Pass an `ILoggerFactory` to `ChangeTrackingBuilder`: +Pass an `ILoggerFactory` to `EntityChangeTracker.Create()`: ```csharp // No logging (tests, scripts) -var tracker = new ChangeTrackingBuilder().Build(); +var tracker = EntityChangeTracker.Create().Build(); // With logging using var loggerFactory = LoggerFactory.Create(b => b.AddConsole()); -var tracker = new ChangeTrackingBuilder(loggerFactory).Build(); +var tracker = EntityChangeTracker.Create(loggerFactory).Build(); ``` -`ChangeTrackingBuilder` normalises `null` to `NullLoggerFactory.Instance`, so calling `new ChangeTrackingBuilder()` without an argument produces a working tracker with no log output. +`EntityChangeTracker.Create()` normalises `null` to `NullLoggerFactory.Instance`, so calling `EntityChangeTracker.Create()` without an argument produces a working tracker with no log output. ### ASP.NET Core (DI) @@ -392,7 +392,7 @@ RayTree emits `System.Diagnostics.Metrics` instruments on a `Meter` named `"RayT ### Default (built-in meter) -`ChangeTrackingBuilder` creates a `RayTreeMeter` automatically and `EntityChangeTracker` disposes it. To collect the metrics, attach a `MeterListener` to the meter named `"RayTree"`, or use the OTel SDK via the `RayTree.OpenTelemetry` package: +`EntityChangeTracker.Create()` creates a `RayTreeMeter` automatically and `EntityChangeTracker` disposes it. To collect the metrics, attach a `MeterListener` to the meter named `"RayTree"`, or use the OTel SDK via the `RayTree.OpenTelemetry` package: ```csharp services.AddOpenTelemetry() @@ -407,7 +407,7 @@ Pass a `RayTreeMeter` instance to share it across trackers or to control its lif ```csharp var meter = new RayTreeMeter(); -var tracker = new ChangeTrackingBuilder(loggerFactory) +var tracker = EntityChangeTracker.Create(loggerFactory) .UseMeter(meter) .ForEntity(/* ... */) .Build(); diff --git a/docs/plugin-development.md b/docs/plugin-development.md index d17878e..9ff188d 100644 --- a/docs/plugin-development.md +++ b/docs/plugin-development.md @@ -53,7 +53,7 @@ public interface IRepository : IRepository where TEntity : class Task InsertAsync(TEntity entity, CancellationToken cancellationToken = default); Task UpdateAsync(TEntity entity, CancellationToken cancellationToken = default); Task DeleteAsync(TEntity entity, CancellationToken cancellationToken = default); - Task GetByIdAsync(object id, CancellationToken cancellationToken = default); + Task GetByIdAsync(object[] keyValues, CancellationToken cancellationToken = default); } public interface IRepository @@ -186,17 +186,16 @@ public static class MyOutboxExtensions ## Testing Plugins -Register plugins directly on `ChangePublisher` and construct `EntityChangeTracker` from it: +Use the builder to wire plugins and construct `EntityChangeTracker`: ```csharp -var publisher = new ChangePublisher(NullLoggerFactory.Instance); -publisher.RegisterOutbox(typeof(MyEntity), new InMemoryOutbox()); -publisher.RegisterPublisher(typeof(MyEntity), new InMemoryQueue()); -publisher.RegisterSerializer(typeof(MyEntity), new MyCustomSerializer()); -publisher.RegisterCompressor(typeof(MyEntity), new MyCustomCompressor()); - -var tracker = new EntityChangeTracker(publisher); -await tracker.InitializeAsync(); +using var tracker = EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(new InMemoryOutbox()) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new MyCustomSerializer()) + .UseCompressor(new MyCustomCompressor())) + .Build(); ``` Verify serializer round-trips: diff --git a/src/RayTree.Core/Handling/ChangeSubscriber.cs b/src/RayTree.Core/Handling/ChangeSubscriber.cs index dff7876..03b0b34 100644 --- a/src/RayTree.Core/Handling/ChangeSubscriber.cs +++ b/src/RayTree.Core/Handling/ChangeSubscriber.cs @@ -219,27 +219,6 @@ await Parallel.ForEachAsync( async (envelope, token) => await DispatchAndAcknowledgeAsync(consumer, envelope, token)); } - public async Task ConsumeFromQueueAsync( - TQueue queue, - Func> reader, - CancellationToken cancellationToken = default) - { - // Note: this overload does not have an IQueueConsumer to acknowledge against. - // Callers using a custom reader are responsible for any broker acknowledgement - // outside of ChangeSubscriber. This stays at-most-once by design — the typed - // overload ConsumeFromConsumerAsync(IQueueConsumer) is the path that participates - // in the optional Ack/Nack lifecycle. - var parallelOptions = new ParallelOptions - { - MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism, - CancellationToken = cancellationToken - }; - await Parallel.ForEachAsync( - reader(queue, cancellationToken), - parallelOptions, - async (envelope, token) => await ProcessMessageAsync(envelope, token)); - } - /// /// Shared-mode dispatch wrapper: invokes and then /// calls on success, or @@ -498,7 +477,7 @@ private async Task MaybeDedupCleanupAsync(CancellationToken cancellationToken) if (DateTime.UtcNow - _lastDedupCleanup < _options.DeduplicationCleanupInterval) return; // Only one concurrent caller runs cleanup; others skip rather than queue. - if (!_cleanupGate.Wait(0)) return; + if (!await _cleanupGate.WaitAsync(millisecondsTimeout: 0, cancellationToken)) return; try { // Double-check after acquiring so a concurrent caller that already ran doesn't repeat it. diff --git a/src/RayTree.Core/Handling/ChangeSubscriberBuilder.cs b/src/RayTree.Core/Handling/ChangeSubscriberBuilder.cs index 9ca3f2a..773668f 100644 --- a/src/RayTree.Core/Handling/ChangeSubscriberBuilder.cs +++ b/src/RayTree.Core/Handling/ChangeSubscriberBuilder.cs @@ -85,7 +85,7 @@ public IChangeSubscriberBuilder ForEntity(Action /// /// When provided (e.g., resolved from DI), takes precedence over any store set on this - /// builder via or . + /// builder via . /// /// /// When provided (e.g., bound from appsettings.json via IOptions), takes diff --git a/src/RayTree.Core/Tracking/ChangeTrackingBuilder.cs b/src/RayTree.Core/Tracking/ChangeTrackingBuilder.cs index f2c8d0f..3bbb6f4 100644 --- a/src/RayTree.Core/Tracking/ChangeTrackingBuilder.cs +++ b/src/RayTree.Core/Tracking/ChangeTrackingBuilder.cs @@ -12,14 +12,14 @@ namespace RayTree.Core.Tracking; -public class ChangeTrackingBuilder : IChangeTrackingBuilder +public sealed class ChangeTrackingBuilder : IChangeTrackingBuilder { private readonly ChangePublisherBuilder _publisherBuilder = new(); private readonly ChangeSubscriberBuilder _subscriberBuilder = new(); private readonly ILoggerFactory _loggerFactory; private RayTreeMeter? _meter; - public ChangeTrackingBuilder(ILoggerFactory? loggerFactory = null) + internal ChangeTrackingBuilder(ILoggerFactory? loggerFactory = null) { _loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; } diff --git a/src/RayTree.Core/Tracking/EntityChangeTracker.cs b/src/RayTree.Core/Tracking/EntityChangeTracker.cs index 2c63ea1..c12993f 100644 --- a/src/RayTree.Core/Tracking/EntityChangeTracker.cs +++ b/src/RayTree.Core/Tracking/EntityChangeTracker.cs @@ -1,3 +1,4 @@ +using Microsoft.Extensions.Logging; using RayTree.Core.Distribution; using RayTree.Core.Handling; using RayTree.Core.Models; @@ -20,13 +21,16 @@ public sealed class EntityChangeTracker : IEntityChangeTracker /// The meter used by this tracker's publisher and subscriber. public RayTreeMeter Meter => _meter; + public static IChangeTrackingBuilder Create(ILoggerFactory? loggerFactory = null) + => new ChangeTrackingBuilder(loggerFactory); + /// /// Constructs a tracker. When is true, /// also disposes . Builders that create /// the meter on the caller's behalf should pass ownsMeter: true; callers that /// inject their own meter via UseMeter should pass false. /// - public EntityChangeTracker( + internal EntityChangeTracker( ChangePublisher publisher, ChangeSubscriber? subscriber = null, RayTreeMeter? meter = null, diff --git a/src/RayTree.Hosting/EntityChangeTrackerFactory.cs b/src/RayTree.Hosting/EntityChangeTrackerFactory.cs index c7f24a9..7469772 100644 --- a/src/RayTree.Hosting/EntityChangeTrackerFactory.cs +++ b/src/RayTree.Hosting/EntityChangeTrackerFactory.cs @@ -6,7 +6,7 @@ public class EntityChangeTrackerFactory { public EntityChangeTracker Create(Action? configure = null) { - var builder = new ChangeTrackingBuilder(); + var builder = EntityChangeTracker.Create(); configure?.Invoke(builder); return builder.Build(); } @@ -14,7 +14,7 @@ public EntityChangeTracker Create(Action? configure = nu public async Task CreateAsync(Action? configure = null, CancellationToken cancellationToken = default) { - var builder = new ChangeTrackingBuilder(); + var builder = EntityChangeTracker.Create(); configure?.Invoke(builder); return await builder.BuildAsync(cancellationToken); } diff --git a/src/RayTree.Hosting/ServiceCollectionExtensions.cs b/src/RayTree.Hosting/ServiceCollectionExtensions.cs index 7f64d5c..bf7a8d1 100644 --- a/src/RayTree.Hosting/ServiceCollectionExtensions.cs +++ b/src/RayTree.Hosting/ServiceCollectionExtensions.cs @@ -24,7 +24,7 @@ public static IServiceCollection AddChangeTracking( services.AddSingleton(sp => { - var builder = new ChangeTrackingBuilder(sp.GetService()); + var builder = EntityChangeTracker.Create(sp.GetService()); builder.UseMeter(sp.GetRequiredService()); configure?.Invoke(builder); return builder.Build(); diff --git a/tests/RayTree.EntityFrameworkCore.Tests/EndToEndInMemoryTests.cs b/tests/RayTree.EntityFrameworkCore.Tests/EndToEndInMemoryTests.cs index fdde411..de7f02f 100644 --- a/tests/RayTree.EntityFrameworkCore.Tests/EndToEndInMemoryTests.cs +++ b/tests/RayTree.EntityFrameworkCore.Tests/EndToEndInMemoryTests.cs @@ -1,12 +1,8 @@ using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging.Abstractions; -using RayTree.Core.Distribution; -using RayTree.Core.Telemetry; using RayTree.Core.Plugins; using RayTree.Core.Plugins.Compression; using RayTree.Core.Tracking; using RayTree.EntityFrameworkCore.Interceptors; -using RayTree.Plugins; using RayTree.Plugins.Compressors.Gzip; using RayTree.Plugins.InMemory; using RayTree.Plugins.Serializers.Json; @@ -15,23 +11,21 @@ namespace RayTree.EntityFrameworkCore.Tests; public class EndToEndInMemoryTests { - private static (ChangePublisher publisher, EntityChangeTracker tracker) BuildTracker( - InMemoryOutbox outbox, bool withQueue = false, bool withGzip = false) - { - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(Product), outbox); - publisher.RegisterSerializer(typeof(Product), new JsonSerializerPlugin()); - publisher.RegisterCompressor(typeof(Product), withGzip ? new GzipCompressorPlugin() : (IChangeCompressor)new NoOpCompressorPlugin()); - if (withQueue) - publisher.RegisterPublisher(typeof(Product), new InMemoryQueue()); - return (publisher, new EntityChangeTracker(publisher)); - } + private static EntityChangeTracker BuildTracker(InMemoryOutbox outbox, bool withGzip = false) + => EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(outbox) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(withGzip ? new GzipCompressorPlugin() : (IChangeCompressor)new NoOpCompressorPlugin())) + .Build(); [Test] public async Task EfCore_SaveChanges_WritesToInMemoryOutbox() { + // Arrange var outbox = new InMemoryOutbox(); - var (_, tracker) = BuildTracker(outbox); + var tracker = BuildTracker(outbox); var interceptor = new EntityChangeInterceptor(tracker, new[] { typeof(Product) }); var options = new DbContextOptionsBuilder() @@ -40,10 +34,12 @@ public async Task EfCore_SaveChanges_WritesToInMemoryOutbox() .Options; await using var context = new TestDbContext(options); - context.Products.Add(new Product { Name = "Widget", Price = 9.99m }); + + // Act await context.SaveChangesAsync(); + // Assert var outboxEntries = outbox.GetAll(); Assert.That(outboxEntries, Has.Count.EqualTo(1)); Assert.That(outboxEntries[0].EntityType, Does.Contain("Product")); @@ -53,8 +49,9 @@ public async Task EfCore_SaveChanges_WritesToInMemoryOutbox() [Test] public async Task EfCore_MultipleChanges_WritesAllToOutbox() { + // Arrange var outbox = new InMemoryOutbox(); - var (_, tracker) = BuildTracker(outbox); + var tracker = BuildTracker(outbox); var interceptor = new EntityChangeInterceptor(tracker, new[] { typeof(Product) }); var options = new DbContextOptionsBuilder() @@ -63,19 +60,22 @@ public async Task EfCore_MultipleChanges_WritesAllToOutbox() .Options; await using var context = new TestDbContext(options); - context.Products.Add(new Product { Name = "Gadget", Price = 19.99m }); context.Products.Add(new Product { Name = "Doohickey", Price = 4.99m }); + + // Act await context.SaveChangesAsync(); + // Assert Assert.That(outbox.GetAll(), Has.Count.EqualTo(2)); } [Test] public async Task EfCore_UpdateChange_DetectedAndStored() { + // Arrange var outbox = new InMemoryOutbox(); - var (_, tracker) = BuildTracker(outbox); + var tracker = BuildTracker(outbox); var interceptor = new EntityChangeInterceptor(tracker, new[] { typeof(Product) }); var options = new DbContextOptionsBuilder() @@ -91,6 +91,7 @@ public async Task EfCore_UpdateChange_DetectedAndStored() productId = context.Products.First().Id; } + // Act await using (var context = new TestDbContext(options)) { var product = context.Products.Find(productId); @@ -101,6 +102,7 @@ public async Task EfCore_UpdateChange_DetectedAndStored() } } + // Assert var updates = outbox.GetAll().Where(c => c.ChangeType == ChangeType.Update).ToList(); Assert.That(updates, Is.Not.Empty); } @@ -108,8 +110,9 @@ public async Task EfCore_UpdateChange_DetectedAndStored() [Test] public async Task EfCore_DeleteChange_DetectedAndStored() { + // Arrange var outbox = new InMemoryOutbox(); - var (_, tracker) = BuildTracker(outbox); + var tracker = BuildTracker(outbox); var interceptor = new EntityChangeInterceptor(tracker, new[] { typeof(Product) }); var options = new DbContextOptionsBuilder() @@ -125,6 +128,7 @@ public async Task EfCore_DeleteChange_DetectedAndStored() productId = context.Products.First().Id; } + // Act await using (var context = new TestDbContext(options)) { var product = context.Products.Find(productId); @@ -135,6 +139,7 @@ public async Task EfCore_DeleteChange_DetectedAndStored() } } + // Assert var deletes = outbox.GetAll().Where(c => c.ChangeType == ChangeType.Delete).ToList(); Assert.That(deletes, Is.Not.Empty); } @@ -142,14 +147,16 @@ public async Task EfCore_DeleteChange_DetectedAndStored() [Test] public async Task EfCore_WithQueue_OutboxAndQueueRegistered() { + // Arrange var outbox = new InMemoryOutbox(); var queue = new InMemoryQueue(); - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(Product), outbox); - publisher.RegisterPublisher(typeof(Product), queue); - publisher.RegisterSerializer(typeof(Product), new JsonSerializerPlugin()); - publisher.RegisterCompressor(typeof(Product), new NoOpCompressorPlugin()); - var tracker = new EntityChangeTracker(publisher); + var tracker = EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(outbox) + .UsePublisher(queue) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); var interceptor = new EntityChangeInterceptor(tracker, new[] { typeof(Product) }); var options = new DbContextOptionsBuilder() @@ -158,10 +165,12 @@ public async Task EfCore_WithQueue_OutboxAndQueueRegistered() .Options; await using var context = new TestDbContext(options); - context.Products.Add(new Product { Name = "Queued", Price = 42.0m }); + + // Act await context.SaveChangesAsync(); + // Assert Assert.That(outbox.GetAll(), Has.Count.EqualTo(1)); Assert.That(tracker.Publisher.GetPublisher(typeof(Product)), Is.Not.Null); Assert.That(tracker.Publisher.GetOutbox(typeof(Product)), Is.Not.Null); @@ -170,33 +179,33 @@ public async Task EfCore_WithQueue_OutboxAndQueueRegistered() [Test] public async Task EfCore_OutboxPublisher_DeliversToQueue() { - var outbox = new InMemoryOutbox(); + // Arrange var queue = new InMemoryQueue(); - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(Product), outbox); - publisher.RegisterPublisher(typeof(Product), queue); - publisher.RegisterSerializer(typeof(Product), new JsonSerializerPlugin()); - publisher.RegisterCompressor(typeof(Product), new NoOpCompressorPlugin()); - publisher.Options.PollingInterval = TimeSpan.FromMilliseconds(50); - - var tracker = new EntityChangeTracker(publisher); - await tracker.InitializeAsync(); - + using var tracker = EntityChangeTracker.Create() + .UsePublisherOptions(o => o.PollingInterval = TimeSpan.FromMilliseconds(50)) + .ForEntity(e => e + .UseOutbox(new InMemoryOutbox()) + .UsePublisher(queue) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); + + // Act await tracker.TrackInsertAsync(new Product { Id = 1, Name = "Widget" }); + // Assert using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var message = await queue.Reader.ReadAsync(cts.Token); Assert.That(message.EntityId, Is.EqualTo("1")); Assert.That(message.ChangeType, Is.EqualTo(ChangeType.Insert)); - - tracker.Dispose(); } [Test] public async Task EfCore_WithCompression_RoundTripPreservesData() { + // Arrange var outbox = new InMemoryOutbox(); - var (_, tracker) = BuildTracker(outbox, withGzip: true); + var tracker = BuildTracker(outbox, withGzip: true); var interceptor = new EntityChangeInterceptor(tracker, new[] { typeof(Product) }); var options = new DbContextOptionsBuilder() @@ -205,10 +214,12 @@ public async Task EfCore_WithCompression_RoundTripPreservesData() .Options; await using var context = new TestDbContext(options); - context.Products.Add(new Product { Name = "Compressed", Price = 15.0m }); + + // Act await context.SaveChangesAsync(); + // Assert Assert.That(outbox.GetAll(), Has.Count.EqualTo(1)); } diff --git a/tests/RayTree.EntityFrameworkCore.Tests/EntityChangeInterceptorTests.cs b/tests/RayTree.EntityFrameworkCore.Tests/EntityChangeInterceptorTests.cs index 14b1371..1e5e29a 100644 --- a/tests/RayTree.EntityFrameworkCore.Tests/EntityChangeInterceptorTests.cs +++ b/tests/RayTree.EntityFrameworkCore.Tests/EntityChangeInterceptorTests.cs @@ -1,13 +1,13 @@ using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.Logging.Abstractions; using Moq; -using RayTree.Core.Distribution; -using RayTree.Core.Telemetry; using RayTree.Core.Models; +using RayTree.Core.Plugins.Compression; using RayTree.Core.Plugins.Outbox; using RayTree.Core.Tracking; using RayTree.EntityFrameworkCore.Interceptors; using RayTree.EntityFrameworkCore.Extensions; +using RayTree.Plugins.InMemory; +using RayTree.Plugins.Serializers.Json; namespace RayTree.EntityFrameworkCore.Tests; @@ -38,15 +38,18 @@ private class TestEntity } private static EntityChangeTracker BuildTracker(IOutbox outbox) - { - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(TestEntity), outbox); - return new EntityChangeTracker(publisher); - } + => EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(outbox) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); [Test] public async Task SavingChangesAsync_DetectsAddedEntities() { + // Arrange var outbox = new Mock(); outbox.Setup(o => o.WriteAsync(It.IsAny>(), It.IsAny())) .Returns(Task.CompletedTask); @@ -60,17 +63,19 @@ public async Task SavingChangesAsync_DetectsAddedEntities() .Options; await using var context = new TestDbContext(options); - context.TestEntities.Add(new TestEntity { Name = "New", CreatedAt = DateTime.UtcNow }); + // Act await context.SaveChangesAsync(); + // Assert outbox.Verify(o => o.WriteAsync(It.IsAny>(), It.IsAny()), Times.AtLeastOnce); } [Test] public async Task SavingChangesAsync_DetectsModifiedEntities() { + // Arrange var outbox = new Mock(); outbox.Setup(o => o.WriteAsync(It.IsAny>(), It.IsAny())) .Returns(Task.CompletedTask); @@ -84,21 +89,23 @@ public async Task SavingChangesAsync_DetectsModifiedEntities() .Options; await using var context = new TestDbContext(options); - context.TestEntities.Add(new TestEntity { Name = "Original", CreatedAt = DateTime.UtcNow }); await context.SaveChangesAsync(); var entity = context.TestEntities.First(); entity.Name = "Modified"; + // Act await context.SaveChangesAsync(); + // Assert outbox.Verify(o => o.WriteAsync(It.Is>(c => c.ChangeType == ChangeType.Update), It.IsAny()), Times.AtLeastOnce); } [Test] public async Task SavingChangesAsync_DetectsDeletedEntities() { + // Arrange var outbox = new Mock(); outbox.Setup(o => o.WriteAsync(It.IsAny>(), It.IsAny())) .Returns(Task.CompletedTask); @@ -112,25 +119,29 @@ public async Task SavingChangesAsync_DetectsDeletedEntities() .Options; await using var context = new TestDbContext(options); - context.TestEntities.Add(new TestEntity { Name = "To Delete", CreatedAt = DateTime.UtcNow }); await context.SaveChangesAsync(); var entity = context.TestEntities.First(); context.TestEntities.Remove(entity); + // Act await context.SaveChangesAsync(); + // Assert outbox.Verify(o => o.WriteAsync(It.Is>(c => c.ChangeType == ChangeType.Delete), It.IsAny()), Times.AtLeastOnce); } [Test] public void ServiceCollectionExtensions_AddChangeTracking_RegistersServices() { + // Arrange var services = new Microsoft.Extensions.DependencyInjection.ServiceCollection(); + // Act var result = services.AddChangeTracking(); + // Assert Assert.That(result, Is.Not.Null); } } diff --git a/tests/RayTree.Plugins.InMemory.Tests/EndToEndTests.cs b/tests/RayTree.Plugins.InMemory.Tests/EndToEndTests.cs index 618cce5..d0145f7 100644 --- a/tests/RayTree.Plugins.InMemory.Tests/EndToEndTests.cs +++ b/tests/RayTree.Plugins.InMemory.Tests/EndToEndTests.cs @@ -1,7 +1,4 @@ -using Microsoft.Extensions.Logging.Abstractions; -using RayTree.Core.Distribution; -using RayTree.Core.Telemetry; -using RayTree.Core.Plugins; +using RayTree.Core.Models; using RayTree.Core.Plugins.Compression; using RayTree.Core.Tracking; using RayTree.Plugins.Serializers.Json; @@ -14,48 +11,67 @@ public class EndToEndTests [Test] public async Task FullPipeline_TracksChange_WritesToOutbox_AndPublishesToQueue() { + // Arrange var outbox = new InMemoryOutbox(); - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(User), outbox); - publisher.RegisterPublisher(typeof(User), new InMemoryQueue()); - publisher.RegisterSerializer(typeof(User), new JsonSerializerPlugin()); - publisher.RegisterCompressor(typeof(User), new NoOpCompressorPlugin()); - var tracker = new EntityChangeTracker(publisher); - + using var tracker = EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(outbox) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); + + // Act await tracker.TrackInsertAsync(new User { Id = 1, Name = "Alice" }); + // Assert Assert.That(outbox.GetAll(), Has.Count.EqualTo(1)); } [Test] public async Task Pipeline_WithCompression_RoundTripSucceeds() { + // Arrange var outbox = new InMemoryOutbox(); - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(Order), outbox); - publisher.RegisterPublisher(typeof(Order), new InMemoryQueue()); - publisher.RegisterSerializer(typeof(Order), new JsonSerializerPlugin()); - publisher.RegisterCompressor(typeof(Order), new GzipCompressorPlugin()); - var tracker = new EntityChangeTracker(publisher); - + using var tracker = EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(outbox) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new GzipCompressorPlugin())) + .Build(); + + // Act await tracker.TrackUpdateAsync(new Order { Id = 100, Total = 99.99m }); + // Assert Assert.That(outbox.GetAll(), Has.Count.EqualTo(1)); } [Test] public async Task MultipleEntities_TrackedIndependently() { + // Arrange var userOutbox = new InMemoryOutbox(); var orderOutbox = new InMemoryOutbox(); - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(User), userOutbox); - publisher.RegisterOutbox(typeof(Order), orderOutbox); - var tracker = new EntityChangeTracker(publisher); - + using var tracker = EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(userOutbox) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .ForEntity(e => e + .UseOutbox(orderOutbox) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); + + // Act await tracker.TrackInsertAsync(new User { Id = 1, Name = "Bob" }); await tracker.TrackInsertAsync(new Order { Id = 100, Total = 50m }); + // Assert Assert.That(userOutbox.GetAll(), Has.Count.EqualTo(1)); Assert.That(orderOutbox.GetAll(), Has.Count.EqualTo(1)); } @@ -63,41 +79,46 @@ public async Task MultipleEntities_TrackedIndependently() [Test] public async Task Pipeline_WritesOutbox_InSameBatch() { + // Arrange var outbox = new InMemoryOutbox(); - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(User), outbox); - publisher.RegisterSerializer(typeof(User), new JsonSerializerPlugin()); - publisher.RegisterCompressor(typeof(User), new NoOpCompressorPlugin()); - var tracker = new EntityChangeTracker(publisher); - + using var tracker = EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(outbox) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); + + // Act await tracker.TrackInsertAsync(new User { Id = 1, Name = "A" }); await tracker.TrackUpdateAsync(new User { Id = 2, Name = "B" }); await tracker.TrackDeleteAsync(new User { Id = 3, Name = "C" }); + // Assert Assert.That(outbox.GetAll(), Has.Count.EqualTo(3)); } [Test] public async Task Pipeline_WithQueue_ConsumerReceivesMessage() { + // Arrange var queue = new InMemoryQueue(); - var publisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - publisher.RegisterOutbox(typeof(User), new InMemoryOutbox()); - publisher.RegisterPublisher(typeof(User), queue); - publisher.RegisterSerializer(typeof(User), new JsonSerializerPlugin()); - publisher.RegisterCompressor(typeof(User), new NoOpCompressorPlugin()); - publisher.Options.PollingInterval = TimeSpan.FromMilliseconds(50); - - var tracker = new EntityChangeTracker(publisher); - await tracker.InitializeAsync(); - + using var tracker = EntityChangeTracker.Create() + .UsePublisherOptions(o => o.PollingInterval = TimeSpan.FromMilliseconds(50)) + .ForEntity(e => e + .UseOutbox(new InMemoryOutbox()) + .UsePublisher(queue) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); + + // Act await tracker.TrackInsertAsync(new User { Id = 1, Name = "Charlie" }); + // Assert using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var message = await queue.Reader.ReadAsync(cts.Token); Assert.That(message.ChangeType, Is.EqualTo(ChangeType.Insert)); - - tracker.Dispose(); } private class User diff --git a/tests/RayTree.Plugins.Kafka.Tests/KafkaEndToEndTests.cs b/tests/RayTree.Plugins.Kafka.Tests/KafkaEndToEndTests.cs index 40b8f55..89d543d 100644 --- a/tests/RayTree.Plugins.Kafka.Tests/KafkaEndToEndTests.cs +++ b/tests/RayTree.Plugins.Kafka.Tests/KafkaEndToEndTests.cs @@ -1,7 +1,6 @@ using Microsoft.Extensions.Logging.Abstractions; -using RayTree.Core.Distribution; -using RayTree.Core.Telemetry; using RayTree.Core.Handling; +using RayTree.Core.Telemetry; using RayTree.Core.Models; using RayTree.Core.Plugins.Compression; using RayTree.Core.Tracking; @@ -30,22 +29,19 @@ public class KafkaEndToEndTests : IAsyncDisposable // ------------------------------------------------------------------------- private EntityChangeTracker BuildTracker(string topic) - { - var publisher = new KafkaPublisher(new KafkaPublisherOptions - { - BootstrapServers = _kafka.GetBootstrapAddress(), - Topic = topic, - Acks = "all" - }); - - var changePublisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - changePublisher.RegisterOutbox(typeof(Order), new InMemoryOutbox()); - changePublisher.RegisterPublisher(typeof(Order), publisher); - changePublisher.RegisterSerializer(typeof(Order), new JsonSerializerPlugin()); - changePublisher.RegisterCompressor(typeof(Order), new NoOpCompressorPlugin()); - changePublisher.Options.PollingInterval = TimeSpan.FromMilliseconds(100); - return new EntityChangeTracker(changePublisher); - } + => EntityChangeTracker.Create() + .UsePublisherOptions(o => o.PollingInterval = TimeSpan.FromMilliseconds(100)) + .ForEntity(e => e + .UseOutbox(new InMemoryOutbox()) + .UsePublisher(new KafkaPublisher(new KafkaPublisherOptions + { + BootstrapServers = _kafka.GetBootstrapAddress(), + Topic = topic, + Acks = "all" + })) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); private KafkaConsumer BuildConsumer(string topic, string groupId) => new(new KafkaConsumerOptions { @@ -79,8 +75,9 @@ private static async Task WaitForAssignmentAsync(KafkaConsumer consumer, [Test] public async Task TrackInsert_HandlerReceivesCorrectChange() { + // Arrange var topic = $"test-insert-{Guid.NewGuid():N}"; - var consumer = BuildConsumer(topic, $"group-{Guid.NewGuid():N}"); + using var consumer = BuildConsumer(topic, $"group-{Guid.NewGuid():N}"); await consumer.InitializeAsync(); var tcs = new TaskCompletionSource(); @@ -96,28 +93,27 @@ public async Task TrackInsert_HandlerReceivesCorrectChange() using var cts = new CancellationTokenSource(); var consumeTask = Task.Run(() => subscriber.ConsumeFromConsumerAsync(consumer, cts.Token)); - // Wait until the broker has acknowledged the subscription before publishing. await WaitForAssignmentAsync(consumer); + using var tracker = BuildTracker(topic); - var tracker = BuildTracker(topic); - await tracker.InitializeAsync(); + // Act await tracker.TrackInsertAsync(new Order { Id = 1, Total = 49.99m }); + // Assert var received = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(20)); Assert.That(received.EntityId, Is.EqualTo("1")); Assert.That(received.ChangeType, Is.EqualTo(ChangeType.Insert)); cts.Cancel(); - tracker.Dispose(); - consumer.Dispose(); } [Test] public async Task TrackUpdate_HandlerReceivesCorrectChange() { + // Arrange var topic = $"test-update-{Guid.NewGuid():N}"; - var consumer = BuildConsumer(topic, $"group-{Guid.NewGuid():N}"); + using var consumer = BuildConsumer(topic, $"group-{Guid.NewGuid():N}"); await consumer.InitializeAsync(); var tcs = new TaskCompletionSource(); @@ -133,34 +129,32 @@ public async Task TrackUpdate_HandlerReceivesCorrectChange() using var cts = new CancellationTokenSource(); var consumeTask = Task.Run(() => subscriber.ConsumeFromConsumerAsync(consumer, cts.Token)); - + // Wait until the broker has acknowledged the subscription before publishing. await WaitForAssignmentAsync(consumer); + using var tracker = BuildTracker(topic); - var tracker = BuildTracker(topic); - await tracker.InitializeAsync(); + // Act await tracker.TrackUpdateAsync(new Order { Id = 77, Total = 300m }); + // Assert var received = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(20)); Assert.That(received.EntityId, Is.EqualTo("77")); Assert.That(received.ChangeType, Is.EqualTo(ChangeType.Update)); cts.Cancel(); - tracker.Dispose(); - consumer.Dispose(); } [Test] public async Task TrackMultiple_AllChangesDeliveredInOrder() { + // Arrange var topic = $"test-batch-{Guid.NewGuid():N}"; - var consumer = BuildConsumer(topic, $"group-{Guid.NewGuid():N}"); + using var consumer = BuildConsumer(topic, $"group-{Guid.NewGuid():N}"); await consumer.InitializeAsync(); var received = new List(); var allReceived = new TaskCompletionSource(); - // The previous wildcard OnChange(null, ...) form was removed; register the same - // delegate for each ChangeType to receive all three events. var subscriber = new ChangeSubscriber(NullLogger.Instance, new RayTreeMeter()); ChangeHandlerAsync recordChange = (change, _) => { @@ -177,25 +171,23 @@ public async Task TrackMultiple_AllChangesDeliveredInOrder() using var cts = new CancellationTokenSource(); var consumeTask = Task.Run(() => subscriber.ConsumeFromConsumerAsync(consumer, cts.Token)); - + // Wait until the broker has acknowledged the subscription before publishing. await WaitForAssignmentAsync(consumer); + using var tracker = BuildTracker(topic); - var tracker = BuildTracker(topic); - await tracker.InitializeAsync(); + // Act await tracker.TrackInsertAsync(new Order { Id = 1, Total = 10m }); await tracker.TrackUpdateAsync(new Order { Id = 2, Total = 20m }); await tracker.TrackDeleteAsync(new Order { Id = 3, Total = 30m }); + // Assert await allReceived.Task.WaitAsync(TimeSpan.FromSeconds(20)); Assert.That(received, Has.Count.EqualTo(3)); - // Kafka preserves per-partition order; all three share the same entity type key Assert.That(received.Select(c => c.EntityId), Is.EqualTo(new[] { "1", "2", "3" })); cts.Cancel(); - tracker.Dispose(); - consumer.Dispose(); } public ValueTask DisposeAsync() => _kafka.DisposeAsync(); diff --git a/tests/RayTree.Plugins.PostgreSQL.Tests/NotificationBasedPublisherTests.cs b/tests/RayTree.Plugins.PostgreSQL.Tests/NotificationBasedPublisherTests.cs index 7a918cb..eb1d7f5 100644 --- a/tests/RayTree.Plugins.PostgreSQL.Tests/NotificationBasedPublisherTests.cs +++ b/tests/RayTree.Plugins.PostgreSQL.Tests/NotificationBasedPublisherTests.cs @@ -46,14 +46,13 @@ public async Task SetUp() // Initialize outbox directly: creates table + trigger without starting background publisher services await _outbox.InitializeAsync(); - var builder = new ChangeTrackingBuilder(); - builder.ForEntity(e => e - .UseOutbox(_outbox) - .UsePublisher(_queue) - .UseSerializer(new JsonSerializerPlugin()) - .UseCompressor(new GzipCompressorPlugin())); - - _tracker = builder.Build(); + _tracker = EntityChangeTracker.Create() + .ForEntity(e => e + .UseOutbox(_outbox) + .UsePublisher(_queue) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new GzipCompressorPlugin())) + .Build(); _publisher = new NotificationBasedPublisher(_tracker.Publisher, new NotificationBasedPublisherOptions @@ -90,7 +89,12 @@ public async Task TearDown() [Test] public async Task StartAsync_SetsIsRunning_StopAsync_ClearsIt() { + // Arrange — setup in [SetUp] + + // Act await _publisher.StartAsync(); + + // Assert Assert.That(_publisher.IsRunning, Is.True); await _publisher.StopAsync(); @@ -100,14 +104,16 @@ public async Task StartAsync_SetsIsRunning_StopAsync_ClearsIt() [Test] public async Task FallbackPolling_DeliversUnpublishedChange_ToQueue() { + // Arrange var change = CreateChange(1); await _outbox.WriteAsync(change); - await _publisher.StartAsync(); + // Act using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); var received = await _queue.Reader.ReadAsync(cts.Token); + // Assert Assert.That(received.EntityId, Is.EqualTo("1")); Assert.That(received.ChangeType, Is.EqualTo(ChangeType.Insert)); } @@ -115,27 +121,32 @@ public async Task FallbackPolling_DeliversUnpublishedChange_ToQueue() [Test] public async Task FallbackPolling_DoesNotRedeliver_AlreadyPublishedChange() { + // Arrange var change = CreateChange(2); await _outbox.WriteAsync(change); await _outbox.MarkPublishedAsync(change.Id); - await _publisher.StartAsync(); + + // Act await Task.Delay(700); // > 2× fallback interval + // Assert Assert.That(_queue.Reader.TryRead(out _), Is.False); } [Test] public async Task FallbackPolling_MarksChangePublished_AfterDelivery() { + // Arrange var change = CreateChange(3); await _outbox.WriteAsync(change); - await _publisher.StartAsync(); + // Act using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); await _queue.Reader.ReadAsync(cts.Token); + // Assert // Poll until MarkPublishedAsync completes the DB write (runs after PublishAsync in the same loop iteration) var deadline = DateTime.UtcNow.AddSeconds(5); EntityChange? stored = null; @@ -152,23 +163,27 @@ public async Task FallbackPolling_MarksChangePublished_AfterDelivery() [Test] public async Task StopAsync_PreventsDelivery_OfSubsequentWrites() { + // Arrange await _publisher.StartAsync(); await _publisher.StopAsync(); + // Act await _outbox.WriteAsync(CreateChange(4)); await Task.Delay(700); // > 2× fallback interval + // Assert Assert.That(_queue.Reader.TryRead(out _), Is.False); } [Test] public async Task FallbackPolling_DeliversBatch_OfMultipleChanges() { + // Arrange for (var i = 1; i <= 5; i++) await _outbox.WriteAsync(CreateChange(i)); - await _publisher.StartAsync(); + // Act using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); var received = new List(); for (var i = 0; i < 5; i++) @@ -177,6 +192,7 @@ public async Task FallbackPolling_DeliversBatch_OfMultipleChanges() received.Add(envelope); } + // Assert Assert.That(received, Has.Count.EqualTo(5)); Assert.That(received.Select(c => c.EntityId).Order(), Is.EqualTo(new[] { "1", "2", "3", "4", "5" })); diff --git a/tests/RayTree.Plugins.PostgreSQL.Tests/PostgreSqlRepositoryIntegrationTests.cs b/tests/RayTree.Plugins.PostgreSQL.Tests/PostgreSqlRepositoryIntegrationTests.cs index cfa938d..b37354b 100644 --- a/tests/RayTree.Plugins.PostgreSQL.Tests/PostgreSqlRepositoryIntegrationTests.cs +++ b/tests/RayTree.Plugins.PostgreSQL.Tests/PostgreSqlRepositoryIntegrationTests.cs @@ -24,21 +24,20 @@ public async Task OneTimeSetUp() [SetUp] public async Task SetUp() { - var builder = new ChangeTrackingBuilder(); - builder.ForEntity(e => e - .UseRepository(new PostgreSqlRepository(new() - { - ConnectionString = _postgres.GetConnectionString(), TableName = "test_users" - }, NullLoggerFactory.Instance)) - .UseOutbox(new PostgreSqlOutbox(new() - { - ConnectionString = _postgres.GetConnectionString(), OutboxTableName = "test_users_outbox" - }, NullLoggerFactory.Instance)) - .UsePublisher(new InMemoryQueue()) - .UseSerializer(new RayTree.Plugins.Serializers.Json.JsonSerializerPlugin()) - .UseCompressor(new RayTree.Plugins.Compressors.Gzip.GzipCompressorPlugin())); - - _tracker = builder.Build(); + _tracker = EntityChangeTracker.Create() + .ForEntity(e => e + .UseRepository(new PostgreSqlRepository(new() + { + ConnectionString = _postgres.GetConnectionString(), TableName = "test_users" + }, NullLoggerFactory.Instance)) + .UseOutbox(new PostgreSqlOutbox(new() + { + ConnectionString = _postgres.GetConnectionString(), OutboxTableName = "test_users_outbox" + }, NullLoggerFactory.Instance)) + .UsePublisher(new InMemoryQueue()) + .UseSerializer(new RayTree.Plugins.Serializers.Json.JsonSerializerPlugin()) + .UseCompressor(new RayTree.Plugins.Compressors.Gzip.GzipCompressorPlugin())) + .Build(); } public ValueTask DisposeAsync() => _postgres.DisposeAsync(); @@ -46,11 +45,14 @@ public async Task SetUp() [Test] public async Task InsertAsync_StoresEntity() { + // Arrange var repo = _tracker.Publisher.GetRepository(typeof(TestUser)) as PostgreSqlRepository; var user = new TestUser { Id = 1 }; + // Act await repo!.InsertAsync(user); + // Assert var stored = await repo.GetByIdAsync([1]); Assert.That(stored, Is.Not.Null); Assert.That(stored!.Id, Is.EqualTo(1)); @@ -59,12 +61,15 @@ public async Task InsertAsync_StoresEntity() [Test] public async Task UpdateAsync_UpdatesTimestamp() { + // Arrange var repo = _tracker.Publisher.GetRepository(typeof(TestUser)) as PostgreSqlRepository; var user = new TestUser { Id = 1 }; await repo!.InsertAsync(user); + // Act await repo.UpdateAsync(user); + // Assert var stored = await repo.GetByIdAsync([1]); Assert.That(stored, Is.Not.Null); } @@ -72,12 +77,15 @@ public async Task UpdateAsync_UpdatesTimestamp() [Test] public async Task DeleteAsync_RemovesEntity() { + // Arrange var repo = _tracker.Publisher.GetRepository(typeof(TestUser)) as PostgreSqlRepository; var user = new TestUser { Id = 1 }; await repo!.InsertAsync(user); + // Act await repo.DeleteAsync(user); + // Assert var stored = await repo.GetByIdAsync([1]); Assert.That(stored, Is.Null); } @@ -85,8 +93,13 @@ public async Task DeleteAsync_RemovesEntity() [Test] public async Task GetByIdAsync_WithNonExistentId_ReturnsNull() { + // Arrange var repo = _tracker.Publisher.GetRepository(typeof(TestUser)) as PostgreSqlRepository; + + // Act var result = await repo!.GetByIdAsync([999]); + + // Assert Assert.That(result, Is.Null); } diff --git a/tests/RayTree.Plugins.RabbitMQ.Tests/RabbitMqEndToEndTests.cs b/tests/RayTree.Plugins.RabbitMQ.Tests/RabbitMqEndToEndTests.cs index f29047d..2f70d02 100644 --- a/tests/RayTree.Plugins.RabbitMQ.Tests/RabbitMqEndToEndTests.cs +++ b/tests/RayTree.Plugins.RabbitMQ.Tests/RabbitMqEndToEndTests.cs @@ -1,7 +1,6 @@ using Microsoft.Extensions.Logging.Abstractions; -using RayTree.Core.Distribution; -using RayTree.Core.Telemetry; using RayTree.Core.Handling; +using RayTree.Core.Telemetry; using RayTree.Core.Models; using RayTree.Core.Plugins.Compression; using RayTree.Core.Tracking; @@ -28,15 +27,14 @@ public class RabbitMqEndToEndTests : IAsyncDisposable // ------------------------------------------------------------------------- private EntityChangeTracker BuildTracker(RabbitMqPublisher publisher) - { - var changePublisher = new ChangePublisher(NullLoggerFactory.Instance, new RayTreeMeter()); - changePublisher.RegisterOutbox(typeof(Order), new InMemoryOutbox()); - changePublisher.RegisterPublisher(typeof(Order), publisher); - changePublisher.RegisterSerializer(typeof(Order), new JsonSerializerPlugin()); - changePublisher.RegisterCompressor(typeof(Order), new NoOpCompressorPlugin()); - changePublisher.Options.PollingInterval = TimeSpan.FromMilliseconds(100); - return new EntityChangeTracker(changePublisher); - } + => EntityChangeTracker.Create() + .UsePublisherOptions(o => o.PollingInterval = TimeSpan.FromMilliseconds(100)) + .ForEntity(e => e + .UseOutbox(new InMemoryOutbox()) + .UsePublisher(publisher) + .UseSerializer(new JsonSerializerPlugin()) + .UseCompressor(new NoOpCompressorPlugin())) + .Build(); private RabbitMqPublisher BuildPublisher() => new(new RabbitMqPublisherOptions { @@ -69,11 +67,12 @@ private EntityChangeTracker BuildTracker(RabbitMqPublisher publisher) [Test] public async Task TrackInsert_HandlerReceivesCorrectChange() { + // Arrange var queueName = $"test-{Guid.NewGuid():N}"; - var publisher = BuildPublisher(); + using var publisher = BuildPublisher(); await publisher.InitializeAsync(); - var consumer = BuildConsumer(queueName); + using var consumer = BuildConsumer(queueName); await consumer.InitializeAsync(); var tcs = new TaskCompletionSource(); @@ -89,29 +88,28 @@ public async Task TrackInsert_HandlerReceivesCorrectChange() using var cts = new CancellationTokenSource(); var consumeTask = Task.Run(() => subscriber.ConsumeFromConsumerAsync(consumer, cts.Token)); + using var tracker = BuildTracker(publisher); - var tracker = BuildTracker(publisher); - await tracker.InitializeAsync(); + // Act await tracker.TrackInsertAsync(new Order { Id = 1, Total = 49.99m }); + // Assert var received = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(15)); Assert.That(received.EntityId, Is.EqualTo("1")); Assert.That(received.ChangeType, Is.EqualTo(ChangeType.Insert)); cts.Cancel(); - tracker.Dispose(); - consumer.Dispose(); - publisher.Dispose(); } [Test] public async Task TrackUpdate_HandlerReceivesCorrectChange() { + // Arrange var queueName = $"test-{Guid.NewGuid():N}"; - var publisher = BuildPublisher(); + using var publisher = BuildPublisher(); await publisher.InitializeAsync(); - var consumer = BuildConsumer(queueName); + using var consumer = BuildConsumer(queueName); await consumer.InitializeAsync(); var tcs = new TaskCompletionSource(); @@ -127,36 +125,33 @@ public async Task TrackUpdate_HandlerReceivesCorrectChange() using var cts = new CancellationTokenSource(); var consumeTask = Task.Run(() => subscriber.ConsumeFromConsumerAsync(consumer, cts.Token)); + using var tracker = BuildTracker(publisher); - var tracker = BuildTracker(publisher); - await tracker.InitializeAsync(); + // Act await tracker.TrackUpdateAsync(new Order { Id = 55, Total = 200m }); + // Assert var received = await tcs.Task.WaitAsync(TimeSpan.FromSeconds(15)); Assert.That(received.EntityId, Is.EqualTo("55")); Assert.That(received.ChangeType, Is.EqualTo(ChangeType.Update)); cts.Cancel(); - tracker.Dispose(); - consumer.Dispose(); - publisher.Dispose(); } [Test] public async Task TrackMultiple_AllChangesDelivered() { + // Arrange var queueName = $"test-{Guid.NewGuid():N}"; - var publisher = BuildPublisher(); + using var publisher = BuildPublisher(); await publisher.InitializeAsync(); - var consumer = BuildConsumer(queueName); + using var consumer = BuildConsumer(queueName); await consumer.InitializeAsync(); var received = new List(); var allReceived = new TaskCompletionSource(); - // The previous wildcard OnChange(null, ...) form was removed; register the same - // delegate for each ChangeType to receive all three events. var subscriber = new ChangeSubscriber(NullLogger.Instance, new RayTreeMeter()); ChangeHandlerAsync recordChange = (change, _) => { @@ -173,22 +168,20 @@ public async Task TrackMultiple_AllChangesDelivered() using var cts = new CancellationTokenSource(); var consumeTask = Task.Run(() => subscriber.ConsumeFromConsumerAsync(consumer, cts.Token)); + using var tracker = BuildTracker(publisher); - var tracker = BuildTracker(publisher); - await tracker.InitializeAsync(); + // Act await tracker.TrackInsertAsync(new Order { Id = 1, Total = 10m }); await tracker.TrackUpdateAsync(new Order { Id = 2, Total = 20m }); await tracker.TrackDeleteAsync(new Order { Id = 3, Total = 30m }); + // Assert await allReceived.Task.WaitAsync(TimeSpan.FromSeconds(15)); Assert.That(received, Has.Count.EqualTo(3)); Assert.That(received.Select(c => c.EntityId).Order(), Is.EqualTo(new[] { "1", "2", "3" })); cts.Cancel(); - tracker.Dispose(); - consumer.Dispose(); - publisher.Dispose(); } public ValueTask DisposeAsync() => _rabbitMq.DisposeAsync();