Skip to content

Commit

Permalink
Handle reowned nodes linking
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Apr 22, 2023
1 parent 84fa21c commit 72aae0d
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (c *CloudProvider) Link(ctx context.Context, machine *v1alpha5.Machine) err
return fmt.Errorf("getting instance ID, %w", err)
}
ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("id", id))
return c.instanceProvider.Link(ctx, id)
return c.instanceProvider.Link(ctx, id, machine.Labels[v1alpha5.ProvisionerNameLabelKey])
}

func (c *CloudProvider) List(ctx context.Context) ([]*v1alpha5.Machine, error) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/controllers/machine/link/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -67,10 +68,23 @@ func (c *Controller) Reconcile(ctx context.Context, _ reconcile.Request) (reconc
if err := c.kubeClient.List(ctx, machineList); err != nil {
return reconcile.Result{}, err
}
nodeList := &v1.NodeList{}
if err := c.kubeClient.List(ctx, nodeList, client.HasLabels{v1alpha5.ProvisionerNameLabelKey}); err != nil {
return reconcile.Result{}, err
}
retrieved, err := c.cloudProvider.List(ctx)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing cloudprovider machines, %w", err)
}
retrievedIDs := sets.NewString(lo.Map(retrieved, func(m *v1alpha5.Machine, _ int) string { return m.Status.ProviderID })...)
// Inject any nodes that are re-owned using karpenter.sh/provisioner-name but aren't found from the cloudprovider.List() call
for i := range nodeList.Items {
if _, ok := lo.Find(retrieved, func(r *v1alpha5.Machine) bool {
return retrievedIDs.Has(nodeList.Items[i].Spec.ProviderID)
}); !ok {
retrieved = append(retrieved, machineutil.NewFromNode(&nodeList.Items[i]))
}
}
// Filter out any machines that shouldn't be linked
retrieved = lo.Filter(retrieved, func(m *v1alpha5.Machine, _ int) bool {
_, ok := m.Labels[v1alpha5.ManagedByLabelKey]
Expand Down
42 changes: 42 additions & 0 deletions pkg/controllers/machine/link/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
. "github.com/onsi/gomega"
"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
. "knative.dev/pkg/logging/testing"
Expand Down Expand Up @@ -329,6 +330,47 @@ var _ = Describe("MachineLink", func() {
instance := ExpectInstanceExists(awsEnv.EC2API, instanceID)
ExpectManagedByTagExists(instance)
})
It("should link an instance that was re-owned with a provisioner-name label", func() {
awsEnv.EC2API.Reset() // Reset so we don't store the extra instance

// Don't include the provisioner-name tag
awsEnv.EC2API.EC2Behavior.Instances.Store(
instanceID,
&ec2.Instance{
State: &ec2.InstanceState{
Name: aws.String(ec2.InstanceStateNameRunning),
},
Tags: []*ec2.Tag{
{
Key: aws.String(fmt.Sprintf("kubernetes.io/cluster/%s", settings.FromContext(ctx).ClusterName)),
Value: aws.String("owned"),
},
},
PrivateDnsName: aws.String(fake.PrivateDNSName()),
Placement: &ec2.Placement{
AvailabilityZone: aws.String("test-zone-1a"),
},
InstanceId: aws.String(instanceID),
InstanceType: aws.String("m5.large"),
},
)
node := coretest.Node(coretest.NodeOptions{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
v1alpha5.ProvisionerNameLabelKey: provisioner.Name,
},
},
ProviderID: providerID,
})
ExpectApplied(ctx, env.Client, node, provisioner, nodeTemplate)
ExpectReconcileSucceeded(ctx, linkController, client.ObjectKey{})

machineList := &v1alpha5.MachineList{}
Expect(env.Client.List(ctx, machineList)).To(Succeed())
Expect(machineList.Items).To(HaveLen(1))
machine := machineList.Items[0]
Expect(machine.Annotations).To(HaveKeyWithValue(v1alpha5.MachineLinkedAnnotationKey, providerID))
})
It("should not link an instance without a provisioner tag", func() {
instance := ExpectInstanceExists(awsEnv.EC2API, instanceID)
instance.Tags = lo.Reject(instance.Tags, func(t *ec2.Tag, _ int) bool {
Expand Down
10 changes: 9 additions & 1 deletion pkg/providers/instance/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,22 @@ func (p *Provider) Create(ctx context.Context, nodeTemplate *v1alpha1.AWSNodeTem
return NewInstanceFromFleet(fleetInstance, tags), nil
}

func (p *Provider) Link(ctx context.Context, id string) error {
func (p *Provider) Link(ctx context.Context, id, provisionerName string) error {
_, err := p.ec2api.CreateTagsWithContext(ctx, &ec2.CreateTagsInput{
Resources: aws.StringSlice([]string{id}),
Tags: []*ec2.Tag{
{
Key: aws.String(v1alpha5.ManagedByLabelKey),
Value: aws.String(settings.FromContext(ctx).ClusterName),
},
{
Key: aws.String(fmt.Sprintf("kubernetes.io/cluster/%s", settings.FromContext(ctx).ClusterName)),
Value: aws.String("owned"),
},
{
Key: aws.String(v1alpha5.ProvisionerNameLabelKey),
Value: aws.String(provisionerName),
},
},
})
if err != nil {
Expand Down
71 changes: 71 additions & 0 deletions test/suites/machine/link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/samber/lo"
"sigs.k8s.io/controller-runtime/pkg/client"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -204,4 +205,74 @@ var _ = Describe("MachineLink", func() {
g.Expect(aws.StringValue(tag.Value)).To(Equal(settings.FromContext(env.Context).ClusterName))
}, time.Minute, time.Second).Should(Succeed())
})
It("should succeed to link a Machine for an existing instance re-owned by Karpenter", func() {
provider := awstest.AWSNodeTemplate(v1alpha1.AWSNodeTemplateSpec{AWS: v1alpha1.AWS{
AMIFamily: &v1alpha1.AMIFamilyAL2,
SecurityGroupSelector: map[string]string{"karpenter.sh/discovery": settings.FromContext(env.Context).ClusterName},
SubnetSelector: map[string]string{"karpenter.sh/discovery": settings.FromContext(env.Context).ClusterName},
}})
provisioner := test.Provisioner(test.ProvisionerOptions{
ProviderRef: &v1alpha5.MachineTemplateRef{Name: provider.Name},
})
env.ExpectCreated(provisioner, provider)

// Update the userData for the instance input with the correct provisionerName
rawContent, err := os.ReadFile("testdata/al2_userdata_input.sh")
Expect(err).ToNot(HaveOccurred())

// No tag specifications since we're mocking an instance not launched by Karpenter
instanceInput.TagSpecifications = nil
instanceInput.UserData = lo.ToPtr(base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(string(rawContent), settings.FromContext(env.Context).ClusterName,
settings.FromContext(env.Context).ClusterEndpoint, env.ExpectCABundle(), provisioner.Name))))

// Create an instance manually to mock Karpenter launching an instance
out, err := env.EC2API.RunInstances(instanceInput)
Expect(err).ToNot(HaveOccurred())
Expect(out.Instances).To(HaveLen(1))

// Always ensure that we cleanup the instance
DeferCleanup(func() {
_, err := env.EC2API.TerminateInstances(&ec2.TerminateInstancesInput{
InstanceIds: []*string{out.Instances[0].InstanceId},
})
if awserrors.IsNotFound(err) {
return
}
Expect(err).ToNot(HaveOccurred())
})

// Wait for the node to register with the cluster
node := env.EventuallyExpectCreatedNodeCount("==", 1)[0]

// Add the provisioner-name label to the node to re-own it
stored := node.DeepCopy()
node.Labels[v1alpha5.ProvisionerNameLabelKey] = provisioner.Name
Expect(env.Client.Patch(env.Context, node, client.MergeFrom(stored))).To(Succeed())

// Restart Karpenter to start the linking process
env.ExpectKarpenterPodsDeleted()

// Expect that the Machine is created when Karpenter starts up
machines := env.EventuallyExpectCreatedMachineCount("==", 1)
machine := machines[0]

// Expect the machine's fields are properly populated
Expect(machine.Spec.Requirements).To(Equal(provisioner.Spec.Requirements))
Expect(machine.Spec.MachineTemplateRef.Name).To(Equal(provider.Name))

// Expect the instance to have the karpenter.sh/managed-by tag and the karpenter.sh/provisioner-name tag
Eventually(func(g Gomega) {
instance := env.GetInstanceByID(aws.StringValue(out.Instances[0].InstanceId))
tag, ok := lo.Find(instance.Tags, func(t *ec2.Tag) bool {
return aws.StringValue(t.Key) == v1alpha5.ManagedByLabelKey
})
g.Expect(ok).To(BeTrue())
g.Expect(aws.StringValue(tag.Value)).To(Equal(settings.FromContext(env.Context).ClusterName))
tag, ok = lo.Find(instance.Tags, func(t *ec2.Tag) bool {
return aws.StringValue(t.Key) == v1alpha5.ProvisionerNameLabelKey
})
g.Expect(ok).To(BeTrue())
g.Expect(aws.StringValue(tag.Value)).To(Equal(provisioner.Name))
}, time.Minute, time.Second).Should(Succeed())
})
})

0 comments on commit 72aae0d

Please sign in to comment.