Skip to content

Commit

Permalink
feat: Ambient transactions support commit delays
Browse files Browse the repository at this point in the history
  • Loading branch information
amanda-tarafa committed Apr 3, 2024
1 parent af15aaf commit f4b4208
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License"):
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using Xunit;

namespace Google.Cloud.Spanner.Data.Tests;
public class AmbientTransactionOptionsTests
{
[Fact]
public void Default_Singleton()
{
var one = AmbientTransactionOptions.Default;
Assert.NotNull(one);
var two = AmbientTransactionOptions.Default;
Assert.Same(one, two);
}

[Fact]
public void Default_Values() =>
Assert.Null(AmbientTransactionOptions.Default.CommitDelay);

public static TheoryData<TimeSpan?> ValidCommitDelayValues => SpannerTransactionTests.ValidCommitDelayValues;

[Theory, MemberData(nameof(ValidCommitDelayValues))]
public void WithCommitDelay_Valid(TimeSpan? commitDelay)
{
var custom = AmbientTransactionOptions.Default.WithCommitDelay(commitDelay);
Assert.NotSame(AmbientTransactionOptions.Default, custom);
Assert.Equal(commitDelay, custom.CommitDelay);
}

public static TheoryData<TimeSpan?> InvalidCommitDelayValues => SpannerTransactionTests.InvalidCommitDelayValues;

[Theory, MemberData(nameof(InvalidCommitDelayValues))]
public void WithCommitDelay_Invalid(TimeSpan? commitDelay) =>
Assert.Throws<ArgumentOutOfRangeException>(() => AmbientTransactionOptions.Default.WithCommitDelay(commitDelay));

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Xunit;

namespace Google.Cloud.Spanner.Data.Tests
Expand Down Expand Up @@ -349,6 +350,65 @@ public void CommitDelay_SetOnCommand_UnsetOnExplicitTransaction_CommandIgnored()
Arg.Any<CallSettings>());
}

[Fact]
public void CommitDelay_SetOnCommand_SetOnAmbientTransaction_CommandIgnored()
{
var transactionCommitDelay = TimeSpan.FromMilliseconds(100);
var commandCommitDelay = TimeSpan.FromMilliseconds(300);

SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger);
spannerClientMock
.SetupBatchCreateSessionsAsync()
.SetupBeginTransactionAsync()
.SetupExecuteBatchDmlAsync()
.SetupCommitAsync();
SpannerConnection connection = SpannerCommandTests.BuildSpannerConnection(spannerClientMock);

using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
connection.Open(AmbientTransactionOptions.Default.WithCommitDelay(transactionCommitDelay));
var command = connection.CreateBatchDmlCommand();
command.Add("UPDATE FOO SET BAR=1 WHERE TRUE");
command.CommitDelay = commandCommitDelay;
command.ExecuteNonQuery();

scope.Complete();
}

spannerClientMock.Received(1).CommitAsync(
Arg.Is<CommitRequest>(request => request.MaxCommitDelay.Equals(Duration.FromTimeSpan(transactionCommitDelay))),
Arg.Any<CallSettings>());
}

[Fact]
public void CommitDelay_SetOnCommand_UnsetOnAmbientTransaction_CommandIgnored()
{
var commandCommitDelay = TimeSpan.FromMilliseconds(300);

SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger);
spannerClientMock
.SetupBatchCreateSessionsAsync()
.SetupBeginTransactionAsync()
.SetupExecuteBatchDmlAsync()
.SetupCommitAsync();
SpannerConnection connection = SpannerCommandTests.BuildSpannerConnection(spannerClientMock);

using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
connection.Open();
var command = connection.CreateBatchDmlCommand();
command.Add("UPDATE FOO SET BAR=1 WHERE TRUE");
command.CommitDelay = commandCommitDelay;
command.ExecuteNonQuery();

scope.Complete();
}

spannerClientMock.Received(1).CommitAsync(
Arg.Is<CommitRequest>(request => request.MaxCommitDelay == null),
Arg.Any<CallSettings>());
}

[Fact]
public void CommandIncludesRequestAndTransactionTag()
{
Expand Down Expand Up @@ -385,7 +445,7 @@ private class FakeSessionPool : SessionPool.ISessionPool
public void Release(PooledSession session, ByteString transactionId, bool deleteSession) => throw new NotImplementedException();
public void Detach(PooledSession session) => throw new NotImplementedException();

public Task<PooledSession> RefreshedOrNewAsync(PooledSession session, TransactionOptions transactionOptions, bool singleUseTransaction, CancellationToken cancellationToken) =>
public Task<PooledSession> RefreshedOrNewAsync(PooledSession session, V1.TransactionOptions transactionOptions, bool singleUseTransaction, CancellationToken cancellationToken) =>
throw new NotImplementedException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,6 +957,111 @@ public void CommitDelay_SetOnCommand_UnsetOnExplicitTransaction_CommandIgnored()
Arg.Any<CallSettings>());
}

