Skip to content

Commit

Permalink
Added Per-pod eviction backoff and retry logic (#515)
Browse files Browse the repository at this point in the history
* added goroutine eviction logic

* Added goroutine eviction queue

* addressed comments
  • Loading branch information
njtran committed Jul 22, 2021
1 parent 311cec3 commit c0fd259
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 60 deletions.
32 changes: 18 additions & 14 deletions pkg/controllers/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,28 @@ import (

// Controller for the resource
type Controller struct {
terminator *Terminator
cloudProvider cloudprovider.CloudProvider
kubeClient client.Client
Terminator *Terminator
KubeClient client.Client
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
terminator: &Terminator{kubeClient: kubeClient, cloudProvider: cloudProvider, coreV1Client: coreV1Client},
cloudProvider: cloudProvider,
kubeClient: kubeClient,
KubeClient: kubeClient,
Terminator: &Terminator{
KubeClient: kubeClient,
CoreV1Client: coreV1Client,
CloudProvider: cloudProvider,
EvictionQueue: NewEvictionQueue(coreV1Client),
},
}
}

// Reconcile executes a termination control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
// 1. Retrieve node from reconcile request
node := &v1.Node{}
if err := c.kubeClient.Get(ctx, req.NamespacedName, node); err != nil {
if err := c.KubeClient.Get(ctx, req.NamespacedName, node); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
Expand All @@ -67,21 +70,22 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, nil
}
// 3. Cordon node
if err := c.terminator.cordon(ctx, node); err != nil {
if err := c.Terminator.cordon(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("cordoning node %s, %w", node.Name, err)
}
// 4. Drain node
drained, err := c.terminator.drain(ctx, node)
drained, err := c.Terminator.drain(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("draining node %s, %w", node.Name, err)
}
if !drained {
return reconcile.Result{Requeue: true}, nil
}
// 5. If fully drained, terminate the node
if drained {
if err := c.terminator.terminate(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("terminating node %s, %w", node.Name, err)
}
if err := c.Terminator.terminate(ctx, node); err != nil {
return reconcile.Result{}, fmt.Errorf("terminating node %s, %w", node.Name, err)
}
return reconcile.Result{Requeue: !drained}, nil
return reconcile.Result{}, nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
Expand Down
112 changes: 112 additions & 0 deletions pkg/controllers/termination/eviction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package termination

import (
"context"
"sync"
"time"

set "github.com/deckarep/golang-set"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
evictionQueueBaseDelay = 100 * time.Millisecond
evictionQueueMaxDelay = 10 * time.Second
)

type EvictionQueue struct {
workqueue.RateLimitingInterface
set.Set

coreV1Client corev1.CoreV1Interface
once sync.Once
}

func NewEvictionQueue(coreV1Client corev1.CoreV1Interface) *EvictionQueue {
return &EvictionQueue{
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(evictionQueueBaseDelay, evictionQueueMaxDelay)),
Set: set.NewSet(),

coreV1Client: coreV1Client,
}
}

// Add adds pods to the EvictionQueue
func (e *EvictionQueue) Add(pods []*v1.Pod) {
// Start processing eviction queue if it hasn't started already
e.once.Do(func() { go e.run() })

for _, pod := range pods {
if nn := client.ObjectKeyFromObject(pod); !e.Set.Contains(nn) {
e.Set.Add(nn)
e.RateLimitingInterface.Add(nn)
}
}
}

func (e *EvictionQueue) run() {
for {
// Get pod from queue. This waits until queue is non-empty.
item, shutdown := e.RateLimitingInterface.Get()
if shutdown {
break
}
nn := item.(types.NamespacedName)
// Evict pod
if e.evict(nn) {
zap.S().Debugf("Evicted pod %s", nn.String())
e.RateLimitingInterface.Forget(nn)
e.Set.Remove(nn)
e.RateLimitingInterface.Done(nn)
continue
}
e.RateLimitingInterface.Done(nn)
// Requeue pod if eviction failed
e.RateLimitingInterface.AddRateLimited(nn)
}
zap.S().Errorf("EvictionQueue is broken and has shutdown.")
}

// evict returns true if successful eviction call, error is returned if not eviction-related error
func (e *EvictionQueue) evict(nn types.NamespacedName) bool {
err := e.coreV1Client.Pods(nn.Namespace).Evict(context.Background(), &v1beta1.Eviction{
ObjectMeta: metav1.ObjectMeta{Name: nn.Name, Namespace: nn.Namespace},
})
if errors.IsInternalError(err) { // 500
zap.S().Debugf("Failed to evict pod %s due to PDB misconfiguration error.", nn.String())
return false
}
if errors.IsTooManyRequests(err) { // 429
zap.S().Debugf("Failed to evict pod %s due to PDB violation.", nn.String())
return false
}
if errors.IsNotFound(err) { // 404
return true
}
if err != nil {
return false
}
return true
}
126 changes: 104 additions & 22 deletions pkg/controllers/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package termination_test

import (
"context"
"fmt"
"testing"

"github.com/Pallinder/go-randomdata"
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/awslabs/karpenter/pkg/cloudprovider/fake"
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
Expand All @@ -30,6 +32,7 @@ import (
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"knative.dev/pkg/ptr"
)

func TestAPIs(t *testing.T) {
Expand All @@ -38,14 +41,22 @@ func TestAPIs(t *testing.T) {
}

var controller *termination.Controller
var evictionQueue *termination.EvictionQueue

var env = test.NewEnvironment(func(e *test.Environment) {
cloudProvider := &fake.CloudProvider{}
registry.RegisterOrDie(cloudProvider)
controller = termination.NewController(
e.Client,
corev1.NewForConfigOrDie(e.Config),
cloudProvider,
)
coreV1Client := corev1.NewForConfigOrDie(e.Config)
evictionQueue = termination.NewEvictionQueue(coreV1Client)
controller = &termination.Controller{
KubeClient: e.Client,
Terminator: &termination.Terminator{
KubeClient: e.Client,
CoreV1Client: coreV1Client,
CloudProvider: cloudProvider,
EvictionQueue: evictionQueue,
},
}
})

var _ = BeforeSuite(func() {
Expand Down Expand Up @@ -90,13 +101,16 @@ var _ = Describe("Termination", func() {
node = ExpectNodeExists(env.Client, node.Name)
ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node))

// Expect podToEvict to be evicting, and delete it
podEvict = ExpectPodExists(env.Client, podEvict.Name, podEvict.Namespace)
Expect(podEvict.GetObjectMeta().GetDeletionTimestamp().IsZero()).To(BeFalse())
// Expect podEvict to be enqueued for eviction
ExpectEvicting(evictionQueue, podEvict)

// Expect node to exist, but be cordoned
node = ExpectNodeExists(env.Client, node.Name)
Expect(node.Spec.Unschedulable).To(BeTrue())

// Expect podEvict to be evicting, and delete it
ExpectEvictingSucceeded(env.Client, podEvict)
ExpectDeleted(env.Client, podEvict)
// Expect podToSkip to not be evicting
podSkip = ExpectPodExists(env.Client, podSkip.Name, podSkip.Namespace)
Expect(podSkip.GetObjectMeta().GetDeletionTimestamp().IsZero()).To(BeTrue())

// Reconcile to delete node
node = ExpectNodeExists(env.Client, node.Name)
Expand All @@ -116,32 +130,100 @@ var _ = Describe("Termination", func() {
node = ExpectNodeExists(env.Client, node.Name)
ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node))

// Expect no pod to be enqueued for eviction
ExpectNotEvicting(evictionQueue, podEvict, podNoEvict)

// Expect node to exist, but be cordoned
node = ExpectNodeExists(env.Client, node.Name)
Expect(node.Spec.Unschedulable).To(Equal(true))

// Expect pods to not be evicting
podEvict = ExpectPodExists(env.Client, podEvict.Name, podEvict.Namespace)
Expect(podEvict.GetObjectMeta().GetDeletionTimestamp().IsZero()).To(BeTrue())
podNoEvict = ExpectPodExists(env.Client, podNoEvict.Name, podNoEvict.Namespace)
Expect(podNoEvict.GetObjectMeta().GetDeletionTimestamp().IsZero()).To(BeTrue())
Expect(node.Spec.Unschedulable).To(BeTrue())

// Delete do-not-evict pod
ExpectDeleted(env.Client, podNoEvict)

// Reconcile node to evict pod
node = ExpectNodeExists(env.Client, node.Name)
ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node))
pod := ExpectPodExists(env.Client, podEvict.Name, podEvict.Namespace)
Expect(pod.GetObjectMeta().GetDeletionTimestamp().IsZero()).To(BeFalse())

// Expect podEvict to be enqueued for eviction then be successful
ExpectEvicting(evictionQueue, podEvict)
ExpectEvictingSucceeded(env.Client, podEvict)

// Delete pod to simulate successful eviction
ExpectDeleted(env.Client, pod)
ExpectDeleted(env.Client, podEvict)

// Terminate Node
// Reconcile to delete node
node = ExpectNodeExists(env.Client, node.Name)
ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node))
ExpectNotFound(env.Client, node)
})
It("should fail to evict pods that violate a PDB", func() {
key, value := randomdata.SillyName(), randomdata.SillyName()
pdb := test.PodDisruptionBudget(test.PDBOptions{
Labels: map[string]string{key: value},
// Don't let any pod evict
MinAvailableNum: ptr.Int64(1),
})
podNoEvict := test.Pod(test.PodOptions{
NodeName: node.Name,
Labels: map[string]string{key: value},
})

ExpectCreated(env.Client, node, podNoEvict, pdb)

// Trigger Termination Controller
Expect(env.Client.Delete(ctx, node)).To(Succeed())
node = ExpectNodeExists(env.Client, node.Name)
ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node))

