Skip to content

Commit

Permalink
spanner: Use Go 1.13 error-unwrapping
Browse files Browse the repository at this point in the history
Use Go 1.13 error-unwrapping and the equivalent xerrors features
for builds on versions < 1.13. This makes it possible to use
wrapped errors with the Spanner client.

This change also deprecates the gRPC code in the Spanner error struct.
All functions that need the gRPC code will extract it from the wrapped
error instead of reading this field.

Fixes #1223 and #1608.
Updates #1310.

Change-Id: Iea914adb5ca78af5e78cc948d8b1180eb3d647d2
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/48730
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Hengfeng Li <hengfeng@google.com>
  • Loading branch information
olavloite committed Dec 24, 2019
1 parent 4fba433 commit 9f47821
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 73 deletions.
98 changes: 86 additions & 12 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func setupMockedTestServerWithConfigAndClientOptions(t *testing.T, config Client
Key: "x-goog-api-client",
ValuesValidator: func(token ...string) error {
if len(token) != 1 {
return spannerErrorf(codes.Internal, "unexpected number of api client token headers: %v", len(token))
return status.Errorf(codes.Internal, "unexpected number of api client token headers: %v", len(token))
}
if !strings.HasPrefix(token[0], "gl-go/") {
return spannerErrorf(codes.Internal, "unexpected api client token: %v", token[0])
return status.Errorf(codes.Internal, "unexpected api client token: %v", token[0])
}
return nil
},
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) {
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: spannerErrorf(codes.Internal, "stream terminated by RST_STREAM"),
Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
// When the client is fetching the partial result set with resume token 3,
Expand All @@ -154,7 +154,7 @@ func TestClient_Single_RetryableErrorOnPartialResultSet(t *testing.T) {
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: spannerErrorf(codes.Unavailable, "server is unavailable"),
Err: status.Errorf(codes.Unavailable, "server is unavailable"),
},
)
ctx := context.Background()
Expand All @@ -177,7 +177,7 @@ func TestClient_Single_NonRetryableErrorOnPartialResultSet(t *testing.T) {
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: spannerErrorf(codes.Internal, "stream terminated by RST_STREAM"),
Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
// 'Session not found' is not retryable and the error will be returned to
Expand All @@ -186,7 +186,7 @@ func TestClient_Single_NonRetryableErrorOnPartialResultSet(t *testing.T) {
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: spannerErrorf(codes.NotFound, "Session not found"),
Err: status.Errorf(codes.NotFound, "Session not found"),
},
)
ctx := context.Background()
Expand Down Expand Up @@ -221,14 +221,14 @@ func TestClient_Single_DeadlineExceeded_WithErrors(t *testing.T) {
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: spannerErrorf(codes.Internal, "stream terminated by RST_STREAM"),
Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: spannerErrorf(codes.Unavailable, "server is unavailable"),
Err: status.Errorf(codes.Unavailable, "server is unavailable"),
ExecutionTime: 50 * time.Millisecond,
},
)
Expand Down Expand Up @@ -262,14 +262,14 @@ func TestClient_Single_ContextCanceled_withDeclaredServerErrors(t *testing.T) {
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(2),
Err: spannerErrorf(codes.Internal, "stream terminated by RST_STREAM"),
Err: status.Errorf(codes.Internal, "stream terminated by RST_STREAM"),
},
)
server.TestSpanner.AddPartialResultSetError(
SelectSingerIDAlbumIDAlbumTitleFromAlbums,
PartialResultSetExecutionTime{
ResumeToken: EncodeResumeToken(3),
Err: spannerErrorf(codes.Unavailable, "server is unavailable"),
Err: status.Errorf(codes.Unavailable, "server is unavailable"),
},
)
ctx := context.Background()
Expand Down Expand Up @@ -328,7 +328,7 @@ func executeSingerQueryWithRowFunc(ctx context.Context, tx *ReadOnlyTransaction,
}
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
return spannerErrorf(codes.Internal, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
return status.Errorf(codes.Internal, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
return nil
}
Expand Down Expand Up @@ -537,7 +537,7 @@ func testReadWriteTransaction(t *testing.T, executionTimes map[string]SimulatedE
rowCount++
}
if rowCount != SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount {
return spannerErrorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
return status.Errorf(codes.FailedPrecondition, "Row count mismatch, got %v, expected %v", rowCount, SelectSingerIDAlbumIDAlbumTitleFromAlbumsRowCount)
}
return nil
})
Expand Down Expand Up @@ -569,6 +569,7 @@ func TestClient_ApplyAtLeastOnce(t *testing.T) {
}

