diff --git a/pkg/kv/transport.go b/pkg/kv/transport.go index e28453d334ce..505e0530b1b4 100644 --- a/pkg/kv/transport.go +++ b/pkg/kv/transport.go @@ -248,6 +248,9 @@ func (gt *grpcTransport) send( if err != nil { return nil, err } + if err := grpcutil.ConnectionReady(conn); err != nil { + return nil, err + } reply, err := roachpb.NewInternalClient(conn).Batch(ctx, &client.args) if reply != nil { for i := range reply.Responses { diff --git a/pkg/util/grpcutil/grpc_util.go b/pkg/util/grpcutil/grpc_util.go index 9fd696c9a3f5..14a8dbbfc4bc 100644 --- a/pkg/util/grpcutil/grpc_util.go +++ b/pkg/util/grpcutil/grpc_util.go @@ -16,6 +16,7 @@ package grpcutil import ( "context" + "fmt" "io" "strings" @@ -24,6 +25,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/status" "google.golang.org/grpc/transport" ) @@ -73,6 +75,9 @@ func IsClosedConnection(err error) bool { // TODO(bdarnell): Replace this with a cleaner mechanism when/if // https://github.com/grpc/grpc-go/issues/1443 is resolved. func RequestDidNotStart(err error) bool { + if _, ok := err.(connectionNotReadyError); ok { + return true + } s, ok := status.FromError(err) if !ok { // This is a non-gRPC error; assume nothing. @@ -87,3 +92,29 @@ func RequestDidNotStart(err error) bool { } return false } + +// ConnectionReady returns nil if the given connection is ready to +// send a request, or an error (which will pass RequestDidNotStart) if +// not. +// +// This is a workaround for the fact that gRPC 1.7 fails to +// distinguish between ambiguous and unambiguous errors. +// +// This is designed for use with connections prepared by +// pkg/rpc.Connection.Connect (which performs an initial heartbeat and +// thereby ensures that we will never see a connection in the +// first-time Connecting state). +func ConnectionReady(conn *grpc.ClientConn) error { + if s := conn.GetState(); s == connectivity.TransientFailure { + return connectionNotReadyError{s} + } + return nil +} + +type connectionNotReadyError struct { + state connectivity.State +} + +func (e connectionNotReadyError) Error() string { + return fmt.Sprintf("connection not ready: %s", e.state) +}