[Fact]
public void CommitDelay_DefaultsToNull_AmbientTransaction()
{
SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger);
spannerClientMock
.SetupBatchCreateSessionsAsync()
.SetupBeginTransactionAsync()
.SetupCommitAsync();
SpannerConnection connection = BuildSpannerConnection(spannerClientMock);

using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
connection.Open();
var command = connection.CreateInsertCommand("FOO");
command.ExecuteNonQuery();

scope.Complete();
}

spannerClientMock.Received(1).CommitAsync(
Arg.Is<CommitRequest>(request => request.MaxCommitDelay == null),
Arg.Any<CallSettings>());
}

[Fact]
public void CommitDelay_Propagates_AmbientTransaction()
{
var commitDelay = TimeSpan.FromMilliseconds(100);

SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger);
spannerClientMock
.SetupBatchCreateSessionsAsync()
.SetupBeginTransactionAsync()
.SetupCommitAsync();
SpannerConnection connection = BuildSpannerConnection(spannerClientMock);

using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
connection.Open(AmbientTransactionOptions.Default.WithCommitDelay(commitDelay));
var command = connection.CreateInsertCommand("FOO");
command.ExecuteNonQuery();

scope.Complete();
}

spannerClientMock.Received(1).CommitAsync(
Arg.Is<CommitRequest>(request => request.MaxCommitDelay.Equals(Duration.FromTimeSpan(commitDelay))),
Arg.Any<CallSettings>());
}

[Fact]
public void CommitDelay_SetOnCommand_SetOnAmbientTransaction_CommandIgnored()
{
var transactionCommitDelay = TimeSpan.FromMilliseconds(100);
var commandCommitDelay = TimeSpan.FromMilliseconds(300);

SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger);
spannerClientMock
.SetupBatchCreateSessionsAsync()
.SetupBeginTransactionAsync()
.SetupCommitAsync();
SpannerConnection connection = BuildSpannerConnection(spannerClientMock);

using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
connection.Open(AmbientTransactionOptions.Default.WithCommitDelay(transactionCommitDelay));
var command = connection.CreateInsertCommand("FOO");
command.CommitDelay = commandCommitDelay;
command.ExecuteNonQuery();

scope.Complete();
}

spannerClientMock.Received(1).CommitAsync(
Arg.Is<CommitRequest>(request => request.MaxCommitDelay.Equals(Duration.FromTimeSpan(transactionCommitDelay))),
Arg.Any<CallSettings>());
}

[Fact]
public void CommitDelay_SetOnCommand_UnsetOnAmbientTransaction_CommandIgnored()
{
var commandCommitDelay = TimeSpan.FromMilliseconds(300);

SpannerClient spannerClientMock = SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger);
spannerClientMock
.SetupBatchCreateSessionsAsync()
.SetupBeginTransactionAsync()
.SetupCommitAsync();
SpannerConnection connection = BuildSpannerConnection(spannerClientMock);

using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
connection.Open();
var command = connection.CreateInsertCommand("FOO");
command.CommitDelay = commandCommitDelay;
command.ExecuteNonQuery();

scope.Complete();
}

spannerClientMock.Received(1).CommitAsync(
Arg.Is<CommitRequest>(request => request.MaxCommitDelay == null),
Arg.Any<CallSettings>());
}

[Fact]
public void ClientCreatedWithEmulatorDetection()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License"):
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;

namespace Google.Cloud.Spanner.Data;

/// <summary>
/// Options for a <see cref="SpannerTransaction"/> that is enlisted in <see cref="System.Transactions.Transaction"/>.
/// </summary>
public sealed class AmbientTransactionOptions
{
/// <summary>
/// Default options for an ambient transaction. Using these options will result in a read-write transaction
/// with no <see cref="CommitDelay"/> value set.
/// </summary>
public static AmbientTransactionOptions Default { get; } = new AmbientTransactionOptions(null);

/// <summary>
/// The maximum amount of time the commit may be delayed server side for batching with other commits.
/// The bigger the delay, the better the throughput (QPS), but at the expense of commit latency.
/// If set to <see cref="TimeSpan.Zero"/>, commit batching is disabled.
/// May be null, in which case commits will continue to be batched as they had been before this configuration
/// option was made available to Spanner API consumers.
/// May be set to any value between <see cref="TimeSpan.Zero"/> and 500ms.
/// </summary>
public TimeSpan? CommitDelay { get; }

private AmbientTransactionOptions(TimeSpan? commitDelay) =>
CommitDelay = SpannerTransaction.CheckCommitDelayRange(commitDelay);

/// <summary>
/// Returns a new set of options equal to these, except for the specified <paramref name="commitDelay"/>.
/// </summary>
public AmbientTransactionOptions WithCommitDelay(TimeSpan? commitDelay) =>
new AmbientTransactionOptions(commitDelay);
}
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ public void OpenAsReadOnly(TimestampBound timestampBound = null)
{
throw new InvalidOperationException($"{nameof(OpenAsReadOnlyAsync)} should only be called with ${nameof(EnlistInTransaction)} set to true.");
}
Open(() => EnlistTransaction(transaction, timestampBound ?? TimestampBound.Strong, null));
Open(() => EnlistTransaction(transaction, timestampBound ?? TimestampBound.Strong, transactionId: null, commitDelay: null));
}

