Skip to content

Commit

Permalink
feat: Spanner DML returning data via SpannerDataReader (#10013)
Browse files Browse the repository at this point in the history
  • Loading branch information
jskeet committed Mar 9, 2023
1 parent 25cb901 commit 4787593
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 24 deletions.
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,13 @@ public class DmlTests
{
private readonly DmlTableFixture _fixture;

public enum TransactionType
{
EphemeralTransaction,
ExplicitTransaction,
RetryableTransaction
}

public DmlTests(DmlTableFixture fixture) => _fixture = fixture;

[Fact]
Expand Down Expand Up @@ -206,7 +213,6 @@ public void InvalidMethodCalls()
using (var command = connection.CreateDmlCommand(dml))
{
command.Parameters.Add("key", SpannerDbType.String, key);
Assert.Throws<InvalidOperationException>(() => command.ExecuteReader());
Assert.Throws<InvalidOperationException>(() => command.ExecuteScalar());
}
}
Expand Down Expand Up @@ -283,7 +289,7 @@ public void InsertAndUpdateWithQuery()
command.Parameters.Add("key", SpannerDbType.String, key);
Assert.Equal(2, command.ExecuteNonQuery());
}
string dml2 = $"UPDATE {_fixture.TableName} SET Value = Value * 2 WHERE KEY=@Key AND OriginalValue > 10";
using (var command = connection.CreateDmlCommand(dml2))
{
Expand Down Expand Up @@ -531,9 +537,176 @@ public void PartitionedUpdate_Medium()
{
command.Parameters.Add("key", SpannerDbType.String, key);
command.Parameters.Add("cutoff", SpannerDbType.Int64, amountToAdd);
Assert.Equal(0, (long)command.ExecuteScalar());
Assert.Equal(0, (long) command.ExecuteScalar());
}
}
}

// With DML's return clause ExecuteReaderAsync should return a reader containing affected rows data.
[Theory, CombinatorialData]
[Trait(Constants.SupportedOnEmulator, Constants.No)]
public async Task DMLReturn_ExecuteReader_Read(TransactionType transactionType)
{
string key = _fixture.CreateTestRows();
using var connection = _fixture.GetConnection();
await connection.OpenAsync();
string dml = $"UPDATE {_fixture.TableName} SET Value = OriginalValue + 1 WHERE UpdateMe AND Key=@key Then Return Value";
var command = connection.CreateDmlCommand(dml);
command.Parameters.Add("key", SpannerDbType.String, key);

List<int> actualReturnedValues = await Execute(DmlAsyncWork, connection, transactionType);

// Assert that the DML Return command has returned expected values only.
var expectedReturnedValues = new List<int> { 2, 5 };
Assert.Equal(expectedReturnedValues, actualReturnedValues);

// Assert that the DML Return command has actually updated the table.
AssertActualAndUpdatedDbValues(key);

async Task<List<int>> DmlAsyncWork(SpannerTransaction transaction = null)
{
var returnedValues = new List<int>();
command.Transaction = transaction;
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
returnedValues.Add(reader.GetFieldValue<int>(reader.GetOrdinal("Value")));
}
return returnedValues;
}
}

// With DML's return clause ExecuteReaderAsync should update table data even if Read is not invoked.
[Theory, CombinatorialData]
[Trait(Constants.SupportedOnEmulator, Constants.No)]
public async Task DMLReturn_ExecuteReader_NoRead(TransactionType transactionType)
{
string key = _fixture.CreateTestRows();

using var connection = _fixture.GetConnection();
await connection.OpenAsync();
string dml = $"UPDATE {_fixture.TableName} SET Value = OriginalValue + 1 WHERE UpdateMe AND Key=@key Then Return Value";
var command = connection.CreateDmlCommand(dml);
command.Parameters.Add("key", SpannerDbType.String, key);

_ = await Execute(DmlAsyncWork, connection, transactionType);

AssertActualAndUpdatedDbValues(key);

async Task<bool> DmlAsyncWork(SpannerTransaction transaction = null)
{
command.Transaction = transaction;
var reader = await command.ExecuteReaderAsync();
return true;
}
}

