Skip to content

Commit

Permalink
Avoids blocking of drain call
Browse files Browse the repository at this point in the history
- There was a deadlock condition where the buffer is full and no more requests could be processed. This change fixes it.
- It also makes sure to deleteWorker in failure/negative cases
  • Loading branch information
prashanth26 committed Jun 17, 2021
1 parent 98c295c commit 16e66de
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 10 deletions.
20 changes: 16 additions & 4 deletions pkg/util/provider/drain/drain.go
Expand Up @@ -626,6 +626,14 @@ func (o *Options) evictPodsWithPv(attemptEvict bool, pods []*corev1.Pod,
return
}

// checkAndDeleteWorker is a helper method that check if volumeAttachmentHandler
// is supported and delete's the worker from the list of event handlers
func (o *Options) checkAndDeleteWorker(volumeAttachmentEventCh chan *storagev1.VolumeAttachment) {
if o.volumeAttachmentHandler != nil {
o.volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)
}
}

func (o *Options) evictPodsWithPVInternal(
attemptEvict bool,
pods []*corev1.Pod,
Expand Down Expand Up @@ -678,19 +686,23 @@ func (o *Options) evictPodsWithPVInternal(
pdbErr := fmt.Errorf("error while evicting pod %q: pod disruption budget %s/%s is misconfigured and requires zero voluntary evictions",
pod.Name, pdb.Namespace, pdb.Name)
returnCh <- pdbErr
o.checkAndDeleteWorker(volumeAttachmentEventCh)
continue
}
}

retryPods = append(retryPods, pod)
o.checkAndDeleteWorker(volumeAttachmentEventCh)
continue
} else if apierrors.IsNotFound(err) {
klog.V(3).Info("\t", pod.Name, " from node ", pod.Spec.NodeName, " is already gone")
returnCh <- nil
o.checkAndDeleteWorker(volumeAttachmentEventCh)
continue
} else if err != nil {
klog.V(4).Infof("Error when evicting pod: %v/%v from node %v. Will be retried. Err: %v", pod.Namespace, pod.Name, pod.Spec.NodeName, err)
retryPods = append(retryPods, pod)
o.checkAndDeleteWorker(volumeAttachmentEventCh)
continue
}

Expand All @@ -711,10 +723,12 @@ func (o *Options) evictPodsWithPVInternal(
if apierrors.IsNotFound(err) {
klog.V(3).Info("Node not found anymore")
returnCh <- nil
o.checkAndDeleteWorker(volumeAttachmentEventCh)
return append(retryPods, pods[i+1:]...), true
} else if err != nil {
klog.Errorf("Error when waiting for volume to detach from node. Err: %v", err)
returnCh <- err
o.checkAndDeleteWorker(volumeAttachmentEventCh)
continue
}
klog.V(4).Infof(
Expand All @@ -735,14 +749,12 @@ func (o *Options) evictPodsWithPVInternal(
} else {
klog.Errorf("Error when waiting for volume reattachment. Err: %v", err)
returnCh <- err
o.checkAndDeleteWorker(volumeAttachmentEventCh)
continue
}
}

if o.volumeAttachmentHandler != nil {
o.volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)
}

o.checkAndDeleteWorker(volumeAttachmentEventCh)
klog.V(3).Infof(
"Pod + volume detachment from node %s + volume reattachment to another node for Pod %s/%s took %v",
o.nodeName,
Expand Down
19 changes: 13 additions & 6 deletions pkg/util/provider/drain/volume_attachment.go
Expand Up @@ -52,13 +52,19 @@ func (v *VolumeAttachmentHandler) dispatch(obj interface{}) {
}

klog.V(4).Infof("Dispatching request for PV %s", *volumeAttachment.Spec.Source.PersistentVolumeName)
defer klog.V(4).Infof("Done dispatching request for PV %s", *volumeAttachment.Spec.Source.PersistentVolumeName)

v.Lock()
defer v.Unlock()

for i, worker := range v.workers {
klog.V(4).Infof("Dispatching request for PV %s to worker %d", *volumeAttachment.Spec.Source.PersistentVolumeName, i)
worker <- volumeAttachment
klog.V(4).Infof("Dispatching request for PV %s to worker %d/%v", *volumeAttachment.Spec.Source.PersistentVolumeName, i, worker)

select {
case worker <- volumeAttachment:
default:
klog.Warningf("Worker %d/%v is full. Discarding value.", i, worker)
}
}
}

Expand All @@ -78,23 +84,24 @@ func (v *VolumeAttachmentHandler) UpdateVolumeAttachment(oldObj, newObj interfac
func (v *VolumeAttachmentHandler) AddWorker() chan *storagev1.VolumeAttachment {
// chanSize is the channel buffer size to hold requests.
// This assumes that not more than 20 unprocessed objects would exist at a given time.
// On bufferring requests beyond this the channel will start dropping writes
const chanSize = 20

klog.V(4).Infof("Adding new worker. Current active workers %d", len(v.workers))
klog.V(4).Infof("Adding new worker. Current active workers %d - %v", len(v.workers), v.workers)

v.Lock()
defer v.Unlock()

newWorker := make(chan *storagev1.VolumeAttachment, chanSize)
v.workers = append(v.workers, newWorker)

klog.V(4).Infof("Successfully added new worker %v. Current active workers %d", newWorker, len(v.workers))
klog.V(4).Infof("Successfully added new worker %v. Current active workers %d - %v", newWorker, len(v.workers), v.workers)
return newWorker
}

// DeleteWorker is the method used to delete an existing worker
func (v *VolumeAttachmentHandler) DeleteWorker(desiredWorker chan *storagev1.VolumeAttachment) {
klog.V(4).Infof("Deleting an existing worker %v. Current active workers %d", desiredWorker, len(v.workers))
klog.V(4).Infof("Deleting an existing worker %v. Current active workers %d - %v", desiredWorker, len(v.workers), v.workers)

v.Lock()
defer v.Unlock()
Expand All @@ -111,5 +118,5 @@ func (v *VolumeAttachmentHandler) DeleteWorker(desiredWorker chan *storagev1.Vol
}

v.workers = finalWorkers
klog.V(4).Infof("Successfully removed worker. Current active workers %d", len(v.workers))
klog.V(4).Infof("Successfully removed worker. Current active workers %d - %v", len(v.workers), v.workers)
}

0 comments on commit 16e66de

Please sign in to comment.