Skip to content

Commit

Permalink
UPSTREAM: 39732: Fix issue kubernetes#34242: Attach/detach should rec…
Browse files Browse the repository at this point in the history
…over from a crash

:100644 100644 d3de5fdf98... 01658bd9b3... M	pkg/controller/volume/attachdetach/BUILD
:100644 100644 01d2adc016... 66cac888ca... M	pkg/controller/volume/attachdetach/attach_detach_controller.go
:100644 100644 4a7a8ebfd2... a1a2266d65... M	pkg/controller/volume/attachdetach/attach_detach_controller_test.go
:100644 100644 5387bec0d9... db40529822... M	pkg/controller/volume/attachdetach/cache/actual_state_of_world.go
:100644 100644 86f0461493... fa19728b33... M	pkg/controller/volume/attachdetach/cache/actual_state_of_world_test.go
:100644 100644 505e11e071... 08ce7effc1... M	pkg/controller/volume/attachdetach/reconciler/reconciler.go
:100644 100644 7911072557... baf67d9ca7... M	pkg/controller/volume/attachdetach/reconciler/reconciler_test.go
:100644 100644 b484cfa8ce... 2b954e6b79... M	pkg/controller/volume/attachdetach/testing/testvolumespec.go
:100644 100644 b78c76d2f9... 89b29be2a5... M	pkg/volume/plugins.go
:100644 100644 8e28405786... f8ae260244... M	pkg/volume/util/operationexecutor/operation_executor.go
:100644 100644 f1aff52c81... f6a9eb092b... M	pkg/volume/util/operationexecutor/operation_generator.go
:100644 100644 c55c8db60e... d4dd45dfe3... M	pkg/volume/util/volumehelper/volumehelper.go
  • Loading branch information
wongma7 authored and deads2k committed Jun 7, 2017
1 parent 4573ea8 commit a154733
Show file tree
Hide file tree
Showing 12 changed files with 843 additions and 128 deletions.
6 changes: 6 additions & 0 deletions pkg/controller/volume/attachdetach/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/volume/util/operationexecutor:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
Expand All @@ -45,9 +46,14 @@ go_test(
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/volume/attachdetach/cache:go_default_library",
"//pkg/controller/volume/attachdetach/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)

Expand Down
156 changes: 143 additions & 13 deletions pkg/controller/volume/attachdetach/attach_detach_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/golang/glog"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -101,27 +102,29 @@ func NewAttachDetachController(
// dropped pods so they are continuously processed until it is accepted or
// deleted (probably can't do this with sharedInformer), etc.
adc := &attachDetachController{
kubeClient: kubeClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
cloud: cloud,
kubeClient: kubeClient,
pvcLister: pvcInformer.Lister(),
pvcsSynced: pvcInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
pvsSynced: pvInformer.Informer().HasSynced,
podLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
cloud: cloud,
}

podInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.podAdd,
UpdateFunc: adc.podUpdate,
DeleteFunc: adc.podDelete,
})
adc.podsSynced = podInformer.Informer().HasSynced

nodeInformer.Informer().AddEventHandler(kcache.ResourceEventHandlerFuncs{
AddFunc: adc.nodeAdd,
UpdateFunc: adc.nodeUpdate,
DeleteFunc: adc.nodeDelete,
})
adc.nodesSynced = nodeInformer.Informer().HasSynced

if err := adc.volumePluginMgr.InitPlugins(plugins, adc); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for Attach/Detach Controller: %+v", err)
Expand Down Expand Up @@ -183,7 +186,10 @@ type attachDetachController struct {
pvLister corelisters.PersistentVolumeLister
pvsSynced kcache.InformerSynced

podsSynced kcache.InformerSynced
podLister corelisters.PodLister
podsSynced kcache.InformerSynced

nodeLister corelisters.NodeLister
nodesSynced kcache.InformerSynced

// cloud provider used by volume host
Expand Down Expand Up @@ -237,13 +243,137 @@ func (adc *attachDetachController) Run(stopCh <-chan struct{}) {
return
}

err := adc.populateActualStateOfWorld()
if err != nil {
glog.Errorf("Error populating the actual state of world: %v", err)
}
err = adc.populateDesiredStateOfWorld()
if err != nil {
glog.Errorf("Error populating the desired state of world: %v", err)
}
go adc.reconciler.Run(stopCh)
go adc.desiredStateOfWorldPopulator.Run(stopCh)

<-stopCh
glog.Infof("Shutting down Attach Detach Controller")
}

