diff --git a/internal/kokoro/vet.sh b/internal/kokoro/vet.sh index 329be04e1b75..e46670c07e49 100755 --- a/internal/kokoro/vet.sh +++ b/internal/kokoro/vet.sh @@ -62,6 +62,7 @@ golint ./... 2>&1 | ( \ grep -v "exported func Value returns unexported type pretty.val, which can be annoying to use" | \ grep -v "exported func Increment returns unexported type firestore.increment, which can be annoying to use" | \ grep -v "ExecuteStreamingSql" | \ + grep -v "MethodExecuteSql should be MethodExecuteSQL" | \ grep -vE "pubsub\/pstest\/fake\.go.+should have comment or be unexported" | \ grep -v "ClusterId" | \ grep -v "InstanceId" | \ diff --git a/spanner/client_test.go b/spanner/client_test.go index ec69e89a98fc..73693f14449e 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -383,26 +383,6 @@ func TestClient_ApplyAtLeastOnce(t *testing.T) { } } -// PartitionedUpdate should not retry on aborted. -func TestClient_PartitionedUpdate(t *testing.T) { - t.Parallel() - server, client, teardown := setupMockedTestServer(t) - defer teardown() - // PartitionedDML transactions are not committed. - server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, - SimulatedExecutionTime{ - Errors: []error{gstatus.Error(codes.Aborted, "Transaction aborted")}, - }) - _, err := client.PartitionedUpdate(context.Background(), NewStatement(UpdateBarSetFoo)) - if err == nil { - t.Fatalf("Missing expected Aborted exception") - } else { - if gstatus.Code(err) != codes.Aborted { - t.Fatalf("Got unexpected error %v, expected Aborted", err) - } - } -} - func TestReadWriteTransaction_ErrUnexpectedEOF(t *testing.T) { _, client, teardown := setupMockedTestServer(t) defer teardown() diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index c23e480f5315..5c4f54643307 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -57,6 +57,7 @@ const ( MethodCreateSession string = "CREATE_SESSION" MethodDeleteSession string = "DELETE_SESSION" MethodGetSession string = "GET_SESSION" + MethodExecuteSql string = "EXECUTE_SQL" MethodExecuteStreamingSql string = "EXECUTE_STREAMING_SQL" ) @@ -563,13 +564,9 @@ func (s *inMemSpannerServer) DeleteSession(ctx context.Context, req *spannerpb.D } func (s *inMemSpannerServer) ExecuteSql(ctx context.Context, req *spannerpb.ExecuteSqlRequest) (*spannerpb.ResultSet, error) { - s.mu.Lock() - if s.stopped { - s.mu.Unlock() - return nil, gstatus.Error(codes.Unavailable, "server has been stopped") + if err := s.simulateExecutionTime(MethodExecuteSql, req); err != nil { + return nil, err } - s.receivedRequests <- req - s.mu.Unlock() if req.Session == "" { return nil, gstatus.Error(codes.InvalidArgument, "Missing session name") } diff --git a/spanner/pdml.go b/spanner/pdml.go index c03c13a60e0d..242a48edcfe9 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -16,10 +16,9 @@ package spanner import ( "context" - "time" "cloud.google.com/go/internal/trace" - "google.golang.org/api/iterator" + "github.com/googleapis/gax-go/v2" sppb "google.golang.org/genproto/googleapis/spanner/v1" "google.golang.org/grpc/codes" ) @@ -38,66 +37,83 @@ func (c *Client) PartitionedUpdate(ctx context.Context, statement Statement) (co if err := checkNestedTxn(ctx); err != nil { return 0, err } - var ( - tx transactionID s *session sh *sessionHandle ) - // Create session. + // Create a session that will be used only for this request. sc := c.rrNext() s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md) if err != nil { return 0, toSpannerError(err) } + // Delete the session at the end of the request. If the PDML statement + // timed out or was cancelled, the DeleteSession request might not succeed, + // but the session will eventually be garbage collected by the server. defer s.delete(ctx) sh = &sessionHandle{session: s} - // Begin transaction. - res, err := sc.BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ - Session: sh.getID(), - Options: &sppb.TransactionOptions{ - Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}}, - }, - }) - if err != nil { - return 0, toSpannerError(err) - } + // Create the parameters and the SQL request, but without a transaction. + // The transaction reference will be added by the executePdml method. params, paramTypes, err := statement.convertParams() if err != nil { return 0, toSpannerError(err) } - tx = res.Id - req := &sppb.ExecuteSqlRequest{ - Session: sh.getID(), - Transaction: &sppb.TransactionSelector{ - Selector: &sppb.TransactionSelector_Id{Id: tx}, - }, + Session: sh.getID(), Sql: statement.SQL, Params: params, ParamTypes: paramTypes, } - rpc := func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { - req.ResumeToken = resumeToken - return sc.ExecuteStreamingSql(ctx, req) - } - iter := stream(contextWithOutgoingMetadata(ctx, sh.getMetadata()), - rpc, func(time.Time) {}, func(error) {}) - // TODO(jba): factor out the following code from here and ReadWriteTransaction.Update. - defer iter.Stop() - for { - _, err := iter.Next() - if err == iterator.Done { - break - } - if err != nil { - return 0, toSpannerError(err) + + // Make a retryer for Aborted errors. + // TODO: use generic Aborted retryer when merged with master + retryer := gax.OnCodes([]codes.Code{codes.Aborted}, DefaultRetryBackoff) + // Execute the PDML and retry if the transaction is aborted. + executePdmlWithRetry := func(ctx context.Context) (int64, error) { + for { + count, err := executePdml(ctx, sh, req) + if err == nil { + return count, nil + } + delay, shouldRetry := retryer.Retry(err) + if !shouldRetry { + return 0, err + } + if err := gax.Sleep(ctx, delay); err != nil { + return 0, err + } } - time.Sleep(time.Second) } + return executePdmlWithRetry(ctx) +} - if !iter.sawStats { - return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", statement.SQL) +// executePdml executes the following steps: +// 1. Begin a PDML transaction +// 2. Add the ID of the PDML transaction to the SQL request. +// 3. Execute the update statement on the PDML transaction +// +// Note that PDML transactions cannot be committed or rolled back. +func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) { + // Begin transaction. + res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ + Session: sh.getID(), + Options: &sppb.TransactionOptions{ + Mode: &sppb.TransactionOptions_PartitionedDml_{PartitionedDml: &sppb.TransactionOptions_PartitionedDml{}}, + }, + }) + if err != nil { + return 0, toSpannerError(err) + } + // Add a reference to the PDML transaction on the ExecuteSql request. + req.Transaction = &sppb.TransactionSelector{ + Selector: &sppb.TransactionSelector_Id{Id: res.Id}, + } + resultSet, err := sh.getClient().ExecuteSql(ctx, req) + if err != nil { + return 0, err + } + if resultSet.Stats == nil { + return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", req.Sql) } - return iter.RowCount, nil + return extractRowCount(resultSet.Stats) } diff --git a/spanner/pdml_test.go b/spanner/pdml_test.go index 9f479bb6189e..7e5a523f31a7 100644 --- a/spanner/pdml_test.go +++ b/spanner/pdml_test.go @@ -15,11 +15,15 @@ package spanner import ( + "bytes" "context" "testing" + "time" . "cloud.google.com/go/spanner/internal/testutil" + sppb "google.golang.org/genproto/googleapis/spanner/v1" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) func TestMockPartitionedUpdate(t *testing.T) { @@ -52,3 +56,71 @@ func TestMockPartitionedUpdateWithQuery(t *testing.T) { t.Errorf("got error %v, want code %s", err, wantCode) } } + +// PDML should be retried if the transaction is aborted. +func TestPartitionedUpdate_Aborted(t *testing.T) { + t.Parallel() + ctx := context.Background() + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + server.TestSpanner.PutExecutionTime(MethodExecuteSql, + SimulatedExecutionTime{ + Errors: []error{status.Error(codes.Aborted, "Transaction aborted")}, + }) + stmt := NewStatement(UpdateBarSetFoo) + rowCount, err := client.PartitionedUpdate(ctx, stmt) + if err != nil { + t.Fatal(err) + } + want := int64(UpdateBarSetFooRowCount) + if rowCount != want { + t.Errorf("Row count mismatch\ngot: %d\nwant: %d", rowCount, want) + } + + gotReqs, err := shouldHaveReceived(server.TestSpanner, []interface{}{ + &sppb.CreateSessionRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.ExecuteSqlRequest{}, + &sppb.BeginTransactionRequest{}, + &sppb.ExecuteSqlRequest{}, + &sppb.DeleteSessionRequest{}, + }) + if err != nil { + t.Fatal(err) + } + id1 := gotReqs[2].(*sppb.ExecuteSqlRequest).Transaction.GetId() + id2 := gotReqs[4].(*sppb.ExecuteSqlRequest).Transaction.GetId() + if bytes.Equal(id1, id2) { + t.Errorf("same transaction used twice, expected two different transactions\ngot tx1: %q\ngot tx2: %q", id1, id2) + } +} + +// Test that a deadline is respected by PDML, and that the session that was +// created is also deleted, even though the update timed out. +func TestPartitionedUpdate_WithDeadline(t *testing.T) { + t.Parallel() + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + ctx := context.Background() + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(50*time.Millisecond)) + defer cancel() + server.TestSpanner.PutExecutionTime(MethodExecuteSql, + SimulatedExecutionTime{ + MinimumExecutionTime: 100 * time.Millisecond, + }) + stmt := NewStatement(UpdateBarSetFoo) + // The following update will cause a 'Failed to delete session' warning to + // be logged. This is expected. Once each client has its own logger, we + // should temporarily turn off logging to prevent this warning to be + // logged. + _, err := client.PartitionedUpdate(ctx, stmt) + if err == nil { + t.Fatalf("missing expected error") + } + wantCode := codes.DeadlineExceeded + if status.Code(err) != wantCode { + t.Fatalf("got error %v, want code %s", err, wantCode) + } +} diff --git a/spanner/retry.go b/spanner/retry.go index c45e80b4d63a..cc6400f621d5 100644 --- a/spanner/retry.go +++ b/spanner/retry.go @@ -23,6 +23,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/googleapis/gax-go/v2" edpb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -32,6 +33,14 @@ const ( retryInfoKey = "google.rpc.retryinfo-bin" ) +// DefaultRetryBackoff is used for retryers as a fallback value when the server +// did not return any retry information. +var DefaultRetryBackoff = gax.Backoff{ + Initial: 20 * time.Millisecond, + Max: 32 * time.Second, + Multiplier: 1.3, +} + // isErrorClosing reports whether the error is generated by gRPC layer talking // to a closed server. func isErrorClosing(err error) bool {