Skip to content

Commit

Permalink
Handle VolumeAssignment from dispatcher
Browse files Browse the repository at this point in the history
Signed-off-by: Ameya Gawde <agawde@mirantis.com>
  • Loading branch information
ameyag committed Aug 13, 2020
1 parent 537edd8 commit 8b59acc
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 4 deletions.
64 changes: 64 additions & 0 deletions agent/csi/mounter.go
@@ -0,0 +1,64 @@
package csi

import (
"errors"
"fmt"
"os"
"os/exec"
"strings"
)

type MounterInterface interface {

// Mount will mount source to target
Mount(source string, target string, opts ...string) error
// Unmount will unmount given target
Unmount(target string) error
}

type Mounter struct{}

func (m *Mounter) Mount(source, target string, opts ...string) error {

if source == "" {
return errors.New("mount source voule is not available")
}

if target == "" {
return errors.New("mount target volume is not available")
}

// Build mount arguments
var mountArgs []string
if len(opts) > 0 {
mountArgs = append(mountArgs, "-o", strings.Join(opts, ","))
}
mountArgs = append(mountArgs, source)
mountArgs = append(mountArgs, target)

// Create the target
err := os.MkdirAll(target, 0750)
if err != nil {
return err
}

out, err := exec.Command("mount", mountArgs...).CombinedOutput()
if err != nil {
return fmt.Errorf("failure:%v cmd: 'mount %s' output: %q", err, strings.Join(mountArgs, " "), string(out))
}

return nil
}

