Skip to content

Commit

Permalink
new, improved, separate store aware, multi-database aware 'projection…
Browse files Browse the repository at this point in the history
…s' command. Closes GH-2266
  • Loading branch information
Jeremy D. Miller authored and Jeremy D. Miller committed Jul 6, 2022
1 parent 7fd5bdb commit fa29349
Show file tree
Hide file tree
Showing 44 changed files with 1,662 additions and 387 deletions.
7 changes: 6 additions & 1 deletion build/build.cs
Expand Up @@ -75,6 +75,11 @@ private static async Task Main(string[] args)
Target("test-core", DependsOn("compile-core-tests"), () =>
RunTests("CoreTests"));

Target("test-cli", () =>
{
Run("dotnet", $"test --configuration {_configuration} src/Marten.CommandLine.Tests/Marten.CommandLine.Tests.csproj");
});

Target("compile-document-db-tests", DependsOn("clean"), () =>
Run("dotnet", $"build src/DocumentDbTests/DocumentDbTests.csproj --framework {_framework} --configuration {configuration}"));

Expand Down Expand Up @@ -109,7 +114,7 @@ private static async Task Main(string[] args)


// JDM -- I removed test-codegen temporarily during V5 work
Target("test", DependsOn("test-base-lib", "test-document-db", "test-event-sourcing"));
Target("test", DependsOn("test-base-lib", "test-document-db", "test-event-sourcing", "test-cli"));

Target("test-extension-libs", DependsOn("test-noda-time", "test-plv8", "test-aspnetcore"));

Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/hostbuilder.md
Expand Up @@ -246,7 +246,7 @@ public interface IConfigureMarten
void Configure(IServiceProvider services, StoreOptions options);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten/MartenServiceCollectionExtensions.cs#L618-L629' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_iconfiguremarten' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten/MartenServiceCollectionExtensions.cs#L641-L652' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_iconfiguremarten' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

You could alternatively implement a custom `IConfigureMarten` class like so:
Expand Down
6 changes: 4 additions & 2 deletions docs/configuration/prebuilding.md
Expand Up @@ -156,7 +156,9 @@ public class Program
{
opts.AutoCreateSchemaObjects = AutoCreate.All;
opts.DatabaseSchemaName = "cli";
opts.Connection(ConnectionSource.ConnectionString);

opts.MultiTenantedWithSingleServer(ConnectionSource.ConnectionString)
.WithTenants("tenant1", "tenant2", "tenant3");

// This is important, setting this option tells Marten to
// *try* to use pre-generated code at runtime
Expand Down Expand Up @@ -188,7 +190,7 @@ public class Program
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CommandLineRunner/Program.cs#L31-L88' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_pre_build_types' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/CommandLineRunner/Program.cs#L31-L90' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_configuring_pre_build_types' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Okay, after all that, there should be a new command line option called `codegen` for your project. Assuming
Expand Down
4 changes: 3 additions & 1 deletion src/CommandLineRunner/Program.cs
Expand Up @@ -53,7 +53,9 @@ public static IHostBuilder CreateHostBuilder(string[] args)
{
opts.AutoCreateSchemaObjects = AutoCreate.All;
opts.DatabaseSchemaName = "cli";
opts.Connection(ConnectionSource.ConnectionString);
opts.MultiTenantedWithSingleServer(ConnectionSource.ConnectionString)
.WithTenants("tenant1", "tenant2", "tenant3");
// This is important, setting this option tells Marten to
// *try* to use pre-generated code at runtime
Expand Down
3 changes: 2 additions & 1 deletion src/CommandLineRunner/Properties/launchSettings.json
Expand Up @@ -5,7 +5,8 @@
"dotnetRunMessages": "true",
"environmentVariables": {
"DOTNET_ENVIRONMENT": "Development"
}
},
"commandLineArgs": "projections"
}
}
}
20 changes: 20 additions & 0 deletions src/CoreTests/using_multiple_document_stores_in_same_host.cs
Expand Up @@ -136,6 +136,26 @@ public void all_the_defaults()

}

[Fact]
public async Task can_resolve_all_stores()
{
using var host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddMarten(ConnectionSource.ConnectionString);
services.AddMartenStore<IFirstStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "first_store";
});
}).StartAsync();

var stores = host.AllDocumentStores();
stores.Count.ShouldBe(2);
stores.OfType<IFirstStore>().Count().ShouldBe(1);
}

