Skip to content

Commit

Permalink
fix: Makes transactions marked as DisposedBehaviour.Detach actually d…
Browse files Browse the repository at this point in the history
…etach from the session pool.
  • Loading branch information
amanda-tarafa committed Jun 15, 2023
1 parent 259b0f0 commit ab7526f
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 16 deletions.
Expand Up @@ -86,6 +86,19 @@ private async Task IncrementByOneAsync(SpannerConnection connection, bool orphan
}
}

[Fact]
public async Task DetachOnDisposeTransactionIsDetached()
{
using var connection = new SpannerConnection(_fixture.ConnectionString);
await connection.OpenAsync();

using var transaction = await connection.BeginTransactionAsync();
transaction.DisposeBehavior = DisposeBehavior.Detach;

// We are testing (through the CommonTestsDiagnostics attribute) that there
// are no active sessions or connections after we have disposed of both.
}

[Fact]
public async Task DisposedTransactionDoesntLeak()
{
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ public void TransactionConstructor()
var pool = new FakeSessionPool();
var session = PooledSession.FromSessionName(pool, new SessionName("project", "instance", "database", "session"));

var transaction = new SpannerTransaction(connection, TransactionMode.ReadWrite, session: session, timestampBound: null);
var transaction = new SpannerTransaction(connection, TransactionMode.ReadWrite, session: session, timestampBound: null, isRetriable: false);
var command = new SpannerBatchCommand(transaction);

Assert.Empty(command.Commands);
Expand Down Expand Up @@ -254,6 +254,7 @@ private class FakeSessionPool : SessionPool.ISessionPool
public IClock Clock => SystemClock.Instance;
public SessionPoolOptions Options { get; } = new SessionPoolOptions();
public void Release(PooledSession session, ByteString transactionId, bool deleteSession) => throw new NotImplementedException();
public void Detach(PooledSession session) => throw new NotImplementedException();

public Task<PooledSession> WithFreshTransactionOrNewAsync(PooledSession session, TransactionOptions transactionOptions, CancellationToken cancellationToken) =>
throw new NotImplementedException();
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -426,6 +426,8 @@ public void Release(PooledSession session, ByteString transactionToRollback, boo
ReleasedSessionDeleted = deleteSession;
}

public void Detach(PooledSession session) => throw new NotImplementedException();

public Task<PooledSession> WithFreshTransactionOrNewAsync(PooledSession session, TransactionOptions transactionOptions, CancellationToken cancellationToken) =>
throw new NotImplementedException();
}
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2019 Google LLC
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,7 +60,7 @@ async Task<TResult> CommitAttempt()
try
{
session = await (session?.WithFreshTransactionOrNewAsync(SpannerConnection.ReadWriteTransactionOptions, cancellationToken) ?? _connection.AcquireReadWriteSessionAsync(cancellationToken)).ConfigureAwait(false);
transaction = new SpannerTransaction(_connection, TransactionMode.ReadWrite, session, null);
transaction = new SpannerTransaction(_connection, TransactionMode.ReadWrite, session, null, true);
TResult result = await asyncWork(transaction).ConfigureAwait(false);
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -90,12 +90,12 @@ async Task<TResult> CommitAttempt()
{
if (transaction != null)
{
// Let's make sure that the associated session is not released to the pool
// because we'll be attempting to get a fresh transaction for this same session first.
// Since the transaction was marked as retriable, disposing of it won't attempt to dispose of or
// return the underlying session to the pool. That's because we'll be attempting to get a
// fresh transaction for this same session first.
// If that fails will attempt a new session acquisition.
// This session will be disposed of by the pool if it can't be refreshed or by the RunAsync method
// if we are not retrying anymore.
transaction.DisposeBehavior = DisposeBehavior.Detach;
transaction.Dispose();
}
}
Expand Down
Expand Up @@ -280,7 +280,7 @@ public SpannerTransaction BeginReadOnlyTransaction(TransactionId transactionId)
ByteString transactionIdBytes = ByteString.FromBase64(transactionId.Id);
var session = _sessionPool.CreateDetachedSession(sessionName, transactionIdBytes, TransactionOptions.ModeOneofCase.ReadOnly);
// This transaction is coming from another process potentially, so we don't auto close it.
return new SpannerTransaction(this, TransactionMode.ReadOnly, session, transactionId.TimestampBound)
return new SpannerTransaction(this, TransactionMode.ReadOnly, session, transactionId.TimestampBound, false)
{
Shared = true,
DisposeBehavior = DisposeBehavior.Detach
Expand Down Expand Up @@ -863,7 +863,7 @@ internal Task<PooledSession> AcquireSessionAsync(TransactionOptions options, Can
{
await OpenAsync(cancellationToken).ConfigureAwait(false);
var session = await AcquireSessionAsync(transactionOptions, cancellationToken).ConfigureAwait(false);
return new SpannerTransaction(this, transactionMode, session, targetReadTimestamp);
return new SpannerTransaction(this, transactionMode, session, targetReadTimestamp, false);
}, "SpannerConnection.BeginTransaction", Logger);
}

Expand Down
Expand Up @@ -59,6 +59,8 @@ private protected SpannerTransactionBase()
public sealed class SpannerTransaction : SpannerTransactionBase, ISpannerTransaction
{
private readonly List<Mutation> _mutations = new List<Mutation>();
// This value will be true if and only if this transaction was created by RetriableTransaction.
private readonly bool _isRetriable = false;
private DisposeBehavior _disposeBehavior = DisposeBehavior.ReleaseToPool;
private bool _disposed = false;

Expand Down Expand Up @@ -192,14 +194,16 @@ public string Tag
SpannerConnection connection,
TransactionMode mode,
PooledSession session,
TimestampBound timestampBound)
TimestampBound timestampBound,
bool isRetriable)
{
SpannerConnection = GaxPreconditions.CheckNotNull(connection, nameof(connection));
CommitTimeout = SpannerConnection.Builder.Timeout;
LogCommitStats = SpannerConnection.LogCommitStats;
Mode = mode;
_session = GaxPreconditions.CheckNotNull(session, nameof(session));
TimestampBound = timestampBound;
_isRetriable = isRetriable;
}

