Skip to content

Commit

Permalink
xds: make xds client a singleton (#4015)
Browse files Browse the repository at this point in the history
- xdsclient.New() no longer takes any input, all configs are from bootstrap file
  - added a NewForTesting()
- The returned *Client is a wrapper of the underlying client implementation, for ref-couting
- xds-resolver and xds-server no longer calls bootstrap.NewConfig. It only calls xdsclient.New()
  • Loading branch information
menghanl committed Nov 12, 2020
1 parent 28c130f commit 6caf9d8
Show file tree
Hide file tree
Showing 21 changed files with 381 additions and 324 deletions.
10 changes: 5 additions & 5 deletions xds/internal/client/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ var gRPCVersion = fmt.Sprintf("%s %s", gRPCUserAgentName, grpc.Version)
var bootstrapFileReadFunc = ioutil.ReadFile

// Config provides the xDS client with several key bits of information that it
// requires in its interaction with an xDS server. The Config is initialized
// from the bootstrap file.
// requires in its interaction with the management server. The Config is
// initialized from the bootstrap file.
type Config struct {
// BalancerName is the name of the xDS server to connect to.
// BalancerName is the name of the management server to connect to.
//
// The bootstrap file contains a list of servers (with name+creds), but we
// pick the first one.
Expand Down Expand Up @@ -96,7 +96,7 @@ type xdsServer struct {
// The format of the bootstrap file will be as follows:
// {
// "xds_server": {
// "server_uri": <string containing URI of xds server>,
// "server_uri": <string containing URI of management server>,
// "channel_creds": [
// {
// "type": <string containing channel cred type>,
Expand Down Expand Up @@ -168,7 +168,7 @@ func NewConfig() (*Config, error) {
return nil, fmt.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
}
if len(servers) < 1 {
return nil, fmt.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to")
return nil, fmt.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any management server to connect to")
}
xs := servers[0]
config.BalancerName = xs.ServerURI
Expand Down
96 changes: 32 additions & 64 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,28 +277,6 @@ type EndpointsUpdate struct {
Localities []Locality
}

// Options provides all parameters required for the creation of an xDS client.
type Options struct {
// Config contains a fully populated bootstrap config. It is the
// responsibility of the caller to use some sane defaults here if the
// bootstrap process returned with certain fields left unspecified.
Config bootstrap.Config
// DialOpts contains dial options to be used when dialing the xDS server.
DialOpts []grpc.DialOption
// TargetName is the target of the parent ClientConn.
TargetName string
// WatchExpiryTimeout is the amount of time the client is willing to wait
// for the first response from the server for any resource being watched.
// Expiry will not cause cancellation of the watch. It will only trigger the
// invocation of the registered callback and it is left up to the caller to
// decide whether or not they want to cancel the watch.
//
// If this field is left unspecified, a default value of 15 seconds will be
// used. This is based on the default value of the initial_fetch_timeout
// field in corepb.ConfigSource proto.
WatchExpiryTimeout time.Duration
}

// Function to be overridden in tests.
var newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, opts BuildOptions) (APIClient, error) {
cb := getAPIClientBuilder(apiVersion)
Expand All @@ -308,23 +286,19 @@ var newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, op
return cb.Build(cc, opts)
}

// Client is a full fledged gRPC client which queries a set of discovery APIs
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources.
//
// A single client object will be shared by the xds resolver and balancer
// implementations. But the same client can only be shared by the same parent
// ClientConn.
// clientImpl is the real implementation of the xds client. The exported Client
// is a wrapper of this struct with a ref count.
//
// Implements UpdateHandler interface.
// TODO(easwars): Make a wrapper struct which implements this interface in the
// style of ccBalancerWrapper so that the Client type does not implement these
// exported methods.
type Client struct {
done *grpcsync.Event
opts Options
cc *grpc.ClientConn // Connection to the xDS server
apiClient APIClient
type clientImpl struct {
done *grpcsync.Event
config *bootstrap.Config
cc *grpc.ClientConn // Connection to the management server.
apiClient APIClient
watchExpiryTimeout time.Duration

logger *grpclog.PrefixLogger

Expand All @@ -345,46 +319,40 @@ type Client struct {
lrsClients map[string]*lrsClient
}

// New returns a new xdsClient configured with opts.
func New(opts Options) (*Client, error) {
// newWithConfig returns a new xdsClient with the given config.
func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration) (*clientImpl, error) {
switch {
case opts.Config.BalancerName == "":
case config.BalancerName == "":
return nil, errors.New("xds: no xds_server name provided in options")
case opts.Config.Creds == nil:
case config.Creds == nil:
return nil, errors.New("xds: no credentials provided in options")
case opts.Config.NodeProto == nil:
case config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}

switch opts.Config.TransportAPI {
switch config.TransportAPI {
case version.TransportV2:
if _, ok := opts.Config.NodeProto.(*v2corepb.Node); !ok {
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", opts.Config.NodeProto, opts.Config.TransportAPI)
if _, ok := config.NodeProto.(*v2corepb.Node); !ok {
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", config.NodeProto, config.TransportAPI)
}
case version.TransportV3:
if _, ok := opts.Config.NodeProto.(*v3corepb.Node); !ok {
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", opts.Config.NodeProto, opts.Config.TransportAPI)
if _, ok := config.NodeProto.(*v3corepb.Node); !ok {
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", config.NodeProto, config.TransportAPI)
}
}

dopts := []grpc.DialOption{
opts.Config.Creds,
config.Creds,
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 5 * time.Minute,
Timeout: 20 * time.Second,
}),
}
dopts = append(dopts, opts.DialOpts...)

if opts.WatchExpiryTimeout == 0 {
// This is based on the default value of the initial_fetch_timeout field
// in corepb.ConfigSource proto.
opts.WatchExpiryTimeout = 15 * time.Second
}

c := &Client{
done: grpcsync.NewEvent(),
opts: opts,
c := &clientImpl{
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,

updateCh: buffer.NewUnbounded(),
ldsWatchers: make(map[string]map[*watchInfo]bool),
Expand All @@ -398,18 +366,18 @@ func New(opts Options) (*Client, error) {
lrsClients: make(map[string]*lrsClient),
}

cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
cc, err := grpc.Dial(config.BalancerName, dopts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err)
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", config.BalancerName, err)
}
c.cc = cc
c.logger = prefixLogger((c))
c.logger.Infof("Created ClientConn to xDS server: %s", opts.Config.BalancerName)
c.logger.Infof("Created ClientConn to xDS management server: %s", config.BalancerName)

apiClient, err := newAPIClient(opts.Config.TransportAPI, cc, BuildOptions{
apiClient, err := newAPIClient(config.TransportAPI, cc, BuildOptions{
Parent: c,
NodeProto: opts.Config.NodeProto,
NodeProto: config.NodeProto,
Backoff: backoff.DefaultExponential.Backoff,
Logger: c.logger,
})
Expand All @@ -426,7 +394,7 @@ func New(opts Options) (*Client, error) {
// "certificate_providers" field of the bootstrap file. The key in the returned
// map is the plugin_instance_name. Callers must not modify the returned map.
func (c *Client) CertProviderConfigs() map[string]*certprovider.BuildableConfig {
return c.opts.Config.CertProviderConfigs
return c.config.CertProviderConfigs
}

// run is a goroutine for all the callbacks.
Expand All @@ -435,7 +403,7 @@ func (c *Client) CertProviderConfigs() map[string]*certprovider.BuildableConfig
// goroutine, the callback will be called inline, which might cause a deadlock
// in user's code. Callbacks also cannot be simple `go callback()` because the
// order matters.
func (c *Client) run() {
func (c *clientImpl) run() {
for {
select {
case t := <-c.updateCh.Get():
Expand All @@ -450,8 +418,8 @@ func (c *Client) run() {
}
}

// Close closes the gRPC connection to the xDS server.
func (c *Client) Close() {
// Close closes the gRPC connection to the management server.
func (c *clientImpl) Close() {
if c.done.HasFired() {
return
}
Expand Down
12 changes: 6 additions & 6 deletions xds/internal/client/client_callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ type watcherInfoWithUpdate struct {

// scheduleCallback should only be called by methods of watchInfo, which checks
// for watcher states and maintain consistency.
func (c *Client) scheduleCallback(wi *watchInfo, update interface{}, err error) {
func (c *clientImpl) scheduleCallback(wi *watchInfo, update interface{}, err error) {
c.updateCh.Put(&watcherInfoWithUpdate{
wi: wi,
update: update,
err: err,
})
}

func (c *Client) callCallback(wiu *watcherInfoWithUpdate) {
func (c *clientImpl) callCallback(wiu *watcherInfoWithUpdate) {
c.mu.Lock()
// Use a closure to capture the callback and type assertion, to save one
// more switch case.
Expand Down Expand Up @@ -74,7 +74,7 @@ func (c *Client) callCallback(wiu *watcherInfoWithUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) NewListeners(updates map[string]ListenerUpdate) {
func (c *clientImpl) NewListeners(updates map[string]ListenerUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -109,7 +109,7 @@ func (c *Client) NewListeners(updates map[string]ListenerUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) NewRouteConfigs(updates map[string]RouteConfigUpdate) {
func (c *clientImpl) NewRouteConfigs(updates map[string]RouteConfigUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -130,7 +130,7 @@ func (c *Client) NewRouteConfigs(updates map[string]RouteConfigUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) NewClusters(updates map[string]ClusterUpdate) {
func (c *clientImpl) NewClusters(updates map[string]ClusterUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -165,7 +165,7 @@ func (c *Client) NewClusters(updates map[string]ClusterUpdate) {
//
// A response can contain multiple resources. They will be parsed and put in a
// map from resource name to the resource content.
func (c *Client) NewEndpoints(updates map[string]EndpointsUpdate) {
func (c *clientImpl) NewEndpoints(updates map[string]EndpointsUpdate) {
c.mu.Lock()
defer c.mu.Unlock()

Expand Down
22 changes: 11 additions & 11 deletions xds/internal/client/client_loadreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import (
)

// ReportLoad starts an load reporting stream to the given server. If the server
// is not an empty string, and is different from the xds server, a new
// is not an empty string, and is different from the management server, a new
// ClientConn will be created.
//
// The same options used for creating the Client will be used (including
// NodeProto, and dial options if necessary).
//
// It returns a Store for the user to report loads, a function to cancel the
// load reporting stream.
func (c *Client) ReportLoad(server string) (*load.Store, func()) {
func (c *clientImpl) ReportLoad(server string) (*load.Store, func()) {
c.lrsMu.Lock()
defer c.lrsMu.Unlock()

Expand All @@ -58,20 +58,21 @@ func (c *Client) ReportLoad(server string) (*load.Store, func()) {
}

// lrsClient maps to one lrsServer. It contains:
// - a ClientConn to this server (only if it's different from the xds server)
// - a ClientConn to this server (only if it's different from the management
// server)
// - a load.Store that contains loads only for this server
type lrsClient struct {
parent *Client
parent *clientImpl
server string

cc *grpc.ClientConn // nil if the server is same as the xds server
cc *grpc.ClientConn // nil if the server is same as the management server
refCount int
cancelStream func()
loadStore *load.Store
}

// newLRSClient creates a new LRS stream to the server.
func newLRSClient(parent *Client, server string) *lrsClient {
func newLRSClient(parent *clientImpl, server string) *lrsClient {
return &lrsClient{
parent: parent,
server: server,
Expand Down Expand Up @@ -109,18 +110,17 @@ func (lrsC *lrsClient) unRef() (closed bool) {
}

// startStream starts the LRS stream to the server. If server is not the same
// xDS server from the parent, it also creates a ClientConn.
// management server from the parent, it also creates a ClientConn.
func (lrsC *lrsClient) startStream() {
var cc *grpc.ClientConn

lrsC.parent.logger.Infof("Starting load report to server: %s", lrsC.server)
if lrsC.server == "" || lrsC.server == lrsC.parent.opts.Config.BalancerName {
if lrsC.server == "" || lrsC.server == lrsC.parent.config.BalancerName {
// Reuse the xDS client if server is the same.
cc = lrsC.parent.cc
} else {
lrsC.parent.logger.Infof("LRS server is different from xDS server, starting a new ClientConn")
dopts := append([]grpc.DialOption{lrsC.parent.opts.Config.Creds}, lrsC.parent.opts.DialOpts...)
ccNew, err := grpc.Dial(lrsC.server, dopts...)
lrsC.parent.logger.Infof("LRS server is different from management server, starting a new ClientConn")
ccNew, err := grpc.Dial(lrsC.server, lrsC.parent.config.Creds)
if err != nil {
// An error from a non-blocking dial indicates something serious.
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err)
Expand Down
16 changes: 8 additions & 8 deletions xds/internal/client/client_loadreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.

defaultClientWatchExpiryTimeout = 15 * time.Second
)

type s struct {
Expand All @@ -61,14 +63,12 @@ func (s) TestLRSClient(t *testing.T) {
}
defer sCleanup()

xdsC, err := client.New(client.Options{
Config: bootstrap.Config{
BalancerName: fs.Address,
Creds: grpc.WithInsecure(),
NodeProto: &v2corepb.Node{},
TransportAPI: version.TransportV2,
},
})
xdsC, err := client.NewWithConfigForTesting(&bootstrap.Config{
BalancerName: fs.Address,
Creds: grpc.WithInsecure(),
NodeProto: &v2corepb.Node{},
TransportAPI: version.TransportV2,
}, defaultClientWatchExpiryTimeout)
if err != nil {
t.Fatalf("failed to create xds client: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/client/client_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ const prefix = "[xds-client %p] "

var logger = grpclog.Component("xds")

func prefixLogger(p *Client) *internalgrpclog.PrefixLogger {
func prefixLogger(p *clientImpl) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

0 comments on commit 6caf9d8

Please sign in to comment.