Skip to content

Commit

Permalink
feature(config): add go-sockaddr templating support to nomad consul …
Browse files Browse the repository at this point in the history
…address
  • Loading branch information
Nkmol committed Feb 24, 2022
1 parent 1ad08c2 commit 856fdcc
Show file tree
Hide file tree
Showing 58 changed files with 1,137 additions and 678 deletions.
3 changes: 3 additions & 0 deletions .changelog/11600.txt
@@ -0,0 +1,3 @@
```release-note:improvement
core: The unused protocol_version agent configuration value has been removed.
```
11 changes: 11 additions & 0 deletions .changelog/12078.txt
@@ -0,0 +1,11 @@
```release-note:improvement
csi: Allow for concurrent plugin allocations
```

```release-note:breaking-change
client: The client state store will be automatically migrated to a new schema version when upgrading a client. Downgrading to a previous version of the client after upgrading it to Nomad 1.3 is not supported. To downgrade safely, users should erase the Nomad client's data directory.
```

```release-note:breaking-change
csi: The client filesystem layout for CSI plugins has been updated to correctly handle the lifecycle of multiple allocations serving the same plugin. Running plugin tasks will not be updated after upgrading the client, but it is recommended to redeploy CSI plugin jobs after upgrading the cluster.
```
3 changes: 3 additions & 0 deletions .changelog/12084.txt
@@ -0,0 +1,3 @@
```release-note:improvement
consul: add go-sockaddr templating support to nomad consul address
```
63 changes: 57 additions & 6 deletions api/nodes.go
Expand Up @@ -621,19 +621,70 @@ type CSITopology struct {
// CSINodeInfo is the fingerprinted data from a CSI Plugin that is specific to
// the Node API.
type CSINodeInfo struct {
ID string
MaxVolumes int64
AccessibleTopology *CSITopology
ID string
MaxVolumes int64
AccessibleTopology *CSITopology

// RequiresNodeStageVolume indicates whether the client should Stage/Unstage
// volumes on this node.
RequiresNodeStageVolume bool

// SupportsStats indicates plugin support for GET_VOLUME_STATS
SupportsStats bool

// SupportsExpand indicates plugin support for EXPAND_VOLUME
SupportsExpand bool

// SupportsCondition indicates plugin support for VOLUME_CONDITION
SupportsCondition bool
}

// CSIControllerInfo is the fingerprinted data from a CSI Plugin that is specific to
// the Controller API.
type CSIControllerInfo struct {
SupportsReadOnlyAttach bool
SupportsAttachDetach bool
SupportsListVolumes bool
// SupportsCreateDelete indicates plugin support for CREATE_DELETE_VOLUME
SupportsCreateDelete bool

// SupportsPublishVolume is true when the controller implements the
// methods required to attach and detach volumes. If this is false Nomad
// should skip the controller attachment flow.
SupportsAttachDetach bool

// SupportsListVolumes is true when the controller implements the
// ListVolumes RPC. NOTE: This does not guarantee that attached nodes will
// be returned unless SupportsListVolumesAttachedNodes is also true.
SupportsListVolumes bool

// SupportsGetCapacity indicates plugin support for GET_CAPACITY
SupportsGetCapacity bool

// SupportsCreateDeleteSnapshot indicates plugin support for
// CREATE_DELETE_SNAPSHOT
SupportsCreateDeleteSnapshot bool

// SupportsListSnapshots indicates plugin support for LIST_SNAPSHOTS
SupportsListSnapshots bool

// SupportsClone indicates plugin support for CLONE_VOLUME
SupportsClone bool

// SupportsReadOnlyAttach is set to true when the controller returns the
// ATTACH_READONLY capability.
SupportsReadOnlyAttach bool

// SupportsExpand indicates plugin support for EXPAND_VOLUME
SupportsExpand bool

// SupportsListVolumesAttachedNodes indicates whether the plugin will
// return attached nodes data when making ListVolume RPCs (plugin support
// for LIST_VOLUMES_PUBLISHED_NODES)
SupportsListVolumesAttachedNodes bool

// SupportsCondition indicates plugin support for VOLUME_CONDITION
SupportsCondition bool

// SupportsGet indicates plugin support for GET_VOLUME
SupportsGet bool
}

// CSIInfo is the current state of a single CSI Plugin. This is updated regularly
Expand Down
9 changes: 5 additions & 4 deletions client/allocrunner/csi_hook.go
Expand Up @@ -9,6 +9,7 @@ import (
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
Expand Down Expand Up @@ -286,13 +287,13 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
defer cancel()
var err error
backoff := time.Second
ticker := time.NewTicker(backoff)
defer ticker.Stop()
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return err
case <-ticker.C:
case <-t.C:
}

err = c.unmountImpl(pair)
Expand All @@ -306,7 +307,7 @@ func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
backoff = c.maxBackoffInterval
}
}
ticker.Reset(backoff)
t.Reset(backoff)
}
return nil
}
Expand Down
100 changes: 76 additions & 24 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Expand Up @@ -21,18 +21,20 @@ import (
// tasks. These plugins will be fingerprinted and it will manage connecting them
// to their requisite plugin manager.
//
// It provides a couple of things to a task running inside Nomad. These are:
// * A mount to the `plugin_mount_dir`, that will then be used by Nomad
// to connect to the nested plugin and handle volume mounts.
// It provides a few things to a plugin task running inside Nomad. These are:
// * A mount to the `csi_plugin.mount_dir` where the plugin will create its csi.sock
// * A mount to `local/csi` that node plugins will use to stage volume mounts.
// * When the task has started, it starts a loop of attempting to connect to the
// plugin, to perform initial fingerprinting of the plugins capabilities before
// notifying the plugin manager of the plugin.
type csiPluginSupervisorHook struct {
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
socketMountPoint string
socketPath string

caps *drivers.Capabilities

Expand Down Expand Up @@ -73,20 +75,36 @@ var _ interfaces.TaskPoststartHook = &csiPluginSupervisorHook{}
// with the catalog and to ensure any mounts are cleaned up.
var _ interfaces.TaskStopHook = &csiPluginSupervisorHook{}

// This hook creates a csi/ directory within the client's datadir used to
// manage plugins and mount points volumes. The layout is as follows:

// plugins/
// {alloc-id}/csi.sock
// Per-allocation directories of unix domain sockets used to communicate
// with the CSI plugin. Nomad creates the directory and the plugin creates
// the socket file. This directory is bind-mounted to the
// csi_plugin.mount_config dir in the plugin task.
//
// {plugin-type}/{plugin-id}/
// staging/
// {volume-id}/{usage-mode}/
// Intermediate mount point used by node plugins that support
// NODE_STAGE_UNSTAGE capability.
//
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode}/
// Mount point bound from the staging directory into tasks that use
// the mounted volumes

func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPluginSupervisorHook {
task := config.runner.Task()

// The Plugin directory will look something like this:
// .
// ..
// csi.sock - A unix domain socket used to communicate with the CSI Plugin
// staging/
// {volume-id}/{usage-mode-hash}/ - Intermediary mount point that will be used by plugins that support NODE_STAGE_UNSTAGE capabilities.
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode-hash}/ - Mount Point that will be bind-mounted into tasks that utilise the volume
pluginRoot := filepath.Join(config.clientStateDirPath, "csi",
string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID)

socketMountPoint := filepath.Join(config.clientStateDirPath, "csi",
"plugins", config.runner.Alloc().ID)

shutdownCtx, cancelFn := context.WithCancel(context.Background())

hook := &csiPluginSupervisorHook{
Expand All @@ -96,6 +114,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
logger: config.logger,
task: task,
mountPoint: pluginRoot,
socketMountPoint: socketMountPoint,
caps: config.capabilities,
shutdownCtx: shutdownCtx,
shutdownCancelFn: cancelFn,
Expand All @@ -122,18 +141,46 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
return fmt.Errorf("failed to create mount point: %v", err)
}

if err := os.MkdirAll(h.socketMountPoint, 0700); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create socket mount point: %v", err)
}

