Skip to content

Commit

Permalink
feat(spanner): support MultiEndpoint (#9565)
Browse files Browse the repository at this point in the history
* feat(spanner): MultiEndpoint support

* update spanner go.mod

---------

Co-authored-by: rahul2393 <irahul@google.com>
  • Loading branch information
nimf and rahul2393 committed Apr 30, 2024
1 parent 84e3236 commit 0ac0d26
Show file tree
Hide file tree
Showing 14 changed files with 2,180 additions and 61 deletions.
33 changes: 33 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
cloud.google.com/go/auth v0.2.0/go.mod h1:+yb+oy3/P0geX6DLKlqiGHARGR6EX2GRtYCzWOCQSbU=
cloud.google.com/go/auth/oauth2adapt v0.2.0/go.mod h1:AfqujpDAlTfLfeCIl/HJZZlIxD8+nJoZ5e0x1IxGq5k=
cloud.google.com/go/dataproc v1.12.0 h1:W47qHL3W4BPkAIbk4SWmIERwsWBaNnWm0P2sdx3YgGU=
cloud.google.com/go/gaming v1.9.0 h1:7vEhFnZmd931Mo7sZ6pJy7uQPDxF7m7v8xtBheG08tc=
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0 h1:oVLqHXhnYtUwM89y9T1fXGaK9wTkXHgNp8/ZNMQzUxE=
github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.0/go.mod h1:dppbR7CwXD4pgtV9t3wD1812RaLDcBjtblcDF5f1vI0=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.18.0 h1:ugYJK/neZQtQeh2jc5xNoDFiMQojlAkoqJMRb7vTu1U=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.18.0/go.mod h1:Xx0VKh7GJ4si3rmElbh19Mejxz68ibWg/J30ZOMrqzU=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.46.0/go.mod h1:V28hx+cUCZC9e3qcqszMb+Sbt8cQZtHTiXOmyDzoDOg=
github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
Expand Down Expand Up @@ -31,6 +36,7 @@ github.com/fullstorydev/grpcurl v1.8.7/go.mod h1:pVtM4qe3CMoLaIzYS8uvTuDj2jVYmXq
github.com/gliderlabs/ssh v0.3.7/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/google/s2a-go v0.1.3/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA=
Expand All @@ -51,6 +57,33 @@ github.com/miekg/dns v1.1.33/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mmcloughlin/avo v0.5.0/go.mod h1:ChHFdoV7ql95Wi7vuq2YT1bwCJqiWdZrQ1im3VujLYM=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
go.opentelemetry.io/otel v1.23.1/go.mod h1:Td0134eafDLcTS4y+zQ26GE8u3dEuRBiBCTUIRHaikA=
go.opentelemetry.io/otel/bridge/opencensus v0.40.0 h1:pqDiayRhBgoqy1vwnscik+TizcImJ58l053NScJyZso=
go.opentelemetry.io/otel/bridge/opencensus v0.40.0/go.mod h1:1NvVHb6tLTe5A9qCYz+eErW0t8iPn4ZfR6tDKcqlGTM=
go.opentelemetry.io/otel/metric v1.23.1/go.mod h1:mpG2QPlAfnK8yNhNJAxDZruU9Y1/HubbC+KyH8FaCWI=
go.opentelemetry.io/otel/trace v1.23.1/go.mod h1:4IpnpJFwr1mo/6HL8XIPJaE9y0+u1KcVmuW7dwFSVrI=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/telemetry v0.0.0-20240208230135-b75ee8823808/go.mod h1:KG1lNk5ZFNssSZLrpVb4sMXKMpGwGXOxSG3rnu2gZQQ=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
google.golang.org/genproto v0.0.0-20230725213213-b022f6e96895/go.mod h1:0ggbjUrZYpy1q+ANUS30SEoGZ53cdfwtbuG7Ptgy108=
google.golang.org/genproto/googleapis/api v0.0.0-20230725213213-b022f6e96895/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20231120223509-83a465c0220f/go.mod h1:iIgEblxoG4klcXsG0d9cpoxJ4xndv6+1FkDROCHhPRI=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240102182953-50ed04b92917/go.mod h1:O9TvT7A9NLgdqqF0JJXJ+axpaoYiEb8txGmkvy+AvLc=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240304161311-37d4d3c04a78/go.mod h1:vh/N7795ftP0AkN1w8XKqN4w1OdUKXW5Eummda+ofv8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230725213213-b022f6e96895/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240228201840-1f18d85a4ec2/go.mod h1:UCOku4NytXMJuLQE5VuqA5lX3PcHCBo8pxNyvkf4xBs=
google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down
201 changes: 192 additions & 9 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

"cloud.google.com/go/internal/trace"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp"
grpcgcppb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
"github.com/googleapis/gax-go/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -37,6 +39,7 @@ import (
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -121,6 +124,131 @@ func (c *Client) ClientID() string {
return c.sc.id
}

func createGCPMultiEndpoint(cfg *grpcgcp.GCPMultiEndpointOptions, config ClientConfig, opts ...option.ClientOption) (*grpcgcp.GCPMultiEndpoint, error) {
if cfg.GRPCgcpConfig == nil {
cfg.GRPCgcpConfig = &grpcgcppb.ApiConfig{}
}
if cfg.GRPCgcpConfig.Method == nil || len(cfg.GRPCgcpConfig.Method) == 0 {
cfg.GRPCgcpConfig.Method = []*grpcgcppb.MethodConfig{
{
Name: []string{"/google.spanner.v1.Spanner/CreateSession"},
Affinity: &grpcgcppb.AffinityConfig{
Command: grpcgcppb.AffinityConfig_BIND,
AffinityKey: "name",
},
},
{
Name: []string{"/google.spanner.v1.Spanner/BatchCreateSessions"},
Affinity: &grpcgcppb.AffinityConfig{
Command: grpcgcppb.AffinityConfig_BIND,
AffinityKey: "session.name",
},
},
{
Name: []string{"/google.spanner.v1.Spanner/DeleteSession"},
Affinity: &grpcgcppb.AffinityConfig{
Command: grpcgcppb.AffinityConfig_UNBIND,
AffinityKey: "name",
},
},
{
Name: []string{"/google.spanner.v1.Spanner/GetSession"},
Affinity: &grpcgcppb.AffinityConfig{
Command: grpcgcppb.AffinityConfig_BOUND,
AffinityKey: "name",
},
},
{
Name: []string{
"/google.spanner.v1.Spanner/BeginTransaction",
"/google.spanner.v1.Spanner/Commit",
"/google.spanner.v1.Spanner/ExecuteBatchDml",
"/google.spanner.v1.Spanner/ExecuteSql",
"/google.spanner.v1.Spanner/ExecuteStreamingSql",
"/google.spanner.v1.Spanner/PartitionQuery",
"/google.spanner.v1.Spanner/PartitionRead",
"/google.spanner.v1.Spanner/Read",
"/google.spanner.v1.Spanner/Rollback",
"/google.spanner.v1.Spanner/StreamingRead",
},
Affinity: &grpcgcppb.AffinityConfig{
Command: grpcgcppb.AffinityConfig_BOUND,
AffinityKey: "session",
},
},
}
}
// Append emulator options if SPANNER_EMULATOR_HOST has been set.
if emulatorAddr := os.Getenv("SPANNER_EMULATOR_HOST"); emulatorAddr != "" {
emulatorOpts := []option.ClientOption{
option.WithEndpoint(emulatorAddr),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication(),
internaloption.SkipDialSettingsValidation(),
}
opts = append(opts, emulatorOpts...)
// Replace all endpoints with emulator target.
for _, meo := range cfg.MultiEndpoints {
meo.Endpoints = []string{emulatorAddr}
}
}

// Set the number of channels to the default value if not specified.
if cfg.GRPCgcpConfig.GetChannelPool() == nil || cfg.GRPCgcpConfig.GetChannelPool().GetMaxSize() == 0 {
cfg.GRPCgcpConfig.ChannelPool = &grpcgcppb.ChannelPoolConfig{
MinSize: numChannels,
MaxSize: numChannels,
}
}
// Set MinSize equal to MaxSize to create all the channels beforehand.
cfg.GRPCgcpConfig.ChannelPool.MinSize = cfg.GRPCgcpConfig.ChannelPool.GetMaxSize()

cfg.GRPCgcpConfig.ChannelPool.BindPickStrategy = grpcgcppb.ChannelPoolConfig_ROUND_ROBIN

cfg.DialFunc = func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
copts := opts

for _, do := range dopts {
copts = append(copts, option.WithGRPCDialOption(do))
}

allOpts := allClientOpts(1, config.Compression, copts...)

// Overwrite endpoint and pool config.
allOpts = append(allOpts,
option.WithEndpoint(target),
option.WithGRPCConnectionPool(1),
option.WithGRPCConn(nil),
)

return gtransport.Dial(ctx, allOpts...)
}

gme, err := grpcgcp.NewGCPMultiEndpoint(cfg)
return gme, err
}

