Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
<PackageVersion Include="Npgsql.DependencyInjection" Version="$(NpgsqlVersion)" />
<PackageVersion Include="RabbitMQ.Client" Version="6.8.1" />
<PackageVersion Include="Microsoft.Data.SqlClient" Version="6.0.2" />
<PackageVersion Include="Microsoft.Data.Sqlite" Version="9.0.6" />
<PackageVersion Include="NEST" Version="7.17.5" />
<PackageVersion Include="Polly" Version="8.5.0" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
Expand Down
7 changes: 7 additions & 0 deletions Eventuous.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@
<Folder Name="/Relational/SqlServer/test/">
<Project Path="src/SqlServer/test/Eventuous.Tests.SqlServer/Eventuous.Tests.SqlServer.csproj" />
</Folder>
<Folder Name="/Relational/Sqlite/" />
<Folder Name="/Relational/Sqlite/src/">
<Project Path="src/Sqlite/src/Eventuous.Sqlite/Eventuous.Sqlite.csproj" />
</Folder>
<Folder Name="/Relational/Sqlite/test/">
<Project Path="src/Sqlite/test/Eventuous.Tests.Sqlite/Eventuous.Tests.Sqlite.csproj" />
</Folder>
<Folder Name="/Samples/" />
<Folder Name="/Samples/KurrentDB/">
<Project Path="samples/kurrentdb/Bookings.Domain/Bookings.Domain.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<InternalsVisibleTo Include="Eventuous.Testing"/>
<InternalsVisibleTo Include="Eventuous.Tests.Postgres"/>
<InternalsVisibleTo Include="Eventuous.Tests.SqlServer"/>
<InternalsVisibleTo Include="Eventuous.Tests.Sqlite"/>
</ItemGroup>
<ItemGroup>
<None Remove="Eventuous.Persistence.csproj.DotSettings"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace Eventuous.Tests.Persistence.Base.Fixtures;
public interface IStartableFixture : IAsyncInitializer, IAsyncDisposable;

