Skip to content

Commit

Permalink
Handle VolumeAssignment from dispatcher
Browse files Browse the repository at this point in the history
Signed-off-by: Ameya Gawde <agawde@mirantis.com>
  • Loading branch information
ameyag committed Aug 12, 2020
1 parent 537edd8 commit 6ed3b4d
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 4 deletions.
24 changes: 24 additions & 0 deletions agent/csi/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,27 @@ func (np *NodePlugin) NodeGetInfo(ctx context.Context) (*api.NodeCSIInfo, error)

return makeNodeInfo(resp), nil
}

func (np *NodePlugin) NodeStageVolume(ctx context.Context, req *api.VolumeAssignment) (*api.VolumeAttachment, error) {
// Message for NodeStageVolumeResponse is empty
// https://github.com/container-storage-interface/spec/blob/v1.2.0/csi.proto#L1128-L1130
return &api.VolumeAttachment{}, nil
}

func (np *NodePlugin) NodeUnstageVolume(ctx context.Context, req *api.VolumeAssignment) (*api.VolumeAttachment, error) {
// Message for NodeStageVolumeResponse is empty
// https://github.com/container-storage-interface/spec/blob/v1.2.0/csi.proto#L1141-L1143
return &api.VolumeAttachment{}, nil
}

func (np *NodePlugin) NodePublishVolume(ctx context.Context, req *api.VolumeAssignment) (*api.VolumeAttachment, error) {
// Message for NodePublishVolumeResponse is empty
// https://github.com/container-storage-interface/spec/blob/v1.2.0/csi.proto#1198-L1200
return &api.VolumeAttachment{}, nil
}

func (np *NodePlugin) NodeUnPublishVolume(ctx context.Context, req *api.Assignment) (*api.VolumeAttachment, error) {
// Message for NodeUnPublishVolumeResponse is empty
// https://github.com/container-storage-interface/spec/blob/v1.2.0/csi.proto#L1212-L1214
return &api.VolumeAttachment{}, nil
}
63 changes: 63 additions & 0 deletions agent/csi/volumes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package csi

import (
"context"
"fmt"
"sync"

"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
)

// volumes is a map that keeps all the currently available volumes to the agent
// mapped by volume ID.
type volumes struct {
mu sync.RWMutex
m map[string]*api.VolumeAssignment
plugin NodePlugin
}

// NewManager returns a place to store volumes.
func NewManager() exec.VolumesManager {
return &volumes{
m: make(map[string]*api.VolumeAssignment),
}
}

// Get returns a volume by ID. If the volume doesn't exist, returns nil.
func (r *volumes) Get(volumesID string) (*api.VolumeAssignment, error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r, ok := r.m[volumesID]; ok {
return r, nil
}
return nil, fmt.Errorf("volume %s not found", volumesID)
}

// Add adds one or more volumes to the volume map.
func (r *volumes) Add(volumes ...api.VolumeAssignment) {
r.mu.Lock()
defer r.mu.Unlock()
for _, volume := range volumes {
r.plugin.NodeStageVolume(context.Background(), &volume)
r.m[volume.VolumeID] = volume.Copy()
}
}

// Remove removes one or more volumes by ID from the volumes map. Succeeds
// whether or not the given IDs are in the map.
func (r *volumes) Remove(volumes []string) {
r.mu.Lock()
defer r.mu.Unlock()
for _, volume := range volumes {
r.plugin.NodeUnstageVolume(context.Background(), r.m[volume])
delete(r.m, volume)
}
}

