Skip to content

Commit

Permalink
Inject mounter into volume plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
pmorie committed May 4, 2015
1 parent 72f9e94 commit b538642
Show file tree
Hide file tree
Showing 22 changed files with 109 additions and 87 deletions.
7 changes: 6 additions & 1 deletion cmd/kubelet/app/server.go
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"

"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
Expand Down Expand Up @@ -309,6 +310,7 @@ func (s *KubeletServer) Run(_ []string) error {
ResourceContainer: s.ResourceContainer,
CgroupRoot: s.CgroupRoot,
ContainerRuntime: s.ContainerRuntime,
Mounter: mount.New(),
}

RunKubelet(&kcfg, nil)
Expand Down Expand Up @@ -419,6 +421,7 @@ func SimpleKubelet(client *client.Client,
OSInterface: osInterface,
CgroupRoot: "",
ContainerRuntime: "docker",
Mounter: mount.New(),
}
return &kcfg
}
Expand Down Expand Up @@ -548,6 +551,7 @@ type KubeletConfig struct {
OSInterface kubecontainer.OSInterface
CgroupRoot string
ContainerRuntime string
Mounter mount.Interface
}

func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
Expand Down Expand Up @@ -594,7 +598,8 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod
kc.ResourceContainer,
kc.OSInterface,
kc.CgroupRoot,
kc.ContainerRuntime)
kc.ContainerRuntime,
kc.Mounter)

