Skip to content

Commit

Permalink
spanner: Add resource-based routing
Browse files Browse the repository at this point in the history
This adds a step to retrieve the instance-specific endpoint before
creating the session client when creating a new spanner client.
It includes the following changes:

* Added an extra step to get the instance-specific endpoint when
creating a new client.
* Added a mocked instance admin server for testing purpose.
* Added an environment variable
"GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING"
to enable this feature. By default, it is disabled.
* If there is a PermissionDenied error when calling GetInstance(),
fallback to use the global endpoint or the user-specified endpoint.
* Included tests to verify the functionality.

Change-Id: Ie447d13b8e414f6299fc56acb678d293531849ae
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/48895
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Reid Hironaga <rhiro@google.com>
Reviewed-by: Shanika Kuruppu <skuruppu@google.com>
  • Loading branch information
hengfengli committed Jan 6, 2020
1 parent b4cdc8d commit 896180c
Show file tree
Hide file tree
Showing 6 changed files with 509 additions and 10 deletions.
78 changes: 77 additions & 1 deletion spanner/client.go
Expand Up @@ -25,12 +25,16 @@ import (
"time"

"cloud.google.com/go/internal/trace"
instance "cloud.google.com/go/spanner/admin/instance/apiv1"
vkit "cloud.google.com/go/spanner/apiv1"
"google.golang.org/api/option"
instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
field_mask "google.golang.org/genproto/protobuf/field_mask"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

const (
Expand All @@ -50,7 +54,8 @@ const (
)

var (
validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
validDBPattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+/databases/[^/]+$")
validInstancePattern = regexp.MustCompile("^projects/[^/]+/instances/[^/]+")
)

func validDatabaseName(db string) error {
Expand All @@ -61,6 +66,15 @@ func validDatabaseName(db string) error {
return nil
}

func getInstanceName(db string) (string, error) {
matches := validInstancePattern.FindStringSubmatch(db)
if len(matches) == 0 {
return "", fmt.Errorf("Failed to retrieve instance name from %q according to pattern %q",
db, validInstancePattern.String())
}
return matches[0], nil
}

// Client is a client for reading and writing data to a Cloud Spanner database.
// A client is safe to use concurrently, except for its Close method.
type Client struct {
Expand Down Expand Up @@ -103,6 +117,42 @@ func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Co
return metadata.NewOutgoingContext(ctx, md)
}

// getInstanceEndpoint returns an instance-specific endpoint if one exists. If
// multiple endpoints exist, it returns the first one.
func getInstanceEndpoint(ctx context.Context, database string, opts ...option.ClientOption) (string, error) {
instanceName, err := getInstanceName(database)
if err != nil {
return "", fmt.Errorf("Failed to resolve endpoint: %v", err)
}

c, err := instance.NewInstanceAdminClient(ctx, opts...)
if err != nil {
return "", err
}
defer c.Close()

req := &instancepb.GetInstanceRequest{
Name: instanceName,
FieldMask: &field_mask.FieldMask{
Paths: []string{"endpoint_uris"},
},
}

resp, err := c.GetInstance(ctx, req)
if err != nil {
return "", err
}

endpointURIs := resp.GetEndpointUris()

if len(endpointURIs) > 0 {
return endpointURIs[0], nil
}

// Return empty string when no endpoints exist.
return "", nil
}

// NewClient creates a client to a database. A valid database name has the
// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
// a default configuration.
Expand Down Expand Up @@ -142,6 +192,32 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
option.WithoutAuthentication(),
}
opts = append(opts, emulatorOpts...)
} else if os.Getenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING") == "true" {
// Fetch the instance-specific endpoint.
reqOpts := []option.ClientOption{option.WithEndpoint(endpoint)}
reqOpts = append(reqOpts, opts...)
instanceEndpoint, err := getInstanceEndpoint(ctx, database, reqOpts...)

if err != nil {
// If there is a PermissionDenied error, fall back to use the global endpoint
// or the user-specified endpoint.
if status.Code(err) == codes.PermissionDenied {
logf(config.logger, `
Warning: The client library attempted to connect to an endpoint closer to your
Cloud Spanner data but was unable to do so. The client library will fall back
and route requests to the global Spanner endpoint (spanner.googleapis.com),
which may result in increased latency. We recommend including the scope
https://www.googleapis.com/auth/spanner.admin so that the client library can
get an instance-specific endpoint and efficiently route requests.
`)
} else {
return nil, err
}
}

if instanceEndpoint != "" {
opts = append(opts, option.WithEndpoint(instanceEndpoint))
}
}

// gRPC options.
Expand Down
187 changes: 186 additions & 1 deletion spanner/client_test.go
Expand Up @@ -20,14 +20,20 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"strings"
"testing"
"time"

itestutil "cloud.google.com/go/internal/testutil"
. "cloud.google.com/go/spanner/internal/testutil"
"github.com/golang/protobuf/proto"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -94,6 +100,34 @@ func TestValidDatabaseName(t *testing.T) {
}
}

// Test getInstanceName()
func TestGetInstanceName(t *testing.T) {
validDbURI := "projects/spanner-cloud-test/instances/foo/databases/foodb"
invalidDbUris := []string{
// Completely wrong DB URI.
"foobarDB",
// Project ID contains "/".
"projects/spanner-cloud/test/instances/foo/databases/foodb",
// No instance ID.
"projects/spanner-cloud-test/instances//databases/foodb",
}
want := "projects/spanner-cloud-test/instances/foo"
got, err := getInstanceName(validDbURI)
if err != nil {
t.Errorf("getInstanceName(%q) has an error: %q, want nil", validDbURI, err)
}
if got != want {
t.Errorf("getInstanceName(%q) = %q, want %q", validDbURI, got, want)
}
for _, d := range invalidDbUris {
wantErr := "Failed to retrieve instance name"
_, err = getInstanceName(d)
if !strings.Contains(err.Error(), wantErr) {
t.Errorf("getInstanceName(%q) has an error: %q, want error pattern %q", validDbURI, err, wantErr)
}
}
}

