Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): enable client to server compression #7899

Merged
merged 3 commits into from May 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 19 additions & 2 deletions spanner/client.go
Expand Up @@ -31,6 +31,7 @@ import (
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"

vkit "cloud.google.com/go/spanner/apiv1"
Expand Down Expand Up @@ -161,6 +162,18 @@ type ClientConfig struct {
// Logger is the logger to use for this client. If it is nil, all logging
// will be directed to the standard logger.
Logger *log.Logger

//
// Sets the compression to use for all gRPC calls. The compressor must be a valid name.
// This will enable compression both from the client to the
// server and from the server to the client.
//
// Supported values are:
// gzip: Enable gzip compression
// identity: Disable compression
//
// Default: identity
Compression string
}

func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context {
Expand Down Expand Up @@ -209,7 +222,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
config.NumChannels = numChannels
}
// gRPC options.
allOpts := allClientOpts(config.NumChannels, opts...)
allOpts := allClientOpts(config.NumChannels, config.Compression, opts...)
pool, err := gtransport.DialPool(ctx, allOpts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -263,13 +276,17 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
// Combines the default options from the generated client, the default options
// of the hand-written client and the user options to one list of options.
// Precedence: userOpts > clientDefaultOpts > generatedDefaultOpts
func allClientOpts(numChannels int, userOpts ...option.ClientOption) []option.ClientOption {
func allClientOpts(numChannels int, compression string, userOpts ...option.ClientOption) []option.ClientOption {
generatedDefaultOpts := vkit.DefaultClientOptions()
clientDefaultOpts := []option.ClientOption{
option.WithGRPCConnectionPool(numChannels),
option.WithUserAgent(fmt.Sprintf("spanner-go/v%s", internal.Version)),
internaloption.EnableDirectPath(true),
}
if compression == "gzip" {
clientDefaultOpts = append(clientDefaultOpts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to add a test with the mock Spanner server that verifies that we actually receive this header?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the unit test for checking the request headers, also adding screenshot to verify all the grpc requests will have the call option for setting compression
Screenshot 2023-05-19 at 11 55 13 AM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the extra info and adding the test. LGTM!

grpc.UseCompressor(gzip.Name))))
}
allDefaultOpts := append(generatedDefaultOpts, clientDefaultOpts...)
return append(allDefaultOpts, userOpts...)
}
Expand Down
20 changes: 13 additions & 7 deletions spanner/integration_test.go
Expand Up @@ -1462,9 +1462,15 @@ func TestIntegration_Reads(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// Set up testing environment.
client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableDDLStatements])
_, dbPath, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, statements[testDialect][testTableDDLStatements])
defer cleanup()

client, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig, Compression: "gzip"})
if err != nil {
t.Fatal(err)
}
defer client.Close()

// Includes k0..k14. Strings sort lexically, eg "k1" < "k10" < "k2".
var ms []*Mutation
for i := 0; i < 15; i++ {
Expand Down Expand Up @@ -2271,7 +2277,7 @@ func TestIntegration_InvalidDatabase(t *testing.T) {
}
ctx := context.Background()
dbPath := fmt.Sprintf("projects/%v/instances/%v/databases/invalid", testProjectID, testInstanceID)
c, err := createClient(ctx, dbPath, SessionPoolConfig{})
c, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}})
// Client creation should succeed even if the database is invalid.
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -3060,7 +3066,7 @@ func TestIntegration_BatchQuery(t *testing.T) {
if err = populate(ctx, client); err != nil {
t.Fatal(err)
}
if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
if client2, err = createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}}); err != nil {
t.Fatal(err)
}
defer client2.Close()
Expand Down Expand Up @@ -3144,7 +3150,7 @@ func TestIntegration_BatchRead(t *testing.T) {
if err = populate(ctx, client); err != nil {
t.Fatal(err)
}
if client2, err = createClient(ctx, dbPath, SessionPoolConfig{}); err != nil {
if client2, err = createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: SessionPoolConfig{}}); err != nil {
t.Fatal(err)
}
defer client2.Close()
Expand Down Expand Up @@ -4376,7 +4382,7 @@ func prepareDBAndClient(ctx context.Context, t *testing.T, spc SessionPoolConfig
t.Fatalf("timeout creating testing table %v: %v", dbPath, err)
}
}
client, err := createClient(ctx, dbPath, spc)
client, err := createClient(ctx, dbPath, ClientConfig{SessionPoolConfig: spc})
if err != nil {
t.Fatalf("cannot create data client on DB %v: %v", dbPath, err)
}
Expand Down Expand Up @@ -4546,15 +4552,15 @@ func isNaN(x interface{}) bool {
}

// createClient creates Cloud Spanner data client.
func createClient(ctx context.Context, dbPath string, spc SessionPoolConfig) (client *Client, err error) {
func createClient(ctx context.Context, dbPath string, config ClientConfig) (client *Client, err error) {
opts := grpcHeaderChecker.CallOptions()
if spannerHost != "" {
opts = append(opts, option.WithEndpoint(spannerHost))
}
if dpConfig.attemptDirectPath {
opts = append(opts, option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.Peer(peerInfo))))
}
client, err = NewClientWithConfig(ctx, dbPath, ClientConfig{SessionPoolConfig: spc}, opts...)
client, err = NewClientWithConfig(ctx, dbPath, config, opts...)
if err != nil {
return nil, fmt.Errorf("cannot create data client on DB %v: %v", dbPath, err)
}
Expand Down