/// <summary>
Expand All @@ -922,7 +922,7 @@ public void OpenAsReadOnly(TransactionId transactionId)
{
throw new InvalidOperationException($"{nameof(OpenAsReadOnlyAsync)} should only be called with ${nameof(EnlistInTransaction)} set to true.");
}
Open(() => EnlistTransaction(transaction, null, transactionId));
Open(() => EnlistTransaction(transaction, timestampBound: null, transactionId, commitDelay: null));
}

/// <summary>
Expand All @@ -943,19 +943,59 @@ public Task OpenAsReadOnlyAsync(TimestampBound timestampBound = null, Cancellati
{
throw new InvalidOperationException($"{nameof(OpenAsReadOnlyAsync)} should only be called with ${nameof(EnlistInTransaction)} set to true.");
}
Action transactionEnlister = () => EnlistTransaction(transaction, timestampBound ?? TimestampBound.Strong, null);
Action transactionEnlister = () => EnlistTransaction(transaction, timestampBound ?? TimestampBound.Strong, transactionId: null, commitDelay: null);
return OpenAsyncImpl(transactionEnlister, cancellationToken);
}

/// <summary>
/// Opens the connection within a <see cref="System.Transactions.TransactionScope"/> with specific
/// <see cref="AmbientTransactionOptions"/>.
/// </summary>
public void Open(AmbientTransactionOptions options)
{
GaxPreconditions.CheckNotNull(options, nameof(options));
var transaction = Transaction.Current;
if (transaction == null)
{
throw new InvalidOperationException($"{nameof(Open)} should only be called within a TransactionScope.");
}
if (!EnlistInTransaction)
{
throw new InvalidOperationException($"{nameof(Open)} should only be called with ${nameof(EnlistInTransaction)} set to true.");
}

Open(() => EnlistTransaction(transaction, timestampBound: null, transactionId: null, options.CommitDelay));
}

/// <summary>
/// Opens the connection within a <see cref="System.Transactions.TransactionScope"/> with specific
/// <see cref="AmbientTransactionOptions"/>.
/// </summary>
public Task OpenAsync(AmbientTransactionOptions options, CancellationToken cancellationToken)
{
GaxPreconditions.CheckNotNull(options, nameof(options));
var transaction = Transaction.Current;
if (transaction == null)
{
throw new InvalidOperationException($"{nameof(Open)} should only be called within a TransactionScope.");
}
if (!EnlistInTransaction)
{
throw new InvalidOperationException($"{nameof(Open)} should only be called with ${nameof(EnlistInTransaction)} set to true.");
}

return OpenAsyncImpl(() => EnlistTransaction(transaction, timestampBound: null, transactionId: null, options.CommitDelay), cancellationToken);
}

/// <summary>
/// Gets or Sets whether to participate in the active <see cref="System.Transactions.TransactionScope" />
/// </summary>
public bool EnlistInTransaction { get; set; } = true;

/// <inheritdoc />
public override void EnlistTransaction(Transaction transaction) => EnlistTransaction(transaction, null, null);
public override void EnlistTransaction(Transaction transaction) => EnlistTransaction(transaction, null, null, null);

private void EnlistTransaction(Transaction transaction, TimestampBound timestampBound, TransactionId transactionId)
private void EnlistTransaction(Transaction transaction, TimestampBound timestampBound, TransactionId transactionId, TimeSpan? commitDelay)
{
if (!EnlistInTransaction)
{
Expand All @@ -965,7 +1005,7 @@ private void EnlistTransaction(Transaction transaction, TimestampBound timestamp
{
throw new InvalidOperationException("This connection is already enlisted to a transaction.");
}
_volatileResourceManager = new VolatileResourceManager(this, timestampBound, transactionId);
_volatileResourceManager = new VolatileResourceManager(this, timestampBound, transactionId, commitDelay);
transaction.EnlistVolatile(_volatileResourceManager, System.Transactions.EnlistmentOptions.None);
}

Expand Down
Loading

0 comments on commit f4b4208

Please sign in to comment.