diff --git a/Directory.Packages.props b/Directory.Packages.props index 5a50c79b4..c321698cf 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -49,6 +49,7 @@ + diff --git a/Eventuous.slnx b/Eventuous.slnx index 0f296de49..83e61f82a 100644 --- a/Eventuous.slnx +++ b/Eventuous.slnx @@ -144,6 +144,13 @@ + + + + + + + diff --git a/src/Core/src/Eventuous.Persistence/Eventuous.Persistence.csproj b/src/Core/src/Eventuous.Persistence/Eventuous.Persistence.csproj index 2ba72e4f0..8334a33bc 100644 --- a/src/Core/src/Eventuous.Persistence/Eventuous.Persistence.csproj +++ b/src/Core/src/Eventuous.Persistence/Eventuous.Persistence.csproj @@ -24,6 +24,7 @@ + diff --git a/src/Core/test/Eventuous.Tests.Persistence.Base/Fixtures/StoreFixtureBase.cs b/src/Core/test/Eventuous.Tests.Persistence.Base/Fixtures/StoreFixtureBase.cs index 9f705be6c..9c5287091 100644 --- a/src/Core/test/Eventuous.Tests.Persistence.Base/Fixtures/StoreFixtureBase.cs +++ b/src/Core/test/Eventuous.Tests.Persistence.Base/Fixtures/StoreFixtureBase.cs @@ -13,7 +13,7 @@ namespace Eventuous.Tests.Persistence.Base.Fixtures; public interface IStartableFixture : IAsyncInitializer, IAsyncDisposable; public abstract class StoreFixtureBase { - public IEventStore EventStore { get; protected private set; } = null!; + public IEventStore EventStore { get; protected set; } = null!; protected static Faker Faker { get; } = new(); protected ServiceProvider Provider { get; set; } = null!; protected bool AutoStart { get; init; } = true; diff --git a/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs b/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs index d4e407712..518820306 100644 --- a/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs +++ b/src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs @@ -18,8 +18,8 @@ namespace Eventuous.Sql.Base; /// Database transaction type public abstract class SqlEventStoreBase(IEventSerializer? serializer, IMetadataSerializer? metaSerializer) : IEventStore where TConnection : DbConnection where TTransaction : DbTransaction { - readonly IEventSerializer _serializer = serializer ?? DefaultEventSerializer.Instance; - readonly IMetadataSerializer _metaSerializer = metaSerializer ?? DefaultMetadataSerializer.Instance; + protected IEventSerializer Serializer { get; } = serializer ?? DefaultEventSerializer.Instance; + protected IMetadataSerializer MetaSerializer { get; } = metaSerializer ?? DefaultMetadataSerializer.Instance; const string ContentType = "application/json"; @@ -134,9 +134,9 @@ async Task ReadInternal(DbCommand cmd, StreamName stream, bool fa [RequiresDynamicCode("Calls Eventuous.IEventSerializer.DeserializeEvent(ReadOnlySpan, String, String)")] [RequiresUnreferencedCode("Calls Eventuous.IEventSerializer.DeserializeEvent(ReadOnlySpan, String, String)")] StreamEvent ToStreamEvent(PersistedEvent evt) { - var deserialized = _serializer.DeserializeEvent(Encoding.UTF8.GetBytes(evt.JsonData), evt.MessageType, ContentType); + var deserialized = Serializer.DeserializeEvent(Encoding.UTF8.GetBytes(evt.JsonData), evt.MessageType, ContentType); - var meta = evt.JsonMetadata == null ? new() : _metaSerializer.Deserialize(Encoding.UTF8.GetBytes(evt.JsonMetadata!)); + var meta = evt.JsonMetadata == null ? new() : MetaSerializer.Deserialize(Encoding.UTF8.GetBytes(evt.JsonMetadata!)); return deserialized switch { SuccessfullyDeserialized success => AsStreamEvent(success.Payload), @@ -149,7 +149,7 @@ StreamEvent ToStreamEvent(PersistedEvent evt) { /// [RequiresDynamicCode(Constants.DynamicSerializationMessage)] [RequiresUnreferencedCode(Constants.DynamicSerializationMessage)] - public async Task AppendEvents( + public virtual async Task AppendEvents( StreamName stream, ExpectedStreamVersion expectedVersion, IReadOnlyCollection events, @@ -182,8 +182,8 @@ CancellationToken cancellationToken [RequiresUnreferencedCode("Calls Eventuous.IEventSerializer.SerializeEvent(Object)")] [RequiresDynamicCode("Calls Eventuous.IEventSerializer.SerializeEvent(Object)")] NewPersistedEvent Convert(NewStreamEvent evt) { - var data = _serializer.SerializeEvent(evt.Payload!); - var meta = _metaSerializer.Serialize(evt.Metadata); + var data = Serializer.SerializeEvent(evt.Payload!); + var meta = MetaSerializer.Serialize(evt.Metadata); return new(evt.Id, data.EventType, AsString(data.Payload), AsString(meta)); } @@ -198,7 +198,7 @@ public async Task StreamExists(StreamName stream, CancellationToken cancel var result = await cmd.ExecuteScalarAsync(cancellationToken).NoContext(); - return (bool)result!; + return Convert.ToBoolean(result); } /// diff --git a/src/Sqlite/src/Eventuous.Sqlite/ConnectionFactory.cs b/src/Sqlite/src/Eventuous.Sqlite/ConnectionFactory.cs new file mode 100644 index 000000000..674f2ba7d --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/ConnectionFactory.cs @@ -0,0 +1,19 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.Sqlite; + +delegate Task GetSqliteConnection(CancellationToken cancellationToken); + +public static class ConnectionFactory { + public static async Task GetConnection(string connectionString, CancellationToken cancellationToken) { + var connection = new SqliteConnection(connectionString); + await connection.OpenAsync(cancellationToken).NoContext(); + + await using var walCmd = connection.CreateCommand(); + walCmd.CommandText = "PRAGMA journal_mode=WAL"; + await walCmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + + return connection; + } +} diff --git a/src/Sqlite/src/Eventuous.Sqlite/Eventuous.Sqlite.csproj b/src/Sqlite/src/Eventuous.Sqlite/Eventuous.Sqlite.csproj new file mode 100644 index 000000000..35715748b --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Eventuous.Sqlite.csproj @@ -0,0 +1,28 @@ + + + + + + + + + + + + + + + + + + + Tools\TaskExtensions.cs + + + Tools\Ensure.cs + + + + + + diff --git a/src/Sqlite/src/Eventuous.Sqlite/Extensions/RegistrationExtensions.cs b/src/Sqlite/src/Eventuous.Sqlite/Extensions/RegistrationExtensions.cs new file mode 100644 index 000000000..df8856581 --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Extensions/RegistrationExtensions.cs @@ -0,0 +1,88 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Sqlite; +using Eventuous.Sqlite.Projections; +using Eventuous.Sqlite.Subscriptions; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +// ReSharper disable UnusedMethodReturnValue.Global +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection; + +public static class ServiceCollectionExtensions { + /// Service collection + extension(IServiceCollection services) { + /// + /// Adds SQLite event store and the necessary schema to the DI container. + /// + /// Connection string + /// Schema name + /// Set to true if you want the schema to be created on startup + /// + public IServiceCollection AddEventuousSqlite( + string connectionString, + string schema = Schema.DefaultSchema, + bool initializeDatabase = false + ) { + var options = new SqliteStoreOptions { + Schema = Ensure.NotEmptyString(schema), + ConnectionString = Ensure.NotEmptyString(connectionString), + InitializeDatabase = initializeDatabase + }; + services.AddSingleton(options); + services.AddSingleton(); + services.AddHostedService(); + services.TryAddSingleton(new SqliteConnectionOptions(connectionString, schema)); + + return services; + } + + /// + /// Adds SQLite event store and the necessary schema to the DI container using the configuration. + /// + /// Configuration section for SQLite options + /// + public IServiceCollection AddEventuousSqlite(IConfiguration config) { + services.Configure(config); + services.AddSingleton(sp => sp.GetRequiredService>().Value); + services.AddSingleton(); + services.AddHostedService(); + + services.TryAddSingleton( + sp => { + var storeOptions = sp.GetRequiredService>().Value; + + return new SqliteConnectionOptions(Ensure.NotEmptyString(storeOptions.ConnectionString), storeOptions.Schema); + } + ); + + return services; + } + + /// + /// Registers the SQLite-based checkpoint store using the details provided when registering + /// SQLite connection factory. + /// + /// + public IServiceCollection AddSqliteCheckpointStore() + => services.AddCheckpointStore( + sp => { + var loggerFactory = sp.GetService(); + var connectionOptions = sp.GetService(); + var checkpointStoreOptions = sp.GetService(); + + var schema = connectionOptions?.Schema is not null and not Schema.DefaultSchema + && checkpointStoreOptions?.Schema is null or Schema.DefaultSchema + ? connectionOptions.Schema + : checkpointStoreOptions?.Schema ?? Schema.DefaultSchema; + var connectionString = checkpointStoreOptions?.ConnectionString ?? connectionOptions?.ConnectionString; + + return new(Ensure.NotNull(connectionString), schema, loggerFactory); + } + ); + } +} diff --git a/src/Sqlite/src/Eventuous.Sqlite/Extensions/SqliteExtensions.cs b/src/Sqlite/src/Eventuous.Sqlite/Extensions/SqliteExtensions.cs new file mode 100644 index 000000000..d6302c791 --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Extensions/SqliteExtensions.cs @@ -0,0 +1,25 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.Sqlite.Extensions; + +static class SqliteExtensions { + extension(SqliteCommand command) { + internal SqliteCommand Add(string parameterName, object? value) { + command.Parameters.AddWithValue(parameterName, value ?? DBNull.Value); + + return command; + } + } + + extension(SqliteConnection connection) { + internal SqliteCommand GetTextCommand(string sql, SqliteTransaction? transaction = null) { + var cmd = connection.CreateCommand(); + cmd.CommandType = System.Data.CommandType.Text; + cmd.CommandText = sql; + if (transaction != null) cmd.Transaction = transaction; + + return cmd; + } + } +} diff --git a/src/Sqlite/src/Eventuous.Sqlite/Projections/SqliteConnectionOptions.cs b/src/Sqlite/src/Eventuous.Sqlite/Projections/SqliteConnectionOptions.cs new file mode 100644 index 000000000..c6df46e23 --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Projections/SqliteConnectionOptions.cs @@ -0,0 +1,6 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.Sqlite.Projections; + +public record SqliteConnectionOptions(string ConnectionString, string Schema); diff --git a/src/Sqlite/src/Eventuous.Sqlite/Projections/SqliteProjector.cs b/src/Sqlite/src/Eventuous.Sqlite/Projections/SqliteProjector.cs new file mode 100644 index 000000000..aefc35430 --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Projections/SqliteProjector.cs @@ -0,0 +1,55 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Subscriptions.Context; +using EventHandler = Eventuous.Subscriptions.EventHandler; + +namespace Eventuous.Sqlite.Projections; + +/// +/// Base class for projectors that store read models in SQLite. +/// +public abstract class SqliteProjector(SqliteConnectionOptions options, ITypeMapper? mapper = null) : EventHandler(mapper) { + readonly string _connectionString = Ensure.NotEmptyString(options.ConnectionString); + + /// + /// Define how an event is converted to a SQLite command to update the read model using event data. + /// + /// Function to synchronously create a SQLite command from the event context. + /// + protected void On(ProjectToSqlite handler) where T : class { + base.On(async ctx => await Handle(ctx, GetCommand).NoContext()); + + return; + + ValueTask GetCommand(SqliteConnection connection, MessageConsumeContext context) => new(handler(connection, context)); + } + + /// + /// Define how an event is converted to a SQLite command to update the read model using event data. + /// + /// Function to asynchronously create a SQLite command from the event context. + /// + protected void On(ProjectToSqliteAsync handler) where T : class + => base.On(async ctx => await Handle(ctx, handler).NoContext()); + + async Task Handle(MessageConsumeContext context, ProjectToSqliteAsync handler) where T : class { + await using var connection = await ConnectionFactory.GetConnection(_connectionString, context.CancellationToken); + + var cmd = await handler(connection, context).ConfigureAwait(false); + await cmd.ExecuteNonQueryAsync(context.CancellationToken).ConfigureAwait(false); + } + + protected static SqliteCommand Project(SqliteConnection connection, string commandText, params SqliteParameter[] parameters) { + var cmd = connection.CreateCommand(); + cmd.CommandText = commandText; + cmd.Parameters.AddRange(parameters); + cmd.CommandType = System.Data.CommandType.Text; + + return cmd; + } +} + +public delegate SqliteCommand ProjectToSqlite(SqliteConnection connection, MessageConsumeContext consumeContext) where T : class; + +public delegate ValueTask ProjectToSqliteAsync(SqliteConnection connection, MessageConsumeContext consumeContext) where T : class; diff --git a/src/Sqlite/src/Eventuous.Sqlite/Schema.cs b/src/Sqlite/src/Eventuous.Sqlite/Schema.cs new file mode 100644 index 000000000..a27eed207 --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Schema.cs @@ -0,0 +1,58 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Reflection; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Sqlite; + +public class Schema(string schema = Schema.DefaultSchema) { + public const string DefaultSchema = "eventuous"; + + public readonly string StreamsTable = $"{schema}_streams"; + public readonly string MessagesTable = $"{schema}_messages"; + public readonly string CheckpointsTable = $"{schema}_checkpoints"; + + public readonly string StreamExists = $"SELECT EXISTS(SELECT 1 FROM {schema}_streams WHERE stream_name = @name)"; + public readonly string GetCheckpointSql = $"SELECT position FROM {schema}_checkpoints WHERE id = @checkpointId"; + public readonly string AddCheckpointSql = $"INSERT INTO {schema}_checkpoints (id) VALUES (@checkpointId)"; + public readonly string UpdateCheckpointSql = $"UPDATE {schema}_checkpoints SET position = @position WHERE id = @checkpointId"; + + static readonly Assembly Assembly = typeof(Schema).Assembly; + + public string SchemaName => schema; + + [PublicAPI] + public async Task CreateSchema(string connectionString, ILogger? log, CancellationToken cancellationToken) { + log?.LogInformation("Creating schema {Schema}", schema); + + var names = Assembly.GetManifestResourceNames() + .Where(x => x.EndsWith(".sql")) + .OrderBy(x => x); + + await using var connection = await ConnectionFactory.GetConnection(connectionString, cancellationToken).NoContext(); + await using var transaction = (SqliteTransaction)await connection.BeginTransactionAsync(cancellationToken).NoContext(); + + try { + foreach (var name in names) { + log?.LogInformation("Executing {Script}", name); + await using var stream = Assembly.GetManifestResourceStream(name); + using var reader = new StreamReader(stream!); + var script = await reader.ReadToEndAsync(cancellationToken).NoContext(); + var cmdScript = script.Replace("__schema__", schema); + + await using var cmd = new SqliteCommand(cmdScript, connection, transaction); + await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } + + await transaction.CommitAsync(cancellationToken).NoContext(); + } catch (Exception e) { + log?.LogCritical(e, "Unable to initialize the database schema"); + await transaction.RollbackAsync(cancellationToken); + + throw; + } + + log?.LogInformation("Database schema initialized"); + } +} diff --git a/src/Sqlite/src/Eventuous.Sqlite/SchemaInitializer.cs b/src/Sqlite/src/Eventuous.Sqlite/SchemaInitializer.cs new file mode 100644 index 000000000..042b0148a --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/SchemaInitializer.cs @@ -0,0 +1,37 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Sqlite; + +public class SchemaInitializer(SqliteStoreOptions options, ILoggerFactory? loggerFactory = null) : IHostedService { + readonly ILogger? _log = loggerFactory?.CreateLogger(); + + public async Task StartAsync(CancellationToken cancellationToken) { + if (!options.InitializeDatabase) return; + + var schema = new Schema(options.Schema); + var connectionString = Ensure.NotEmptyString(options.ConnectionString); + + Exception? ex = null; + + for (var i = 0; i < 10; i++) { + try { + await schema.CreateSchema(connectionString, _log, cancellationToken); + + return; + } catch (SqliteException e) { + _log?.LogError("Unable to initialize the database schema: {Message}", e.Message); + ex = e; + } + + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } + + throw ex!; + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; +} diff --git a/src/Sqlite/src/Eventuous.Sqlite/Scripts/1_Schema.sql b/src/Sqlite/src/Eventuous.Sqlite/Scripts/1_Schema.sql new file mode 100644 index 000000000..8cbb426ca --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Scripts/1_Schema.sql @@ -0,0 +1,26 @@ +CREATE TABLE IF NOT EXISTS __schema___streams ( + stream_id INTEGER PRIMARY KEY AUTOINCREMENT, + stream_name TEXT NOT NULL UNIQUE, + version INTEGER NOT NULL DEFAULT(-1) +); + +CREATE TABLE IF NOT EXISTS __schema___messages ( + global_position INTEGER PRIMARY KEY AUTOINCREMENT, + message_id TEXT NOT NULL, + message_type TEXT NOT NULL, + stream_id INTEGER NOT NULL REFERENCES __schema___streams(stream_id), + stream_position INTEGER NOT NULL, + json_data TEXT NOT NULL, + json_metadata TEXT, + created TEXT NOT NULL, + UNIQUE(stream_id, stream_position), + UNIQUE(stream_id, message_id), + CHECK(stream_position >= 0) +); + +CREATE INDEX IF NOT EXISTS idx___schema___messages_stream_id ON __schema___messages(stream_id); + +CREATE TABLE IF NOT EXISTS __schema___checkpoints ( + id TEXT PRIMARY KEY, + position INTEGER NULL +); diff --git a/src/Sqlite/src/Eventuous.Sqlite/SqliteStore.cs b/src/Sqlite/src/Eventuous.Sqlite/SqliteStore.cs new file mode 100644 index 000000000..9048e294f --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/SqliteStore.cs @@ -0,0 +1,215 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Data.Common; +using System.Text; +using Eventuous.Diagnostics; +using Eventuous.Sql.Base; +using Eventuous.Sqlite.Extensions; + +namespace Eventuous.Sqlite; + +public record SqliteStoreOptions { + public string ConnectionString { get; init; } = "Data Source=eventuous.db"; + public string Schema { get; init; } = Sqlite.Schema.DefaultSchema; + public bool InitializeDatabase { get; init; } +} + +public class SqliteStore : SqlEventStoreBase { + readonly GetSqliteConnection _getConnection; + + public Schema Schema { get; } + + public SqliteStore(SqliteStoreOptions options, IEventSerializer? serializer = null, IMetadataSerializer? metaSerializer = null) + : base(serializer, metaSerializer) { + var connectionString = Ensure.NotEmptyString(options.ConnectionString); + _getConnection = ct => ConnectionFactory.GetConnection(connectionString, ct); + Schema = new(options.Schema); + } + + protected override async ValueTask OpenConnection(CancellationToken cancellationToken) + => await _getConnection(cancellationToken).NoContext(); + + protected override DbCommand GetReadCommand(SqliteConnection connection, StreamName stream, StreamReadPosition start, int count) + => connection + .GetTextCommand( + $""" + SELECT m.message_id, m.message_type, m.stream_position, m.global_position, + m.json_data, m.json_metadata, m.created + FROM {Schema.MessagesTable} m + INNER JOIN {Schema.StreamsTable} s ON m.stream_id = s.stream_id + WHERE s.stream_name = @stream_name AND m.stream_position >= @from_position + ORDER BY m.stream_position + LIMIT @count + """ + ) + .Add("@stream_name", stream.ToString()) + .Add("@from_position", start.Value) + .Add("@count", count); + + protected override DbCommand GetReadBackwardsCommand(SqliteConnection connection, StreamName stream, StreamReadPosition start, int count) + => connection + .GetTextCommand( + $""" + SELECT m.message_id, m.message_type, m.stream_position, m.global_position, + m.json_data, m.json_metadata, m.created + FROM {Schema.MessagesTable} m + INNER JOIN {Schema.StreamsTable} s ON m.stream_id = s.stream_id + WHERE s.stream_name = @stream_name + AND m.stream_position <= MIN(@from_position, s.version) + ORDER BY m.stream_position DESC + LIMIT @count + """ + ) + .Add("@stream_name", stream.ToString()) + .Add("@from_position", start.Value) + .Add("@count", count); + + protected override DbCommand GetAppendCommand( + SqliteConnection connection, + SqliteTransaction transaction, + StreamName stream, + ExpectedStreamVersion expectedVersion, + NewPersistedEvent[] events + ) + => throw new NotSupportedException("SQLite does not use GetAppendCommand. AppendEvents is overridden directly."); + + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + public override async Task AppendEvents( + StreamName stream, + ExpectedStreamVersion expectedVersion, + IReadOnlyCollection events, + CancellationToken cancellationToken + ) { + var persistedEvents = events.Where(x => x.Payload != null).Select(Convert).ToArray(); + + if (persistedEvents.Length == 0) return AppendEventsResult.NoOp; + + await using var connection = await OpenConnection(cancellationToken).NoContext(); + await using var transaction = (SqliteTransaction)await connection.BeginTransactionAsync(cancellationToken).NoContext(); + + try { + // Ensure stream exists (idempotent insert) + await using (var insertStreamCmd = connection.GetTextCommand( + $"INSERT OR IGNORE INTO {Schema.StreamsTable} (stream_name, version) VALUES (@name, -1)", transaction + )) { + insertStreamCmd.Parameters.AddWithValue("@name", stream.ToString()); + await insertStreamCmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } + + // Get current stream state + int streamId; + int currentVersion; + + await using (var selectCmd = connection.GetTextCommand( + $"SELECT stream_id, version FROM {Schema.StreamsTable} WHERE stream_name = @name", transaction + )) { + selectCmd.Parameters.AddWithValue("@name", stream.ToString()); + await using var reader = await selectCmd.ExecuteReaderAsync(cancellationToken).NoContext(); + await reader.ReadAsync(cancellationToken).NoContext(); + streamId = reader.GetInt32(0); + currentVersion = reader.GetInt32(1); + } + + // Validate expected version + if (expectedVersion != ExpectedStreamVersion.Any && currentVersion != expectedVersion.Value) { + throw new AppendToStreamException( + stream, + new InvalidOperationException( + $"WrongExpectedVersion {expectedVersion.Value}, current version {currentVersion}" + ) + ); + } + + // Insert events + var now = DateTime.UtcNow.ToString("o"); + long lastGlobalPosition = 0; + + for (var i = 0; i < persistedEvents.Length; i++) { + var evt = persistedEvents[i]; + var streamPosition = currentVersion + i + 1; + + await using var insertCmd = connection.GetTextCommand( + $""" + INSERT INTO {Schema.MessagesTable} + (message_id, message_type, stream_id, stream_position, json_data, json_metadata, created) + VALUES (@message_id, @message_type, @stream_id, @stream_position, @json_data, @json_metadata, @created) + RETURNING global_position + """, + transaction + ); + + insertCmd.Parameters.AddWithValue("@message_id", evt.MessageId.ToString()); + insertCmd.Parameters.AddWithValue("@message_type", evt.MessageType); + insertCmd.Parameters.AddWithValue("@stream_id", streamId); + insertCmd.Parameters.AddWithValue("@stream_position", streamPosition); + insertCmd.Parameters.AddWithValue("@json_data", evt.JsonData); + insertCmd.Parameters.AddWithValue("@json_metadata", (object?)evt.JsonMetadata ?? DBNull.Value); + insertCmd.Parameters.AddWithValue("@created", now); + + var result = await insertCmd.ExecuteScalarAsync(cancellationToken).NoContext(); + lastGlobalPosition = System.Convert.ToInt64(result); + } + + // Update stream version + var newVersion = currentVersion + persistedEvents.Length; + + await using (var updateCmd = connection.GetTextCommand( + $"UPDATE {Schema.StreamsTable} SET version = @version WHERE stream_id = @id", transaction + )) { + updateCmd.Parameters.AddWithValue("@version", newVersion); + updateCmd.Parameters.AddWithValue("@id", streamId); + await updateCmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + } + + await transaction.CommitAsync(cancellationToken).NoContext(); + + return new AppendEventsResult((ulong)lastGlobalPosition, newVersion); + } catch (AppendToStreamException) { + await transaction.RollbackAsync(cancellationToken).NoContext(); + + throw; + } catch (Exception e) { + await transaction.RollbackAsync(cancellationToken).NoContext(); + PersistenceEventSource.Log.UnableToAppendEvents(stream, e); + + throw IsConflict(e) ? new AppendToStreamException(stream, e) : e; + } + + [RequiresUnreferencedCode("Calls Eventuous.IEventSerializer.SerializeEvent(Object)")] + [RequiresDynamicCode("Calls Eventuous.IEventSerializer.SerializeEvent(Object)")] + NewPersistedEvent Convert(NewStreamEvent evt) { + var data = Serializer.SerializeEvent(evt.Payload!); + var meta = MetaSerializer.Serialize(evt.Metadata); + + return new(evt.Id, data.EventType, AsString(data.Payload), AsString(meta)); + } + + string AsString(ReadOnlySpan bytes) => Encoding.UTF8.GetString(bytes); + } + + protected override bool IsStreamNotFound(Exception exception) => false; + + protected override bool IsConflict(Exception exception) => exception is SqliteException { SqliteErrorCode: 19 }; + + protected override DbCommand GetStreamExistsCommand(SqliteConnection connection, StreamName stream) + => connection.GetTextCommand(Schema.StreamExists).Add("@name", stream.ToString()); + + protected override DbCommand GetTruncateCommand( + SqliteConnection connection, + StreamName stream, + ExpectedStreamVersion expectedVersion, + StreamTruncatePosition position + ) + => connection + .GetTextCommand( + $""" + DELETE FROM {Schema.MessagesTable} + WHERE stream_id = (SELECT stream_id FROM {Schema.StreamsTable} WHERE stream_name = @stream_name) + AND stream_position < @position + """ + ) + .Add("@stream_name", stream.ToString()) + .Add("@position", position.Value); +} diff --git a/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteAllStreamSubscription.cs b/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteAllStreamSubscription.cs new file mode 100644 index 000000000..5078511fd --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteAllStreamSubscription.cs @@ -0,0 +1,52 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Sqlite.Projections; +using Eventuous.Subscriptions; +using Eventuous.Subscriptions.Checkpoints; +using Eventuous.Subscriptions.Filters; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Sqlite.Subscriptions; + +using Extensions; + +/// +/// Subscription for all events in the system using SQLite event store. +/// +public class SqliteAllStreamSubscription( + SqliteAllStreamSubscriptionOptions options, + ICheckpointStore checkpointStore, + ConsumePipe consumePipe, + ILoggerFactory? loggerFactory = null, + IEventSerializer? eventSerializer = null, + IMetadataSerializer? metaSerializer = null, + SqliteConnectionOptions? connectionOptions = null + ) + : SqliteSubscriptionBase( + options, + checkpointStore, + consumePipe, + SubscriptionKind.All, + loggerFactory, + eventSerializer, + metaSerializer, + connectionOptions + ) { + protected override SqliteCommand PrepareCommand(SqliteConnection connection, long start) + => connection.GetTextCommand( + $""" + SELECT m.message_id, m.message_type, m.stream_position, m.global_position, + m.json_data, m.json_metadata, m.created, s.stream_name + FROM {Schema.MessagesTable} m + INNER JOIN {Schema.StreamsTable} s ON m.stream_id = s.stream_id + WHERE m.global_position >= @from_position + ORDER BY m.global_position + LIMIT @count + """ + ) + .Add("@from_position", start + 1) + .Add("@count", Options.MaxPageSize); +} + +public record SqliteAllStreamSubscriptionOptions : SqliteSubscriptionBaseOptions; diff --git a/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteCheckpointStore.cs b/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteCheckpointStore.cs new file mode 100644 index 000000000..85a80a14d --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteCheckpointStore.cs @@ -0,0 +1,95 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Subscriptions.Checkpoints; +using Eventuous.Subscriptions.Logging; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Sqlite.Subscriptions; + +using Extensions; + +/// +/// Checkpoint store for SQLite, which stores checkpoints in a table. +/// Use it when you create read models in SQLite too. +/// +public class SqliteCheckpointStore : ICheckpointStore { + readonly ILoggerFactory? _loggerFactory; + readonly string _getCheckpointSql; + readonly string _addCheckpointSql; + readonly string _storeCheckpointSql; + readonly string _connectionString; + + public SqliteCheckpointStore(string connectionString, string schemaName, ILoggerFactory? loggerFactory = null) { + _loggerFactory = loggerFactory; + _connectionString = Ensure.NotEmptyString(connectionString); + var schema = new Schema(schemaName); + _getCheckpointSql = schema.GetCheckpointSql; + _addCheckpointSql = schema.AddCheckpointSql; + _storeCheckpointSql = schema.UpdateCheckpointSql; + } + + public SqliteCheckpointStore(SqliteCheckpointStoreOptions options, ILoggerFactory? loggerFactory) + : this(options.ConnectionString!, options is { Schema: not null } ? options.Schema : Schema.DefaultSchema, loggerFactory) { } + + /// + public async ValueTask GetLastCheckpoint(string checkpointId, CancellationToken cancellationToken) { + Logger.ConfigureIfNull(checkpointId, _loggerFactory); + await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext(); + + Checkpoint checkpoint; + + await using (var cmd = GetCheckpointCommand(connection, _getCheckpointSql, checkpointId)) { + await using var reader = await cmd.ExecuteReaderAsync(cancellationToken).NoContext(); + + if (await reader.ReadAsync(cancellationToken).NoContext()) { + var hasPosition = !reader.IsDBNull(0); + checkpoint = hasPosition ? new(checkpointId, (ulong?)reader.GetInt64(0)) : Checkpoint.Empty(checkpointId); + Logger.Current.CheckpointLoaded(this, checkpoint); + + return checkpoint; + } + } + + await using var add = GetCheckpointCommand(connection, _addCheckpointSql, checkpointId); + await add.ExecuteNonQueryAsync(cancellationToken).NoContext(); + checkpoint = Checkpoint.Empty(checkpointId); + Logger.Current.CheckpointLoaded(this, checkpoint); + + return checkpoint; + } + + /// + public async ValueTask StoreCheckpoint(Checkpoint checkpoint, bool force, CancellationToken cancellationToken) { + if (checkpoint.Position == null) return checkpoint; + + await using var connection = await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext(); + + await using var cmd = GetCheckpointCommand(connection, _storeCheckpointSql, checkpoint.Id) + .Add("@position", (long)checkpoint.Position); + + await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + Logger.Current.CheckpointStored(this, checkpoint, force); + + return checkpoint; + } + + static SqliteCommand GetCheckpointCommand(SqliteConnection connection, string sql, string checkpointId) + => connection.GetTextCommand(sql).Add("@checkpointId", checkpointId); +} + +/// +/// SQLite checkpoint store options. +/// +[PublicAPI] +public record SqliteCheckpointStoreOptions { + /// + /// Name of schema to use + /// + public string? Schema { get; init; } + + /// + /// SQLite connection string + /// + public string? ConnectionString { get; init; } +} diff --git a/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteStreamSubscription.cs b/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteStreamSubscription.cs new file mode 100644 index 000000000..6644a9fd5 --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteStreamSubscription.cs @@ -0,0 +1,76 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Sqlite.Projections; +using Eventuous.Subscriptions; +using Eventuous.Subscriptions.Checkpoints; +using Eventuous.Subscriptions.Filters; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Sqlite.Subscriptions; + +using Extensions; + +/// +/// Subscription for events in a single stream in the SQLite event store. +/// +public class SqliteStreamSubscription( + SqliteStreamSubscriptionOptions options, + ICheckpointStore checkpointStore, + ConsumePipe consumePipe, + ILoggerFactory? loggerFactory = null, + IEventSerializer? eventSerializer = null, + IMetadataSerializer? metaSerializer = null, + SqliteConnectionOptions? connectionOptions = null + ) + : SqliteSubscriptionBase( + options, + checkpointStore, + consumePipe, + SubscriptionKind.Stream, + loggerFactory, + eventSerializer, + metaSerializer, + connectionOptions + ) { + protected override SqliteCommand PrepareCommand(SqliteConnection connection, long start) + => connection.GetTextCommand( + $""" + SELECT m.message_id, m.message_type, m.stream_position, m.global_position, + m.json_data, m.json_metadata, m.created, @stream_name AS stream_name + FROM {Schema.MessagesTable} m + WHERE m.stream_id = @stream_id AND m.stream_position >= @from_position + ORDER BY m.global_position + LIMIT @count + """ + ) + .Add("@stream_id", _streamId) + .Add("@stream_name", _streamName) + .Add("@from_position", (int)start + 1) + .Add("@count", Options.MaxPageSize); + + protected override async Task BeforeSubscribe(CancellationToken cancellationToken) { + await using var connection = await OpenConnection(cancellationToken).NoContext(); + + await using var ensureCmd = connection.GetTextCommand( + $"INSERT OR IGNORE INTO {Schema.StreamsTable} (stream_name, version) VALUES (@stream_name, -1)" + ).Add("@stream_name", Options.Stream.ToString()); + + await ensureCmd.ExecuteNonQueryAsync(cancellationToken).NoContext(); + + await using var selectCmd = connection.GetTextCommand( + $"SELECT stream_id FROM {Schema.StreamsTable} WHERE stream_name = @stream_name" + ).Add("@stream_name", Options.Stream.ToString()); + + var result = await selectCmd.ExecuteScalarAsync(cancellationToken).NoContext(); + _streamId = Convert.ToInt32(result); + } + + int _streamId; + + readonly string _streamName = options.Stream.ToString(); +} + +public record SqliteStreamSubscriptionOptions : SqliteSubscriptionBaseOptions { + public StreamName Stream { get; set; } +} diff --git a/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteSubscriptionBase.cs b/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteSubscriptionBase.cs new file mode 100644 index 000000000..31b4b7c5b --- /dev/null +++ b/src/Sqlite/src/Eventuous.Sqlite/Subscriptions/SqliteSubscriptionBase.cs @@ -0,0 +1,53 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Sql.Base.Subscriptions; +using Eventuous.Sqlite.Projections; +using Eventuous.Subscriptions; +using Eventuous.Subscriptions.Checkpoints; +using Eventuous.Subscriptions.Filters; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Sqlite.Subscriptions; + +public abstract class SqliteSubscriptionBase : SqlSubscriptionBase where T : SqliteSubscriptionBaseOptions { + protected Schema Schema { get; } + readonly string _connectionString; + + protected SqliteSubscriptionBase( + T options, + ICheckpointStore checkpointStore, + ConsumePipe consumePipe, + SubscriptionKind kind, + ILoggerFactory? loggerFactory, + IEventSerializer? eventSerializer, + IMetadataSerializer? metaSerializer, + SqliteConnectionOptions? connectionOptions + ) : base(options, checkpointStore, consumePipe, options.ConcurrencyLimit, kind, loggerFactory, eventSerializer, metaSerializer) { + Schema = new( + connectionOptions?.Schema is not null and not Schema.DefaultSchema + ? connectionOptions.Schema + : options.Schema + ); + var connectionString = connectionOptions?.ConnectionString ?? options.ConnectionString; + _connectionString = Ensure.NotEmptyString(connectionString); + GetEndOfStream = $"SELECT MAX(stream_position) FROM {Schema.MessagesTable}"; + GetEndOfAll = $"SELECT MAX(global_position) FROM {Schema.MessagesTable}"; + } + + protected override async ValueTask OpenConnection(CancellationToken cancellationToken) + => await ConnectionFactory.GetConnection(_connectionString, cancellationToken).NoContext(); + + protected override bool IsTransient(Exception exception) => false; + + protected override bool IsStopping(Exception exception) + => exception is OperationCanceledException; + + protected override string GetEndOfStream { get; } + protected override string GetEndOfAll { get; } +} + +public abstract record SqliteSubscriptionBaseOptions : SqlSubscriptionOptionsBase { + protected SqliteSubscriptionBaseOptions() => Schema = Sqlite.Schema.DefaultSchema; + public string? ConnectionString { get; set; } +} diff --git a/src/Sqlite/test/Eventuous.Tests.Sqlite/Eventuous.Tests.Sqlite.csproj b/src/Sqlite/test/Eventuous.Tests.Sqlite/Eventuous.Tests.Sqlite.csproj new file mode 100644 index 000000000..7b1670dd0 --- /dev/null +++ b/src/Sqlite/test/Eventuous.Tests.Sqlite/Eventuous.Tests.Sqlite.csproj @@ -0,0 +1,24 @@ + + + true + true + Exe + CA1822 + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + diff --git a/src/Sqlite/test/Eventuous.Tests.Sqlite/Fixtures/SqliteStoreFixtureBase.cs b/src/Sqlite/test/Eventuous.Tests.Sqlite/Fixtures/SqliteStoreFixtureBase.cs new file mode 100644 index 000000000..e1ecc9746 --- /dev/null +++ b/src/Sqlite/test/Eventuous.Tests.Sqlite/Fixtures/SqliteStoreFixtureBase.cs @@ -0,0 +1,83 @@ +using System.Text.RegularExpressions; +using Bogus; +using Eventuous.TestHelpers; +using Eventuous.TestHelpers.TUnit.Logging; +using Eventuous.Tests.Persistence.Base.Fixtures; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Eventuous.Tests.Sqlite.Fixtures; + +public abstract partial class SqliteStoreFixtureBase(LogLevel logLevel = LogLevel.Information) + : StoreFixtureBase, IStartableFixture { + string _dbPath = null!; + + public string ConnectionString { get; private set; } = null!; + + public virtual async Task InitializeAsync() { + _dbPath = Path.Combine(Path.GetTempPath(), $"eventuous_test_{Guid.NewGuid():N}.db"); + ConnectionString = $"Data Source={_dbPath}"; + + var services = new ServiceCollection(); + + Serializer = new DefaultEventSerializer(TestPrimitives.DefaultOptions, TypeMapper); + services.AddSingleton(Serializer); + services.AddSingleton(TypeMapper); + services.AddLogging(b => b.ForTests(logLevel).SetMinimumLevel(logLevel)); + SetupServices(services); + + Provider = services.BuildServiceProvider(); + EventStore = Provider.GetRequiredService(); + GetDependencies(Provider); + + if (AutoStart) { + await Start(); + } + } + + protected async Task Start() { + var inits = Provider.GetServices(); + + foreach (var hostedService in inits) { + await hostedService.StartAsync(CancellationToken.None); + } + } + + public virtual async ValueTask DisposeAsync() { + if (_disposed) return; + + _disposed = true; + var inits = Provider.GetServices(); + + foreach (var hostedService in inits) { + await hostedService.StopAsync(CancellationToken.None); + } + + await Provider.DisposeAsync(); + + // Clean up temp database files + TryDeleteFile(_dbPath); + TryDeleteFile(_dbPath + "-wal"); + TryDeleteFile(_dbPath + "-shm"); + + GC.SuppressFinalize(this); + } + + static void TryDeleteFile(string path) { + try { if (File.Exists(path)) File.Delete(path); } catch { /* best effort */ } + } + + protected abstract void SetupServices(IServiceCollection services); + + protected virtual void GetDependencies(IServiceProvider provider) { } + + public IEventSerializer Serializer { get; private set; } = null!; + + bool _disposed; + + protected static string GetSchemaName() => NormaliseRegex().Replace(new Faker().Internet.UserName(), "").ToLower(); + + [GeneratedRegex(@"[\.\-\s]")] + private static partial Regex NormaliseRegex(); +} diff --git a/src/Sqlite/test/Eventuous.Tests.Sqlite/Limiter.cs b/src/Sqlite/test/Eventuous.Tests.Sqlite/Limiter.cs new file mode 100644 index 000000000..f62e7b6f0 --- /dev/null +++ b/src/Sqlite/test/Eventuous.Tests.Sqlite/Limiter.cs @@ -0,0 +1,10 @@ +using Eventuous.Tests.Sqlite; +using TUnit.Core.Interfaces; + +[assembly: ParallelLimiter] + +namespace Eventuous.Tests.Sqlite; + +public class Limiter : IParallelLimit { + public int Limit => 2; +} diff --git a/src/Sqlite/test/Eventuous.Tests.Sqlite/Registrations/RegistrationTests.cs b/src/Sqlite/test/Eventuous.Tests.Sqlite/Registrations/RegistrationTests.cs new file mode 100644 index 000000000..29e43bb37 --- /dev/null +++ b/src/Sqlite/test/Eventuous.Tests.Sqlite/Registrations/RegistrationTests.cs @@ -0,0 +1,57 @@ +using Eventuous.Diagnostics.Tracing; +using Eventuous.Sqlite; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.TestHost; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; + +namespace Eventuous.Tests.Sqlite.Registrations; + +public class RegistrationTests { + const string ConnectionString = "Data Source=:memory:"; + + [Test] + public void Should_resolve_store_with_manual_registration() { + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureServices(services => { + services.AddEventStore(); + services.AddSingleton(new SqliteStoreOptions { ConnectionString = ConnectionString }); + } + ) + ) + .Build(); + var store = host.Services.GetRequiredService(); + store.ShouldBeOfType(); + var innerStore = ((TracedEventStore)store).Inner; + innerStore.ShouldBeOfType(); + } + + [Test] + public void Should_resolve_store_with_extensions() { + var config = new Dictionary { + ["sqlite:schema"] = "test", + ["sqlite:connectionString"] = ConnectionString + }; + + using var host = new HostBuilder() + .ConfigureWebHost(webHostBuilder => webHostBuilder + .UseTestServer() + .ConfigureAppConfiguration(cfg => cfg.AddInMemoryCollection(config)) + .ConfigureServices((ctx, services) => { + services.AddEventStore(); + services.AddEventuousSqlite(ctx.Configuration.GetSection("sqlite")); + } + ) + ) + .Build(); + var store = host.Services.GetService(); + store.ShouldNotBeNull(); + var inner = ((store as TracedEventStore)!).Inner as SqliteStore; + inner.ShouldNotBeNull(); + inner.Schema.SchemaName.ShouldBe("test"); + } +} diff --git a/src/Sqlite/test/Eventuous.Tests.Sqlite/Store/StoreFixture.cs b/src/Sqlite/test/Eventuous.Tests.Sqlite/Store/StoreFixture.cs new file mode 100644 index 000000000..74330f98b --- /dev/null +++ b/src/Sqlite/test/Eventuous.Tests.Sqlite/Store/StoreFixture.cs @@ -0,0 +1,14 @@ +using Eventuous.Sqlite; +using Eventuous.Tests.Sqlite.Fixtures; +using Microsoft.Extensions.DependencyInjection; + +namespace Eventuous.Tests.Sqlite.Store; + +public sealed class StoreFixture() : SqliteStoreFixtureBase(LogLevel.Information) { + readonly string _schemaName = GetSchemaName(); + + protected override void SetupServices(IServiceCollection services) { + services.AddEventuousSqlite(ConnectionString, _schemaName, true); + services.AddEventStore(); + } +} diff --git a/src/Sqlite/test/Eventuous.Tests.Sqlite/Store/StoreTests.cs b/src/Sqlite/test/Eventuous.Tests.Sqlite/Store/StoreTests.cs new file mode 100644 index 000000000..5d1a8d83c --- /dev/null +++ b/src/Sqlite/test/Eventuous.Tests.Sqlite/Store/StoreTests.cs @@ -0,0 +1,17 @@ +using Eventuous.Tests.Persistence.Base.Store; + +// ReSharper disable UnusedType.Global + +namespace Eventuous.Tests.Sqlite.Store; + +[InheritsTests] +[ClassDataSource] +public class Append(StoreFixture fixture) : StoreAppendTests(fixture); + +[InheritsTests] +[ClassDataSource] +public class Read(StoreFixture fixture) : StoreReadTests(fixture); + +[InheritsTests] +[ClassDataSource] +public class OtherMethods(StoreFixture fixture) : StoreOtherOpsTests(fixture); diff --git a/src/Sqlite/test/Eventuous.Tests.Sqlite/Subscriptions/SubscribeTests.cs b/src/Sqlite/test/Eventuous.Tests.Sqlite/Subscriptions/SubscribeTests.cs new file mode 100644 index 000000000..ccc9b10c2 --- /dev/null +++ b/src/Sqlite/test/Eventuous.Tests.Sqlite/Subscriptions/SubscribeTests.cs @@ -0,0 +1,114 @@ +using Eventuous.Sqlite.Subscriptions; +using Eventuous.Sut.App; +using Eventuous.Sut.Domain; +using Eventuous.Tests.Persistence.Base.Fixtures; +using Eventuous.Tests.Subscriptions.Base; +using static Eventuous.Sut.App.Commands; +using static Eventuous.Sut.Domain.BookingEvents; + +// ReSharper disable UnusedType.Global + +namespace Eventuous.Tests.Sqlite.Subscriptions; + +[NotInParallel] +public class SubscribeToAll() : SubscriptionTestBase(Fixture) { + static readonly SubscriptionFixture Fixture + = new(_ => { }, false); + + [Test] + public async Task Sqlite_ShouldConsumeProducedEvents(CancellationToken cancellationToken) { + const int count = 10; + + var commands = await GenerateAndHandleCommands(count); + var testEvents = commands.Select(ToEvent).ToList(); + + await Fixture.StartSubscription(); + await Fixture.Handler.AssertCollection(TimeSpan.FromSeconds(5), [..testEvents]).Validate(cancellationToken); + await Fixture.StopSubscription(); + await Assert.That(Fixture.Handler.Count).IsEqualTo(10); + } + + [Test] + public async Task Sqlite_ShouldUseExistingCheckpoint(CancellationToken cancellationToken) { + const int count = 10; + + await GenerateAndHandleCommands(count); + + await Fixture.CheckpointStore.GetLastCheckpoint(Fixture.SubscriptionId, cancellationToken); + var last = await Fixture.GetLastPosition(); + await Fixture.CheckpointStore.StoreCheckpoint(new(Fixture.SubscriptionId, last), true, cancellationToken); + + await Fixture.StartSubscription(); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); + await Fixture.StopSubscription(); + await Assert.That(Fixture.Handler.Count).IsEqualTo(0); + } + + static BookingImported ToEvent(ImportBooking cmd) => new(cmd.RoomId, cmd.Price, cmd.CheckIn, cmd.CheckOut); + + async Task> GenerateAndHandleCommands(int count) { + var commands = Enumerable + .Range(0, count) + .Select(_ => DomainFixture.CreateImportBooking()) + .ToList(); + + var service = new BookingService(Fixture.EventStore); + + foreach (var cmd in commands) { + var result = await service.Handle(cmd, default); + result.ThrowIfError(); + } + + return commands; + } +} + +[NotInParallel] +public class SubscribeToStream() : SubscriptionTestBase(Fixture) { + static readonly StreamName StreamName = new(Guid.NewGuid().ToString()); + + static readonly SubscriptionFixture Fixture + = new(opt => opt.Stream = StreamName, false); + + [Test] + public async Task Sqlite_ShouldConsumeProducedEvents(CancellationToken cancellationToken) { + const int count = 10; + + var testEvents = await GenerateAndProduceEvents(count); + + await Fixture.StartSubscription(); + await Fixture.Handler.AssertCollection(TimeSpan.FromSeconds(5), [..testEvents]).Validate(cancellationToken); + await Fixture.StopSubscription(); + await Assert.That(Fixture.Handler.Count).IsEqualTo(10); + } + + [Test] + public async Task Sqlite_ShouldUseExistingCheckpoint(CancellationToken cancellationToken) { + const int count = 10; + + await GenerateAndProduceEvents(count); + + await Fixture.CheckpointStore.GetLastCheckpoint(Fixture.SubscriptionId, cancellationToken); + await Fixture.CheckpointStore.StoreCheckpoint(new(Fixture.SubscriptionId, 9), true, cancellationToken); + + await Fixture.StartSubscription(); + await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); + await Fixture.StopSubscription(); + await Assert.That(Fixture.Handler.Count).IsEqualTo(0); + } + + static BookingImported ToEvent(ImportBooking cmd) => new(cmd.RoomId, cmd.Price, cmd.CheckIn, cmd.CheckOut); + + async Task> GenerateAndProduceEvents(int count) { + var commands = Enumerable + .Range(0, count) + .Select(_ => DomainFixture.CreateImportBooking()) + .ToList(); + + var events = commands.Select(ToEvent).ToList(); + var streamEvents = events.Select(x => new NewStreamEvent(Guid.NewGuid(), x, new())); + await Fixture.EventStore.AppendEvents(StreamName, ExpectedStreamVersion.Any, streamEvents.ToArray(), default); + + return events; + } +} diff --git a/src/Sqlite/test/Eventuous.Tests.Sqlite/Subscriptions/SubscriptionFixture.cs b/src/Sqlite/test/Eventuous.Tests.Sqlite/Subscriptions/SubscriptionFixture.cs new file mode 100644 index 000000000..4f6f0e038 --- /dev/null +++ b/src/Sqlite/test/Eventuous.Tests.Sqlite/Subscriptions/SubscriptionFixture.cs @@ -0,0 +1,148 @@ +using System.Text.RegularExpressions; +using Bogus; +using Eventuous.Sqlite; +using Eventuous.Sqlite.Subscriptions; +using Eventuous.Subscriptions; +using Eventuous.Subscriptions.Checkpoints; +using Eventuous.Sut.Domain; +using Eventuous.TestHelpers; +using Eventuous.TestHelpers.TUnit.Logging; +using Eventuous.Tests.Persistence.Base.Fixtures; +using Eventuous.Tests.Subscriptions.Base; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using ILogger = Microsoft.Extensions.Logging.ILogger; + +namespace Eventuous.Tests.Sqlite.Subscriptions; + +public partial class SubscriptionFixture( + Action configureOptions, + bool autoStart = true, + Action? configureServices = null, + LogLevel logLevel = LogLevel.Information + ) + : IStartableFixture + where TSubscription : SqliteSubscriptionBase + where TSubscriptionOptions : SqliteSubscriptionBaseOptions + where TEventHandler : class, IEventHandler { + string _dbPath = null!; + + protected internal readonly string SchemaName = GetSchemaName(); + + public string ConnectionString { get; private set; } = null!; + public IEventStore EventStore { get; private set; } = null!; + public TypeMapper TypeMapper { get; } = new(); + public string SubscriptionId { get; } = $"test-{Guid.NewGuid():N}"; + + internal TEventHandler Handler { get; private set; } = null!; + internal ICheckpointStore CheckpointStore { get; private set; } = null!; + internal ILoggerFactory LoggerFactory { get; set; } = null!; + ILogger Log { get; set; } = null!; + IMessageSubscription Subscription { get; set; } = null!; + ServiceProvider Provider { get; set; } = null!; + + public async Task InitializeAsync() { + _dbPath = Path.Combine(Path.GetTempPath(), $"eventuous_test_{Guid.NewGuid():N}.db"); + ConnectionString = $"Data Source={_dbPath}"; + + TypeMapper.RegisterKnownEventTypes(typeof(BookingEvents.BookingImported).Assembly); + + var services = new ServiceCollection(); + var serializer = new DefaultEventSerializer(TestPrimitives.DefaultOptions, TypeMapper); + services.AddSingleton(serializer); + services.AddSingleton(TypeMapper); + services.AddLogging(b => b.ForTests(logLevel).SetMinimumLevel(logLevel)); + + services.AddEventuousSqlite(ConnectionString, SchemaName, true); + services.AddEventStore(); + services.AddSqliteCheckpointStore(); + services.AddSingleton(new TestEventHandlerOptions()); + + services.AddSubscription( + SubscriptionId, + b => { + b.AddEventHandler(); + b.Configure(opt => { + opt.Schema = SchemaName; + opt.ConnectionString = ConnectionString; + configureOptions(opt); + }); + } + ); + + // Remove the SubscriptionHostedService registration since we manage the subscription lifecycle manually + var host = services.First(x => !x.IsKeyedService && x.ImplementationFactory?.GetType() == typeof(Func)); + services.Remove(host); + + configureServices?.Invoke(services); + + Provider = services.BuildServiceProvider(); + + // Start hosted services (schema init) + var inits = Provider.GetServices(); + + foreach (var hostedService in inits) { + await hostedService.StartAsync(CancellationToken.None); + } + + Provider.AddEventuousLogs(); + EventStore = Provider.GetRequiredService(); + CheckpointStore = Provider.GetRequiredService(); + Subscription = Provider.GetRequiredService(); + Handler = Provider.GetRequiredKeyedService(SubscriptionId); + LoggerFactory = Provider.GetRequiredService(); + Log = LoggerFactory.CreateLogger(GetType()); + + if (autoStart) await StartSubscription(); + } + + internal ValueTask StartSubscription() => Subscription.SubscribeWithLog(Log); + + internal ValueTask StopSubscription() => Subscription.UnsubscribeWithLog(Log); + + public async Task GetLastPosition() { + await using var connection = await ConnectionFactory.GetConnection(ConnectionString, default); + await using var cmd = connection.CreateCommand(); + cmd.CommandText = $"SELECT MAX(global_position) FROM {SchemaName}_messages"; + var result = await cmd.ExecuteScalarAsync(); + + return (ulong)(result is DBNull or null ? 0 : (long)result); + } + + public async ValueTask DisposeAsync() { + if (_disposed) return; + + _disposed = true; + + try { await StopSubscription(); } catch { /* best effort */ } + + var inits = Provider.GetServices(); + + foreach (var hostedService in inits) { + await hostedService.StopAsync(CancellationToken.None); + } + + await Provider.DisposeAsync(); + + // Clean up temp database files + TryDeleteFile(_dbPath); + TryDeleteFile(_dbPath + "-wal"); + TryDeleteFile(_dbPath + "-shm"); + + GC.SuppressFinalize(this); + } + + static void TryDeleteFile(string path) { + try { if (File.Exists(path)) File.Delete(path); } catch { /* best effort */ } + } + + static string GetSchemaName() => NormaliseRegex().Replace(new Faker().Internet.UserName(), "").ToLower(); + + [GeneratedRegex(@"[\.\-\s]")] + private static partial Regex NormaliseRegex(); + + bool _disposed; +} + +public record SchemaInfo(string Schema);