// Reset removes all the volumes.
func (r *volumes) Reset() {
r.mu.Lock()
defer r.mu.Unlock()
r.m = make(map[string]*api.VolumeAssignment)
}
12 changes: 12 additions & 0 deletions agent/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"github.com/docker/swarmkit/agent/configs"
"github.com/docker/swarmkit/agent/csi"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/agent/secrets"
"github.com/docker/swarmkit/api"
Expand All @@ -10,6 +11,7 @@ import (
type dependencyManager struct {
secrets exec.SecretsManager
configs exec.ConfigsManager
volumes exec.VolumesManager
}

// NewDependencyManager creates a dependency manager object that wraps
Expand All @@ -18,6 +20,7 @@ func NewDependencyManager() exec.DependencyManager {
return &dependencyManager{
secrets: secrets.NewManager(),
configs: configs.NewManager(),
volumes: csi.NewManager(),
}
}

Expand All @@ -29,9 +32,14 @@ func (d *dependencyManager) Configs() exec.ConfigsManager {
return d.configs
}

func (d *dependencyManager) Volumes() exec.VolumesManager {
return d.volumes
}

type dependencyGetter struct {
secrets exec.SecretGetter
configs exec.ConfigGetter
volumes exec.VolumeGetter
}

func (d *dependencyGetter) Secrets() exec.SecretGetter {
Expand All @@ -42,6 +50,10 @@ func (d *dependencyGetter) Configs() exec.ConfigGetter {
return d.configs
}

func (d *dependencyGetter) Volumes() exec.VolumeGetter {
return d.volumes
}

// Restrict provides getters that only allows access to the dependencies
// referenced by the task.
func Restrict(dependencies exec.DependencyManager, t *api.Task) exec.DependencyGetter {
Expand Down
22 changes: 22 additions & 0 deletions agent/exec/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type ConfigsProvider interface {
Configs() ConfigsManager
}

// VolumesProvider is implemented by objects that can store volumes,
// typically an executor.
type VolumesProvider interface {
Volumes() VolumesManager
}

// DependencyManager is a meta-object that can keep track of typed objects
// such as secrets and configs.
type DependencyManager interface {
Expand Down Expand Up @@ -80,3 +86,19 @@ type ConfigsManager interface {
Remove(configs []string) // remove the configs by ID
Reset() // remove all configs
}

// VolumeGetter contains volume data necessary for the Controller.
type VolumeGetter interface {
// Get returns the the volume with a specific volume ID, if available.
// When the volume is not available, the return will be nil.
Get(volumeID string) (*api.VolumeAssignment, error)
}

// VolumesManager is the interface for volume storage and updates.
type VolumesManager interface {
VolumeGetter

Add(VolumeAssignment ...api.VolumeAssignment) // add one or more volumes
Remove(volumes []string) // remove the volumes by ID
Reset() // remove all volumes
}
66 changes: 62 additions & 4 deletions agent/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@ type Worker interface {
// It is not safe to call any worker function after that.
Close()

// Assign assigns a complete set of tasks and configs/secrets to a
// Assign assigns a complete set of tasks and configs/secrets/volumes to a
// worker. Any items not included in this set will be removed.
Assign(ctx context.Context, assignments []*api.AssignmentChange) error

// Updates updates an incremental set of tasks or configs/secrets of
// Updates updates an incremental set of tasks or configs/secrets/volumes of
// the worker. Any items not included either in added or removed will
// remain untouched.
Update(ctx context.Context, assignments []*api.AssignmentChange) error

// PublishVolume updates an incremental set of CSI volumes of
// the worker. Any items not included either in added or removed will
// remain untouched.
// PublishVolume(ctx context.Context, assignments []*api.AssignmentChange) error

// Listen to updates about tasks controlled by the worker. When first
// called, the reporter will receive all updates for all tasks controlled
// by the worker.
Expand Down Expand Up @@ -152,7 +157,12 @@ func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange
return err
}

return reconcileTaskState(ctx, w, assignments, true)
err = reconcileTaskState(ctx, w, assignments, true)
if err != nil {
return err
}

return reconcileVolumes(ctx, w, assignments, true)
}

// Update updates the set of tasks, configs, and secrets for the worker.
Expand Down Expand Up @@ -184,7 +194,12 @@ func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange
return err
}

return reconcileTaskState(ctx, w, assignments, false)
err = reconcileTaskState(ctx, w, assignments, false)
if err != nil {
return err
}

return reconcileVolumes(ctx, w, assignments, false)
}

func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
Expand Down Expand Up @@ -409,6 +424,49 @@ func reconcileConfigs(ctx context.Context, w *worker, assignments []*api.Assignm
return nil
}

func reconcileVolumes(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
var (
updatedVolumes []api.VolumeAssignment
removedVolumes []string
)
for _, a := range assignments {
if r := a.Assignment.GetVolume(); r != nil {
switch a.Action {
case api.AssignmentChange_AssignmentActionUpdate:
updatedVolumes = append(updatedVolumes, *r)
case api.AssignmentChange_AssignmentActionRemove:
removedVolumes = append(removedVolumes, r.ID)
}

}
}

volumesProvider, ok := w.executor.(exec.VolumesProvider)
if !ok {
if len(updatedVolumes) != 0 || len(removedVolumes) != 0 {
log.G(ctx).Warn("volumes update ignored; executor does not support volumes")
}
return nil
}

volumes := volumesProvider.Volumes()

log.G(ctx).WithFields(logrus.Fields{
"len(updatedVolumes)": len(updatedVolumes),
"len(removedVolumes)": len(removedVolumes),
}).Debug("(*worker).reconcileVolumes")

// If this was a complete set of volumes, we're going to clear the volumes map and add all of them
if fullSnapshot {
volumes.Reset()
} else {
volumes.Remove(removedVolumes)
}
volumes.Add(updatedVolumes...)

return nil
}

func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
w.mu.Lock()
defer w.mu.Unlock()
Expand Down

0 comments on commit 6ed3b4d

Please sign in to comment.