func (adc *attachDetachController) populateActualStateOfWorld() error {
glog.V(5).Infof("Populating ActualStateOfworld")
nodes, err := adc.nodeLister.List(labels.Everything())
if err != nil {
return err
}

for _, node := range nodes {
nodeName := types.NodeName(node.Name)
for _, attachedVolume := range node.Status.VolumesAttached {
uniqueName := attachedVolume.Name
// The nil VolumeSpec is safe only in the case the volume is not in use by any pod.
// In such a case it should be detached in the first reconciliation cycle and the
// volume spec is not needed to detach a volume. If the volume is used by a pod, it
// its spec can be: this would happen during in the populateDesiredStateOfWorld which
// scans the pods and updates their volumes in the ActualStateOfWorld too.
err = adc.actualStateOfWorld.MarkVolumeAsAttached(uniqueName, nil /* VolumeSpec */, nodeName, attachedVolume.DevicePath)
if err != nil {
glog.Errorf("Failed to mark the volume as attached: %v", err)
continue
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, true /* forceUnmount */)
if _, exists := node.Annotations[volumehelper.ControllerManagedAttachAnnotation]; exists {
// Node specifies annotation indicating it should be managed by
// attach detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(types.NodeName(node.Name)) // Needed for DesiredStateOfWorld population
}
}
}
return nil
}

func (adc *attachDetachController) getNodeVolumeDevicePath(
volumeName v1.UniqueVolumeName, nodeName types.NodeName) (string, error) {
var devicePath string
var found bool
node, err := adc.nodeLister.Get(string(nodeName))
if err != nil {
return devicePath, err
}
for _, attachedVolume := range node.Status.VolumesAttached {
if volumeName == attachedVolume.Name {
devicePath = attachedVolume.DevicePath
found = true
break
}
}
if !found {
err = fmt.Errorf("Volume %s not found on node %s", volumeName, nodeName)
}

return devicePath, err
}

func (adc *attachDetachController) populateDesiredStateOfWorld() error {
glog.V(5).Infof("Populating DesiredStateOfworld")

pods, err := adc.podLister.List(labels.Everything())
if err != nil {
return err
}
for _, pod := range pods {
podToAdd := pod
adc.podAdd(&podToAdd)
for _, podVolume := range podToAdd.Spec.Volumes {
// The volume specs present in the ActualStateOfWorld are nil, let's replace those
// with the correct ones found on pods. The present in the ASW with no corresponding
// pod will be detached and the spec is irrelevant.
volumeSpec, err := util.CreateVolumeSpec(podVolume, podToAdd.Namespace, adc.pvcLister, adc.pvLister)
if err != nil {
glog.Errorf(
"Error creating spec for volume %q, pod %q/%q: %v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
nodeName := types.NodeName(podToAdd.Spec.NodeName)
plugin, err := adc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
if err != nil || plugin == nil {
glog.V(10).Infof(
"Skipping volume %q for pod %q/%q: it does not implement attacher interface. err=%v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
volumeName, err := volumehelper.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
if err != nil {
glog.Errorf(
"Failed to find unique name for volume %q, pod %q/%q: %v",
podVolume.Name,
podToAdd.Namespace,
podToAdd.Name,
err)
continue
}
if adc.actualStateOfWorld.VolumeNodeExists(volumeName, nodeName) {
devicePath, err := adc.getNodeVolumeDevicePath(volumeName, nodeName)
if err != nil {
glog.Errorf("Failed to find device path: %v", err)
continue
}
err = adc.actualStateOfWorld.MarkVolumeAsAttached(volumeName, volumeSpec, nodeName, devicePath)
if err != nil {
glog.Errorf("Failed to update volume spec for node %s: %v", nodeName, err)
}
}
}
}

return nil
}

func (adc *attachDetachController) podAdd(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if pod == nil || !ok {
Expand Down Expand Up @@ -307,7 +437,7 @@ func (adc *attachDetachController) nodeUpdate(oldObj, newObj interface{}) {
// detach controller. Add it to desired state of world.
adc.desiredStateOfWorld.AddNode(nodeName)
}
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */)
}

func (adc *attachDetachController) nodeDelete(obj interface{}) {
Expand All @@ -321,15 +451,15 @@ func (adc *attachDetachController) nodeDelete(obj interface{}) {
glog.V(10).Infof("%v", err)
}

adc.processVolumesInUse(nodeName, node.Status.VolumesInUse)
adc.processVolumesInUse(nodeName, node.Status.VolumesInUse, false /* forceUnmount */)
}

// processVolumesInUse processes the list of volumes marked as "in-use"
// according to the specified Node's Status.VolumesInUse and updates the
// corresponding volume in the actual state of the world to indicate that it is
// mounted.
func (adc *attachDetachController) processVolumesInUse(
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName) {
nodeName types.NodeName, volumesInUse []v1.UniqueVolumeName, forceUnmount bool) {
glog.V(4).Infof("processVolumesInUse for node %q", nodeName)
for _, attachedVolume := range adc.actualStateOfWorld.GetAttachedVolumesForNode(nodeName) {
mounted := false
Expand All @@ -340,7 +470,7 @@ func (adc *attachDetachController) processVolumesInUse(
}
}
err := adc.actualStateOfWorld.SetVolumeMountedByNode(
attachedVolume.VolumeName, nodeName, mounted)
attachedVolume.VolumeName, nodeName, mounted, forceUnmount)
if err != nil {
glog.Warningf(
"SetVolumeMountedByNode(%q, %q, %q) returned an error: %v",
Expand Down

0 comments on commit a154733

Please sign in to comment.