Skip to content

Commit

Permalink
csi: refactor gRPC client to connect on first use
Browse files Browse the repository at this point in the history
The gRPC client for CSI connected to the socket during initialization
so that we can ensure the gRPC service interfaces have been
initialized before use. But this design makes it challenging to
correctly retry and timeout the initial connection in the plugin
supervisor and then reuse that same client for fingerprinting.

Lazily connect to the socket on the first RPC call, allowing the
caller's context to be used to control retries and timeouts.

This also gives the CSI plugin manager instance the ability to retry
its initial connection setup.
  • Loading branch information
tgross committed Feb 15, 2022
1 parent 787acdf commit e898121
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 115 deletions.
10 changes: 2 additions & 8 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), err

// At this point we know the plugin is ready and we can fingerprint it
// to get its vendor name and version
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
if err != nil {
return nil, fmt.Errorf("failed to create csi client: %v", err)
}
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()

info, err := client.PluginInfo()
Expand Down Expand Up @@ -360,10 +357,7 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socket
return false, fmt.Errorf("failed to stat socket: %v", err)
}

client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
if err != nil {
return false, fmt.Errorf("failed to create csi client: %v", err)
}
client := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()

healthy, err := client.PluginProbe(ctx)
Expand Down
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,11 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.dynamicRegistry =
dynamicplugins.NewRegistry(c.stateDB, map[string]dynamicplugins.PluginDispenser{
dynamicplugins.PluginTypeCSIController: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller"))
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "controller")), nil
},
dynamicplugins.PluginTypeCSINode: func(info *dynamicplugins.PluginInfo) (interface{}, error) {
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client"))
}, // TODO(tgross): refactor these dispenser constructors into csimanager to tidy it up
return csi.NewClient(info.ConnectionInfo.SocketPath, logger.Named("csi_client").With("plugin.name", info.Name, "plugin.type", "client")), nil
},
})

// Setup the clients RPC server
Expand Down
7 changes: 1 addition & 6 deletions client/pluginmanager/csimanager/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ func newInstanceManager(logger hclog.Logger, eventer TriggerNodeEvent, updater U
}

func (i *instanceManager) run() {
c, err := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
if err != nil {
i.logger.Error("failed to setup instance manager client", "error", err)
close(i.shutdownCh)
return
}
c := csi.NewClient(i.info.ConnectionInfo.SocketPath, i.logger)
i.client = c
i.fp.client = c

Expand Down
Loading

0 comments on commit e898121

Please sign in to comment.