// With DML's return clause ExecuteNonQueryAsync should return number of affected rows.
[Theory, CombinatorialData]
[Trait(Constants.SupportedOnEmulator, Constants.No)]
public async Task DMLReturn_ExecuteNonQueryAsync(TransactionType transactionType)
{
string key = _fixture.CreateTestRows();
using var connection = _fixture.GetConnection();
string dml = $"UPDATE {_fixture.TableName} SET Value = OriginalValue + 1 WHERE UpdateMe AND Key=@key Then Return Value";
using var command = connection.CreateDmlCommand(dml);
command.Parameters.Add("key", SpannerDbType.String, key);

int actualAffectedRows = await Execute(DmlAsyncWork, connection, transactionType);

Assert.Equal(2, actualAffectedRows);
AssertActualAndUpdatedDbValues(key);

async Task<int> DmlAsyncWork(SpannerTransaction transaction = null)
{
command.Transaction = transaction;
return await command.ExecuteNonQueryAsync();
}
}

// Without DML's return clause also ExecuteReaderAsync should update the database table.
// First ReadAsync call should return false in the absence of 'return` clause.
[Theory, CombinatorialData]
public async Task NoDMLReturn_ExecuteReader_Read(TransactionType transactionType)
{
string key = _fixture.CreateTestRows();

using var connection = _fixture.GetConnection();
await connection.OpenAsync();
string dml = $"UPDATE {_fixture.TableName} SET Value = OriginalValue + 1 WHERE UpdateMe AND Key=@key";
var command = connection.CreateDmlCommand(dml);
command.Parameters.Add("key", SpannerDbType.String, key);

bool read = await Execute(DmlAsyncWork, connection, transactionType);

Assert.False(read);
AssertActualAndUpdatedDbValues(key);

async Task<bool> DmlAsyncWork(SpannerTransaction transaction = null)
{
command.Transaction = transaction;
using var reader = await command.ExecuteReaderAsync();
var readOnce = await reader.ReadAsync();
return readOnce;
}
}

// ExecuteReaderAsync still updates the database, even if we don't read from the returned reader and don't include "Then Return".
[Theory, CombinatorialData]
public async Task NoDMLReturn_ExecuteReader_NoRead(TransactionType transactionType)
{
string key = _fixture.CreateTestRows();
using var connection = _fixture.GetConnection();
await connection.OpenAsync();
string dml = $"UPDATE {_fixture.TableName} SET Value = OriginalValue + 1 WHERE UpdateMe AND Key=@key";
var command = connection.CreateDmlCommand(dml);
command.Parameters.Add("key", SpannerDbType.String, key);

await Execute(DmlAsyncWork, connection, transactionType);

AssertActualAndUpdatedDbValues(key);

async Task<List<int>> DmlAsyncWork(SpannerTransaction transaction = null)
{
command.Transaction = transaction;
var reader = await command.ExecuteReaderAsync();
return null;
}
}

private async Task<T> Execute<T>(Func<SpannerTransaction, Task<T>> dmlAsyncWork, SpannerConnection connection, TransactionType transactionType)
{
switch (transactionType)
{
case TransactionType.EphemeralTransaction:
return await dmlAsyncWork(null);
case TransactionType.ExplicitTransaction:
{
using var transaction = connection.BeginTransaction();
var result = await dmlAsyncWork(transaction).ConfigureAwait(false);
transaction.Commit();
return result;
}
case TransactionType.RetryableTransaction:
return await connection.RunWithRetriableTransactionAsync(dmlAsyncWork);
default:
throw new ArgumentException($"Invalid argument '{transactionType}'.");
}
}

private void AssertActualAndUpdatedDbValues(string key, SpannerTransaction transaction = null)
{
var actualDbValues = _fixture.FetchValues(key, transaction);
var expectedDbValues = new Dictionary<int, int>
{
{ 0, 0 }, // Not updated
{ 1, 2 }, // Updated
{ 2, 2 }, // Not updated
{ 3, 3 }, // Not updated
{ 4, 5 } // Updated
};

Assert.Equal(expectedDbValues, actualDbValues);
}
}
}
@@ -1,4 +1,4 @@
// Copyright 2017 Google Inc. All Rights Reserved.
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,6 +52,27 @@ async Task<long> Impl(SpannerTransaction transaction)
}
}

