diff --git a/src/EventSourcingTests/Bugs/Bug_2865_configuration_assertion_with_flat_table_projections.cs b/src/EventSourcingTests/Bugs/Bug_2865_configuration_assertion_with_flat_table_projections.cs new file mode 100644 index 0000000000..e767c88700 --- /dev/null +++ b/src/EventSourcingTests/Bugs/Bug_2865_configuration_assertion_with_flat_table_projections.cs @@ -0,0 +1,114 @@ +using System; +using System.Threading.Tasks; +using Marten; +using Marten.Events.Daemon.Resiliency; +using Marten.Events.Projections; +using Marten.Events.Projections.Flattened; +using Marten.Testing.Harness; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Xunit; + +namespace EventSourcingTests.Bugs; + +public class Bug_2865_configuration_assertion_with_flat_table_projections +{ + [Fact] + public async Task should_be_able_to_assert_on_existence_of_flat_table_functions() + { + var appBuilder = Host.CreateApplicationBuilder(); + + appBuilder.Logging + .SetMinimumLevel(LogLevel.Information) + .AddFilter("Marten", LogLevel.Debug); + + appBuilder.Services.AddMarten(options => + { + options.Connection(ConnectionSource.ConnectionString); + options.DatabaseSchemaName = "flat_projections"; + + options.Projections.Add(ProjectionLifecycle.Async); + }) + // Add this + .ApplyAllDatabaseChangesOnStartup() + .UseLightweightSessions() + .OptimizeArtifactWorkflow() + .AddAsyncDaemon(DaemonMode.Solo); + + var app = appBuilder.Build(); + await app.StartAsync(); + + var store = app.Services.GetRequiredService(); + +// ########## Uncomment the next line to get the error ########## + await store.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); + + await using (var session = store.LightweightSession()) + { + session.Events.StartStream(Guid.NewGuid(), + new ImportStarted(DateTimeOffset.Now.AddMinutes(-1), "foo", "cust-1", 3), + new ImportProgress("step-1", 3, 1), + new ImportFinished(DateTimeOffset.Now)); + + await session.SaveChangesAsync(); + } + + await app.StopAsync(); + } +} + +public record ImportStarted(DateTimeOffset Started, string ActivityType, string CustomerId, int PlannedSteps); +public record ImportProgress(string StepName, int Records, int Invalids); +public record ImportFinished(DateTimeOffset Finished); +public record ImportFailed; + +public class FlatImportProjection : FlatTableProjection +{ + // I'm telling Marten to use the same database schema as the events from + // the Marten configuration in this application + public FlatImportProjection() : base("import_history", SchemaNameSource.EventSchema) + { + // We need to explicitly add a primary key + Table.AddColumn("id").AsPrimaryKey(); + + TeardownDataOnRebuild = true; + + Project(map => + { + // Set values in the table from the event + map.Map(x => x.ActivityType, "activity_type").NotNull(); + map.Map(x => x.CustomerId, "customer_id"); + map.Map(x => x.PlannedSteps, "total_steps") + .DefaultValue(0); + + map.Map(x => x.Started); + + // Initial values + map.SetValue("status", "started"); + map.SetValue("step_number", 0); + map.SetValue("records", 0); + }); + + Project(map => + { + // Add 1 to this column when this event is encountered + map.Increment("step_number"); + + // Update a running sum of records progressed + // by the number of records on this event + map.Increment(x => x.Records); + + map.SetValue("status", "working"); + }); + + Project(map => + { + map.Map(x => x.Finished); + map.SetValue("status", "completed"); + }); + + // Just gonna delete the record of any failures + Delete(); + } +} diff --git a/src/Marten/Events/Projections/Flattened/CallUpsertFunctionFrame.cs b/src/Marten/Events/Projections/Flattened/CallUpsertFunctionFrame.cs index 44b9811927..16cceb6578 100644 --- a/src/Marten/Events/Projections/Flattened/CallUpsertFunctionFrame.cs +++ b/src/Marten/Events/Projections/Flattened/CallUpsertFunctionFrame.cs @@ -19,7 +19,7 @@ internal class CallUpsertFunctionFrame: MethodCall, IEventHandlingFrame public CallUpsertFunctionFrame(Type eventType, DbObjectName functionIdentifier, List columnMaps, MemberInfo[] members): base(typeof(IDocumentOperations), nameof(IDocumentOperations.QueueSqlCommand)) { - _functionIdentifier = functionIdentifier; + _functionIdentifier = functionIdentifier ?? throw new ArgumentNullException(nameof(functionIdentifier)); _columnMaps = columnMaps; _members = members; EventType = eventType; diff --git a/src/Marten/Events/Projections/Flattened/FlatTableProjection.CodeGeneration.cs b/src/Marten/Events/Projections/Flattened/FlatTableProjection.CodeGeneration.cs index 633d77c9dc..3ca28de9de 100644 --- a/src/Marten/Events/Projections/Flattened/FlatTableProjection.CodeGeneration.cs +++ b/src/Marten/Events/Projections/Flattened/FlatTableProjection.CodeGeneration.cs @@ -3,6 +3,7 @@ using System.Reflection; using JasperFx.CodeGeneration; using JasperFx.Core; +using JasperFx.Core.Reflection; using Marten.Events.CodeGeneration; namespace Marten.Events.Projections.Flattened; diff --git a/src/Marten/Events/Projections/Flattened/StatementMap.cs b/src/Marten/Events/Projections/Flattened/StatementMap.cs index 62e268d1a3..9e6c6687d0 100644 --- a/src/Marten/Events/Projections/Flattened/StatementMap.cs +++ b/src/Marten/Events/Projections/Flattened/StatementMap.cs @@ -30,6 +30,11 @@ public StatementMap(FlatTableProjection parent, MemberInfo[] pkMembers) IEventHandlingFrame IEventHandler.BuildFrame(EventGraph events, Table table) { + if (_functionIdentifier == null) + { + createFunctionName(table); + } + return new CallUpsertFunctionFrame(typeof(T), _functionIdentifier, _columnMaps, determinePkMembers(events).ToArray()); } @@ -42,12 +47,17 @@ bool IEventHandler.AssertValid(EventGraph events, out string? message) IEnumerable IEventHandler.BuildObjects(EventGraph events, Table table) { - var functionName = $"mt_upsert_{table.Identifier.Name}_{typeof(T).NameInCode().Sanitize()}"; - _functionIdentifier = new PostgresqlObjectName(table.Identifier.Schema, functionName); + createFunctionName(table); yield return new FlatTableUpsertFunction(_functionIdentifier, table, _columnMaps); } + private void createFunctionName(Table table) + { + var functionName = $"mt_upsert_{table.Identifier.Name.ToLower()}_{typeof(T).NameInCode().ToLower().Sanitize()}"; + _functionIdentifier = new PostgresqlObjectName(table.Identifier.Schema, functionName); + } + private IEnumerable determinePkMembers(EventGraph events) { if (_pkMembers.Any())