Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): enable client to server compression #7899

Merged
merged 3 commits into from May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 26 additions & 3 deletions spanner/client.go
Expand Up @@ -31,6 +31,7 @@ import (
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"

vkit "cloud.google.com/go/spanner/apiv1"
Expand All @@ -51,6 +52,8 @@ const (
// requests need to route to leader.
routeToLeaderHeader = "x-goog-spanner-route-to-leader"

requestsCompressionHeader = "x-response-encoding"

// numChannels is the default value for NumChannels of client.
numChannels = 4
)
Expand Down Expand Up @@ -161,6 +164,18 @@ type ClientConfig struct {
// Logger is the logger to use for this client. If it is nil, all logging
// will be directed to the standard logger.
Logger *log.Logger

//
// Sets the compression to use for all gRPC calls. The compressor must be a valid name.
// This will enable compression both from the client to the
// server and from the server to the client.
//
// Supported values are:
// gzip: Enable gzip compression
// identity: Disable compression
//
// Default: identity
Compression string
}

func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context {
Expand Down Expand Up @@ -209,7 +224,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
config.NumChannels = numChannels
}
// gRPC options.
allOpts := allClientOpts(config.NumChannels, opts...)
allOpts := allClientOpts(config.NumChannels, config.Compression, opts...)
pool, err := gtransport.DialPool(ctx, allOpts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -237,8 +252,12 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
if config.incStep == 0 {
config.incStep = DefaultSessionPoolConfig.incStep
}
md := metadata.Pairs(resourcePrefixHeader, database)
if config.Compression == gzip.Name {
md.Append(requestsCompressionHeader, gzip.Name)
}
// Create a session client.
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, metadata.Pairs(resourcePrefixHeader, database), config.Logger, config.CallOptions)
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.Logger, config.CallOptions)
// Create a session pool.
config.SessionPoolConfig.sessionLabels = sessionLabels
sp, err := newSessionPool(sc, config.SessionPoolConfig)
Expand All @@ -263,13 +282,17 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
// Combines the default options from the generated client, the default options
// of the hand-written client and the user options to one list of options.
// Precedence: userOpts > clientDefaultOpts > generatedDefaultOpts
func allClientOpts(numChannels int, userOpts ...option.ClientOption) []option.ClientOption {
func allClientOpts(numChannels int, compression string, userOpts ...option.ClientOption) []option.ClientOption {
generatedDefaultOpts := vkit.DefaultClientOptions()
clientDefaultOpts := []option.ClientOption{
option.WithGRPCConnectionPool(numChannels),
option.WithUserAgent(fmt.Sprintf("spanner-go/v%s", internal.Version)),
internaloption.EnableDirectPath(true),
}
if compression == "gzip" {
userOpts = append(userOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
grpc.UseCompressor(gzip.Name))))
}
allDefaultOpts := append(generatedDefaultOpts, clientDefaultOpts...)
return append(allDefaultOpts, userOpts...)
}
Expand Down
15 changes: 15 additions & 0 deletions spanner/client_test.go
Expand Up @@ -35,6 +35,7 @@ import (
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/spanner/apiv1"
Expand Down Expand Up @@ -70,6 +71,20 @@ func setupMockedTestServerWithConfigAndClientOptions(t *testing.T, config Client
},
},
}
if config.Compression == gzip.Name {
grpcHeaderChecker.Checkers = append(grpcHeaderChecker.Checkers, &itestutil.HeaderChecker{
Key: "x-response-encoding",
ValuesValidator: func(token ...string) error {
if len(token) != 1 {
return status.Errorf(codes.Internal, "unexpected number of compression headers: %v", len(token))
}
if token[0] != gzip.Name {
return status.Errorf(codes.Internal, "unexpected compression: %v", token[0])
}
return nil
},
})
}
clientOptions = append(clientOptions, grpcHeaderChecker.CallOptions()...)
server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
opts = append(opts, clientOptions...)
Expand Down
20 changes: 13 additions & 7 deletions spanner/integration_test.go
Expand Up @@ -1462,9 +1462,15 @@ func TestIntegration_Reads(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableDDLStatements])
_, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableDDLStatements])
defer cleanup()

client, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig, Compression: "gzip"})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
var ms []*Mutation
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -2271,7 +2277,7 @@ func TestIntegration_InvalidDatabase(t *testing.T) {
}
ctx := context.Background()
dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
c, err := createClient(ctx, dbPath, SessionPoolConfig{})
c, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}})
// Client creation should succeed even if the database is invalid.
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3060,7 +3066,7 @@ func TestIntegration_BatchQuery(t *testing.T) {
if err = populate(ctx, client); err != nil {
t.Fatal(err)
}
if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
if client2, err = createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}}); err != nil {
t.Fatal(err)
}
defer client2.Close()
Expand Down Expand Up @@ -3144,7 +3150,7 @@ func TestIntegration_BatchRead(t *testing.T) {
if err = populate(ctx, client); err != nil {
t.Fatal(err)
}
if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
if client2, err = createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}}); err != nil {
t.Fatal(err)
}
defer client2.Close()
Expand Down Expand Up @@ -4376,7 +4382,7 @@ func prepareDBAndClient(ctx context.Context, t *testing.T, spc SessionPoolConfig
t.Fatalf("timeout creating testing table %v: %v", dbPath, err)
}
}
client, err := createClient(ctx, dbPath, spc)
client, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: spc})
if err != nil {
t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
}
Expand Down Expand Up @@ -4546,15 +4552,15 @@ func isNaN(x interface{}) bool {
}

// createClient creates Cloud Spanner data client.
func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
func createClient(ctx context.Context, dbPath string, config ClientConfig) (client *Client, err error) {
opts := grpcHeaderChecker.CallOptions()
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...)
client, err = NewClientWithConfig(ctx, dbPath, config, opts...)
if err != nil {
return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
}
Expand Down
3 changes: 2 additions & 1 deletion spanner/pdml_test.go
Expand Up @@ -26,6 +26,7 @@ import (
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
. "cloud.google.com/go/spanner/internal/testutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -150,7 +151,7 @@ func TestPartitionedUpdate_QueryOptions(t *testing.T) {
}

ctx := context.Background()
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client})
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: tt.client, Compression: gzip.Name})
defer teardown()

var err error
Expand Down