Skip to content

Commit

Permalink
spanner: use BatchCreateSessions to init pool
Browse files Browse the repository at this point in the history
Adds a session client that is responsible for creating
sessions for the session pool and others that need a
session. The session client encapsulates the details of
spreading all sessions over all available gRPC channels.

The session client also contains functionality for creating
batches of sessions that can be used by the session pool to
speed up initialization of a large session pool or to create
a burst of sessions. These batches are also automatically
evenly distributed over all available channels.

The default MinOpened sessions configuration is changed to
100. The session pool uses the BatchCreateSessions method
to initialize the pool if MinOpened > 0.

Updates #1566.

Change-Id: I13e6fbc321688cdbd396913e4f4c01aa8631fb2c
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/45111
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
  • Loading branch information
olavloite committed Oct 18, 2019
1 parent b9197a0 commit b246326
Show file tree
Hide file tree
Showing 9 changed files with 907 additions and 154 deletions.
69 changes: 24 additions & 45 deletions spanner/client.go
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"os"
"regexp"
"sync/atomic"
"time"

"cloud.google.com/go/internal/trace"
Expand Down Expand Up @@ -64,16 +63,8 @@ func validDatabaseName(db string) error {
// Client is a client for reading and writing data to a Cloud Spanner database.
// A client is safe to use concurrently, except for its Close method.
type Client struct {
// rr must be accessed through atomic operations.
rr uint32
clients []*vkit.Client

database string
// Metadata to be sent with each request.
md metadata.MD
sc *sessionClient
idleSessions *sessionPool
// sessionLabels for the sessions created by this client.
sessionLabels map[string]string
}

// ClientConfig has configurations for the client.
Expand Down Expand Up @@ -110,23 +101,12 @@ func contextWithOutgoingMetadata(ctx context.Context, md metadata.MD) context.Co
// form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID. It uses
// a default configuration.
func NewClient(ctx context.Context, database string, opts ...option.ClientOption) (*Client, error) {
return NewClientWithConfig(ctx, database, ClientConfig{}, opts...)
return NewClientWithConfig(ctx, database, ClientConfig{SessionPoolConfig: DefaultSessionPoolConfig}, opts...)
}

// NewClientWithConfig creates a client to a database. A valid database name has
// the form projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID.
func NewClientWithConfig(ctx context.Context, database string, config ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
c = &Client{
database: database,
md: metadata.Pairs(resourcePrefixHeader, database),
}

// Make a copy of labels.
c.sessionLabels = make(map[string]string)
for k, v := range config.SessionLabels {
c.sessionLabels[k] = v
}

// Prepare gRPC channels.
if config.NumChannels == 0 {
config.NumChannels = numChannels
Expand All @@ -137,7 +117,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
config.MaxOpened = uint64(config.NumChannels * 100)
}
if config.MaxBurst == 0 {
config.MaxBurst = 10
config.MaxBurst = DefaultSessionPoolConfig.MaxBurst
}

// Validate database path.
Expand Down Expand Up @@ -174,43 +154,44 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
// TODO(deklerk): This should be replaced with a balancer with
// config.NumChannels connections, instead of config.NumChannels
// clients.
var clients []*vkit.Client
for i := 0; i < config.NumChannels; i++ {
client, err := vkit.NewClient(ctx, allOpts...)
if err != nil {
return nil, errDial(i, err)
}
c.clients = append(c.clients, client)
clients = append(clients, client)
}

// Prepare session pool.
// TODO: support more loadbalancing options.
config.SessionPoolConfig.getRPCClient = func() (*vkit.Client, error) {
return c.rrNext(), nil
// TODO(loite): Remove as the original map cannot be changed by the user
// anyways, and the client library is also not changing it.
// Make a copy of labels.
sessionLabels := make(map[string]string)
for k, v := range config.SessionLabels {
sessionLabels[k] = v
}
config.SessionPoolConfig.sessionLabels = c.sessionLabels
sp, err := newSessionPool(database, config.SessionPoolConfig, c.md)
// Create a session client.
sc := newSessionClient(clients, database, sessionLabels, metadata.Pairs(resourcePrefixHeader, database))
// Create a session pool.
config.SessionPoolConfig.sessionLabels = sessionLabels
sp, err := newSessionPool(sc, config.SessionPoolConfig)
if err != nil {
c.Close()
sc.close()
return nil, err
}
c.idleSessions = sp
c = &Client{
sc: sc,
idleSessions: sp,
}
return c, nil
}

// rrNext returns the next available vkit Cloud Spanner RPC client in a
// round-robin manner.
func (c *Client) rrNext() *vkit.Client {
return c.clients[atomic.AddUint32(&c.rr, 1)%uint32(len(c.clients))]
}

// Close closes the client.
func (c *Client) Close() {
if c.idleSessions != nil {
c.idleSessions.close()
}
for _, gpc := range c.clients {
gpc.Close()
}
c.sc.close()
}

// Single provides a read-only snapshot transaction optimized for the case
Expand Down Expand Up @@ -273,8 +254,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
}()

// Create session.
sc := c.rrNext()
s, err = createSession(ctx, sc, c.database, c.sessionLabels, c.md)
s, err = c.sc.createSession(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -318,8 +298,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
// BatchReadOnlyTransactionFromID reconstruct a BatchReadOnlyTransaction from
// BatchReadOnlyTransactionID
func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) *BatchReadOnlyTransaction {
sc := c.rrNext()
s := &session{valid: true, client: sc, id: tid.sid, createTime: time.Now(), md: c.md}
s := c.sc.sessionWithID(tid.sid)
sh := &sessionHandle{session: s}

t := &BatchReadOnlyTransaction{
Expand Down

0 comments on commit b246326

Please sign in to comment.