diff --git a/pkg/util/provider/drain/drain.go b/pkg/util/provider/drain/drain.go index d33b0c265..4face60b4 100644 --- a/pkg/util/provider/drain/drain.go +++ b/pkg/util/provider/drain/drain.go @@ -626,6 +626,14 @@ func (o *Options) evictPodsWithPv(attemptEvict bool, pods []*corev1.Pod, return } +// checkAndDeleteWorker is a helper method that check if volumeAttachmentHandler +// is supported and delete's the worker from the list of event handlers +func (o *Options) checkAndDeleteWorker(volumeAttachmentEventCh chan *storagev1.VolumeAttachment) { + if o.volumeAttachmentHandler != nil { + o.volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh) + } +} + func (o *Options) evictPodsWithPVInternal( attemptEvict bool, pods []*corev1.Pod, @@ -678,19 +686,23 @@ func (o *Options) evictPodsWithPVInternal( pdbErr := fmt.Errorf("error while evicting pod %q: pod disruption budget %s/%s is misconfigured and requires zero voluntary evictions", pod.Name, pdb.Namespace, pdb.Name) returnCh <- pdbErr + o.checkAndDeleteWorker(volumeAttachmentEventCh) continue } } retryPods = append(retryPods, pod) + o.checkAndDeleteWorker(volumeAttachmentEventCh) continue } else if apierrors.IsNotFound(err) { klog.V(3).Info("\t", pod.Name, " from node ", pod.Spec.NodeName, " is already gone") returnCh <- nil + o.checkAndDeleteWorker(volumeAttachmentEventCh) continue } else if err != nil { klog.V(4).Infof("Error when evicting pod: %v/%v from node %v. Will be retried. Err: %v", pod.Namespace, pod.Name, pod.Spec.NodeName, err) retryPods = append(retryPods, pod) + o.checkAndDeleteWorker(volumeAttachmentEventCh) continue } @@ -711,10 +723,12 @@ func (o *Options) evictPodsWithPVInternal( if apierrors.IsNotFound(err) { klog.V(3).Info("Node not found anymore") returnCh <- nil + o.checkAndDeleteWorker(volumeAttachmentEventCh) return append(retryPods, pods[i+1:]...), true } else if err != nil { klog.Errorf("Error when waiting for volume to detach from node. Err: %v", err) returnCh <- err + o.checkAndDeleteWorker(volumeAttachmentEventCh) continue } klog.V(4).Infof( @@ -735,14 +749,12 @@ func (o *Options) evictPodsWithPVInternal( } else { klog.Errorf("Error when waiting for volume reattachment. Err: %v", err) returnCh <- err + o.checkAndDeleteWorker(volumeAttachmentEventCh) continue } } - if o.volumeAttachmentHandler != nil { - o.volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh) - } - + o.checkAndDeleteWorker(volumeAttachmentEventCh) klog.V(3).Infof( "Pod + volume detachment from node %s + volume reattachment to another node for Pod %s/%s took %v", o.nodeName, diff --git a/pkg/util/provider/drain/volume_attachment.go b/pkg/util/provider/drain/volume_attachment.go index 5d01e43ad..43dd7dc6e 100644 --- a/pkg/util/provider/drain/volume_attachment.go +++ b/pkg/util/provider/drain/volume_attachment.go @@ -52,13 +52,19 @@ func (v *VolumeAttachmentHandler) dispatch(obj interface{}) { } klog.V(4).Infof("Dispatching request for PV %s", *volumeAttachment.Spec.Source.PersistentVolumeName) + defer klog.V(4).Infof("Done dispatching request for PV %s", *volumeAttachment.Spec.Source.PersistentVolumeName) v.Lock() defer v.Unlock() for i, worker := range v.workers { - klog.V(4).Infof("Dispatching request for PV %s to worker %d", *volumeAttachment.Spec.Source.PersistentVolumeName, i) - worker <- volumeAttachment + klog.V(4).Infof("Dispatching request for PV %s to worker %d/%v", *volumeAttachment.Spec.Source.PersistentVolumeName, i, worker) + + select { + case worker <- volumeAttachment: + default: + klog.Warningf("Worker %d/%v is full. Discarding value.", i, worker) + } } } @@ -78,9 +84,10 @@ func (v *VolumeAttachmentHandler) UpdateVolumeAttachment(oldObj, newObj interfac func (v *VolumeAttachmentHandler) AddWorker() chan *storagev1.VolumeAttachment { // chanSize is the channel buffer size to hold requests. // This assumes that not more than 20 unprocessed objects would exist at a given time. + // On bufferring requests beyond this the channel will start dropping writes const chanSize = 20 - klog.V(4).Infof("Adding new worker. Current active workers %d", len(v.workers)) + klog.V(4).Infof("Adding new worker. Current active workers %d - %v", len(v.workers), v.workers) v.Lock() defer v.Unlock() @@ -88,13 +95,13 @@ func (v *VolumeAttachmentHandler) AddWorker() chan *storagev1.VolumeAttachment { newWorker := make(chan *storagev1.VolumeAttachment, chanSize) v.workers = append(v.workers, newWorker) - klog.V(4).Infof("Successfully added new worker %v. Current active workers %d", newWorker, len(v.workers)) + klog.V(4).Infof("Successfully added new worker %v. Current active workers %d - %v", newWorker, len(v.workers), v.workers) return newWorker } // DeleteWorker is the method used to delete an existing worker func (v *VolumeAttachmentHandler) DeleteWorker(desiredWorker chan *storagev1.VolumeAttachment) { - klog.V(4).Infof("Deleting an existing worker %v. Current active workers %d", desiredWorker, len(v.workers)) + klog.V(4).Infof("Deleting an existing worker %v. Current active workers %d - %v", desiredWorker, len(v.workers), v.workers) v.Lock() defer v.Unlock() @@ -111,5 +118,5 @@ func (v *VolumeAttachmentHandler) DeleteWorker(desiredWorker chan *storagev1.Vol } v.workers = finalWorkers - klog.V(4).Infof("Successfully removed worker. Current active workers %d", len(v.workers)) + klog.V(4).Infof("Successfully removed worker. Current active workers %d - %v", len(v.workers), v.workers) }