// where the socket will be mounted
configMount := &drivers.MountConfig{
TaskPath: h.task.CSIPluginConfig.MountDir,
HostPath: h.socketMountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
// where the staging and per-alloc directories will be mounted
volumeStagingMounts := &drivers.MountConfig{
// TODO(tgross): add this TaskPath to the CSIPluginConfig as well
TaskPath: "/local/csi",
HostPath: h.mountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
// devices from the host
devMount := &drivers.MountConfig{
TaskPath: "/dev",
HostPath: "/dev",
Readonly: false,
}

// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
// If we're already registered, we should be able to update the
// definition in the update hook

// For backwards compatibility, ensure that we don't overwrite the
// socketPath on client restart with existing plugin allocations.
pluginInfo, _ := h.runner.dynamicRegistry.PluginForAlloc(
string(h.task.CSIPluginConfig.Type), h.task.CSIPluginConfig.ID, h.alloc.ID)
if pluginInfo != nil {
h.socketPath = pluginInfo.ConnectionInfo.SocketPath
} else {
h.socketPath = filepath.Join(h.socketMountPoint, structs.CSISocketName)
}

switch h.caps.FSIsolation {
case drivers.FSIsolationNone:
// Plugin tasks with no filesystem isolation won't have the
Expand All @@ -142,13 +189,15 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
// plugins will need to be aware of the csi directory layout
// in the client data dir
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.mountPoint, "csi.sock")}
"CSI_ENDPOINT": h.socketPath}
default:
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.task.CSIPluginConfig.MountDir, "csi.sock")}
"CSI_ENDPOINT": filepath.Join(
h.task.CSIPluginConfig.MountDir, structs.CSISocketName)}
}

mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount)
mounts = ensureMountpointInserted(mounts, volumeStagingMounts)
mounts = ensureMountpointInserted(mounts, devMount)

h.runner.hookResources.setMounts(mounts)
Expand Down Expand Up @@ -203,9 +252,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
h.runningLock.Unlock()
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)

client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
client := csi.NewClient(h.socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
Expand Down Expand Up @@ -249,7 +296,7 @@ WAITFORREADY:
}

// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
deregisterPluginFn, err := h.registerPlugin(client, h.socketPath)
if err != nil {
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
return
Expand Down Expand Up @@ -317,7 +364,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
Options: map[string]string{
"Provider": info.Name, // vendor name
"MountPoint": h.mountPoint,
"ContainerMountPoint": h.task.CSIPluginConfig.MountDir,
"ContainerMountPoint": "/local/csi",
},
}
}
Expand Down Expand Up @@ -348,8 +395,9 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
// closes over its own registration
rname := reg.Name
rtype := reg.Type
allocID := reg.AllocID
deregistrationFns = append(deregistrationFns, func() {
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname)
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname, allocID)
if err != nil {
h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err)
}
Expand Down Expand Up @@ -384,6 +432,10 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client
// Stop hooks must be idempotent. The context is cancelled prematurely if the
// task is killed.
func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
err := os.RemoveAll(h.socketMountPoint)
if err != nil {
h.logger.Error("could not remove plugin socket directory", "dir", h.socketMountPoint, "error", err)
}
h.shutdownCancelFn()
return nil
}
Expand Down
14 changes: 1 addition & 13 deletions client/client.go
Expand Up @@ -749,18 +749,6 @@ func (c *Client) secretNodeID() string {
return c.config.Node.SecretID
}

// RPCMajorVersion returns the structs.ApiMajorVersion supported by the
// client.
func (c *Client) RPCMajorVersion() int {
return structs.ApiMajorVersion
}

// RPCMinorVersion returns the structs.ApiMinorVersion supported by the
// client.
func (c *Client) RPCMinorVersion() int {
return structs.ApiMinorVersion
}

// Shutdown is used to tear down the client
func (c *Client) Shutdown() error {
c.shutdownLock.Lock()
Expand Down Expand Up @@ -2773,7 +2761,7 @@ DISCOLOOP:
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
if err := c.connPool.RPC(region, addr, "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
Expand Down

0 comments on commit 856fdcc

Please sign in to comment.