Skip to content

Commit

Permalink
Readjusted reallocation controller for nodes that fail to come up (#534)
Browse files Browse the repository at this point in the history
* Reallocation deletes failed to become ready nodes and does not TTL NotReady nodes

* added robust failedToJoin logic and tests

* rebasing

* addressed comments
  • Loading branch information
njtran committed Jul 29, 2021
1 parent 9e718d3 commit 0abf03e
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 58 deletions.
2 changes: 1 addition & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {
if err := manager.RegisterControllers(ctx,
expiration.NewController(manager.GetClient()),
allocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider),
reallocation.NewController(manager.GetClient(), clientSet.CoreV1(), cloudProvider),
reallocation.NewController(manager.GetClient(), cloudProvider),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), cloudProvider),
).Start(ctx); err != nil {
panic(fmt.Sprintf("Unable to start manager, %s", err.Error()))
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/awslabs/karpenter
go 1.16

require (
bou.ke/monkey v1.0.2
github.com/Pallinder/go-randomdata v1.2.0
github.com/avast/retry-go v2.7.0+incompatible
github.com/aws/aws-sdk-go v1.38.69
Expand All @@ -14,7 +15,7 @@ require (
github.com/onsi/gomega v1.13.0
github.com/patrickmn/go-cache v2.1.0+incompatible
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.18.1
go.uber.org/zap v1.18.1 // indirect
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
k8s.io/api v0.20.7
k8s.io/apimachinery v0.20.7
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
bou.ke/monkey v1.0.2 h1:kWcnsrCNUatbxncxR/ThdYqbytgOIArtYWqcQLQzKLI=
bou.ke/monkey v1.0.2/go.mod h1:OqickVX3tNx6t33n1xvtTtu85YN5s6cKwVug+oHMaIA=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down
36 changes: 21 additions & 15 deletions pkg/controllers/reallocation/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"knative.dev/pkg/logging"

"k8s.io/apimachinery/pkg/api/errors"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/workqueue"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -36,17 +35,17 @@ import (

// Controller for the resource
type Controller struct {
utilization *Utilization
cloudProvider cloudprovider.CloudProvider
kubeClient client.Client
Utilization *Utilization
CloudProvider cloudprovider.CloudProvider
KubeClient client.Client
}

// NewController constructs a controller instance
func NewController(kubeClient client.Client, coreV1Client corev1.CoreV1Interface, cloudProvider cloudprovider.CloudProvider) *Controller {
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
return &Controller{
utilization: &Utilization{kubeClient: kubeClient},
cloudProvider: cloudProvider,
kubeClient: kubeClient,
Utilization: &Utilization{KubeClient: kubeClient},
CloudProvider: cloudProvider,
KubeClient: kubeClient,
}
}

Expand All @@ -56,31 +55,38 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco

// 1. Retrieve provisioner from reconcile request
provisioner := &v1alpha3.Provisioner{}
if err := c.kubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil {
if err := c.KubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

// 2. Delete any node that has been unable to join.
if err := c.Utilization.terminateFailedToJoin(ctx, provisioner); err != nil {
return reconcile.Result{}, fmt.Errorf("terminating nodes that failed to join, %w", err)
}

// Skip reconciliation if utilization ttl is not defined.
if provisioner.Spec.TTLSecondsAfterEmpty == nil {
return reconcile.Result{}, nil
}
// 2. Set TTL on TTLable Nodes
if err := c.utilization.markUnderutilized(ctx, provisioner); err != nil {

// 3. Set TTL on TTLable Nodes
if err := c.Utilization.markUnderutilized(ctx, provisioner); err != nil {
return reconcile.Result{}, fmt.Errorf("adding ttl and underutilized label, %w", err)
}

// 3. Remove TTL from Utilized Nodes
if err := c.utilization.clearUnderutilized(ctx, provisioner); err != nil {
// 4. Remove TTL from Utilized Nodes
if err := c.Utilization.clearUnderutilized(ctx, provisioner); err != nil {
return reconcile.Result{}, fmt.Errorf("removing ttl from node, %w", err)
}

// 4. Delete any node past its TTL
if err := c.utilization.terminateExpired(ctx, provisioner); err != nil {
// 5. Delete any node past its TTL
if err := c.Utilization.terminateExpired(ctx, provisioner); err != nil {
return reconcile.Result{}, fmt.Errorf("marking nodes terminable, %w", err)
}

return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

Expand Down
107 changes: 97 additions & 10 deletions pkg/controllers/reallocation/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ import (
"github.com/awslabs/karpenter/pkg/cloudprovider/registry"
"github.com/awslabs/karpenter/pkg/controllers/reallocation"
"github.com/awslabs/karpenter/pkg/test"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"knative.dev/pkg/ptr"

"bou.ke/monkey"
. "github.com/awslabs/karpenter/pkg/test/expectations"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var ctx context.Context
var controller *reallocation.Controller
var env *test.Environment

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
Expand All @@ -51,11 +52,11 @@ var _ = BeforeSuite(func() {
env = test.NewEnvironment(ctx, func(e *test.Environment) {
cloudProvider := &fake.CloudProvider{}
registry.RegisterOrDie(cloudProvider)
controller = reallocation.NewController(
e.Client,
corev1.NewForConfigOrDie(e.Config),
cloudProvider,
)
controller = &reallocation.Controller{
Utilization: &reallocation.Utilization{KubeClient: e.Client},
CloudProvider: cloudProvider,
KubeClient: e.Client,
}
})
Expect(env.Start()).To(Succeed(), "Failed to start environment")
})
Expand All @@ -82,6 +83,34 @@ var _ = Describe("Reallocation", func() {
})

Context("Reconciliation", func() {
It("should not TTL nodes that have ready status unknown", func() {
node := test.Node(test.NodeOptions{
ReadyStatus: v1.ConditionUnknown,
})

ExpectCreated(env.Client, provisioner)
ExpectCreatedWithStatus(env.Client, node)
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner))

updatedNode := &v1.Node{}
Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed())
Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha3.ProvisionerUnderutilizedLabelKey))
Expect(updatedNode.Annotations).ToNot(HaveKey(v1alpha3.ProvisionerTTLAfterEmptyKey))
})
It("should not TTL nodes that have ready status false", func() {
node := test.Node(test.NodeOptions{
ReadyStatus: v1.ConditionFalse,
})

ExpectCreated(env.Client, provisioner)
ExpectCreatedWithStatus(env.Client, node)
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner))

updatedNode := &v1.Node{}
Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed())
Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha3.ProvisionerUnderutilizedLabelKey))
Expect(updatedNode.Annotations).ToNot(HaveKey(v1alpha3.ProvisionerTTLAfterEmptyKey))
})
It("should label nodes as underutilized and add TTL", func() {
node := test.Node(test.NodeOptions{
Labels: map[string]string{
Expand All @@ -104,7 +133,7 @@ var _ = Describe("Reallocation", func() {
v1alpha3.ProvisionerUnderutilizedLabelKey: "true",
},
Annotations: map[string]string{
v1alpha3.ProvisionerTTLAfterEmptyKey: time.Now().Add(time.Duration(100) * time.Second).Format(time.RFC3339),
v1alpha3.ProvisionerTTLAfterEmptyKey: time.Now().Add(100 * time.Second).Format(time.RFC3339),
},
})
ExpectCreated(env.Client, provisioner)
Expand All @@ -122,5 +151,63 @@ var _ = Describe("Reallocation", func() {
Expect(updatedNode.Labels).ToNot(HaveKey(v1alpha3.ProvisionerUnderutilizedLabelKey))
Expect(updatedNode.Annotations).ToNot(HaveKey(v1alpha3.ProvisionerTTLAfterEmptyKey))
})
It("should terminate underutilized nodes past their TTL", func() {
node := test.Node(test.NodeOptions{
Finalizers: []string{v1alpha3.KarpenterFinalizer},
Labels: map[string]string{
v1alpha3.ProvisionerNameLabelKey: provisioner.Name,
v1alpha3.ProvisionerUnderutilizedLabelKey: "true",
},
Annotations: map[string]string{
v1alpha3.ProvisionerTTLAfterEmptyKey: time.Now().Add(-100 * time.Second).Format(time.RFC3339),
},
})
ExpectCreated(env.Client, provisioner, node)
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner))

updatedNode := &v1.Node{}
Expect(env.Client.Get(ctx, client.ObjectKey{Name: node.Name}, updatedNode)).To(Succeed())
Expect(updatedNode.DeletionTimestamp.IsZero()).To(BeFalse())
})
It("should only terminate nodes that failed to join with all pods terminating after 5 minutes", func() {
node := test.Node(test.NodeOptions{
Finalizers: []string{v1alpha3.KarpenterFinalizer},
Labels: map[string]string{
v1alpha3.ProvisionerNameLabelKey: provisioner.Name,
v1alpha3.ProvisionerUnderutilizedLabelKey: "true",
},
ReadyStatus: v1.ConditionUnknown,
})
pod := test.Pod(test.PodOptions{
Finalizers: []string{"fake.sh/finalizer"},
NodeName: node.Name,
})
ExpectCreated(env.Client, provisioner, pod)
ExpectCreatedWithStatus(env.Client, node)

ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner))

// Expect node not deleted
updatedNode := ExpectNodeExists(env.Client, node.Name)
Expect(updatedNode.DeletionTimestamp.IsZero()).To(BeTrue())

// Set pod DeletionTimestamp and do another reconcile
Expect(env.Client.Delete(ctx, pod)).To(Succeed())
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner))