// Expect the pod to be enqueued for eviction
ExpectEvicting(evictionQueue, podNoEvict)

// Expect node to exist, but be cordoned
node = ExpectNodeExists(env.Client, node.Name)
Expect(node.Spec.Unschedulable).To(BeTrue())

// Expect podNoEvict to fail eviction due to PDB
ExpectEvictingFailed(env.Client, evictionQueue, podNoEvict)

// Delete pod to simulate successful eviction
ExpectDeleted(env.Client, podNoEvict)

// Reconcile to delete node
node = ExpectNodeExists(env.Client, node.Name)
ExpectReconcileSucceeded(controller, client.ObjectKeyFromObject(node))
ExpectNotFound(env.Client, node)
})
})
})

func ExpectEvicting(e *termination.EvictionQueue, pods ...*v1.Pod) {
for _, pod := range pods {
Expect(e.Contains(client.ObjectKeyFromObject(pod))).To(BeTrue())
}
}

func ExpectNotEvicting(e *termination.EvictionQueue, pods ...*v1.Pod) {
for _, pod := range pods {
Expect(e.Contains(client.ObjectKeyFromObject(pod))).To(BeFalse())
}
}

func ExpectEvictingSucceeded(c client.Client, pods ...*v1.Pod) {
for _, pod := range pods {
Eventually(func() bool {
return ExpectPodExists(c, pod.Name, pod.Namespace).GetDeletionTimestamp().IsZero()
}, ReconcilerPropagationTime, RequestInterval).Should(BeFalse(), func() string {
return fmt.Sprintf("expected %s/%s to be evicting, but it isn't", pod.Namespace, pod.Name)
})
}
}

func ExpectEvictingFailed(c client.Client, e *termination.EvictionQueue, pods ...*v1.Pod) {
for _, pod := range pods {
Eventually(func() bool {
return ExpectPodExists(c, pod.Name, pod.Namespace).GetDeletionTimestamp().IsZero() && e.NumRequeues(client.ObjectKeyFromObject(pod)) > 0
}, ReconcilerPropagationTime, RequestInterval).Should(BeTrue(), func() string {
return fmt.Sprintf("expected %s/%s to not be evicting, but it is", pod.Namespace, pod.Name)
})
}
}

0 comments on commit c0fd259

Please sign in to comment.