func (m *Mounter) Unmount(target string) error {
if target == "" {
return errors.New("mount target volume is not available")
}

out, err := exec.Command("umount", target).CombinedOutput()
if err != nil {
return fmt.Errorf("failure: %v cmd: 'umount %s' output: %q", err, target, string(out))
}

return nil
}
45 changes: 45 additions & 0 deletions agent/csi/plugin.go
Expand Up @@ -11,6 +11,8 @@ import (

type NodePluginInterface interface {
NodeGetInfo(ctx context.Context) (*api.NodeCSIInfo, error)
NodeStageVolume(ctx context.Context, req []*api.VolumeAssignment) ([]*api.VolumeAttachment, error)
NodeUnstageVolume(ctx context.Context, req []*api.VolumeAssignment) ([]*api.VolumeAttachment, error)
}

// plugin represents an individual CSI node plugin
Expand All @@ -33,6 +35,9 @@ type NodePlugin struct {

// NodeClient is the node service client
NodeClient csi.NodeClient

// Mounter exposes mount/unmount operation
Mounter Mounter
}

func (np *NodePlugin) NodeGetInfo(ctx context.Context) (*api.NodeCSIInfo, error) {
Expand All @@ -42,3 +47,43 @@ func (np *NodePlugin) NodeGetInfo(ctx context.Context) (*api.NodeCSIInfo, error)

return makeNodeInfo(resp), nil
}

func (np *NodePlugin) NodeStageVolume(ctx context.Context, req []*api.VolumeAssignment) ([]*api.VolumeAttachment, error) {
// Message for NodeStageVolumeResponse is empty
// https://github.com/container-storage-interface/spec/blob/v1.2.0/csi.proto#L1128-L1130
var volumeAttachments []*api.VolumeAttachment
err := np.Mounter.Mount("/var/lib/docker", "/var/target")
if err != nil {
return nil, err
}
for range req {
volumeAttachments = append(volumeAttachments, &api.VolumeAttachment{})
}
return volumeAttachments, nil
}

func (np *NodePlugin) NodeUnstageVolume(ctx context.Context, req []*api.VolumeAssignment) ([]*api.VolumeAttachment, error) {
// Message for NodeStageVolumeResponse is empty
// https://github.com/container-storage-interface/spec/blob/v1.2.0/csi.proto#L1141-L1143
var volumeAttachments []*api.VolumeAttachment
err := np.Mounter.Unmount("/var/target")
if err != nil {
return nil, err
}
for range req {
volumeAttachments = append(volumeAttachments, &api.VolumeAttachment{})
}
return volumeAttachments, nil
}

func (np *NodePlugin) NodePublishVolume(ctx context.Context, req ...api.VolumeAssignment) (*api.VolumeAttachment, error) {
// Message for NodePublishVolumeResponse is empty
// https://github.com/container-storage-interface/spec/blob/v1.2.0/csi.proto#1198-L1200
return &api.VolumeAttachment{}, nil
}

func (np *NodePlugin) NodeUnPublishVolume(ctx context.Context, req ...api.Assignment) (*api.VolumeAttachment, error) {
// Message for NodeUnPublishVolumeResponse is empty
// https://github.com/container-storage-interface/spec/blob/v1.2.0/csi.proto#L1212-L1214
return &api.VolumeAttachment{}, nil
}
75 changes: 75 additions & 0 deletions agent/csi/volumes.go
@@ -0,0 +1,75 @@
package csi

import (
"context"
"fmt"
"sync"

"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
)

// volumes is a map that keeps all the currently available volumes to the agent
// mapped by volume ID.
type volumes struct {
mu sync.RWMutex
m map[string]*api.VolumeAssignment
plugin NodePlugin
}

// NewManager returns a place to store volumes.
func NewManager() exec.VolumesManager {
return &volumes{
m: make(map[string]*api.VolumeAssignment),
}
}

// Get returns a volume by ID. If the volume doesn't exist, returns nil.
func (r *volumes) Get(volumesID string) (*api.VolumeAssignment, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r, ok := r.m[volumesID]; ok {
return r, nil
}
return nil, fmt.Errorf("volume %s not found", volumesID)
}

// Add adds one or more volumes to the volume map.
func (r *volumes) Add(volumes ...api.VolumeAssignment) {
r.mu.Lock()
var volumeObjects []*api.VolumeAssignment
defer r.mu.Unlock()
for _, volume := range volumes {
v := volume.Copy()
r.m[volume.VolumeID] = v
volumeObjects = append(volumeObjects, v)
}
go r.plugin.NodeStageVolume(context.Background(), volumeObjects)
}

// Remove removes one or more volumes by ID from the volumes map. Succeeds
// whether or not the given IDs are in the map.
func (r *volumes) Remove(volumes []string) {
r.mu.Lock()
var volumeObjects []*api.VolumeAssignment
defer r.mu.Unlock()
for _, volume := range volumes {
v := r.m[volume]
volumeObjects = append(volumeObjects, v)
delete(r.m, volume)
}

go r.plugin.NodeUnstageVolume(context.Background(), volumeObjects)
}

// Reset removes all the volumes.
func (r *volumes) Reset() {
r.mu.Lock()
var volumeObjects []*api.VolumeAssignment
defer r.mu.Unlock()
for _, v := range r.m {
volumeObjects = append(volumeObjects, v)
}
r.m = make(map[string]*api.VolumeAssignment)
go r.plugin.NodeUnstageVolume(context.Background(), volumeObjects)
}
12 changes: 12 additions & 0 deletions agent/dependency.go
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"github.com/docker/swarmkit/agent/configs"
"github.com/docker/swarmkit/agent/csi"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/agent/secrets"
"github.com/docker/swarmkit/api"
Expand All @@ -10,6 +11,7 @@ import (
type dependencyManager struct {
secrets exec.SecretsManager
configs exec.ConfigsManager
volumes exec.VolumesManager
}

// NewDependencyManager creates a dependency manager object that wraps
Expand All @@ -18,6 +20,7 @@ func NewDependencyManager() exec.DependencyManager {
return &dependencyManager{
secrets: secrets.NewManager(),
configs: configs.NewManager(),
volumes: csi.NewManager(),
}
}

Expand All @@ -29,9 +32,14 @@ func (d *dependencyManager) Configs() exec.ConfigsManager {
return d.configs
}

func (d *dependencyManager) Volumes() exec.VolumesManager {
return d.volumes
}

type dependencyGetter struct {
secrets exec.SecretGetter
configs exec.ConfigGetter
volumes exec.VolumeGetter
}

func (d *dependencyGetter) Secrets() exec.SecretGetter {
Expand All @@ -42,6 +50,10 @@ func (d *dependencyGetter) Configs() exec.ConfigGetter {
return d.configs
}

func (d *dependencyGetter) Volumes() exec.VolumeGetter {
return d.volumes
}

// Restrict provides getters that only allows access to the dependencies
// referenced by the task.
func Restrict(dependencies exec.DependencyManager, t *api.Task) exec.DependencyGetter {
Expand Down
22 changes: 22 additions & 0 deletions agent/exec/executor.go
Expand Up @@ -35,6 +35,12 @@ type ConfigsProvider interface {
Configs() ConfigsManager
}

// VolumesProvider is implemented by objects that can store volumes,
// typically an executor.
type VolumesProvider interface {
Volumes() VolumesManager
}

// DependencyManager is a meta-object that can keep track of typed objects
// such as secrets and configs.
type DependencyManager interface {
Expand Down Expand Up @@ -80,3 +86,19 @@ type ConfigsManager interface {
Remove(configs []string) // remove the configs by ID
Reset() // remove all configs
}

// VolumeGetter contains volume data necessary for the Controller.
type VolumeGetter interface {
// Get returns the the volume with a specific volume ID, if available.
// When the volume is not available, the return will be nil.
Get(volumeID string) (*api.VolumeAssignment, error)
}

// VolumesManager is the interface for volume storage and updates.
type VolumesManager interface {
VolumeGetter

Add(VolumeAssignment ...api.VolumeAssignment) // add one or more volumes
Remove(volumes []string) // remove the volumes by ID
Reset() // remove all volumes
}
66 changes: 62 additions & 4 deletions agent/worker.go
Expand Up @@ -23,15 +23,20 @@ type Worker interface {
// It is not safe to call any worker function after that.
Close()

// Assign assigns a complete set of tasks and configs/secrets to a
// Assign assigns a complete set of tasks and configs/secrets/volumes to a
// worker. Any items not included in this set will be removed.
Assign(ctx context.Context, assignments []*api.AssignmentChange) error

// Updates updates an incremental set of tasks or configs/secrets of
// Updates updates an incremental set of tasks or configs/secrets/volumes of
// the worker. Any items not included either in added or removed will
// remain untouched.
Update(ctx context.Context, assignments []*api.AssignmentChange) error

// PublishVolume updates an incremental set of CSI volumes of
// the worker. Any items not included either in added or removed will
// remain untouched.
// PublishVolume(ctx context.Context, assignments []*api.AssignmentChange) error

// Listen to updates about tasks controlled by the worker. When first
// called, the reporter will receive all updates for all tasks controlled
// by the worker.
Expand Down Expand Up @@ -152,7 +157,12 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange
return err
}

return reconcileTaskState(ctx, w, assignments, true)
err = reconcileTaskState(ctx, w, assignments, true)
if err != nil {
return err
}

return reconcileVolumes(ctx, w, assignments, true)
}

// Update updates the set of tasks, configs, and secrets for the worker.
Expand Down Expand Up @@ -184,7 +194,12 @@ func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange
return err
}

return reconcileTaskState(ctx, w, assignments, false)
err = reconcileTaskState(ctx, w, assignments, false)
if err != nil {
return err
}

return reconcileVolumes(ctx, w, assignments, false)
}

func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
Expand Down Expand Up @@ -409,6 +424,49 @@ func reconcileConfigs(ctx context.Context, w *worker, assignments []*api.Assignm
return nil
}

func reconcileVolumes(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
var (
updatedVolumes []api.VolumeAssignment
removedVolumes []string
)
for _, a := range assignments {
if r := a.Assignment.GetVolume(); r != nil {
switch a.Action {
case api.AssignmentChange_AssignmentActionUpdate:
updatedVolumes = append(updatedVolumes, *r)
case api.AssignmentChange_AssignmentActionRemove:
removedVolumes = append(removedVolumes, r.ID)
}

}
}

volumesProvider, ok := w.executor.(exec.VolumesProvider)
if !ok {
if len(updatedVolumes) != 0 || len(removedVolumes) != 0 {
log.G(ctx).Warn("volumes update ignored; executor does not support volumes")
}
return nil
}

volumes := volumesProvider.Volumes()

log.G(ctx).WithFields(logrus.Fields{
"len(updatedVolumes)": len(updatedVolumes),
"len(removedVolumes)": len(removedVolumes),
}).Debug("(*worker).reconcileVolumes")

// If this was a complete set of volumes, we're going to clear the volumes map and add all of them
if fullSnapshot {
volumes.Reset()
} else {
volumes.Remove(removedVolumes)
}
volumes.Add(updatedVolumes...)

return nil
}

func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down

0 comments on commit 8b59acc

Please sign in to comment.