Skip to content

Commit

Permalink
Forcing function names created for flat table projections to be all l…
Browse files Browse the repository at this point in the history
…ower case so the weasel comparisons can work correctly. Closes GH-2865
  • Loading branch information
jeremydmiller committed Dec 20, 2023
1 parent e0ed144 commit 7212262
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using System.Threading.Tasks;
using Marten;
using Marten.Events.Daemon.Resiliency;
using Marten.Events.Projections;
using Marten.Events.Projections.Flattened;
using Marten.Testing.Harness;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Xunit;

namespace EventSourcingTests.Bugs;

public class Bug_2865_configuration_assertion_with_flat_table_projections
{
[Fact]
public async Task should_be_able_to_assert_on_existence_of_flat_table_functions()
{
var appBuilder = Host.CreateApplicationBuilder();

appBuilder.Logging
.SetMinimumLevel(LogLevel.Information)
.AddFilter("Marten", LogLevel.Debug);

appBuilder.Services.AddMarten(options =>
{
options.Connection(ConnectionSource.ConnectionString);
options.DatabaseSchemaName = "flat_projections";
options.Projections.Add<FlatImportProjection>(ProjectionLifecycle.Async);
})
// Add this
.ApplyAllDatabaseChangesOnStartup()
.UseLightweightSessions()
.OptimizeArtifactWorkflow()
.AddAsyncDaemon(DaemonMode.Solo);

var app = appBuilder.Build();
await app.StartAsync();

var store = app.Services.GetRequiredService<IDocumentStore>();

// ########## Uncomment the next line to get the error ##########
await store.Storage.Database.AssertDatabaseMatchesConfigurationAsync();

await using (var session = store.LightweightSession())
{
session.Events.StartStream(Guid.NewGuid(),
new ImportStarted(DateTimeOffset.Now.AddMinutes(-1), "foo", "cust-1", 3),
new ImportProgress("step-1", 3, 1),
new ImportFinished(DateTimeOffset.Now));

await session.SaveChangesAsync();
}

await app.StopAsync();
}
}

public record ImportStarted(DateTimeOffset Started, string ActivityType, string CustomerId, int PlannedSteps);
public record ImportProgress(string StepName, int Records, int Invalids);
public record ImportFinished(DateTimeOffset Finished);
public record ImportFailed;

public class FlatImportProjection : FlatTableProjection
{
// I'm telling Marten to use the same database schema as the events from
// the Marten configuration in this application
public FlatImportProjection() : base("import_history", SchemaNameSource.EventSchema)
{
// We need to explicitly add a primary key
Table.AddColumn<Guid>("id").AsPrimaryKey();

TeardownDataOnRebuild = true;

Project<ImportStarted>(map =>
{
// Set values in the table from the event
map.Map(x => x.ActivityType, "activity_type").NotNull();
map.Map(x => x.CustomerId, "customer_id");
map.Map(x => x.PlannedSteps, "total_steps")
.DefaultValue(0);
map.Map(x => x.Started);
// Initial values
map.SetValue("status", "started");
map.SetValue("step_number", 0);
map.SetValue("records", 0);
});

Project<ImportProgress>(map =>
{
// Add 1 to this column when this event is encountered
map.Increment("step_number");
// Update a running sum of records progressed
// by the number of records on this event
map.Increment(x => x.Records);
map.SetValue("status", "working");
});

Project<ImportFinished>(map =>
{
map.Map(x => x.Finished);
map.SetValue("status", "completed");
});

// Just gonna delete the record of any failures
Delete<ImportFailed>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class CallUpsertFunctionFrame: MethodCall, IEventHandlingFrame
public CallUpsertFunctionFrame(Type eventType, DbObjectName functionIdentifier, List<IColumnMap> columnMaps,
MemberInfo[] members): base(typeof(IDocumentOperations), nameof(IDocumentOperations.QueueSqlCommand))
{
_functionIdentifier = functionIdentifier;
_functionIdentifier = functionIdentifier ?? throw new ArgumentNullException(nameof(functionIdentifier));
_columnMaps = columnMaps;
_members = members;
EventType = eventType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Reflection;
using JasperFx.CodeGeneration;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten.Events.CodeGeneration;

namespace Marten.Events.Projections.Flattened;
Expand Down
14 changes: 12 additions & 2 deletions src/Marten/Events/Projections/Flattened/StatementMap.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public StatementMap(FlatTableProjection parent, MemberInfo[] pkMembers)

IEventHandlingFrame IEventHandler.BuildFrame(EventGraph events, Table table)
{
if (_functionIdentifier == null)
{
createFunctionName(table);
}

return new CallUpsertFunctionFrame(typeof(T), _functionIdentifier, _columnMaps,
determinePkMembers(events).ToArray());
}
Expand All @@ -42,12 +47,17 @@ bool IEventHandler.AssertValid(EventGraph events, out string? message)

IEnumerable<ISchemaObject> IEventHandler.BuildObjects(EventGraph events, Table table)
{
var functionName = $"mt_upsert_{table.Identifier.Name}_{typeof(T).NameInCode().Sanitize()}";
_functionIdentifier = new PostgresqlObjectName(table.Identifier.Schema, functionName);
createFunctionName(table);

yield return new FlatTableUpsertFunction(_functionIdentifier, table, _columnMaps);
}

private void createFunctionName(Table table)
{
var functionName = $"mt_upsert_{table.Identifier.Name.ToLower()}_{typeof(T).NameInCode().ToLower().Sanitize()}";
_functionIdentifier = new PostgresqlObjectName(table.Identifier.Schema, functionName);
}

private IEnumerable<MemberInfo> determinePkMembers(EventGraph events)
{
if (_pkMembers.Any())
Expand Down

0 comments on commit 7212262

Please sign in to comment.