func TestReadWriteTransaction_ErrUnexpectedEOF(t *testing.T) {
t.Parallel()
_, client, teardown := setupMockedTestServer(t)
defer teardown()
ctx := context.Background()
Expand Down Expand Up @@ -600,3 +601,76 @@ func TestReadWriteTransaction_ErrUnexpectedEOF(t *testing.T) {
t.Fatalf("unexpected number of attempts: %d, expected %d", attempts, 1)
}
}

func TestReadWriteTransaction_WrapError(t *testing.T) {
t.Parallel()
server, client, teardown := setupMockedTestServer(t)
defer teardown()
// Abort the transaction on both the query as well as commit.
// The first abort error will be wrapped. The client will unwrap the cause
// of the error and retry the transaction. The aborted error on commit
// will not be wrapped, but will also be recognized by the client as an
// abort that should be retried.
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Transaction aborted")},
})
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.Aborted, "Transaction aborted")},
})
msg := "query failed"
numAttempts := 0
ctx := context.Background()
_, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
numAttempts++
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
// Wrap the error in another error that implements the
// (xerrors|errors).Wrapper interface.
return &wrappedTestError{err, msg}
}
}
return nil
})
if err != nil {
t.Fatalf("Unexpected error\nGot: %v\nWant: nil", err)
}
if g, w := numAttempts, 3; g != w {
t.Fatalf("Number of transaction attempts mismatch\nGot: %d\nWant: %d", w, w)
}

