From 02b919921131f371f7195e1417f51c1e04e23dda Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Fri, 16 Aug 2019 17:09:45 +0200 Subject: [PATCH] spanner: retry aborted PDMLs Partitioned DML transactions can be aborted by Cloud Spanner. These transactions were not automatically retried by the client library. This feature was however added to the Java library, and the Spanner team requested this change to be added to the Go library as well. Fixes #1536 Change-Id: Ibcb7a61cb5827880d3adff55f65b3d4ae8c6f573 Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/44171 Reviewed-by: kokoro Reviewed-by: Emmanuel Odeke --- internal/kokoro/vet.sh | 1 + spanner/client_test.go | 20 ---- .../internal/testutil/inmem_spanner_server.go | 9 +- spanner/pdml.go | 96 +++++++++++-------- spanner/pdml_test.go | 72 ++++++++++++++ spanner/retry.go | 9 ++ 6 files changed, 141 insertions(+), 66 deletions(-) diff --git a/internal/kokoro/vet.sh b/internal/kokoro/vet.sh index 329be04e1b75..e46670c07e49 100755 --- a/internal/kokoro/vet.sh +++ b/internal/kokoro/vet.sh @@ -62,6 +62,7 @@ golint ./... 2>&1 | ( \ grep -v "exported func Value returns unexported type pretty.val, which can be annoying to use" | \ grep -v "exported func Increment returns unexported type firestore.increment, which can be annoying to use" | \ grep -v "ExecuteStreamingSql" | \ + grep -v "MethodExecuteSql should be MethodExecuteSQL" | \ grep -vE "pubsub\/pstest\/fake\.go.+should have comment or be unexported" | \ grep -v "ClusterId" | \ grep -v "InstanceId" | \ diff --git a/spanner/client_test.go b/spanner/client_test.go index ec69e89a98fc..73693f14449e 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -383,26 +383,6 @@ func TestClient_ApplyAtLeastOnce(t *testing.T) { } } -// PartitionedUpdate should not retry on aborted. -func TestClient_PartitionedUpdate(t *testing.T) { - t.Parallel() - server, client, teardown := setupMockedTestServer(t) - defer teardown() - // PartitionedDML transactions are not committed. - server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, - SimulatedExecutionTime{ - Errors: []error{gstatus.Error(codes.Aborted, "Transaction aborted")}, - }) - _, err := client.PartitionedUpdate(context.Background(), NewStatement(UpdateBarSetFoo)) - if err == nil { - t.Fatalf("Missing expected Aborted exception") - } else { - if gstatus.Code(err) != codes.Aborted { - t.Fatalf("Got unexpected error %v, expected Aborted", err) - } - } -} - func TestReadWriteTransaction_ErrUnexpectedEOF(t *testing.T) { _, client, teardown := setupMockedTestServer(t) defer teardown() diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index c23e480f5315..5c4f54643307 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -57,6 +57,7 @@ const ( MethodCreateSession string = "CREATE_SESSION" MethodDeleteSession string = "DELETE_SESSION" MethodGetSession string = "GET_SESSION" + MethodExecuteSql string = "EXECUTE_SQL" MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL" ) @@ -563,13 +564,9 @@ func (s *inMemSpannerServer) DeleteSession(ctx context.Context, req *spannerpb.D } func (s *inMemSpannerServer) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest) (*spannerpb.ResultSet, error) { - s.mu.Lock() - if s.stopped { - s.mu.Unlock() - return nil, gstatus.Error(codes.Unavailable, "server has been stopped") + if err := s.simulateExecutionTime(MethodExecuteSql, req); err != nil { + return nil, err } - s.receivedRequests <- req - s.mu.Unlock() if req.Session == "" { return nil, gstatus.Error(codes.InvalidArgument, "Missing session name") } diff --git a/spanner/pdml.go b/spanner/pdml.go index c03c13a60e0d..242a48edcfe9 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -16,10 +16,9 @@ package spanner import ( "context" - "time" "cloud.google.com/go/internal/trace" - "google.golang.org/api/iterator" + "github.com/googleapis/gax-go/v2" sppb "google.golang.org/genproto/googleapis/spanner/v1" "google.golang.org/grpc/codes" ) @@ -38,66 +37,83 @@ func (c *Client) PartitionedUpdate(ctx context.Context, statement Statement) (co if err := checkNestedTxn(ctx); err != nil { return 0, err } - var ( - tx transactionID s *session sh *sessionHandle ) - // Create session. + // Create a session that will be used only for this request. sc := c.rrNext() s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md) if err != nil { return 0, toSpannerError(err) } + // Delete the session at the end of the request. If the PDML statement + // timed out or was cancelled, the DeleteSession request might not succeed, + // but the session will eventually be garbage collected by the server. defer s.delete(ctx) sh = &sessionHandle{session: s} - // Begin transaction. - res, err := sc.BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ - Session: sh.getID(), - Options: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}}, - }, - }) - if err != nil { - return 0, toSpannerError(err) - } + // Create the parameters and the SQL request, but without a transaction. + // The transaction reference will be added by the executePdml method. params, paramTypes, err := statement.convertParams() if err != nil { return 0, toSpannerError(err) } - tx = res.Id - req := &sppb.ExecuteSqlRequest{ - Session: sh.getID(), - Transaction: &sppb.TransactionSelector{ - Selector: &sppb.TransactionSelector_Id{Id: tx}, - }, + Session: sh.getID(), Sql: statement.SQL, Params: params, ParamTypes: paramTypes, } - rpc := func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { - req.ResumeToken = resumeToken - return sc.ExecuteStreamingSql(ctx, req) - } - iter := stream(contextWithOutgoingMetadata(ctx, sh.getMetadata()), - rpc, func(time.Time) {}, func(error) {}) - // TODO(jba): factor out the following code from here and ReadWriteTransaction.Update. - defer iter.Stop() - for { - _, err := iter.Next() - if err == iterator.Done { - break - } - if err != nil { - return 0, toSpannerError(err) + + // Make a retryer for Aborted errors. + // TODO: use generic Aborted retryer when merged with master + retryer := gax.OnCodes([]codes.Code{codes.Aborted}, DefaultRetryBackoff) + // Execute the PDML and retry if the transaction is aborted. + executePdmlWithRetry := func(ctx context.Context) (int64, error) { + for { + count, err := executePdml(ctx, sh, req) + if err == nil { + return count, nil + } + delay, shouldRetry := retryer.Retry(err) + if !shouldRetry { + return 0, err + } + if err := gax.Sleep(ctx, delay); err != nil { + return 0, err + } } - time.Sleep(time.Second) } + return executePdmlWithRetry(ctx) +} - if !iter.sawStats { - return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", statement.SQL) +// executePdml executes the following steps: +// 1. Begin a PDML transaction +// 2. Add the ID of the PDML transaction to the SQL request. +// 3. Execute the update statement on the PDML transaction +// +// Note that PDML transactions cannot be committed or rolled back. +func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) { + // Begin transaction. + res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ + Session: sh.getID(), + Options: &sppb.TransactionOptions{ + Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}}, + }, + }) + if err != nil { + return 0, toSpannerError(err) + } + // Add a reference to the PDML transaction on the ExecuteSql request. + req.Transaction = &sppb.TransactionSelector{ + Selector: &sppb.TransactionSelector_Id{Id: res.Id}, + } + resultSet, err := sh.getClient().ExecuteSql(ctx, req) + if err != nil { + return 0, err + } + if resultSet.Stats == nil { + return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", req.Sql) } - return iter.RowCount, nil + return extractRowCount(resultSet.Stats) } diff --git a/spanner/pdml_test.go b/spanner/pdml_test.go index 9f479bb6189e..7e5a523f31a7 100644 --- a/spanner/pdml_test.go +++ b/spanner/pdml_test.go @@ -15,11 +15,15 @@ package spanner import ( + "bytes" "context" "testing" + "time" . "cloud.google.com/go/spanner/internal/testutil" + sppb "google.golang.org/genproto/googleapis/spanner/v1" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestMockPartitionedUpdate(t *testing.T) { @@ -52,3 +56,71 @@ func TestMockPartitionedUpdateWithQuery(t *testing.T) { t.Errorf("got error %v, want code %s", err, wantCode) } } + +// PDML should be retried if the transaction is aborted. +func TestPartitionedUpdate_Aborted(t *testing.T) { + t.Parallel() + ctx := context.Background() + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + server.TestSpanner.PutExecutionTime(MethodExecuteSql, + SimulatedExecutionTime{ + Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}, + }) + stmt := NewStatement(UpdateBarSetFoo) + rowCount, err := client.PartitionedUpdate(ctx, stmt) + if err != nil { + t.Fatal(err) + } + want := int64(UpdateBarSetFooRowCount) + if rowCount != want { + t.Errorf("Row count mismatch\ngot: %d\nwant: %d", rowCount, want) + } + + gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{ + &sppb.CreateSessionRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.ExecuteSqlRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.ExecuteSqlRequest{}, + &sppb.DeleteSessionRequest{}, + }) + if err != nil { + t.Fatal(err) + } + id1 := gotReqs[2].(*sppb.ExecuteSqlRequest).Transaction.GetId() + id2 := gotReqs[4].(*sppb.ExecuteSqlRequest).Transaction.GetId() + if bytes.Equal(id1, id2) { + t.Errorf("same transaction used twice, expected two different transactions\ngot tx1: %q\ngot tx2: %q", id1, id2) + } +} + +// Test that a deadline is respected by PDML, and that the session that was +// created is also deleted, even though the update timed out. +func TestPartitionedUpdate_WithDeadline(t *testing.T) { + t.Parallel() + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + ctx := context.Background() + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(50*time.Millisecond)) + defer cancel() + server.TestSpanner.PutExecutionTime(MethodExecuteSql, + SimulatedExecutionTime{ + MinimumExecutionTime: 100 * time.Millisecond, + }) + stmt := NewStatement(UpdateBarSetFoo) + // The following update will cause a 'Failed to delete session' warning to + // be logged. This is expected. Once each client has its own logger, we + // should temporarily turn off logging to prevent this warning to be + // logged. + _, err := client.PartitionedUpdate(ctx, stmt) + if err == nil { + t.Fatalf("missing expected error") + } + wantCode := codes.DeadlineExceeded + if status.Code(err) != wantCode { + t.Fatalf("got error %v, want code %s", err, wantCode) + } +} diff --git a/spanner/retry.go b/spanner/retry.go index c45e80b4d63a..cc6400f621d5 100644 --- a/spanner/retry.go +++ b/spanner/retry.go @@ -23,6 +23,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/googleapis/gax-go/v2" edpb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -32,6 +33,14 @@ const ( retryInfoKey = "google.rpc.retryinfo-bin" ) +// DefaultRetryBackoff is used for retryers as a fallback value when the server +// did not return any retry information. +var DefaultRetryBackoff = gax.Backoff{ + Initial: 20 * time.Millisecond, + Max: 32 * time.Second, + Multiplier: 1.3, +} + // isErrorClosing reports whether the error is generated by gRPC layer talking // to a closed server. func isErrorClosing(err error) bool {