Skip to content

Commit

Permalink
Command Timeout is not being honored in PostgreSQL and MySQL #490
Browse files Browse the repository at this point in the history
ToObjectStream #92
  • Loading branch information
Grauenwolf committed Jul 12, 2022
1 parent 38d5b56 commit ef30cde
Show file tree
Hide file tree
Showing 42 changed files with 2,914 additions and 524 deletions.
42 changes: 42 additions & 0 deletions Tortuga.Chain/Changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,45 @@


[#92 ToObjectStream](https://github.com/TortugaResearch/Tortuga.Chain/issues/92)

Previously, Chain would fully manage database connections by default. Specifically, it would open and close connections automatically unless a transaction was involved. In that case, the developer only needed to manage the transactional data source itself.

However, there are times when a result set is too large to handle at one time. In this case the developer will want an `IEnumerable` or `IAsyncEnumerable` instead of a collection. To support this, the `ToObjectStream` materializer was created.

When used in place of `ToCollection`, the caller gets a `ObjectStream` object. This object implements `IEnumerable<TObject>`, `IDisposable`, `IAsyncDisposable`, abd `IAsyncEnumerable<TObject>`. (That latter two are only available in .NET 6 or later.)

This object stream may be used directly, as shown below, or attached to an RX Observable or TPL Dataflow just like any other enumerable data structure.

```csharp
//sync pattern
using var objectStream = dataSource.From<Employee>(new { Title = uniqueKey }).ToObjectStream<Employee>().Execute();
foreach (var item in objectStream)
{
Assert.AreEqual(uniqueKey, item.Title);
}

//async pattern
await using var objectStream = await dataSource.From<Employee>(new { Title = uniqueKey }).ToObjectStream<Employee>().ExecuteAsync();
await foreach (var item in objectStream)
{
Assert.AreEqual(uniqueKey, item.Title);
}

```

It is vital that the object stream is disposed after use. If that doesn't occur, the database can suffer from thread exhaustion or deadlocks.



### Bugs

[#490 Command Timeout is not being honored in PostgreSQL and MySQL](https://github.com/TortugaResearch/Tortuga.Chain/issues/490)

See the ticket for an explaination for why this was broken.


## Version 4.2

### Features
Expand Down
81 changes: 81 additions & 0 deletions Tortuga.Chain/Shared/Tests/Materializers/ObjectStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using Tests.Models;

namespace Tests.Materializers;

[TestClass]
public class ToObjectStreamTests : TestBase
{
[DataTestMethod, BasicData(DataSourceGroup.Primary)]
public void ToObjectStream(string dataSourceName, DataSourceType mode)
{
var dataSource = DataSource(dataSourceName, mode);

try
{
var uniqueKey = Guid.NewGuid().ToString();

var empA1 = new Employee() { FirstName = "A", LastName = "1", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empA1).ToObject<Employee>().Execute();

var empA2 = new Employee() { FirstName = "B", LastName = "2", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empA2).ToObject<Employee>().Execute();

var empB1 = new Employee() { FirstName = "B", LastName = "1", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB1).ToObject<Employee>().Execute();

var empB2 = new Employee() { FirstName = "B", LastName = "2", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB2).ToObject<Employee>().Execute();

var empB3 = new Employee() { FirstName = "B", LastName = "3", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB3).ToObject<Employee>().Execute();

using var objectStream = dataSource.From<Employee>(new { Title = uniqueKey }).ToObjectStream<Employee>().Execute();
foreach (var item in objectStream)
{
Assert.AreEqual(uniqueKey, item.Title);
}
}
finally
{
Release(dataSource);
}
}

#if NET6_0_OR_GREATER
[DataTestMethod, BasicData(DataSourceGroup.Primary)]
public async Task ToObjectStreamAsync(string dataSourceName, DataSourceType mode)
{
var dataSource = DataSource(dataSourceName, mode);

try
{
var uniqueKey = Guid.NewGuid().ToString();

var empA1 = new Employee() { FirstName = "A", LastName = "1", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empA1).ToObject<Employee>().Execute();

var empA2 = new Employee() { FirstName = "B", LastName = "2", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empA2).ToObject<Employee>().Execute();

var empB1 = new Employee() { FirstName = "B", LastName = "1", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB1).ToObject<Employee>().Execute();

var empB2 = new Employee() { FirstName = "B", LastName = "2", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB2).ToObject<Employee>().Execute();

var empB3 = new Employee() { FirstName = "B", LastName = "3", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB3).ToObject<Employee>().Execute();

await using var objectStream = await dataSource.From<Employee>(new { Title = uniqueKey }).ToObjectStream<Employee>().ExecuteAsync();
await foreach (var item in objectStream)
{
Assert.AreEqual(uniqueKey, item.Title);
}
}
finally
{
Release(dataSource);
}
}
#endif
}
134 changes: 132 additions & 2 deletions Tortuga.Chain/Tortuga.Chain.Access/Access/AccessOpenDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,137 @@ internal AccessOpenDataSource(AccessDataSource dataSource, OleDbConnection conne
m_Transaction = transaction;
}

/// <summary>
/// Executes the stream.
/// </summary>
/// <param name="executionToken">The execution token.</param>
/// <param name="implementation">The implementation.</param>
/// <param name="state">The state.</param>
/// <returns>StreamingCommandCompletionToken.</returns>
public override StreamingCommandCompletionToken ExecuteStream(CommandExecutionToken<OleDbCommand, OleDbParameter> executionToken, StreamingCommandImplementation<OleDbCommand> implementation, object? state)
{
if (executionToken == null)
throw new ArgumentNullException(nameof(executionToken), $"{nameof(executionToken)} is null.");
if (implementation == null)
throw new ArgumentNullException(nameof(implementation), $"{nameof(implementation)} is null.");
var currentToken = executionToken as AccessCommandExecutionToken;
if (currentToken == null)
throw new ArgumentNullException(nameof(executionToken), "only AccessCommandExecutionToken is supported.");

var startTime = DateTimeOffset.Now;
OnExecutionStarted(executionToken, startTime, state);

try
{
OleDbCommand? cmdToReturn = null;

while (currentToken != null)
{
OnExecutionStarted(currentToken, startTime, state);
using (var cmd = new OleDbCommand())
{
cmd.Connection = m_Connection;
if (m_Transaction != null)
cmd.Transaction = m_Transaction;

currentToken.PopulateCommand(cmd, DefaultCommandTimeout);

if (currentToken.ExecutionMode == AccessCommandExecutionMode.Materializer)
{
implementation(cmd);
cmdToReturn = cmd;
}
else if (currentToken.ExecutionMode == AccessCommandExecutionMode.ExecuteScalarAndForward)
{
if (currentToken.ForwardResult == null)
throw new InvalidOperationException("currentToken.ExecutionMode is ExecuteScalarAndForward, but currentToken.ForwardResult is null.");

currentToken.ForwardResult(cmd.ExecuteScalar());
}
else
cmd.ExecuteNonQuery();
}
currentToken = currentToken.NextCommand;
}
return new StreamingCommandCompletionToken(this, executionToken, startTime, state, cmdToReturn, null);
}
catch (Exception ex)
{
OnExecutionError(executionToken, startTime, DateTimeOffset.Now, ex, state);
throw;
}
}

/// <summary>
/// Executes the stream asynchronous.
/// </summary>
/// <param name="executionToken">The execution token.</param>
/// <param name="implementation">The implementation.</param>
/// <param name="cancellationToken">The cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <param name="state">The state.</param>
/// <returns>Task&lt;StreamingCommandCompletionToken&gt;.</returns>
public override async Task<StreamingCommandCompletionToken> ExecuteStreamAsync(CommandExecutionToken<OleDbCommand, OleDbParameter> executionToken, StreamingCommandImplementationAsync<OleDbCommand> implementation, CancellationToken cancellationToken, object? state)
{
if (executionToken == null)
throw new ArgumentNullException(nameof(executionToken), $"{nameof(executionToken)} is null.");
if (implementation == null)
throw new ArgumentNullException(nameof(implementation), $"{nameof(implementation)} is null.");
var currentToken = executionToken as AccessCommandExecutionToken;
if (currentToken == null)
throw new ArgumentNullException(nameof(executionToken), "only AccessCommandExecutionToken is supported.");

var startTime = DateTimeOffset.Now;
OnExecutionStarted(executionToken, startTime, state);

try
{
OleDbCommand? cmdToReturn = null;

while (currentToken != null)
{
OnExecutionStarted(currentToken, startTime, state);
using (var cmd = new OleDbCommand())
{
cmd.Connection = m_Connection;
if (m_Transaction != null)
cmd.Transaction = m_Transaction;
currentToken.PopulateCommand(cmd, DefaultCommandTimeout);

if (currentToken.ExecutionMode == AccessCommandExecutionMode.Materializer)
{
await implementation(cmd).ConfigureAwait(false);
cmdToReturn = cmd;
}
else if (currentToken.ExecutionMode == AccessCommandExecutionMode.ExecuteScalarAndForward)
{
if (currentToken.ForwardResult == null)
throw new InvalidOperationException("currentToken.ExecutionMode is ExecuteScalarAndForward, but currentToken.ForwardResult is null.");

currentToken.ForwardResult(await cmd.ExecuteScalarAsync().ConfigureAwait(false));
}
else
await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
}
currentToken = currentToken.NextCommand;
}
return new StreamingCommandCompletionToken(this, executionToken, startTime, state, cmdToReturn, null);
}
catch (Exception ex)
{
if (cancellationToken.IsCancellationRequested) //convert AccessException into a OperationCanceledException
{
var ex2 = new OperationCanceledException("Operation was canceled.", ex, cancellationToken);
OnExecutionError(executionToken, startTime, DateTimeOffset.Now, ex2, state);
throw ex2;
}
else
{
OnExecutionError(executionToken, startTime, DateTimeOffset.Now, ex, state);
throw;
}
}
}

/// <summary>
/// Executes the specified operation.
/// </summary>
Expand Down Expand Up @@ -123,7 +254,6 @@ internal AccessOpenDataSource(AccessDataSource dataSource, OleDbConnection conne
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="state">User supplied state.</param>
/// <returns>Task.</returns>
/// <exception cref="NotImplementedException"></exception>
protected override async Task<int?> ExecuteAsync(CommandExecutionToken<OleDbCommand, OleDbParameter> executionToken, CommandImplementationAsync<OleDbCommand> implementation, CancellationToken cancellationToken, object? state)
{
if (executionToken == null)
Expand Down Expand Up @@ -233,4 +363,4 @@ internal AccessOpenDataSource(AccessDataSource dataSource, OleDbConnection conne

return this;
}
}
}
Loading

0 comments on commit ef30cde

Please sign in to comment.