public abstract class StoreFixtureBase {
public IEventStore EventStore { get; protected private set; } = null!;
public IEventStore EventStore { get; protected set; } = null!;
protected static Faker Faker { get; } = new();
protected ServiceProvider Provider { get; set; } = null!;
protected bool AutoStart { get; init; } = true;
Expand Down
16 changes: 8 additions & 8 deletions src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
/// <typeparam name="TTransaction">Database transaction type</typeparam>
public abstract class SqlEventStoreBase<TConnection, TTransaction>(IEventSerializer? serializer, IMetadataSerializer? metaSerializer) : IEventStore
where TConnection : DbConnection where TTransaction : DbTransaction {
readonly IEventSerializer _serializer = serializer ?? DefaultEventSerializer.Instance;
readonly IMetadataSerializer _metaSerializer = metaSerializer ?? DefaultMetadataSerializer.Instance;
protected IEventSerializer Serializer { get; } = serializer ?? DefaultEventSerializer.Instance;

Check warning on line 21 in src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs

View workflow job for this annotation

GitHub Actions / Build and test (9.0)

Missing XML comment for publicly visible type or member 'SqlEventStoreBase<TConnection, TTransaction>.Serializer'

Check warning on line 21 in src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs

View workflow job for this annotation

GitHub Actions / Build and test (8.0)

Missing XML comment for publicly visible type or member 'SqlEventStoreBase<TConnection, TTransaction>.Serializer'

Check warning on line 21 in src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs

View workflow job for this annotation

GitHub Actions / Build and test (10.0)

Missing XML comment for publicly visible type or member 'SqlEventStoreBase<TConnection, TTransaction>.Serializer'
protected IMetadataSerializer MetaSerializer { get; } = metaSerializer ?? DefaultMetadataSerializer.Instance;

Check warning on line 22 in src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs

View workflow job for this annotation

GitHub Actions / Build and test (9.0)

Missing XML comment for publicly visible type or member 'SqlEventStoreBase<TConnection, TTransaction>.MetaSerializer'

Check warning on line 22 in src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs

View workflow job for this annotation

GitHub Actions / Build and test (8.0)

Missing XML comment for publicly visible type or member 'SqlEventStoreBase<TConnection, TTransaction>.MetaSerializer'

Check warning on line 22 in src/Relational/src/Eventuous.Sql.Base/SqlEventStoreBase.cs

View workflow job for this annotation

GitHub Actions / Build and test (10.0)

Missing XML comment for publicly visible type or member 'SqlEventStoreBase<TConnection, TTransaction>.MetaSerializer'

const string ContentType = "application/json";

Expand Down Expand Up @@ -134,9 +134,9 @@
[RequiresDynamicCode("Calls Eventuous.IEventSerializer.DeserializeEvent(ReadOnlySpan<Byte>, String, String)")]
[RequiresUnreferencedCode("Calls Eventuous.IEventSerializer.DeserializeEvent(ReadOnlySpan<Byte>, String, String)")]
StreamEvent ToStreamEvent(PersistedEvent evt) {
var deserialized = _serializer.DeserializeEvent(Encoding.UTF8.GetBytes(evt.JsonData), evt.MessageType, ContentType);
var deserialized = Serializer.DeserializeEvent(Encoding.UTF8.GetBytes(evt.JsonData), evt.MessageType, ContentType);

var meta = evt.JsonMetadata == null ? new() : _metaSerializer.Deserialize(Encoding.UTF8.GetBytes(evt.JsonMetadata!));
var meta = evt.JsonMetadata == null ? new() : MetaSerializer.Deserialize(Encoding.UTF8.GetBytes(evt.JsonMetadata!));

return deserialized switch {
SuccessfullyDeserialized success => AsStreamEvent(success.Payload),
Expand All @@ -149,7 +149,7 @@
/// <inheritdoc />
[RequiresDynamicCode(Constants.DynamicSerializationMessage)]
[RequiresUnreferencedCode(Constants.DynamicSerializationMessage)]
public async Task<AppendEventsResult> AppendEvents(
public virtual async Task<AppendEventsResult> AppendEvents(
StreamName stream,
ExpectedStreamVersion expectedVersion,
IReadOnlyCollection<NewStreamEvent> events,
Expand Down Expand Up @@ -182,8 +182,8 @@
[RequiresUnreferencedCode("Calls Eventuous.IEventSerializer.SerializeEvent(Object)")]
[RequiresDynamicCode("Calls Eventuous.IEventSerializer.SerializeEvent(Object)")]
NewPersistedEvent Convert(NewStreamEvent evt) {
var data = _serializer.SerializeEvent(evt.Payload!);
var meta = _metaSerializer.Serialize(evt.Metadata);
var data = Serializer.SerializeEvent(evt.Payload!);
var meta = MetaSerializer.Serialize(evt.Metadata);

return new(evt.Id, data.EventType, AsString(data.Payload), AsString(meta));
}
Expand All @@ -198,7 +198,7 @@

var result = await cmd.ExecuteScalarAsync(cancellationToken).NoContext();

return (bool)result!;
return Convert.ToBoolean(result);
}

/// <inheritdoc />
Expand Down
19 changes: 19 additions & 0 deletions src/Sqlite/src/Eventuous.Sqlite/ConnectionFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous.Sqlite;

delegate Task<SqliteConnection> GetSqliteConnection(CancellationToken cancellationToken);

public static class ConnectionFactory {
public static async Task<SqliteConnection> GetConnection(string connectionString, CancellationToken cancellationToken) {
var connection = new SqliteConnection(connectionString);
await connection.OpenAsync(cancellationToken).NoContext();

await using var walCmd = connection.CreateCommand();
walCmd.CommandText = "PRAGMA journal_mode=WAL";
await walCmd.ExecuteNonQueryAsync(cancellationToken).NoContext();

return connection;
}
}
28 changes: 28 additions & 0 deletions src/Sqlite/src/Eventuous.Sqlite/Eventuous.Sqlite.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="$(CoreRoot)\Eventuous.Subscriptions\Eventuous.Subscriptions.csproj"/>
<ProjectReference Include="$(CoreRoot)\Eventuous.Persistence\Eventuous.Persistence.csproj"/>
<ProjectReference Include="$(SrcRoot)\Relational\src\Eventuous.Sql.Base\Eventuous.Sql.Base.csproj"/>
<ProjectReference Include="$(CoreRoot)\Eventuous.Producers\Eventuous.Producers.csproj"/>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Data.Sqlite"/>
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions"/>
<Using Include="Microsoft.Data.Sqlite"/>
<Using Include="Eventuous.Tools"/>
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Scripts\1_Schema.sql"/>
</ItemGroup>
<ItemGroup>
<Compile Include="$(CoreRoot)\Eventuous.Shared\Tools\TaskExtensions.cs">
<Link>Tools\TaskExtensions.cs</Link>
</Compile>
<Compile Include="$(CoreRoot)\Eventuous.Shared\Tools\Ensure.cs">
<Link>Tools\Ensure.cs</Link>
</Compile>
</ItemGroup>
<ItemGroup>
<InternalsVisibleTo Include="Eventuous.Tests.Sqlite"/>
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Sqlite;
using Eventuous.Sqlite.Projections;
using Eventuous.Sqlite.Subscriptions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

// ReSharper disable UnusedMethodReturnValue.Global
// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection;

public static class ServiceCollectionExtensions {
/// <param name="services">Service collection</param>
extension(IServiceCollection services) {
/// <summary>
/// Adds SQLite event store and the necessary schema to the DI container.
/// </summary>
/// <param name="connectionString">Connection string</param>
/// <param name="schema">Schema name</param>
/// <param name="initializeDatabase">Set to true if you want the schema to be created on startup</param>
/// <returns></returns>
public IServiceCollection AddEventuousSqlite(
string connectionString,
string schema = Schema.DefaultSchema,
bool initializeDatabase = false
) {
var options = new SqliteStoreOptions {
Schema = Ensure.NotEmptyString(schema),
ConnectionString = Ensure.NotEmptyString(connectionString),
InitializeDatabase = initializeDatabase
};
services.AddSingleton(options);
services.AddSingleton<SqliteStore>();
services.AddHostedService<SchemaInitializer>();
services.TryAddSingleton(new SqliteConnectionOptions(connectionString, schema));

return services;
}

/// <summary>
/// Adds SQLite event store and the necessary schema to the DI container using the configuration.
/// </summary>
/// <param name="config">Configuration section for SQLite options</param>
/// <returns></returns>
public IServiceCollection AddEventuousSqlite(IConfiguration config) {
services.Configure<SqliteStoreOptions>(config);
services.AddSingleton<SqliteStoreOptions>(sp => sp.GetRequiredService<IOptions<SqliteStoreOptions>>().Value);
services.AddSingleton<SqliteStore>();
services.AddHostedService<SchemaInitializer>();

services.TryAddSingleton(
sp => {
var storeOptions = sp.GetRequiredService<IOptions<SqliteStoreOptions>>().Value;

return new SqliteConnectionOptions(Ensure.NotEmptyString(storeOptions.ConnectionString), storeOptions.Schema);
}
);

return services;
}

/// <summary>
/// Registers the SQLite-based checkpoint store using the details provided when registering
/// SQLite connection factory.
/// </summary>
/// <returns></returns>
public IServiceCollection AddSqliteCheckpointStore()
=> services.AddCheckpointStore<SqliteCheckpointStore>(
sp => {
var loggerFactory = sp.GetService<ILoggerFactory>();
var connectionOptions = sp.GetService<SqliteConnectionOptions>();
var checkpointStoreOptions = sp.GetService<SqliteCheckpointStoreOptions>();

var schema = connectionOptions?.Schema is not null and not Schema.DefaultSchema
&& checkpointStoreOptions?.Schema is null or Schema.DefaultSchema
? connectionOptions.Schema
: checkpointStoreOptions?.Schema ?? Schema.DefaultSchema;
var connectionString = checkpointStoreOptions?.ConnectionString ?? connectionOptions?.ConnectionString;

return new(Ensure.NotNull(connectionString), schema, loggerFactory);
}
);
}
}
25 changes: 25 additions & 0 deletions src/Sqlite/src/Eventuous.Sqlite/Extensions/SqliteExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous.Sqlite.Extensions;

static class SqliteExtensions {
extension(SqliteCommand command) {
internal SqliteCommand Add(string parameterName, object? value) {
command.Parameters.AddWithValue(parameterName, value ?? DBNull.Value);

return command;
}
}

extension(SqliteConnection connection) {
internal SqliteCommand GetTextCommand(string sql, SqliteTransaction? transaction = null) {
var cmd = connection.CreateCommand();
cmd.CommandType = System.Data.CommandType.Text;
cmd.CommandText = sql;
if (transaction != null) cmd.Transaction = transaction;

return cmd;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

namespace Eventuous.Sqlite.Projections;

public record SqliteConnectionOptions(string ConnectionString, string Schema);
55 changes: 55 additions & 0 deletions src/Sqlite/src/Eventuous.Sqlite/Projections/SqliteProjector.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Subscriptions.Context;
using EventHandler = Eventuous.Subscriptions.EventHandler;

namespace Eventuous.Sqlite.Projections;

/// <summary>
/// Base class for projectors that store read models in SQLite.
/// </summary>
public abstract class SqliteProjector(SqliteConnectionOptions options, ITypeMapper? mapper = null) : EventHandler(mapper) {
readonly string _connectionString = Ensure.NotEmptyString(options.ConnectionString);

/// <summary>
/// Define how an event is converted to a SQLite command to update the read model using event data.
/// </summary>
/// <param name="handler">Function to synchronously create a SQLite command from the event context.</param>
/// <typeparam name="T"></typeparam>
protected void On<T>(ProjectToSqlite<T> handler) where T : class {
base.On<T>(async ctx => await Handle(ctx, GetCommand).NoContext());

return;

ValueTask<SqliteCommand> GetCommand(SqliteConnection connection, MessageConsumeContext<T> context) => new(handler(connection, context));
}

/// <summary>
/// Define how an event is converted to a SQLite command to update the read model using event data.
/// </summary>
/// <param name="handler">Function to asynchronously create a SQLite command from the event context.</param>
/// <typeparam name="T"></typeparam>
protected void On<T>(ProjectToSqliteAsync<T> handler) where T : class
=> base.On<T>(async ctx => await Handle(ctx, handler).NoContext());

async Task Handle<T>(MessageConsumeContext<T> context, ProjectToSqliteAsync<T> handler) where T : class {
await using var connection = await ConnectionFactory.GetConnection(_connectionString, context.CancellationToken);

var cmd = await handler(connection, context).ConfigureAwait(false);
await cmd.ExecuteNonQueryAsync(context.CancellationToken).ConfigureAwait(false);
}

protected static SqliteCommand Project(SqliteConnection connection, string commandText, params SqliteParameter[] parameters) {
var cmd = connection.CreateCommand();
cmd.CommandText = commandText;
cmd.Parameters.AddRange(parameters);
cmd.CommandType = System.Data.CommandType.Text;

return cmd;
}
}

public delegate SqliteCommand ProjectToSqlite<T>(SqliteConnection connection, MessageConsumeContext<T> consumeContext) where T : class;

public delegate ValueTask<SqliteCommand> ProjectToSqliteAsync<T>(SqliteConnection connection, MessageConsumeContext<T> consumeContext) where T : class;
58 changes: 58 additions & 0 deletions src/Sqlite/src/Eventuous.Sqlite/Schema.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (C) Eventuous HQ OÜ. All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Reflection;
using Microsoft.Extensions.Logging;

namespace Eventuous.Sqlite;

public class Schema(string schema = Schema.DefaultSchema) {
public const string DefaultSchema = "eventuous";

public readonly string StreamsTable = $"{schema}_streams";
public readonly string MessagesTable = $"{schema}_messages";
public readonly string CheckpointsTable = $"{schema}_checkpoints";

public readonly string StreamExists = $"SELECT EXISTS(SELECT 1 FROM {schema}_streams WHERE stream_name = @name)";
public readonly string GetCheckpointSql = $"SELECT position FROM {schema}_checkpoints WHERE id = @checkpointId";
public readonly string AddCheckpointSql = $"INSERT INTO {schema}_checkpoints (id) VALUES (@checkpointId)";
public readonly string UpdateCheckpointSql = $"UPDATE {schema}_checkpoints SET position = @position WHERE id = @checkpointId";

static readonly Assembly Assembly = typeof(Schema).Assembly;

public string SchemaName => schema;

[PublicAPI]
public async Task CreateSchema(string connectionString, ILogger<Schema>? log, CancellationToken cancellationToken) {
log?.LogInformation("Creating schema {Schema}", schema);

var names = Assembly.GetManifestResourceNames()
.Where(x => x.EndsWith(".sql"))
.OrderBy(x => x);

await using var connection = await ConnectionFactory.GetConnection(connectionString, cancellationToken).NoContext();
await using var transaction = (SqliteTransaction)await connection.BeginTransactionAsync(cancellationToken).NoContext();

try {
foreach (var name in names) {
log?.LogInformation("Executing {Script}", name);
await using var stream = Assembly.GetManifestResourceStream(name);
using var reader = new StreamReader(stream!);
var script = await reader.ReadToEndAsync(cancellationToken).NoContext();
var cmdScript = script.Replace("__schema__", schema);

await using var cmd = new SqliteCommand(cmdScript, connection, transaction);
await cmd.ExecuteNonQueryAsync(cancellationToken).NoContext();
}

await transaction.CommitAsync(cancellationToken).NoContext();
} catch (Exception e) {
log?.LogCritical(e, "Unable to initialize the database schema");
await transaction.RollbackAsync(cancellationToken);

throw;
}

log?.LogInformation("Database schema initialized");
}
}
Loading
Loading