// Expect node not deleted
updatedNode = ExpectNodeExists(env.Client, node.Name)
Expect(updatedNode.DeletionTimestamp.IsZero()).To(BeTrue())

// Simulate time passing and a node failing to join
future := time.Now().Add(reallocation.FailedToJoinTimeout)
monkey.Patch(time.Now, func() time.Time {
return future
})
ExpectReconcileSucceeded(ctx, controller, client.ObjectKeyFromObject(provisioner))

updatedNode = ExpectNodeExists(env.Client, node.Name)
Expect(updatedNode.DeletionTimestamp.IsZero()).To(BeFalse())
})
})
})
48 changes: 34 additions & 14 deletions pkg/controllers/reallocation/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@ import (
"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha3"
"github.com/awslabs/karpenter/pkg/utils/functional"
utilsnode "github.com/awslabs/karpenter/pkg/utils/node"
"github.com/awslabs/karpenter/pkg/utils/pod"
"github.com/awslabs/karpenter/pkg/utils/ptr"

v1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const FailedToJoinTimeout = 5 * time.Minute

type Utilization struct {
kubeClient client.Client
KubeClient client.Client
}

// markUnderutilized adds a TTL to underutilized nodes
Expand All @@ -40,20 +44,21 @@ func (u *Utilization) markUnderutilized(ctx context.Context, provisioner *v1alph
if err != nil {
return err
}

// 2. Get underutilized nodes
for _, node := range nodes {
if !utilsnode.IsReady(node) {
continue
}
pods, err := u.getPods(ctx, node)
if err != nil {
return fmt.Errorf("getting pods for node %s, %w", node.Name, err)
}
if utilsnode.IsEmpty(node, pods) {
if pod.IgnoredForUnderutilization(pods) {
if _, ok := node.Annotations[v1alpha3.ProvisionerTTLAfterEmptyKey]; !ok {
ttlable = append(ttlable, node)
}
}
}

// 3. Set TTL for each underutilized node
for _, node := range ttlable {
persisted := node.DeepCopy()
Expand All @@ -65,7 +70,7 @@ func (u *Utilization) markUnderutilized(ctx context.Context, provisioner *v1alph
node.Annotations,
map[string]string{v1alpha3.ProvisionerTTLAfterEmptyKey: time.Now().Add(time.Duration(ptr.Int64Value(provisioner.Spec.TTLSecondsAfterEmpty)) * time.Second).Format(time.RFC3339)},
)
if err := u.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
if err := u.KubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
return fmt.Errorf("patching node %s, %w", node.Name, err)
}
logging.FromContext(ctx).Infof("Added TTL and label to underutilized node %s", node.Name)
Expand All @@ -80,19 +85,17 @@ func (u *Utilization) clearUnderutilized(ctx context.Context, provisioner *v1alp
if err != nil {
return fmt.Errorf("listing labeled underutilized nodes, %w", err)
}

// 2. Clear underutilized label if node is utilized
for _, node := range nodes {
pods, err := u.getPods(ctx, node)
if err != nil {
return fmt.Errorf("listing pods on node %s, %w", node.Name, err)
}

if !utilsnode.IsEmpty(node, pods) {
if !pod.IgnoredForUnderutilization(pods) {
persisted := node.DeepCopy()
delete(node.Labels, v1alpha3.ProvisionerUnderutilizedLabelKey)
delete(node.Annotations, v1alpha3.ProvisionerTTLAfterEmptyKey)
if err := u.kubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
if err := u.KubeClient.Patch(ctx, node, client.MergeFrom(persisted)); err != nil {
return fmt.Errorf("removing underutilized label on %s, %w", node.Name, err)
} else {
logging.FromContext(ctx).Infof("Removed TTL from node %s", node.Name)
Expand All @@ -109,13 +112,30 @@ func (u *Utilization) terminateExpired(ctx context.Context, provisioner *v1alpha
if err != nil {
return fmt.Errorf("listing underutilized nodes, %w", err)
}

// 2. Trigger termination workflow if past TTLAfterEmpty
for _, node := range nodes {
if utilsnode.IsPastEmptyTTL(node) {
logging.FromContext(ctx).Infof("Triggering termination for empty node %s", node.Name)
if err := u.kubeClient.Delete(ctx, node); err != nil {
return fmt.Errorf("sending delete for node %s, %w", node.Name, err)
if err := u.KubeClient.Delete(ctx, node); err != nil {
return fmt.Errorf("deleting node %s, %w", node.Name, err)
}
}
}
return nil
}

func (u *Utilization) terminateFailedToJoin(ctx context.Context, provisioner *v1alpha3.Provisioner) error {
// 1. Get nodes
nodes, err := u.getNodes(ctx, provisioner, map[string]string{})
if err != nil {
return fmt.Errorf("listing nodes, %w", err)
}
// 2. Trigger termination workflow if node has failed to become ready for 5 minutes
for _, node := range nodes {
if utilsnode.FailedToJoin(node, FailedToJoinTimeout) {
logging.FromContext(ctx).Infof("Triggering termination for node that failed to join %s", node.Name)
if err := u.KubeClient.Delete(ctx, node); err != nil {
return fmt.Errorf("deleting node %s, %w", node.Name, err)
}
}
}
Expand All @@ -125,7 +145,7 @@ func (u *Utilization) terminateExpired(ctx context.Context, provisioner *v1alpha
// getNodes returns a list of nodes with the provisioner's labels and given labels
func (u *Utilization) getNodes(ctx context.Context, provisioner *v1alpha3.Provisioner, additionalLabels map[string]string) ([]*v1.Node, error) {
nodes := &v1.NodeList{}
if err := u.kubeClient.List(ctx, nodes, client.MatchingLabels(functional.UnionStringMaps(map[string]string{
if err := u.KubeClient.List(ctx, nodes, client.MatchingLabels(functional.UnionStringMaps(map[string]string{
v1alpha3.ProvisionerNameLabelKey: provisioner.Name,
}, additionalLabels))); err != nil {
return nil, fmt.Errorf("listing nodes, %w", err)
Expand All @@ -136,7 +156,7 @@ func (u *Utilization) getNodes(ctx context.Context, provisioner *v1alpha3.Provis
// getPods returns a list of pods scheduled to a node
func (u *Utilization) getPods(ctx context.Context, node *v1.Node) ([]*v1.Pod, error) {
pods := &v1.PodList{}
if err := u.kubeClient.List(ctx, pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
if err := u.KubeClient.List(ctx, pods, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
return nil, fmt.Errorf("listing pods on node %s, %w", node.Name, err)
}
return ptr.PodListToSlice(pods), nil
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var ctx context.Context
var controller *termination.Controller
var evictionQueue *termination.EvictionQueue
var env *test.Environment

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
RegisterFailHandler(Fail)
Expand Down

0 comments on commit 0abf03e

Please sign in to comment.