if err != nil {
return nil, nil, err
Expand Down
9 changes: 8 additions & 1 deletion pkg/kubelet/kubelet.go
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
utilErrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
Expand Down Expand Up @@ -121,7 +122,8 @@ func NewMainKubelet(
resourceContainer string,
osInterface kubecontainer.OSInterface,
cgroupRoot string,
containerRuntime string) (*Kubelet, error) {
containerRuntime string,
mounter mount.Interface) (*Kubelet, error) {
if rootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", rootDirectory)
}
Expand Down Expand Up @@ -234,6 +236,7 @@ func NewMainKubelet(
os: osInterface,
oomWatcher: oomWatcher,
cgroupRoot: cgroupRoot,
mounter: mounter,
}

if plug, err := network.InitNetworkPlugin(networkPlugins, networkPluginName, &networkHost{klet}); err != nil {
Expand Down Expand Up @@ -401,6 +404,7 @@ type Kubelet struct {
// status. Kubelet may fail to update node status reliablly if the value is too small,
// as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency time.Duration

// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
Expand All @@ -412,6 +416,9 @@ type Kubelet struct {

// If non-empty, pass this to the container runtime as the root cgroup.
cgroupRoot string

// Mounter to use for volumes.
mounter mount.Interface
}

// getRootDir returns the full path to the directory under which kubelet can
Expand Down
21 changes: 11 additions & 10 deletions pkg/kubelet/volumes.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
Expand Down Expand Up @@ -54,15 +55,15 @@ func (vh *volumeHost) GetKubeClient() client.Interface {
return vh.kubelet.kubeClient
}

func (vh *volumeHost) NewWrapperBuilder(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions) (volume.Builder, error) {
b, err := vh.kubelet.newVolumeBuilderFromPlugins(spec, podRef, opts)
func (vh *volumeHost) NewWrapperBuilder(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
b, err := vh.kubelet.newVolumeBuilderFromPlugins(spec, podRef, opts, mounter)
if err == nil && b == nil {
return nil, errUnsupportedVolumeType
}
return b, nil
}

func (vh *volumeHost) NewWrapperCleaner(spec *volume.Spec, podUID types.UID) (volume.Cleaner, error) {
func (vh *volumeHost) NewWrapperCleaner(spec *volume.Spec, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
plugin, err := vh.kubelet.volumePluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, err
Expand All @@ -71,14 +72,14 @@ func (vh *volumeHost) NewWrapperCleaner(spec *volume.Spec, podUID types.UID) (vo
// Not found but not an error
return nil, nil
}
c, err := plugin.NewCleaner(spec.Name, podUID)
c, err := plugin.NewCleaner(spec.Name, podUID, mounter)
if err == nil && c == nil {
return nil, errUnsupportedVolumeType
}
return c, nil
}

func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions) (volume.Builder, error) {
func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
plugin, err := kl.volumePluginMgr.FindPluginBySpec(spec)
if err != nil {
return nil, fmt.Errorf("can't use volume plugins for %s: %v", spew.Sprintf("%#v", *spec), err)
Expand All @@ -87,7 +88,7 @@ func (kl *Kubelet) newVolumeBuilderFromPlugins(spec *volume.Spec, podRef *api.Ob
// Not found but not an error
return nil, nil
}
builder, err := plugin.NewBuilder(spec, podRef, opts)
builder, err := plugin.NewBuilder(spec, podRef, opts, mounter)
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume plugin for %s: %v", spew.Sprintf("%#v", *spec), err)
}
Expand All @@ -113,7 +114,7 @@ func (kl *Kubelet) mountExternalVolumes(pod *api.Pod) (volumeMap, error) {

// Try to use a plugin for this volume.
internal := volume.NewSpecFromVolume(volSpec)
builder, err := kl.newVolumeBuilderFromPlugins(internal, podRef, volume.VolumeOptions{rootContext})
builder, err := kl.newVolumeBuilderFromPlugins(internal, podRef, volume.VolumeOptions{rootContext}, kl.mounter)
if err != nil {
glog.Errorf("Could not create volume builder for pod %s: %v", pod.UID, err)
return nil, err
Expand Down Expand Up @@ -164,7 +165,7 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner {
// or volume objects.

// Try to use a plugin for this volume.
cleaner, err := kl.newVolumeCleanerFromPlugins(volumeKind, volumeName, podUID)
cleaner, err := kl.newVolumeCleanerFromPlugins(volumeKind, volumeName, podUID, kl.mounter)
if err != nil {
glog.Errorf("Could not create volume cleaner for %s: %v", volumeNameDir.Name(), err)
continue
Expand All @@ -180,7 +181,7 @@ func (kl *Kubelet) getPodVolumesFromDisk() map[string]volume.Cleaner {
return currentVolumes
}

func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID) (volume.Cleaner, error) {
func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
plugName := util.UnescapeQualifiedNameForDisk(kind)
plugin, err := kl.volumePluginMgr.FindPluginByName(plugName)
if err != nil {
Expand All @@ -191,7 +192,7 @@ func (kl *Kubelet) newVolumeCleanerFromPlugins(kind string, name string, podUID
// Not found but not an error.
return nil, nil
}
cleaner, err := plugin.NewCleaner(name, podUID)
cleaner, err := plugin.NewCleaner(name, podUID, mounter)
if err != nil {
return nil, fmt.Errorf("failed to instantiate volume plugin for %s/%s: %v", podUID, kind, err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/volume/aws_ebs/aws_ebs.go
Expand Up @@ -68,9 +68,9 @@ func (plugin *awsElasticBlockStorePlugin) GetAccessModes() []api.AccessModeType
}
}

func (plugin *awsElasticBlockStorePlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectReference, _ volume.VolumeOptions) (volume.Builder, error) {
func (plugin *awsElasticBlockStorePlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectReference, _ volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
// Inject real implementations here, test through the internal function.
return plugin.newBuilderInternal(spec, podRef.UID, &AWSDiskUtil{}, mount.New())
return plugin.newBuilderInternal(spec, podRef.UID, &AWSDiskUtil{}, mounter)
}

func (plugin *awsElasticBlockStorePlugin) newBuilderInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Builder, error) {
Expand Down Expand Up @@ -103,9 +103,9 @@ func (plugin *awsElasticBlockStorePlugin) newBuilderInternal(spec *volume.Spec,
}, nil
}

func (plugin *awsElasticBlockStorePlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) {
func (plugin *awsElasticBlockStorePlugin) NewCleaner(volName string, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
// Inject real implementations here, test through the internal function.
return plugin.newCleanerInternal(volName, podUID, &AWSDiskUtil{}, mount.New())
return plugin.newCleanerInternal(volName, podUID, &AWSDiskUtil{}, mounter)
}

func (plugin *awsElasticBlockStorePlugin) newCleanerInternal(volName string, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Cleaner, error) {
Expand Down
20 changes: 6 additions & 14 deletions pkg/volume/empty_dir/empty_dir.go
Expand Up @@ -30,21 +30,14 @@ import (

// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return ProbeVolumePluginsWithMounter(mount.New())
}

// ProbePluginsWithMounter is a convenience for testing other plugins which wrap this one.
//FIXME: alternative: pass mount.Interface to all ProbeVolumePlugins() functions? Opinions?
func ProbeVolumePluginsWithMounter(mounter mount.Interface) []volume.VolumePlugin {
return []volume.VolumePlugin{
&emptyDirPlugin{nil, mounter, false},
&emptyDirPlugin{nil, mounter, true},
&emptyDirPlugin{nil, false},
&emptyDirPlugin{nil, true},
}
}

type emptyDirPlugin struct {
host volume.VolumeHost
mounter mount.Interface
legacyMode bool // if set, plugin answers to the legacy name
}

Expand Down Expand Up @@ -78,9 +71,8 @@ func (plugin *emptyDirPlugin) CanSupport(spec *volume.Spec) bool {
return false
}

func (plugin *emptyDirPlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions) (volume.Builder, error) {
// Inject real implementations here, test through the internal function.
return plugin.newBuilderInternal(spec, podRef, plugin.mounter, &realMountDetector{plugin.mounter}, opts)
func (plugin *emptyDirPlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
return plugin.newBuilderInternal(spec, podRef, mounter, &realMountDetector{mounter}, opts)
}

func (plugin *emptyDirPlugin) newBuilderInternal(spec *volume.Spec, podRef *api.ObjectReference, mounter mount.Interface, mountDetector mountDetector, opts volume.VolumeOptions) (volume.Builder, error) {
Expand All @@ -104,9 +96,9 @@ func (plugin *emptyDirPlugin) newBuilderInternal(spec *volume.Spec, podRef *api.
}, nil
}

func (plugin *emptyDirPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) {
func (plugin *emptyDirPlugin) NewCleaner(volName string, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
// Inject real implementations here, test through the internal function.
return plugin.newCleanerInternal(volName, podUID, plugin.mounter, &realMountDetector{plugin.mounter})
return plugin.newCleanerInternal(volName, podUID, mounter, &realMountDetector{mounter})
}

func (plugin *emptyDirPlugin) newCleanerInternal(volName string, podUID types.UID, mounter mount.Interface, mountDetector mountDetector) (volume.Cleaner, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/volume/empty_dir/empty_dir_linux.go
@@ -1,3 +1,5 @@
// +build linux

/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Expand All @@ -21,6 +23,7 @@ import (
"syscall"

"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/golang/glog"
)

// Defined by Linux - the type number for tmpfs mounts.
Expand All @@ -32,6 +35,7 @@ type realMountDetector struct {
}

func (m *realMountDetector) GetMountMedium(path string) (storageMedium, bool, error) {
glog.V(5).Infof("Determining mount medium of %v", path)
isMnt, err := m.mounter.IsMountPoint(path)
if err != nil {
return 0, false, fmt.Errorf("IsMountPoint(%q): %v", path, err)
Expand All @@ -40,6 +44,8 @@ func (m *realMountDetector) GetMountMedium(path string) (storageMedium, bool, er
if err := syscall.Statfs(path, &buf); err != nil {
return 0, false, fmt.Errorf("statfs(%q): %v", path, err)
}

glog.V(5).Info("Statfs_t of %v: %+v", path, buf)
if buf.Type == linuxTmpfsMagic {
return mediumMemory, isMnt, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/volume/empty_dir/empty_dir_test.go
Expand Up @@ -197,7 +197,7 @@ func TestPluginBackCompat(t *testing.T) {
spec := &api.Volume{
Name: "vol1",
}
builder, err := plug.NewBuilder(volume.NewSpecFromVolume(spec), &api.ObjectReference{UID: types.UID("poduid")}, volume.VolumeOptions{""})
builder, err := plug.NewBuilder(volume.NewSpecFromVolume(spec), &api.ObjectReference{UID: types.UID("poduid")}, volume.VolumeOptions{""}, nil)
if err != nil {
t.Errorf("Failed to make a new Builder: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/volume/gce_pd/gce_pd.go
Expand Up @@ -75,9 +75,9 @@ func (plugin *gcePersistentDiskPlugin) GetAccessModes() []api.AccessModeType {
}
}

func (plugin *gcePersistentDiskPlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectReference, _ volume.VolumeOptions) (volume.Builder, error) {
func (plugin *gcePersistentDiskPlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectReference, _ volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
// Inject real implementations here, test through the internal function.
return plugin.newBuilderInternal(spec, podRef.UID, &GCEDiskUtil{}, mount.New())
return plugin.newBuilderInternal(spec, podRef.UID, &GCEDiskUtil{}, mounter)
}

func (plugin *gcePersistentDiskPlugin) newBuilderInternal(spec *volume.Spec, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Builder, error) {
Expand Down Expand Up @@ -116,9 +116,9 @@ func (plugin *gcePersistentDiskPlugin) newBuilderInternal(spec *volume.Spec, pod
}, nil
}

func (plugin *gcePersistentDiskPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) {
func (plugin *gcePersistentDiskPlugin) NewCleaner(volName string, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
// Inject real implementations here, test through the internal function.
return plugin.newCleanerInternal(volName, podUID, &GCEDiskUtil{}, mount.New())
return plugin.newCleanerInternal(volName, podUID, &GCEDiskUtil{}, mounter)
}

func (plugin *gcePersistentDiskPlugin) newCleanerInternal(volName string, podUID types.UID, manager pdManager, mounter mount.Interface) (volume.Cleaner, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/volume/gce_pd/gce_pd_test.go
Expand Up @@ -186,11 +186,11 @@ func TestPluginLegacy(t *testing.T) {
}

spec := &api.Volume{VolumeSource: api.VolumeSource{GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{}}}
if _, err := plug.NewBuilder(volume.NewSpecFromVolume(spec), &api.ObjectReference{UID: types.UID("poduid")}, volume.VolumeOptions{""}); err == nil {
if _, err := plug.NewBuilder(volume.NewSpecFromVolume(spec), &api.ObjectReference{UID: types.UID("poduid")}, volume.VolumeOptions{""}, nil); err == nil {
t.Errorf("Expected failiure")
}

cleaner, err := plug.NewCleaner("vol1", types.UID("poduid"))
cleaner, err := plug.NewCleaner("vol1", types.UID("poduid"), nil)
if err != nil {
t.Errorf("Failed to make a new Cleaner: %v", err)
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/volume/git_repo/git_repo.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
volumeutil "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/util"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func (plugin *gitRepoPlugin) CanSupport(spec *volume.Spec) bool {
return spec.VolumeSource.GitRepo != nil
}

func (plugin *gitRepoPlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions) (volume.Builder, error) {
func (plugin *gitRepoPlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectReference, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
if plugin.legacyMode {
// Legacy mode instances can be cleaned up but not created anew.
return nil, fmt.Errorf("legacy mode: can not create new instances")
Expand All @@ -80,10 +81,11 @@ func (plugin *gitRepoPlugin) NewBuilder(spec *volume.Spec, podRef *api.ObjectRef
plugin: plugin,
legacyMode: false,
opts: opts,
mounter: mounter,
}, nil
}

func (plugin *gitRepoPlugin) NewCleaner(volName string, podUID types.UID) (volume.Cleaner, error) {
func (plugin *gitRepoPlugin) NewCleaner(volName string, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
legacy := false
if plugin.legacyMode {
legacy = true
Expand All @@ -93,6 +95,7 @@ func (plugin *gitRepoPlugin) NewCleaner(volName string, podUID types.UID) (volum
volName: volName,
plugin: plugin,
legacyMode: legacy,
mounter: mounter,
}, nil
}

Expand All @@ -107,6 +110,7 @@ type gitRepo struct {
plugin *gitRepoPlugin
legacyMode bool
opts volume.VolumeOptions
mounter mount.Interface
}

// SetUp creates new directory and clones a git repo.
Expand All @@ -130,7 +134,7 @@ func (gr *gitRepo) SetUpAt(dir string) error {
}

// Wrap EmptyDir, let it do the setup.
wrapped, err := gr.plugin.host.NewWrapperBuilder(wrappedVolumeSpec, &gr.podRef, gr.opts)
wrapped, err := gr.plugin.host.NewWrapperBuilder(wrappedVolumeSpec, &gr.podRef, gr.opts, gr.mounter)
if err != nil {
return err
}
Expand Down Expand Up @@ -193,7 +197,7 @@ func (gr *gitRepo) TearDown() error {
// TearDownAt simply deletes everything in the directory.
func (gr *gitRepo) TearDownAt(dir string) error {
// Wrap EmptyDir, let it do the teardown.
wrapped, err := gr.plugin.host.NewWrapperCleaner(wrappedVolumeSpec, gr.podRef.UID)
wrapped, err := gr.plugin.host.NewWrapperCleaner(wrappedVolumeSpec, gr.podRef.UID, gr.mounter)
if err != nil {
return err
}
Expand Down

0 comments on commit b538642

Please sign in to comment.