-
Notifications
You must be signed in to change notification settings - Fork 832
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: instance tagging controller (#4611)
Co-authored-by: Jonathan Innis <jonathan.innis.ji@gmail.com>
- Loading branch information
1 parent
7d74908
commit 5ab60b2
Showing
7 changed files
with
556 additions
and
91 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
/* | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package tagging | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/api/equality" | ||
"knative.dev/pkg/logging" | ||
controllerruntime "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/manager" | ||
"sigs.k8s.io/controller-runtime/pkg/predicate" | ||
"sigs.k8s.io/controller-runtime/pkg/reconcile" | ||
|
||
"github.com/samber/lo" | ||
|
||
"github.com/aws/karpenter/pkg/apis/v1beta1" | ||
"github.com/aws/karpenter/pkg/providers/instance" | ||
"github.com/aws/karpenter/pkg/utils" | ||
|
||
corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1" | ||
"github.com/aws/karpenter-core/pkg/cloudprovider" | ||
corecontroller "github.com/aws/karpenter-core/pkg/operator/controller" | ||
) | ||
|
||
const ( | ||
TagNodeClaim = corev1beta1.Group + "/nodeclaim" | ||
TagName = "Name" | ||
) | ||
|
||
type Controller struct { | ||
kubeClient client.Client | ||
instanceProvider *instance.Provider | ||
} | ||
|
||
func NewController(kubeClient client.Client, instanceProvider *instance.Provider) corecontroller.Controller { | ||
return corecontroller.Typed[*corev1beta1.NodeClaim](kubeClient, &Controller{ | ||
kubeClient: kubeClient, | ||
instanceProvider: instanceProvider, | ||
}) | ||
} | ||
|
||
func (c *Controller) Name() string { | ||
return "nodeclaim.tagging" | ||
} | ||
|
||
func (c *Controller) Reconcile(ctx context.Context, nodeClaim *corev1beta1.NodeClaim) (reconcile.Result, error) { | ||
stored := nodeClaim.DeepCopy() | ||
|
||
if !isTaggable(nodeClaim) { | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("provider-id", nodeClaim.Status.ProviderID)) | ||
id, err := utils.ParseInstanceID(nodeClaim.Status.ProviderID) | ||
if err != nil { | ||
// We don't throw an error here since we don't want to retry until the ProviderID has been updated. | ||
logging.FromContext(ctx).Errorf("failed to parse instance ID, %w", err) | ||
return reconcile.Result{}, nil | ||
} | ||
|
||
if err := c.tagInstance(ctx, nodeClaim, id); err != nil { | ||
return reconcile.Result{}, cloudprovider.IgnoreNodeClaimNotFoundError(err) | ||
} | ||
|
||
nodeClaim.Annotations = lo.Assign(nodeClaim.Annotations, map[string]string{v1beta1.AnnotationInstanceTagged: "true"}) | ||
if !equality.Semantic.DeepEqual(nodeClaim, stored) { | ||
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil { | ||
return reconcile.Result{}, client.IgnoreNotFound(err) | ||
} | ||
} | ||
|
||
return reconcile.Result{}, nil | ||
} | ||
|
||
func (c *Controller) Builder(_ context.Context, m manager.Manager) corecontroller.Builder { | ||
return corecontroller.Adapt( | ||
controllerruntime. | ||
NewControllerManagedBy(m). | ||
For(&corev1beta1.NodeClaim{}). | ||
WithEventFilter(predicate.NewPredicateFuncs(func(o client.Object) bool { | ||
return isTaggable(o.(*corev1beta1.NodeClaim)) | ||
})), | ||
) | ||
} | ||
|
||
func (c *Controller) tagInstance(ctx context.Context, nc *corev1beta1.NodeClaim, id string) error { | ||
tags := map[string]string{ | ||
TagName: nc.Status.NodeName, | ||
TagNodeClaim: nc.Name, | ||
} | ||
|
||
// Remove tags which have been already populated | ||
instance, err := c.instanceProvider.Get(ctx, id) | ||
if err != nil { | ||
return fmt.Errorf("tagging nodeclaim, %w", err) | ||
} | ||
tags = lo.OmitByKeys(tags, lo.Keys(instance.Tags)) | ||
if len(tags) == 0 { | ||
return nil | ||
} | ||
|
||
// Ensures that no more than 1 CreateTags call is made per second. Rate limiting is required since CreateTags | ||
// shares a pool with other mutating calls (e.g. CreateFleet). | ||
defer time.Sleep(time.Second) | ||
if err := c.instanceProvider.CreateTags(ctx, id, tags); err != nil { | ||
return fmt.Errorf("tagging nodeclaim, %w", err) | ||
} | ||
return nil | ||
} | ||
|
||
func isTaggable(nc *corev1beta1.NodeClaim) bool { | ||
// Instance has already been tagged | ||
if val := nc.Annotations[v1beta1.AnnotationInstanceTagged]; val == "true" { | ||
return false | ||
} | ||
// Node name is not yet known | ||
if nc.Status.NodeName == "" { | ||
return false | ||
} | ||
// NodeClaim is currently terminating | ||
if !nc.DeletionTimestamp.IsZero() { | ||
return false | ||
} | ||
return true | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,230 @@ | ||
/* | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package tagging_test | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"testing" | ||
|
||
. "github.com/onsi/ginkgo/v2" | ||
. "github.com/onsi/gomega" | ||
"github.com/samber/lo" | ||
v1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
||
. "github.com/aws/karpenter-core/pkg/test/expectations" | ||
|
||
"github.com/aws/aws-sdk-go/aws" | ||
"github.com/aws/aws-sdk-go/service/ec2" | ||
. "knative.dev/pkg/logging/testing" | ||
|
||
coresettings "github.com/aws/karpenter-core/pkg/apis/settings" | ||
corev1beta1 "github.com/aws/karpenter-core/pkg/apis/v1beta1" | ||
coretest "github.com/aws/karpenter-core/pkg/test" | ||
"github.com/aws/karpenter/pkg/apis" | ||
"github.com/aws/karpenter/pkg/apis/settings" | ||
"github.com/aws/karpenter/pkg/apis/v1beta1" | ||
"github.com/aws/karpenter/pkg/controllers/nodeclaim/tagging" | ||
"github.com/aws/karpenter/pkg/fake" | ||
"github.com/aws/karpenter/pkg/providers/instance" | ||
"github.com/aws/karpenter/pkg/test" | ||
|
||
"github.com/aws/karpenter-core/pkg/operator/controller" | ||
"github.com/aws/karpenter-core/pkg/operator/scheme" | ||
) | ||
|
||
var ctx context.Context | ||
var awsEnv *test.Environment | ||
var env *coretest.Environment | ||
var taggingController controller.Controller | ||
|
||
func TestAPIs(t *testing.T) { | ||
ctx = TestContextWithLogger(t) | ||
RegisterFailHandler(Fail) | ||
RunSpecs(t, "TaggingController") | ||
} | ||
|
||
var _ = BeforeSuite(func() { | ||
env = coretest.NewEnvironment(scheme.Scheme, coretest.WithCRDs(apis.CRDs...)) | ||
ctx = coresettings.ToContext(ctx, coretest.Settings()) | ||
ctx = settings.ToContext(ctx, test.Settings()) | ||
awsEnv = test.NewEnvironment(ctx, env) | ||
taggingController = tagging.NewController(env.Client, awsEnv.InstanceProvider) | ||
}) | ||
var _ = AfterSuite(func() { | ||
Expect(env.Stop()).To(Succeed(), "Failed to stop environment") | ||
}) | ||
|
||
var _ = BeforeEach(func() { | ||
awsEnv.Reset() | ||
}) | ||
|
||
var _ = AfterEach(func() { | ||
ExpectCleanedUp(ctx, env.Client) | ||
}) | ||
|
||
var _ = Describe("TaggingController", func() { | ||
var ec2Instance *ec2.Instance | ||
|
||
BeforeEach(func() { | ||
ec2Instance = &ec2.Instance{ | ||
State: &ec2.InstanceState{ | ||
Name: aws.String(ec2.InstanceStateNameRunning), | ||
}, | ||
Tags: []*ec2.Tag{ | ||
{ | ||
Key: aws.String(fmt.Sprintf("kubernetes.io/cluster/%s", settings.FromContext(ctx).ClusterName)), | ||
Value: aws.String("owned"), | ||
}, | ||
{ | ||
Key: aws.String(corev1beta1.NodePoolLabelKey), | ||
Value: aws.String("default"), | ||
}, | ||
{ | ||
Key: aws.String(corev1beta1.ManagedByAnnotationKey), | ||
Value: aws.String(settings.FromContext(ctx).ClusterName), | ||
}, | ||
}, | ||
PrivateDnsName: aws.String(fake.PrivateDNSName()), | ||
Placement: &ec2.Placement{ | ||
AvailabilityZone: aws.String(fake.DefaultRegion), | ||
}, | ||
InstanceId: aws.String(fake.InstanceID()), | ||
InstanceType: aws.String("m5.large"), | ||
} | ||
|
||
awsEnv.EC2API.Instances.Store(*ec2Instance.InstanceId, ec2Instance) | ||
}) | ||
|
||
It("shouldn't tag instances without a Node", func() { | ||
nodeClaim := coretest.NodeClaim(corev1beta1.NodeClaim{ | ||
Status: corev1beta1.NodeClaimStatus{ | ||
ProviderID: fake.ProviderID(*ec2Instance.InstanceId), | ||
}, | ||
}) | ||
|
||
ExpectApplied(ctx, env.Client, nodeClaim) | ||
ExpectReconcileSucceeded(ctx, taggingController, client.ObjectKeyFromObject(nodeClaim)) | ||
Expect(nodeClaim.Annotations).To(Not(HaveKey(v1beta1.AnnotationInstanceTagged))) | ||
Expect(lo.ContainsBy(ec2Instance.Tags, func(tag *ec2.Tag) bool { | ||
return *tag.Key == tagging.TagName | ||
})).To(BeFalse()) | ||
}) | ||
|
||
It("shouldn't tag nodeclaim with a malformed provderID", func() { | ||
nodeClaim := coretest.NodeClaim(corev1beta1.NodeClaim{ | ||
Status: corev1beta1.NodeClaimStatus{ | ||
ProviderID: "Bad providerID", | ||
NodeName: "default", | ||
}, | ||
}) | ||
|
||
ExpectApplied(ctx, env.Client, nodeClaim) | ||
ExpectReconcileSucceeded(ctx, taggingController, client.ObjectKeyFromObject(nodeClaim)) | ||
Expect(nodeClaim.Annotations).To(Not(HaveKey(v1beta1.AnnotationInstanceTagged))) | ||
Expect(lo.ContainsBy(ec2Instance.Tags, func(tag *ec2.Tag) bool { | ||
return *tag.Key == tagging.TagName | ||
})).To(BeFalse()) | ||
}) | ||
|
||
It("should gracefully handle missing NodeClaim", func() { | ||
nodeClaim := coretest.NodeClaim(corev1beta1.NodeClaim{ | ||
Status: corev1beta1.NodeClaimStatus{ | ||
ProviderID: fake.ProviderID(*ec2Instance.InstanceId), | ||
NodeName: "default", | ||
}, | ||
}) | ||
|
||
ExpectApplied(ctx, env.Client, nodeClaim) | ||
ExpectDeleted(ctx, env.Client, nodeClaim) | ||
ExpectReconcileSucceeded(ctx, taggingController, client.ObjectKeyFromObject(nodeClaim)) | ||
}) | ||
|
||
It("should gracefully handle missing instance", func() { | ||
nodeClaim := coretest.NodeClaim(corev1beta1.NodeClaim{ | ||
Status: corev1beta1.NodeClaimStatus{ | ||
ProviderID: fake.ProviderID(*ec2Instance.InstanceId), | ||
NodeName: "default", | ||
}, | ||
}) | ||
|
||
ExpectApplied(ctx, env.Client, nodeClaim) | ||
awsEnv.EC2API.Instances.Delete(*ec2Instance.InstanceId) | ||
ExpectReconcileSucceeded(ctx, taggingController, client.ObjectKeyFromObject(nodeClaim)) | ||
Expect(nodeClaim.Annotations).To(Not(HaveKey(v1beta1.AnnotationInstanceTagged))) | ||
}) | ||
|
||
It("shouldn't tag nodeclaim with deletion timestamp set", func() { | ||
nodeClaim := coretest.NodeClaim(corev1beta1.NodeClaim{ | ||
Status: corev1beta1.NodeClaimStatus{ | ||
ProviderID: fake.ProviderID(*ec2Instance.InstanceId), | ||
NodeName: "default", | ||
}, | ||
ObjectMeta: v1.ObjectMeta{ | ||
Finalizers: []string{"testing/finalizer"}, | ||
}, | ||
}) | ||
|
||
ExpectApplied(ctx, env.Client, nodeClaim) | ||
Expect(env.Client.Delete(ctx, nodeClaim)).To(Succeed()) | ||
ExpectReconcileSucceeded(ctx, taggingController, client.ObjectKeyFromObject(nodeClaim)) | ||
Expect(nodeClaim.Annotations).To(Not(HaveKey(v1beta1.AnnotationInstanceTagged))) | ||
Expect(lo.ContainsBy(ec2Instance.Tags, func(tag *ec2.Tag) bool { | ||
return *tag.Key == tagging.TagName | ||
})).To(BeFalse()) | ||
}) | ||
|
||
DescribeTable( | ||
"should tag taggable instances", | ||
func(customTags ...string) { | ||
nodeClaim := coretest.NodeClaim(corev1beta1.NodeClaim{ | ||
Status: corev1beta1.NodeClaimStatus{ | ||
ProviderID: fake.ProviderID(*ec2Instance.InstanceId), | ||
NodeName: "default", | ||
}, | ||
}) | ||
|
||
for _, tag := range customTags { | ||
ec2Instance.Tags = append(ec2Instance.Tags, &ec2.Tag{ | ||
Key: aws.String(tag), | ||
Value: aws.String("custom-tag"), | ||
}) | ||
} | ||
awsEnv.EC2API.Instances.Store(*ec2Instance.InstanceId, ec2Instance) | ||
|
||
ExpectApplied(ctx, env.Client, nodeClaim) | ||
ExpectReconcileSucceeded(ctx, taggingController, client.ObjectKeyFromObject(nodeClaim)) | ||
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) | ||
Expect(nodeClaim.Annotations).To(HaveKey(v1beta1.AnnotationInstanceTagged)) | ||
|
||
expectedTags := map[string]string{ | ||
tagging.TagName: nodeClaim.Status.NodeName, | ||
tagging.TagNodeClaim: nodeClaim.Name, | ||
} | ||
instanceTags := instance.NewInstance(ec2Instance).Tags | ||
for tag, value := range expectedTags { | ||
if lo.Contains(customTags, tag) { | ||
value = "custom-tag" | ||
} | ||
Expect(instanceTags).To(HaveKeyWithValue(tag, value)) | ||
} | ||
}, | ||
Entry("with only karpenter.k8s.aws/nodeclaim tag", tagging.TagName), | ||
Entry("with only Name tag", tagging.TagNodeClaim), | ||
Entry("with both Name and karpenter.k8s.aws/nodeclaim tags"), | ||
Entry("with nothing to tag", tagging.TagName, tagging.TagNodeClaim), | ||
) | ||
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.