// Execute a transaction that returns a non-retryable error that is
// wrapped in a custom error. The transaction should return the custom
// error.
server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql,
SimulatedExecutionTime{
Errors: []error{status.Error(codes.NotFound, "Table not found")},
})
_, err = client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *ReadWriteTransaction) error {
numAttempts++
iter := tx.Query(ctx, NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
defer iter.Stop()
for {
_, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
// Wrap the error in another error that implements the
// (xerrors|errors).Wrapper interface.
return &wrappedTestError{err, msg}
}
}
return nil
})
if err == nil || err.Error() != msg {
t.Fatalf("Unexpected error\nGot: %v\nWant: %v", err, msg)
}
}
68 changes: 47 additions & 21 deletions spanner/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand All @@ -30,7 +29,14 @@ import (
type Error struct {
// Code is the canonical error code for describing the nature of a
// particular error.
//
// Deprecated: The error code should be extracted from the wrapped error by
// calling ErrCode(err error). This field will be removed in a future
// release.
Code codes.Code
// err is the wrapped error that caused this Spanner error. The wrapped
// error can be read with the Unwrap method.
err error
// Desc explains more details of the error.
Desc string
// trailers are the trailers returned in the response, if any.
Expand All @@ -42,27 +48,48 @@ func (e *Error) Error() string {
if e == nil {
return fmt.Sprintf("spanner: OK")
}
return fmt.Sprintf("spanner: code = %q, desc = %q", e.Code, e.Desc)
code := ErrCode(e)
return fmt.Sprintf("spanner: code = %q, desc = %q", code, e.Desc)
}

// Unwrap returns the wrapped error (if any).
func (e *Error) Unwrap() error {
return e.err
}

// GRPCStatus returns the corresponding gRPC Status of this Spanner error.
// This allows the error to be converted to a gRPC status using
// `status.Convert(error)`.
func (e *Error) GRPCStatus() *status.Status {
return status.New(e.Code, e.Desc)
err := unwrap(e)
for {
// No gRPC Status found in the chain of errors. Return 'Unknown' with
// the message of the original error.
if err == nil {
return status.New(codes.Unknown, e.Desc)
}
code := status.Code(err)
if code != codes.Unknown {
return status.New(code, e.Desc)
}
err = unwrap(err)
}
}

// decorate decorates an existing spanner.Error with more information.
func (e *Error) decorate(info string) {
e.Desc = fmt.Sprintf("%v, %v", info, e.Desc)
}

// spannerErrorf generates a *spanner.Error with the given error code and
// description.
func spannerErrorf(ec codes.Code, format string, args ...interface{}) error {
// spannerErrorf generates a *spanner.Error with the given description and a
// status error with the given error code as its wrapped error.
func spannerErrorf(code codes.Code, format string, args ...interface{}) error {
msg := fmt.Sprintf(format, args...)
wrapped := status.Error(code, msg)
return &Error{
Code: ec,
Desc: fmt.Sprintf(format, args...),
Code: code,
err: wrapped,
Desc: msg,
}
}

Expand All @@ -79,46 +106,45 @@ func toSpannerErrorWithMetadata(err error, trailers metadata.MD) error {
if err == nil {
return nil
}
if se, ok := err.(*Error); ok {
var se *Error
if errorAs(err, &se) {
if trailers != nil {
se.trailers = metadata.Join(se.trailers, trailers)
}
return se
}
switch {
case err == context.DeadlineExceeded:
return &Error{codes.DeadlineExceeded, err.Error(), trailers}
case err == context.Canceled:
return &Error{codes.Canceled, err.Error(), trailers}
case err == context.DeadlineExceeded || err == context.Canceled:
return &Error{status.FromContextError(err).Code(), status.FromContextError(err).Err(), err.Error(), trailers}
case status.Code(err) == codes.Unknown:
return &Error{codes.Unknown, err.Error(), trailers}
return &Error{codes.Unknown, err, err.Error(), trailers}
default:
return &Error{status.Code(err), grpc.ErrorDesc(err), trailers}
return &Error{status.Convert(err).Code(), err, status.Convert(err).Message(), trailers}
}
}

// ErrCode extracts the canonical error code from a Go error.
func ErrCode(err error) codes.Code {
se, ok := toSpannerError(err).(*Error)
s, ok := status.FromError(err)
if !ok {
return codes.Unknown
}
return se.Code
return s.Code()
}

// ErrDesc extracts the Cloud Spanner error description from a Go error.
func ErrDesc(err error) string {
se, ok := toSpannerError(err).(*Error)
if !ok {
var se *Error
if !errorAs(err, &se) {
return err.Error()
}
return se.Desc
}

// errTrailers extracts the grpc trailers if present from a Go error.
func errTrailers(err error) metadata.MD {
se, ok := err.(*Error)
if !ok {
var se *Error
if !errorAs(err, &se) {
return nil
}
return se.trailers
Expand Down
33 changes: 33 additions & 0 deletions spanner/errors112.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO: Remove entire file when support for Go1.12 and lower has been dropped.
// +build !go1.13

package spanner

import "golang.org/x/xerrors"

// unwrap is a generic implementation of (errors|xerrors).Unwrap(error). This
// implementation uses xerrors and is included in Go 1.12 and earlier builds.
func unwrap(err error) error {
return xerrors.Unwrap(err)
}

// errorAs is a generic implementation of
// (errors|xerrors).As(error, interface{}). This implementation uses xerrors
// and is included in Go 1.12 and earlier builds.
func errorAs(err error, target interface{}) bool {
return xerrors.As(err, target)
}
33 changes: 33 additions & 0 deletions spanner/errors113.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// TODO: Remove entire file when support for Go1.12 and lower has been dropped.
// +build go1.13

package spanner

import "errors"

// unwrap is a generic implementation of (errors|xerrors).Unwrap(error). This
// implementation uses errors and is included in Go 1.13 and later builds.
func unwrap(err error) error {
return errors.Unwrap(err)
}

// errorAs is a generic implementation of
// (errors|xerrors).As(error, interface{}). This implementation uses errors and
// is included in Go 1.13 and later builds.
func errorAs(err error, target interface{}) bool {
return errors.As(err, target)
}
Loading

0 comments on commit 9f47821

Please sign in to comment.