Skip to content
This repository was archived by the owner on Sep 3, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions src/SqlStreamStore.Server/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
Expand All @@ -17,28 +15,29 @@ internal class Program : IDisposable

public static async Task<int> Main(string[] args)
{
using (var program = new Program(args))
var configuration = new SqlStreamStoreServerConfiguration(
Environment.GetEnvironmentVariables(),
args);

using (var program = new Program(configuration))
{
return await program.Run();
}
}

private Program(string[] args)
private Program(SqlStreamStoreServerConfiguration configuration)
{
_configuration = new SqlStreamStoreServerConfiguration(
Environment.GetEnvironmentVariables(),
args);

Console.WriteLine(_configuration.ToString());

Log.Logger = new LoggerConfiguration()
.MinimumLevel.Is(_configuration.LogLevel)
.MinimumLevel.Is(configuration.LogLevel)
.Enrich.FromLogContext()
.WriteTo.Console()
.CreateLogger();

Log.Information(configuration.ToString());

_configuration = configuration;
_cts = new CancellationTokenSource();
_factory = new SqlStreamStoreFactory(_configuration);
_factory = new SqlStreamStoreFactory(configuration);
}

private async Task<int> Run()
Expand All @@ -47,8 +46,10 @@ private async Task<int> Run()
{
using (var streamStore = await _factory.Create(_cts.Token))
using (var host = new WebHostBuilder()
.SuppressStatusMessages(true)
.UseKestrel()
.UseStartup(new SqlStreamStoreServerStartup(streamStore,
.UseStartup(new SqlStreamStoreServerStartup(
streamStore,
new SqlStreamStoreMiddlewareOptions
{
UseCanonicalUrls = _configuration.UseCanonicalUris,
Expand Down
136 changes: 82 additions & 54 deletions src/SqlStreamStore.Server/SqlStreamStoreFactory.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Npgsql;
Expand Down Expand Up @@ -32,10 +33,11 @@ private static readonly IDictionary<string, CreateStreamStore> s_factories

public SqlStreamStoreFactory(SqlStreamStoreServerConfiguration configuration)
{
if(configuration == null)
if (configuration == null)
{
throw new ArgumentNullException(nameof(configuration));
}

_configuration = configuration;
}

Expand All @@ -46,15 +48,12 @@ public Task<IStreamStore> Create(CancellationToken cancellationToken = default)

Log.Information($"Creating stream store for provider '{provider}'");

if(!s_factories.TryGetValue(provider, out var factory))
if (!s_factories.TryGetValue(provider, out var factory))
{
throw new InvalidOperationException($"No provider factory for provider '{provider}' found.");
}

var connectionString = _configuration.ConnectionString;
var schema = _configuration.Schema;

return factory(connectionString, schema, cancellationToken);
return factory(_configuration.ConnectionString, _configuration.Schema, cancellationToken);
}

private static Task<IStreamStore> CreateInMemoryStreamStore(
Expand All @@ -69,37 +68,45 @@ private static async Task<IStreamStore> CreateMssqlStreamStore(
CancellationToken cancellationToken)
{
var connectionStringBuilder = new SqlConnectionStringBuilder(connectionString);
using(var connection = new SqlConnection(new SqlConnectionStringBuilder(connectionString)
var settings = new MsSqlStreamStoreV3Settings(connectionString);

if (schema != null)
{
InitialCatalog = "master"
}.ConnectionString))
settings.Schema = schema;
}

var streamStore = new MsSqlStreamStoreV3(settings);

try
{
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
using (var connection = new SqlConnection(new SqlConnectionStringBuilder(connectionString)
{
InitialCatalog = "master"
}.ConnectionString))
{
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();

using(var command = new SqlCommand(
$@"
using (var command = new SqlCommand(
$@"
IF NOT EXISTS (SELECT name FROM sys.databases WHERE name = N'{connectionStringBuilder.InitialCatalog}')
BEGIN
CREATE DATABASE [{connectionStringBuilder.InitialCatalog}]
END;
",
connection))
{
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
connection))
{
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
}
}
}

var settings = new MsSqlStreamStoreV3Settings(connectionString);

if(schema != null)
await streamStore.CreateSchemaIfNotExists(cancellationToken);
}
catch (SqlException ex)
{
settings.Schema = schema;
SchemaCreationFailed(streamStore.GetSchemaCreationScript, ex);
throw;
}

var streamStore = new MsSqlStreamStoreV3(settings);

await streamStore.CreateSchemaIfNotExists(cancellationToken);

return streamStore;
}

Expand All @@ -109,46 +116,67 @@ private static async Task<IStreamStore> CreatePostgresStreamStore(
CancellationToken cancellationToken)
{
var connectionStringBuilder = new NpgsqlConnectionStringBuilder(connectionString);
var settings = new PostgresStreamStoreSettings(connectionString);

using(var connection = new NpgsqlConnection(new NpgsqlConnectionStringBuilder(connectionString)
{
Database = null
}.ConnectionString))
if (schema != null)
{
bool exists;
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();
settings.Schema = schema;
}

using(var command = new NpgsqlCommand(
$"SELECT 1 FROM pg_database WHERE datname = '{connectionStringBuilder.Database}'",
connection))
{
exists = await command.ExecuteScalarAsync(cancellationToken).NotOnCapturedContext()
!= null;
}
var streamStore = new PostgresStreamStore(settings);

if(!exists)
try
{
using (var connection = new NpgsqlConnection(new NpgsqlConnectionStringBuilder(connectionString)
{
using(var command = new NpgsqlCommand(
$"CREATE DATABASE {connectionStringBuilder.Database}",
connection))
Database = null
}.ConnectionString))
{
await connection.OpenAsync(cancellationToken).NotOnCapturedContext();

async Task<bool> DatabaseExists()
{
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
using (var command = new NpgsqlCommand(
$"SELECT 1 FROM pg_database WHERE datname = '{connectionStringBuilder.Database}'",
connection))
{
return await command.ExecuteScalarAsync(cancellationToken).NotOnCapturedContext()
!= null;
}
}
}

var settings = new PostgresStreamStoreSettings(connectionString);
if (!await DatabaseExists())
{
using (var command = new NpgsqlCommand(
$"CREATE DATABASE {connectionStringBuilder.Database}",
connection))
{
await command.ExecuteNonQueryAsync(cancellationToken).NotOnCapturedContext();
}
}

if(schema != null)
{
settings.Schema = schema;
await streamStore.CreateSchemaIfNotExists(cancellationToken);
}

var streamStore = new PostgresStreamStore(settings);

await streamStore.CreateSchemaIfNotExists(cancellationToken);

return streamStore;
}
catch (NpgsqlException ex)
{
SchemaCreationFailed(streamStore.GetSchemaCreationScript, ex);
throw;
}

return streamStore;
}

private static void SchemaCreationFailed(Func<string> getSchemaCreationScript, Exception ex)
=> Log.Warning(
new StringBuilder()
.Append($"Could not create schema: {ex.Message}")
.AppendLine()
.Append(
"Does your connection string have enough permissions? If not, run the following sql script as a privileged user:")
.AppendLine()
.Append(getSchemaCreationScript())
.ToString(),
ex);
}
}
}
34 changes: 22 additions & 12 deletions src/SqlStreamStore.Server/SqlStreamStoreServerConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ void Log(string logName, IDictionary<string, string> data)
{
foreach (var (key, value) in data)
{
_values[key] = (logName, value);
_values[key] = (
logName,
s_sensitiveKeys.Contains(key)
? new string('*', 8)
: value);
}
}

Expand Down Expand Up @@ -82,18 +86,16 @@ public override string ToString()
}
}
.Concat(
s_allKeys
.Select(key => new[] {delimiter, key, _values[key].value, _values[key].source}))
s_allKeys.Select(key => new[] {delimiter, key, _values[key].value, _values[key].source}))
.Aggregate(
new StringBuilder().AppendLine("SQL Stream Store Configuration:"),
(builder, values) => builder
.Append((values[1] ?? string.Empty).PadRight(column0Width, ' '))
.Append(values[0])
.Append((s_sensitiveKeys.Contains(values[1])
? new string('*', Math.Min(column1Width, 8))
: values[2] ?? string.Empty).PadRight(column1Width, ' '))
.Append((values[2] ?? string.Empty).PadRight(column1Width, ' '))
.Append(values[0])
.AppendLine(values[3])).ToString();
.AppendLine(values[3]))
.ToString();
}

private static string Computerize(string value) =>
Expand All @@ -115,7 +117,11 @@ private class ConfigurationData

public ConfigurationData(IConfigurationRoot configuration)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
if (configuration == null)
{
throw new ArgumentNullException(nameof(configuration));
}

_configuration = configuration;
}
}
Expand All @@ -135,16 +141,20 @@ public Default(Action<string, IDictionary<string, string>> log)
}

public IConfigurationProvider Build(IConfigurationBuilder builder) =>
new DefaultConfigurtationProvider(_log);
new DefaultConfigurationProvider(_log);
}

private class DefaultConfigurtationProvider : ConfigurationProvider
private class DefaultConfigurationProvider : ConfigurationProvider
{
private readonly Action<string, IDictionary<string, string>> _log;

public DefaultConfigurtationProvider(Action<string, IDictionary<string, string>> log)
public DefaultConfigurationProvider(Action<string, IDictionary<string, string>> log)
{
if (log == null) throw new ArgumentNullException(nameof(log));
if (log == null)
{
throw new ArgumentNullException(nameof(log));
}

_log = log;
}

Expand Down