Skip to content

Commit

Permalink
feat: allow configuring batch timeout property in session client (#7965)
Browse files Browse the repository at this point in the history
Creating NewClientWithConfig supports BatchTimeout property.
  • Loading branch information
n0npax committed May 29, 2023
1 parent 65a9ba5 commit 3678dde
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
10 changes: 9 additions & 1 deletion spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ type ClientConfig struct {
//
// Default: identity
Compression string

// BatchTimeout specifies the timeout for a batch of sessions managed sessionClient.
BatchTimeout time.Duration
}

func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD, disableRouteToLeader bool) context.Context {
Expand Down Expand Up @@ -252,12 +255,17 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
if config.incStep == 0 {
config.incStep = DefaultSessionPoolConfig.incStep
}
if config.BatchTimeout == 0 {
config.BatchTimeout = time.Minute
}

md := metadata.Pairs(resourcePrefixHeader, database)
if config.Compression == gzip.Name {
md.Append(requestsCompressionHeader, gzip.Name)
}
// Create a session client.
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.Logger, config.CallOptions)
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions)

// Create a session pool.
config.SessionPoolConfig.sessionLabels = sessionLabels
sp, err := newSessionPool(sc, config.SessionPoolConfig)
Expand Down
32 changes: 32 additions & 0 deletions spanner/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2678,6 +2678,38 @@ func TestClient_WithGRPCConnectionPoolAndNumChannels_Misconfigured(t *testing.T)
}
}

func TestClient_WithCustomBatchTimeout(t *testing.T) {
t.Parallel()

_, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
defer serverTeardown()

wantBatchTimeout := time.Second * 42
client, err := NewClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", ClientConfig{BatchTimeout: wantBatchTimeout}, opts...)
if err != nil {
t.Fatalf("failed to get a client: %v", err)
}
if wantBatchTimeout != client.sc.batchTimeout {
t.Fatalf("mismatch in client configuration for property BatchTimeout: got %v, want %v", client.sc.batchTimeout, wantBatchTimeout)
}
}

func TestClient_WithoutCustomBatchTimeout(t *testing.T) {
t.Parallel()

_, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
defer serverTeardown()

wantBatchTimeout := time.Minute
client, err := NewClientWithConfig(context.Background(), "projects/p/instances/i/databases/d", ClientConfig{}, opts...)
if err != nil {
t.Fatalf("failed to get a client: %v", err)
}
if wantBatchTimeout != client.sc.batchTimeout {
t.Fatalf("mismatch in client configuration for property BatchTimeout: got %v, want %v", client.sc.batchTimeout, wantBatchTimeout)
}
}

func TestClient_CallOptions(t *testing.T) {
t.Parallel()
co := &vkit.CallOptions{
Expand Down
4 changes: 2 additions & 2 deletions spanner/sessionclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type sessionClient struct {
}

// newSessionClient creates a session client to use for a database.
func newSessionClient(connPool gtransport.ConnPool, database, userAgent string, sessionLabels map[string]string, databaseRole string, disableRouteToLeader bool, md metadata.MD, logger *log.Logger, callOptions *vkit.CallOptions) *sessionClient {
func newSessionClient(connPool gtransport.ConnPool, database, userAgent string, sessionLabels map[string]string, databaseRole string, disableRouteToLeader bool, md metadata.MD, batchTimeout time.Duration, logger *log.Logger, callOptions *vkit.CallOptions) *sessionClient {
return &sessionClient{
connPool: connPool,
database: database,
Expand All @@ -112,7 +112,7 @@ func newSessionClient(connPool gtransport.ConnPool, database, userAgent string,
databaseRole: databaseRole,
disableRouteToLeader: disableRouteToLeader,
md: md,
batchTimeout: time.Minute,
batchTimeout: batchTimeout,
logger: logger,
callOptions: callOptions,
}
Expand Down

0 comments on commit 3678dde

Please sign in to comment.