/// <summary>
/// Executes a DML statement and returns affected rows values via ReliableStreamReader.
/// The reader does not return any data if the DML statement does not contain a THEN RETURN clause, but the DML statement is executed.
/// </summary>
/// <returns> An instance of <see cref="ReliableStreamReader"/>.</returns>
public Task<ReliableStreamReader> ExecuteDmlReaderAsync(ExecuteSqlRequest request, CancellationToken cancellationToken, int timeoutSeconds)
{
return ExecuteHelper.WithErrorTranslationAndProfiling(
() => _connection.RunWithRetriableTransactionAsync(Impl, cancellationToken), "EphemeralTransaction.ExecuteDmlReaderAsync", _connection.Logger);

async Task<ReliableStreamReader> Impl(SpannerTransaction transaction)
{
transaction.CommitTimeout = timeoutSeconds;
transaction.CommitPriority = _commitPriority;

return await ((ISpannerTransaction) transaction)
.ExecuteDmlReaderAsync(request, cancellationToken, timeoutSeconds)
.ConfigureAwait(false);
}
}

// Note that this method is not in ISpannerTransaction because PartitionedDml can only be executed through ephemeral transactions.
internal Task<long> ExecutePartitionedDmlAsync(ExecuteSqlRequest request, CancellationToken cancellationToken, int timeoutSeconds)
{
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2017 Google Inc. All Rights Reserved.
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,9 @@ internal interface ISpannerTransaction
// do if they need to return an int result type.
Task<long> ExecuteDmlAsync(ExecuteSqlRequest request, CancellationToken cancellationToken, int timeoutSeconds);

// Note: this returns Task<ReliableStreamReader> to allow affected rows to be returned.
Task<ReliableStreamReader> ExecuteDmlReaderAsync(ExecuteSqlRequest request, CancellationToken cancellationToken, int timeoutSeconds);

// Note: this returns Task<IEnumerable<long>> to reflect the results from Spanner faithfully. The caller can then decide what to
// do if they need to return an int result type.
Task<IEnumerable<long>> ExecuteBatchDmlAsync(ExecuteBatchDmlRequest request, CancellationToken cancellationToken, int timeoutSeconds);
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -104,11 +104,22 @@ internal async Task<T> ExecuteScalarAsync<T>(CancellationToken cancellationToken
internal async Task<SpannerDataReader> ExecuteReaderAsync(CommandBehavior behavior, TimestampBound singleUseReadSettings, CancellationToken cancellationToken)
{
ValidateConnectionAndCommandTextBuilder();

if (CommandTextBuilder.SpannerCommandType == SpannerCommandType.Dml)
{
if (singleUseReadSettings != null)
{
throw new NotSupportedException("singleUseReadSettings cannot be used with DML command.");
}

return await ExecuteDmlReaderAsync(behavior, cancellationToken).ConfigureAwait(false);
}

ValidateCommandBehavior(behavior);

if (CommandTextBuilder.SpannerCommandType != SpannerCommandType.Select && CommandTextBuilder.SpannerCommandType != SpannerCommandType.Read)
{
throw new InvalidOperationException("ExecuteReader functionality is only available for queries and reads.");
throw new InvalidOperationException("ExecuteReader functionality is only available for queries, reads and DML commands.");
}

await Connection.EnsureIsOpenAsync(cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -220,7 +231,22 @@ private async Task<int> ExecuteDmlAsync(CancellationToken cancellationToken)
ExecuteSqlRequest request = GetExecuteSqlRequest();
long count = await transaction.ExecuteDmlAsync(request, cancellationToken, CommandTimeout).ConfigureAwait(false);
// This cannot currently exceed int.MaxValue due to Spanner commit limitations anyway.
return checked((int)count);
return checked((int) count);
}

private async Task<SpannerDataReader> ExecuteDmlReaderAsync(CommandBehavior behavior, CancellationToken cancellationToken)
{
ValidateCommandBehavior(behavior);
await Connection.EnsureIsOpenAsync(cancellationToken).ConfigureAwait(false);
var transaction = Transaction ?? Connection.AmbientTransaction ?? new EphemeralTransaction(Connection, Priority);
ExecuteSqlRequest request = GetExecuteSqlRequest();
var resultSet = await transaction.ExecuteDmlReaderAsync(request, cancellationToken, CommandTimeout).ConfigureAwait(false);

var enableGetSchemaTable = Connection.Builder.EnableGetSchemaTable;
// When the data reader is closed, we may need to dispose of the connection.
IDisposable resourceToClose = (behavior & CommandBehavior.CloseConnection) == CommandBehavior.CloseConnection ? Connection : null;

return new SpannerDataReader(Connection.Logger, resultSet, Transaction?.ReadTimestamp, resourceToClose, ConversionOptions, enableGetSchemaTable, CommandTimeout);
}

private async Task<int> ExecuteDdlAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -389,7 +415,7 @@ private V1.ExecuteSqlRequest.Types.QueryOptions GetEffectiveQueryOptions()
}

private RequestOptions BuildRequestOptions() =>
new RequestOptions { Priority = PriorityConverter.ToProto(Priority) , RequestTag = Tag ?? "", TransactionTag = Transaction?.Tag ?? "" };
new RequestOptions { Priority = PriorityConverter.ToProto(Priority), RequestTag = Tag ?? "", TransactionTag = Transaction?.Tag ?? "" };

private ExecuteSqlRequest GetExecuteSqlRequest()
{
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2017 Google Inc. All Rights Reserved.
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -315,13 +315,9 @@ Task<long> ISpannerTransaction.ExecuteDmlAsync(ExecuteSqlRequest request, Cancel
var callSettings = SpannerConnection.CreateCallSettings(settings => settings.ExecuteStreamingSqlSettings, timeoutSeconds, cancellationToken);
using (var reader = _session.ExecuteSqlStreamReader(request, callSettings))
{
Value value = await reader.NextAsync(cancellationToken).ConfigureAwait(false);
if (value != null)
{
throw new SpannerException(ErrorCode.Internal, "DML returned results unexpectedly.");
}
await reader.NextAsync(cancellationToken).ConfigureAwait(false);
var stats = reader.Stats;
if (stats == null)
{
throw new SpannerException(ErrorCode.Internal, "DML completed without statistics.");
Expand All @@ -339,6 +335,21 @@ Task<long> ISpannerTransaction.ExecuteDmlAsync(ExecuteSqlRequest request, Cancel
}, "SpannerTransaction.ExecuteDml", SpannerConnection.Logger);
}

Task<ReliableStreamReader> ISpannerTransaction.ExecuteDmlReaderAsync(ExecuteSqlRequest request, CancellationToken cancellationToken, int timeoutSeconds)
{
CheckCompatibleMode(TransactionMode.ReadWrite);
GaxPreconditions.CheckNotNull(request, nameof(request));
_hasExecutedStatements = true;
request.Seqno = Interlocked.Increment(ref _lastDmlSequenceNumber);
return ExecuteHelper.WithErrorTranslationAndProfiling(async () =>
{
var callSettings = SpannerConnection.CreateCallSettings(settings => settings.ExecuteStreamingSqlSettings, timeoutSeconds, cancellationToken);
using var reader = _session.ExecuteSqlStreamReader(request, callSettings);
await reader.EnsureInitializedAsync(cancellationToken).ConfigureAwait(false);
return reader;
}, "SpannerTransaction.ExecuteDmlReader", SpannerConnection.Logger);
}

Task<IEnumerable<long>> ISpannerTransaction.ExecuteBatchDmlAsync(ExecuteBatchDmlRequest request, CancellationToken cancellationToken, int timeoutSeconds)
{
CheckCompatibleMode(TransactionMode.ReadWrite);
Expand Down Expand Up @@ -464,7 +475,9 @@ protected override void Dispose(bool disposing)
}
_session.ReleaseToPool(forceDelete: false);
break;
// Default for detach or unknown DisposeBehavior is to do nothing.
default:
// Default for detached or unknown DisposeBehavior is to do nothing.
break;
}
}

Expand Down

0 comments on commit 4787593

Please sign in to comment.