Skip to content

Commit

Permalink
fix: retry PDML on EOS on DATA error (#5238)
Browse files Browse the repository at this point in the history
* fix: retry PDML on EOS on DATA error

For long-running PDML queries (>= 30mins), there's a possibility that
the gRPC connection is terminated with an error "Received unexpected EOS
on DATA frame from server".

We now retry the same transaction on this error.

Fixes #5209

* test: added unit test to verify retry on error
  • Loading branch information
skuruppu committed Sep 7, 2020
1 parent b43315c commit 0272d4e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ public void ExecuteReaderHasResourceHeader()
It.Is<CallSettings>(settings => HasResourcePrefixHeader(settings))), Times.Once());
}

[Fact]
public void PdmlRetriedOnEosError()
{
Mock<SpannerClient> spannerClientMock = SpannerClientHelpers
.CreateMockClient(Logger.DefaultLogger, MockBehavior.Strict);
spannerClientMock
.SetupBatchCreateSessionsAsync()
.SetupBeginTransactionAsync()
.SetupExecuteStreamingSqlForDmlThrowingEosError();

SpannerConnection connection = BuildSpannerConnection(spannerClientMock);

var command = connection.CreateDmlCommand("UPDATE abc SET xyz = 1 WHERE Id > 1");
long rowCount = command.ExecutePartitionedUpdate();
Assert.True(rowCount > 0);
spannerClientMock.Verify(client => client.ExecuteStreamingSql(
It.IsAny<ExecuteSqlRequest>(),
It.IsAny<CallSettings>()), Times.Exactly(3));
}

private Mock<SpannerClient> SetupExecuteStreamingSql(string optimizerVersion = "")
{
Mock<SpannerClient> spannerClientMock = SpannerClientHelpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Google.Api.Gax.Grpc;
using Google.Api.Gax.Grpc.Testing;
using Google.Api.Gax.Testing;
using Google.Cloud.Spanner.Data;
using Google.Cloud.Spanner.V1.Internal.Logging;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
Expand Down Expand Up @@ -217,6 +218,35 @@ internal static Mock<SpannerClient> SetupExecuteStreamingSql(this Mock<SpannerCl
return spannerClientMock;
}

internal static Mock<SpannerClient> SetupExecuteStreamingSqlForDmlThrowingEosError(this Mock<SpannerClient> spannerClientMock)
{
const string eosError = "Received unexpected EOS on DATA frame from server";
spannerClientMock
.SetupSequence(client => client.ExecuteStreamingSql(
It.IsAny<ExecuteSqlRequest>(),
It.IsAny<CallSettings>()))
.Throws(new SpannerException(ErrorCode.Internal, eosError))
.Throws(new SpannerException(ErrorCode.Internal, eosError))
.Returns(() =>
{
IEnumerable<PartialResultSet> results = new string[] {"token1", "token2", "token3"}
.Select((resumeToken, index) => new PartialResultSet
{
ResumeToken = ByteString.CopyFromUtf8(resumeToken),
Stats = new ResultSetStats
{
RowCountExact = 10
}
})
.ToList();
var asyncResults = new AsyncStreamAdapter<PartialResultSet>(results.ToAsyncEnumerable().GetAsyncEnumerator(default));
var call = new AsyncServerStreamingCall<PartialResultSet>(asyncResults,
Task.FromResult(new Metadata()), () => new Status(), () => new Metadata(), () => { });
return new ExecuteStreamingSqlStreamImpl(call);
});
return spannerClientMock;
}

private static ISetupSequentialResult<TResult> SetupFailures<TResult>(this ISetupSequentialResult<TResult> setup, int failures, StatusCode statusCode, string exceptionMessage, TimeSpan? exceptionRetryDelay)
{
var exception = BuildRpcException(exceptionRetryDelay, statusCode, exceptionMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

using Google.Api.Gax;
using Google.Cloud.Spanner.V1;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -45,17 +46,32 @@ async Task<long> Impl()
using (var transaction = await _connection.BeginTransactionImplAsync(_transactionOptions, TransactionMode.ReadWrite, cancellationToken).ConfigureAwait(false))
{
transaction.CommitTimeout = timeoutSeconds;
long count = await ((ISpannerTransaction)transaction)
.ExecuteDmlAsync(request, cancellationToken, timeoutSeconds)
.ConfigureAwait(false);

// This is somewhat ugly. PDML commits as it goes, so we don't need to, whereas non-partitioned
// DML needs the commit afterwards to finish up.
if (_transactionOptions.ModeCase != TransactionOptions.ModeOneofCase.PartitionedDml)
while (true)
{
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
try
{
long count = await ((ISpannerTransaction)transaction)
.ExecuteDmlAsync(request, cancellationToken, timeoutSeconds)
.ConfigureAwait(false);

// This is somewhat ugly. PDML commits as it goes, so we don't need to, whereas non-partitioned
// DML needs the commit afterwards to finish up.
if (_transactionOptions.ModeCase != TransactionOptions.ModeOneofCase.PartitionedDml)
{
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
}
return count;
}
catch (SpannerException e) when (
_transactionOptions.ModeCase == TransactionOptions.ModeOneofCase.PartitionedDml &&
e.ErrorCode == ErrorCode.Internal &&
e.Message.Contains("Received unexpected EOS on DATA frame from server"))
{
// Retry with the same transaction. Since this error happens in long-lived
// transactions (>= 30 mins), it's unnecessary to do exponential backoff.
continue;
}
}
return count;
}
}
}
Expand Down

0 comments on commit 0272d4e

Please sign in to comment.