/// <summary>
Expand Down Expand Up @@ -460,6 +464,19 @@ protected override void Dispose(bool disposing)
return;
}
_disposed = true;

if (_isRetriable)
{
// If this transaction is being used by RetriableTransaction, we want to dispose of this instance
// but we don't want to do anything with the session, as the RetriableTransaction will attempt to
// reuse it with a fresh transaction.
// If acquiring a fresh transaction with the existing session fails, the session will be disposed
// and a new one with a fresh transaction will be obtained.
// If acquiring a fresh transaction succeeds, then the session will be disposed after the RetriableTransaction
// succeeds or we have stopped retrying.
return;
}

switch (DisposeBehavior)
{
case DisposeBehavior.CloseResources:
Expand All @@ -475,8 +492,16 @@ protected override void Dispose(bool disposing)
}
_session.ReleaseToPool(forceDelete: false);
break;
case DisposeBehavior.Detach:
// A detached transaction is expected to be explicitly shared across processes.
// So we don't release it back to the pool, but we need to mark it as disposed and stop counting it as active
// if it was created in the targeted pool. The first time a detached transaction is created, it will be created
// in the targeted pool, subsequent times (as the transaction ID is used) it will be created in the detached pool
// which doesn't keep track of active transactions/sessions.
_session.DetachFromPool();
break;
default:
// Default for detached or unknown DisposeBehavior is to do nothing.
// Default for unknown DisposeBehavior is to do nothing.
break;
}
}
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -231,6 +231,34 @@ public void ReleaseToPool(bool forceDelete)
}
}

/// <summary>
/// Detaches this session from the session pool from which it was acquired, so that the session pool
/// stops tracking this session and counting it as active.
/// This method should only be called once per instance; subsequent calls are ignored.
/// No other methods can be called after this.
/// </summary>
/// <remarks>
/// This method should be called only for sessions that are meant to be explicitly shared across processes.
/// Note that we don't attempt to rollback a transaction that is being detached, or attempt to delete the session,
/// under the assumption that it will be reused across processes.
/// If there's a process capable of knowing when all other processes are done using the session, then that process could call
/// <see cref="SessionPool.CreateDetachedSession(SessionName, ByteString, ModeOneofCase)"/> (or an overload) to create an instance
/// of <see cref="PooledSession"/> representing the shared transaction and then call <see cref="ReleaseToPool(bool)"/> passing true
/// to force session deletion and clean up resources.
/// Else, the application can rely on Spaner service garbage collection to clean up this session once it becomes stale.
/// </remarks>
public void DetachFromPool()
{
if (MarkAsDisposed())
{
_pool.Detach(AfterReset());
}
else
{
// Log?
}
}

/// <summary>
/// Executes a Commit RPC asynchronously.
/// </summary>
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,6 +43,11 @@ public override void Release(PooledSession session, ByteString transactionToRoll
}
}

public override void Detach(PooledSession session)
{
// No-op: We are already in the detached session pool which doesn't keep track of sessions.
}

public override Task<PooledSession> WithFreshTransactionOrNewAsync(PooledSession session, TransactionOptions transactionOptions, CancellationToken cancellationToken) =>
throw new InvalidOperationException(
$"{nameof(session)} is a detached session. Its transaction can't be refreshed and it can't be substituted by a new session.");
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,14 @@ internal interface ISessionPool
IClock Clock { get; }
Task<PooledSession> WithFreshTransactionOrNewAsync(PooledSession session, TransactionOptions transactionOptions, CancellationToken cancellationToken);
void Release(PooledSession session, ByteString transactionToRollback, bool deleteSession);
/// <summary>
/// Detaches the given session from the pool, meaning the pool stops tracking this session as active,
/// but does nothing else, in particular it doesn't return the session to the pool or attempts to delete it.
/// </summary>
/// <param name="session">The pooled session to be detached. Currently unusued by all <see cref="ISessionPool"/>
/// implementations, but that's just an implementation detail: the <see cref="TargetedSessionPool"/> does not
/// keep track of active <see cref="PooledSession"/> instances themselves, but only through counters.</param>
void Detach(PooledSession session);
SessionPoolOptions Options { get; }
}
}
Expand Down
@@ -1,4 +1,4 @@
// Copyright 2018 Google LLC
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,6 +33,8 @@ internal abstract class SessionPoolBase : ISessionPool
protected SessionPool Parent { get; }
public abstract Task<PooledSession> WithFreshTransactionOrNewAsync(PooledSession session, TransactionOptions transactionOptions, CancellationToken cancellationToken);
public abstract void Release(PooledSession session, ByteString transactionToRollback, bool deleteSession);
public abstract void Detach(PooledSession session);

protected SessionPoolBase(SessionPool parent) => Parent = parent;
}
}
Expand Down
Expand Up @@ -521,6 +521,12 @@ private async Task ReleaseAsync(PooledSession session, ByteString transactionId,
}
}

public override void Detach(PooledSession session)
{
Interlocked.Decrement(ref _activeSessionCount);
Interlocked.Decrement(ref _liveOrRequestedSessionCount);
}

internal void MaintainPool()
{
if (Shutdown)
Expand Down

0 comments on commit ab7526f

Please sign in to comment.