// To use GCPMultiEndpoint in gtransport.Dial (via gtransport.WithConnPool option)
// we implement gtransport.ConnPool interface using this wrapper.
type gmeWrapper struct {
*grpcgcp.GCPMultiEndpoint
}

// Make sure gmeWrapper implements ConnPool interface.
var _ gtransport.ConnPool = (*gmeWrapper)(nil)

func (gw *gmeWrapper) Conn() *grpc.ClientConn {
// GCPMultiEndpoint does not expose any ClientConn.
// This is safe because Cloud Spanner client doesn't use this function and instead
// makes calls directly using Invoke and NewStream from the grpc.ClientConnInterface
// which GCPMultiEndpoint implements.
return nil
}

func (gw *gmeWrapper) Num() int {
return int(gw.GCPMultiEndpoint.GCPConfig().GetChannelPool().GetMaxSize())
}

// ClientConfig has configurations for the client.
type ClientConfig struct {
// NumChannels is the number of gRPC channels.
Expand Down Expand Up @@ -241,6 +369,10 @@ func NewClient(ctx context.Context, database string, opts ...option.ClientOption
// NewClientWithConfig creates a client to a database. A valid database name has
// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
return newClientWithConfig(ctx, database, config, nil, opts...)
}

func newClientWithConfig(ctx context.Context, database string, config ClientConfig, gme *grpcgcp.GCPMultiEndpoint, opts ...option.ClientOption) (c *Client, err error) {
// Validate database path.
if err := validDatabaseName(database); err != nil {
return nil, err
Expand All @@ -265,16 +397,25 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
if config.NumChannels == 0 {
config.NumChannels = numChannels
}
// gRPC options.
allOpts := allClientOpts(config.NumChannels, config.Compression, opts...)
pool, err := gtransport.DialPool(ctx, allOpts...)
if err != nil {
return nil, err
}

if hasNumChannelsConfig && pool.Num() != config.NumChannels {
pool.Close()
return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num())
var pool gtransport.ConnPool

if gme != nil {
// Use GCPMultiEndpoint if provided.
pool = &gmeWrapper{gme}
} else {
// Create gtransport ConnPool as usual if MultiEndpoint is not used.
// gRPC options.
allOpts := allClientOpts(config.NumChannels, config.Compression, opts...)
pool, err = gtransport.DialPool(ctx, allOpts...)
if err != nil {
return nil, err
}

if hasNumChannelsConfig && pool.Num() != config.NumChannels {
pool.Close()
return nil, spannerErrorf(codes.InvalidArgument, "Connection pool mismatch: NumChannels=%v, WithGRPCConnectionPool=%v. Only set one of these options, or set both to the same value.", config.NumChannels, pool.Num())
}
}

// TODO(loite): Remove as the original map cannot be changed by the user
Expand Down Expand Up @@ -343,6 +484,48 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
return c, nil
}

// NewMultiEndpointClient is the same as NewMultiEndpointClientWithConfig with
// the default client configuration.
//
// A valid database name has the
// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
func NewMultiEndpointClient(ctx context.Context, database string, gmeCfg *grpcgcp.GCPMultiEndpointOptions, opts ...option.ClientOption) (*Client, *grpcgcp.GCPMultiEndpoint, error) {
return NewMultiEndpointClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig, DisableRouteToLeader: false}, gmeCfg, opts...)
}