func TestReadOnlyTransactionClose(t *testing.T) {
// Closing a ReadOnlyTransaction shouldn't panic.
c := &Client{}
Expand Down Expand Up @@ -121,7 +155,7 @@ func TestClient_Single_InvalidArgument(t *testing.T) {
t.Parallel()
err := testSingleQuery(t, status.Error(codes.InvalidArgument, "Invalid argument"))
if status.Code(err) != codes.InvalidArgument {
t.Fatalf("got unexpected exception %v, expected InvalidArgument", err)
t.Fatalf("got: %v, want: %v", err, codes.InvalidArgument)
}
}

Expand Down Expand Up @@ -289,6 +323,157 @@ func TestClient_Single_ContextCanceled_withDeclaredServerErrors(t *testing.T) {
}
}

func TestClient_ResourceBasedRouting_WithEndpointsReturned(t *testing.T) {
os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")

// Create two servers. The base server receives the GetInstance request and
// returns the instance endpoint of the target server. The client should contact
// the target server after getting the instance endpoint.
serverBase, optsBase, serverTeardownBase := NewMockedSpannerInMemTestServerWithAddr(t, "localhost:8081")
defer serverTeardownBase()
serverTarget, optsTarget, serverTeardownTarget := NewMockedSpannerInMemTestServerWithAddr(t, "localhost:8082")
defer serverTeardownTarget()

// Return the instance endpoint.
instanceEndpoint := fmt.Sprintf("%s", optsTarget[0])
resps := []proto.Message{&instancepb.Instance{
EndpointUris: []string{instanceEndpoint},
}}
serverBase.TestInstanceAdmin.SetResps(resps)

ctx := context.Background()
formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
client, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{}, optsBase...)
if err != nil {
t.Fatal(err)
}

if err := executeSingerQuery(ctx, client.Single()); err != nil {
t.Fatal(err)
}

// The base server should not receive any requests.
if _, err := shouldHaveReceived(serverBase.TestSpanner, []interface{}{}); err != nil {
t.Fatal(err)
}

// The target server should receive requests.
if _, err = shouldHaveReceived(serverTarget.TestSpanner, []interface{}{
&sppb.CreateSessionRequest{},
&sppb.ExecuteSqlRequest{},
}); err != nil {
t.Fatal(err)
}
}

func TestClient_ResourceBasedRouting_WithoutEndpointsReturned(t *testing.T) {
os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")

server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
defer serverTeardown()

// Return an empty list of endpoints.
resps := []proto.Message{&instancepb.Instance{
EndpointUris: []string{},
}}
server.TestInstanceAdmin.SetResps(resps)

ctx := context.Background()
formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
client, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{}, opts...)
if err != nil {
t.Fatal(err)
}

if err := executeSingerQuery(ctx, client.Single()); err != nil {
t.Fatal(err)
}

// Check if the request goes to the default endpoint.
if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
&sppb.CreateSessionRequest{},
&sppb.ExecuteSqlRequest{},
}); err != nil {
t.Fatal(err)
}
}

func TestClient_ResourceBasedRouting_WithPermissionDeniedError(t *testing.T) {
os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")

server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
defer serverTeardown()

server.TestInstanceAdmin.SetErr(status.Error(codes.PermissionDenied, "Permission Denied"))

ctx := context.Background()
formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
// `PermissionDeniedError` causes a warning message to be logged, which is expected.
// We set the output to be discarded to avoid spamming the log.
logger := log.New(ioutil.Discard, "", log.LstdFlags)
client, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{logger: logger}, opts...)
if err != nil {
t.Fatal(err)
}

if err := executeSingerQuery(ctx, client.Single()); err != nil {
t.Fatal(err)
}

// Fallback to use the default endpoint when calling GetInstance() returns
// a PermissionDenied error.
if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{
&sppb.CreateSessionRequest{},
&sppb.ExecuteSqlRequest{},
}); err != nil {
t.Fatal(err)
}
}

func TestClient_ResourceBasedRouting_WithUnavailableError(t *testing.T) {
os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")

server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
defer serverTeardown()

resps := []proto.Message{&instancepb.Instance{
EndpointUris: []string{},
}}
server.TestInstanceAdmin.SetResps(resps)
server.TestInstanceAdmin.SetErr(status.Error(codes.Unavailable, "Temporary unavailable"))

ctx := context.Background()
formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
_, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{}, opts...)
// The first request will get an error and the server resets the error to nil,
// so the next request will be fine. Due to retrying, there is no errors.
if err != nil {
t.Fatal(err)
}
}

func TestClient_ResourceBasedRouting_WithInvalidArgumentError(t *testing.T) {
os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "true")
defer os.Setenv("GOOGLE_CLOUD_SPANNER_ENABLE_RESOURCE_BASED_ROUTING", "")

server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
defer serverTeardown()

server.TestInstanceAdmin.SetErr(status.Error(codes.InvalidArgument, "Invalid argument"))

ctx := context.Background()
formattedDatabase := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "some-project", "some-instance", "some-database")
_, err := NewClientWithConfig(ctx, formattedDatabase, ClientConfig{}, opts...)

if status.Code(err) != codes.InvalidArgument {
t.Fatalf("got unexpected exception %v, expected InvalidArgument", err)
}
}

func testSingleQuery(t *testing.T, serverError error) error {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
Expand Down

0 comments on commit 896180c

Please sign in to comment.