Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPIKE: Add support for using a provided connection factory #278

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions source/Nevermore.Tests/RelationalStore/ReadTransactionFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ public class ReadTransactionFixture
RelationalTransactionRegistry registry;
readonly List<FakeSqlConnection> createdConnections = new();

DbConnection ConnectionFactory(string s)
(DbConnection connection, bool ownsConnection) ConnectionFactory(string s)
{
var c = new FakeSqlConnection { ConnectionString = s };
createdConnections.Add(c);
return c;
return (c, true);
}

[SetUp]
Expand Down Expand Up @@ -95,11 +95,11 @@ public override void Open()
[Test]
public void OpenWillRetryATransientFailure()
{
DbConnection ConnectionFactoryTransientFailure(string s)
(DbConnection connection, bool ownsConnection) ConnectionFactoryTransientFailure(string s)
{
var c = new FakeSqlConnectionWhichThrowsOnFirstOpen { ConnectionString = s };
createdConnections.Add(c);
return c;
return (c, true);
}

var c = new ReadTransaction(null!, registry, RetriableOperation.Select, new RelationalStoreConfiguration(FakeConnectionString), ConnectionFactoryTransientFailure);
Expand All @@ -115,11 +115,11 @@ DbConnection ConnectionFactoryTransientFailure(string s)
[Test]
public async Task OpenAsyncWillRetryATransientFailure()
{
DbConnection ConnectionFactoryTransientFailure(string s)
(DbConnection connection, bool ownsConnection) ConnectionFactoryTransientFailure(string s)
{
var c = new FakeSqlConnectionWhichThrowsOnFirstOpen { ConnectionString = s };
createdConnections.Add(c);
return c;
return (c, true);
}

var c = new ReadTransaction(null!, registry, RetriableOperation.Select, new RelationalStoreConfiguration(FakeConnectionString), ConnectionFactoryTransientFailure);
Expand All @@ -136,11 +136,11 @@ DbConnection ConnectionFactoryTransientFailure(string s)
[Test]
public void OpenWithIsolationWillRetryATransientFailure()
{
DbConnection ConnectionFactoryTransientFailure(string s)
(DbConnection connection, bool ownsConnection) ConnectionFactoryTransientFailure(string s)
{
var c = new FakeSqlConnectionWhichThrowsOnFirstOpen { ConnectionString = s };
createdConnections.Add(c);
return c;
return (c, true);
}

var c = new ReadTransaction(null!, registry, RetriableOperation.Select, new RelationalStoreConfiguration(FakeConnectionString), ConnectionFactoryTransientFailure);
Expand Down Expand Up @@ -183,11 +183,11 @@ public override DbTransaction BeginTransaction(IsolationLevel iso, string transa
[Test]
public void OpenWithIsolationWillRetryATransientFailureFromTransaction()
{
DbConnection ConnectionFactoryTransientFailure(string s)
(DbConnection connection, bool ownsConnection) ConnectionFactoryTransientFailure(string s)
{
var c = new FakeSqlConnectionWhichThrowsOnFirstTransaction { ConnectionString = s };
createdConnections.Add(c);
return c;
return (c, true);
}

var c = new ReadTransaction(null!, registry, RetriableOperation.Select, new RelationalStoreConfiguration(FakeConnectionString), ConnectionFactoryTransientFailure);
Expand All @@ -203,11 +203,11 @@ DbConnection ConnectionFactoryTransientFailure(string s)
[Test]
public async Task OpenAsyncWithIsolationWillRetryATransientFailureFromTransaction()
{
DbConnection ConnectionFactoryTransientFailure(string s)
(DbConnection connection, bool ownsConnection) ConnectionFactoryTransientFailure(string s)
{
var c = new FakeSqlConnectionWhichThrowsOnFirstTransaction { ConnectionString = s };
createdConnections.Add(c);
return c;
return (c, true);
}

var c = new ReadTransaction(null!, registry, RetriableOperation.Select, new RelationalStoreConfiguration(FakeConnectionString), ConnectionFactoryTransientFailure);
Expand Down
26 changes: 18 additions & 8 deletions source/Nevermore/Advanced/ReadTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ namespace Nevermore.Advanced
public class ReadTransaction : IReadTransaction, ITransactionDiagnostic
{
static readonly ILog Log = LogProvider.For<ReadTransaction>();
protected static DbConnection DefaultConnectionFactory(string connectionString) => new SqlConnection(connectionString);
protected static (DbConnection connection, bool ownsConnection) DefaultConnectionFactory(string connectionString) => (new SqlConnection(connectionString), true);

readonly IRelationalTransactionRegistry registry;
readonly RetriableOperation operationsToRetry;
readonly IRelationalStoreConfiguration configuration;
readonly ITableAliasGenerator tableAliasGenerator = new TableAliasGenerator();

readonly Func<string, DbConnection> connectionFactory;
readonly Func<string, (DbConnection connection, bool ownsConnection)> connectionFactory;
readonly Action<string>? customCommandTrace;
readonly string name;

Expand Down Expand Up @@ -86,12 +86,12 @@ public class ReadTransaction : IReadTransaction, ITransactionDiagnostic
OwnsSqlTransaction = false;
}

internal ReadTransaction(
public ReadTransaction(
IRelationalStore store,
IRelationalTransactionRegistry registry,
RetriableOperation operationsToRetry,
IRelationalStoreConfiguration configuration,
Func<string, DbConnection> connectionFactory,
Func<string, (DbConnection connection, bool ownsConnection)> connectionFactory,
Action<string>? customCommandTrace = null,
string? name = null)
{
Expand Down Expand Up @@ -126,8 +126,13 @@ public void Open()
if (!OwnsSqlTransaction)
throw new InvalidOperationException("An existing connection and transaction were provided, they should have been opened externally");

connection = connectionFactory(configuration.ConnectionString);
connection.OpenWithRetry();
var (connectionFactoryConnection, ownsConnection) = connectionFactory(configuration.ConnectionString);
connection = connectionFactoryConnection;

if (ownsConnection)
{
connection.OpenWithRetry();
}

TransactionTimer = new TimedSection(ms => configuration.TransactionLogger.Write(ms, name));
}
Expand All @@ -137,8 +142,13 @@ public async Task OpenAsync(CancellationToken cancellationToken = default)
if (!OwnsSqlTransaction)
throw new InvalidOperationException("An existing connection and transaction were provided, they should have been opened externally");

connection = connectionFactory(configuration.ConnectionString);
await connection.OpenWithRetryAsync(cancellationToken).ConfigureAwait(false);
var (connectionFactoryConnection, ownsConnection) = connectionFactory(configuration.ConnectionString);
connection = connectionFactoryConnection;

if (ownsConnection)
{
await connection.OpenWithRetryAsync(cancellationToken).ConfigureAwait(false);
}

TransactionTimer = new TimedSection(ms => configuration.TransactionLogger.Write(ms, name));
}
Expand Down
16 changes: 16 additions & 0 deletions source/Nevermore/Advanced/WriteTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,22 @@ public class WriteTransaction : ReadTransaction, IRelationalTransaction
this.keyAllocator = keyAllocator;
builder = new DataModificationQueryBuilder(configuration, AllocateId);
}

public WriteTransaction(
IRelationalStore store,
IRelationalTransactionRegistry registry,
RetriableOperation operationsToRetry,
IRelationalStoreConfiguration configuration,
IKeyAllocator keyAllocator,
Func<string, (DbConnection connection, bool ownsConnection)> connectionFactory,
Action<string>? customCommandTrace = null,
string? name = null
) : base(store, registry, operationsToRetry, configuration, connectionFactory, customCommandTrace, name)
{
this.configuration = configuration;
this.keyAllocator = keyAllocator;
builder = new DataModificationQueryBuilder(configuration, AllocateId);
}
#nullable disable

public void Insert<TDocument>(TDocument document, InsertOptions options = null) where TDocument : class
Expand Down
46 changes: 46 additions & 0 deletions source/Nevermore/RelationalStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,37 @@ public async Task<IWriteTransaction> BeginWriteTransactionAsync(IsolationLevel i
throw;
}
}

public IWriteTransaction BeginWriteTransactionFromExistingConnectionFactory(Func<string, (DbConnection connection, bool ownsConnection)> connectionFactory, IsolationLevel isolationLevel = NevermoreDefaults.IsolationLevel, RetriableOperation retriableOperation = NevermoreDefaults.RetriableOperations, string? name = null, CancellationToken cancellationToken = default)
{
var txn = CreateWriteTransactionFromExistingConnectionFactory(connectionFactory, retriableOperation, name);
try
{
txn.Open(isolationLevel);
return txn;
}
catch
{
txn.Dispose();
throw;
}
}

public async Task<IWriteTransaction> BeginWriteTransactionFromExistingConnectionFactoryAsync(Func<string, (DbConnection connection, bool ownsConnection)> connectionFactory, IsolationLevel isolationLevel = NevermoreDefaults.IsolationLevel, RetriableOperation retriableOperation = NevermoreDefaults.RetriableOperations, string? name = null, CancellationToken cancellationToken = default)
{
var txn = CreateWriteTransactionFromExistingConnectionFactory(connectionFactory, retriableOperation, name);
try
{
await txn.OpenAsync(isolationLevel, cancellationToken).ConfigureAwait(false);
return txn;
}
catch
{
txn.Dispose();
throw;
}
}


public IRelationalTransaction BeginTransaction(IsolationLevel isolationLevel = NevermoreDefaults.IsolationLevel, RetriableOperation retriableOperation = NevermoreDefaults.RetriableOperations, string? name = null)
{
Expand Down Expand Up @@ -140,6 +171,21 @@ public IRelationalTransaction BeginTransaction(IsolationLevel isolationLevel = N
customCommandTrace,
name);
}

public WriteTransaction CreateWriteTransactionFromExistingConnectionFactory(
Func<string, (DbConnection connection, bool ownsConnection)> connectionFactory,
RetriableOperation retriableOperation,
string? name = null)
{
return new WriteTransaction(
this,
registry.Value,
retriableOperation,
Configuration,
keyAllocator.Value,
connectionFactory,
name: name);
}

ReadTransaction CreateReadTransaction(RetriableOperation retriableOperation, string? name = null)
{
Expand Down