// NewMultiEndpointClientWithConfig creates a client to a database using GCPMultiEndpoint.
//
// The purposes of GCPMultiEndpoint are:
//
// - Fallback to an alternative endpoint (host:port) when the original
// endpoint is completely unavailable.
// - Be able to route a Cloud Spanner call to a specific group of endpoints.
// - Be able to reconfigure endpoints in runtime.
//
// The GRPCgcpConfig and DialFunc in the GCPMultiEndpointOptions are optional
// and will be configured automatically.
//
// For GCPMultiEndpoint the number of channels is configured via MaxSize of the
// ChannelPool config in the GRPCgcpConfig.
//
// The GCPMultiEndpoint returned can be used to update the endpoints in runtime.
//
// A valid database name has the
// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
func NewMultiEndpointClientWithConfig(ctx context.Context, database string, config ClientConfig, gmeCfg *grpcgcp.GCPMultiEndpointOptions, opts ...option.ClientOption) (c *Client, gme *grpcgcp.GCPMultiEndpoint, err error) {
gme, err = createGCPMultiEndpoint(gmeCfg, config, opts...)
if err != nil {
return nil, nil, err
}
// Align number of channels.
config.NumChannels = int(gme.GCPConfig().GetChannelPool().GetMaxSize())
c, err = newClientWithConfig(ctx, database, config, gme, opts...)
if err != nil {
return nil, nil, err
}
return
}

// Combines the default options from the generated client, the default options
// of the hand-written client and the user options to one list of options.
// Precedence: userOpts > clientDefaultOpts > generatedDefaultOpts
Expand Down
Loading

0 comments on commit 0ac0d26

Please sign in to comment.