[Fact]
public void using_optimized_mode_in_development()
{
Expand Down
4 changes: 4 additions & 0 deletions src/EventPublisher/EventPublisher.csproj
Expand Up @@ -11,4 +11,8 @@
<ProjectReference Include="..\Marten\Marten.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Lamar" Version="8.0.1" />
<PackageReference Include="Spectre.Console" Version="0.44.0" />
</ItemGroup>
</Project>
142 changes: 83 additions & 59 deletions src/EventPublisher/Program.cs
Expand Up @@ -3,98 +3,122 @@
using System.Linq;
using System.Threading.Tasks;
using Baseline.Dates;
using Lamar;
using LamarCodeGeneration;
using Marten;
using Marten.AsyncDaemon.Testing.TestingSupport;
using Marten.Services;
using Marten.Storage;
using Marten.Testing.Harness;
using Weasel.Core;
using Weasel.Postgresql;

namespace EventPublisher
{
class Program
internal static class Program
{
static Task Main(string[] args)
static async Task Main(string[] args)
{
using var store = DocumentStore.For(opts =>
{
opts.AutoCreateSchemaObjects = AutoCreate.All;
opts.DatabaseSchemaName = "cli";
opts.Connection(ConnectionSource.ConnectionString);
});
await using var container = BuildContainer();

var source = new TaskCompletionSource();

store.Advanced.Clean.CompletelyRemoveAll();
var stores = container.AllDocumentStores();
var board = new StatusBoard(source.Task);

var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
foreach (var store in stores)
{
var task = new Publisher(store).Start();
tasks.Add(task);
await store.Advanced.Clean.CompletelyRemoveAllAsync();

var databases = await store.Storage.AllDatabases();
foreach (var database in databases)
{
for (var i = 0; i < 10; i++)
{
var publisher = new Publisher(store, database, board);
tasks.Add(publisher.Start());
}
}
}

return Task.WhenAll(tasks.ToArray());
await Task.WhenAll(tasks.ToArray());
}

public static IContainer BuildContainer()
{
return new Container(services =>
{
services.AddMarten(opts =>
{
opts.AutoCreateSchemaObjects = AutoCreate.All;
opts.DatabaseSchemaName = "cli";
opts.MultiTenantedWithSingleServer(ConnectionSource.ConnectionString)
.WithTenants("tenant1", "tenant2", "tenant3");
});
});
}
}

internal class Publisher
{
private readonly IDocumentStore _store;
private readonly IMartenDatabase _database;
private readonly StatusBoard _board;
private readonly string _name;

public class Publisher
public Publisher(IDocumentStore store, IMartenDatabase database, StatusBoard board)
{
private readonly IDocumentStore _store;
_store = store;
_database = database;
_board = board;

public Publisher(IDocumentStore store)
{
_store = store;
}
var storeName = store.GetType() == typeof(DocumentStore) ? "Marten" : store.GetType().NameInCode();
_name = $"{storeName}:{_database.Identifier}";
}

public Task Start()
public Task Start()
{
var random = new Random();
return Task.Run(async () =>
{
var random = new Random();
return Task.Run(async () =>
while (true)
{
while (true)
{
var delay = random.Next(0, 250);
var delay = random.Next(0, 250);
await Task.Delay(delay.Milliseconds());
await PublishEvents();
using (var session = _store.LightweightSession())
{
var count = await session.Events.QueryAllRawEvents().CountAsync();
Console.WriteLine($"Published {count} total events");
}
}
});
}
await Task.Delay(delay.Milliseconds());
await PublishEvents();
}
});
}

public async Task PublishEvents()
public async Task PublishEvents()
{
var streams = TripStream.RandomStreams(5);
while (streams.Any())
{
var streams = TripStream.RandomStreams(5);
while (streams.Any())
var count = 0;
var options = SessionOptions.ForDatabase(_database);

await using var session = _store.OpenSession(options);
foreach (var stream in streams.ToArray())
{
var count = 0;
using (var session = _store.LightweightSession())
if (stream.TryCheckOutEvents(out var events))
{
foreach (var stream in streams.ToArray())
{
if (stream.TryCheckOutEvents(out var events))
{
count += events.Length;
session.Events.Append(stream.StreamId, events);
}

if (stream.IsFinishedPublishing())
{
streams.Remove(stream);
}
}

await session.SaveChangesAsync();
Console.WriteLine($"Wrote {count} events at {DateTime.Now.ToShortTimeString()}");
count += events.Length;
session.Events.Append(stream.StreamId, events);
}


if (stream.IsFinishedPublishing())
{
streams.Remove(stream);
}
}

await session.SaveChangesAsync();
_board.Update(_name, count);
}

}
}
}
73 changes: 73 additions & 0 deletions src/EventPublisher/StatusBoard.cs
@@ -0,0 +1,73 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Baseline;
using Spectre.Console;

namespace EventPublisher
{
public class StatusBoard
{
private readonly LightweightCache<string, ProjectionStatus> _counts;
private StatusContext _context;
private readonly ActionBlock<UpdateMessage> _updater;

public StatusBoard(Task completion)
{
_counts = new(name => new ProjectionStatus(name, completion));
_updater = new ActionBlock<UpdateMessage>(Update,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true });

}

public record UpdateMessage(string Name, int Count);

public void Update(UpdateMessage message)
{
_counts[message.Name].Update(message.Count);
}

public void Update(string name, int count)
{
_updater.Post(new UpdateMessage(name, count));
}
}


public class ProjectionStatus
{
private readonly string _name;
private StatusContext _context;
private int _count;

public ProjectionStatus(string name, Task completion)
{
_name = name;

AnsiConsole
.Status()
.AutoRefresh(true)
.StartAsync("Waiting...", context =>
{
context.Spinner(Spinner.Known.Clock);
context.SpinnerStyle(Style.Parse("grey italic"));
context.Refresh();
_context = context;
return completion;
});
}

public void Update(int count)
{
_count += count;

if (_context == null) return;
_context.Spinner(Spinner.Known.Default);
_context.SpinnerStyle(Style.Plain);
_context.Status = $"{_name}: {_count}";

_context.Refresh();
}

}
}
33 changes: 33 additions & 0 deletions src/Marten.CommandLine.Tests/Marten.CommandLine.Tests.csproj
@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>

<IsPackable>false</IsPackable>

<LangVersion>10</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
<PackageReference Include="NSubstitute" Version="4.3.0" />
<PackageReference Include="Shouldly" Version="4.0.3" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.1.2">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Marten.CommandLine\Marten.CommandLine.csproj" />
<ProjectReference Include="..\Marten.Testing\Marten.Testing.csproj" />
</ItemGroup>

</Project>

0 comments on commit fa29349

Please sign in to comment.