Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross-cluster GC after delete #180

Merged
merged 2 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions charts/multicluster-scheduler/templates/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ spec:
- name: controller-manager
args: ["--leader-elect"]
env:
- name: CLUSTER_NAME
value: {{ .Values.clusterName }}
# POD_NAME for leader election
- name: POD_NAME
valueFrom:
Expand Down Expand Up @@ -104,6 +106,9 @@ spec:
- name: proxy-scheduler
image: {{ .Values.scheduler.image.repository }}:{{ default .Chart.AppVersion .Values.scheduler.image.tag }}
args: ["--config", "/etc/admiralty/proxy-scheduler-config"]
env:
- name: CLUSTER_NAME
value: {{ .Values.clusterName }}
volumeMounts:
- name: config
mountPath: /etc/admiralty
Expand Down
2 changes: 2 additions & 0 deletions charts/multicluster-scheduler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ fullnameOverride: ""

#imagePullSecretName: ""

clusterName: ""

sourceController:
enabled: true

Expand Down
10 changes: 9 additions & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import (
"context"
"flag"
"fmt"
"os"
"time"

"admiralty.io/multicluster-scheduler/pkg/controllers/cleanup"
Expand Down Expand Up @@ -125,6 +126,8 @@ func startOldStyleControllers(
var factories []startable
var controllers []runnable

clusterName := os.Getenv("CLUSTER_NAME")

for _, target := range agentCfg.Targets {
kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(k, time.Second*30, kubeinformers.WithNamespace(target.Namespace))
factories = append(factories, kubeInformerFactory)
Expand Down Expand Up @@ -156,6 +159,7 @@ func startOldStyleControllers(
controllers = append(
controllers,
follow.NewConfigMapController(
clusterName,
target,
k,
targetKubeClient,
Expand All @@ -164,6 +168,7 @@ func startOldStyleControllers(
targetKubeInformerFactory.Core().V1().ConfigMaps(),
),
service.NewController(
clusterName,
target,
k,
targetKubeClient,
Expand All @@ -173,6 +178,7 @@ func startOldStyleControllers(
targetKubeInformerFactory.Core().V1().Services(),
),
follow.NewSecretController(
clusterName,
target,
k,
targetKubeClient,
Expand All @@ -181,6 +187,7 @@ func startOldStyleControllers(
targetKubeInformerFactory.Core().V1().Secrets(),
),
ingress.NewIngressController(
clusterName,
target,
k,
targetKubeClient,
Expand All @@ -193,6 +200,7 @@ func startOldStyleControllers(
controllers = append(
controllers,
feedback.NewController(
clusterName,
target,
k,
targetCustomClient,
Expand Down
10 changes: 6 additions & 4 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,9 +63,11 @@ var (

LabelKeyHasFinalizer = KeyPrefix + "has-finalizer"

LabelKeyParentUID = KeyPrefix + "parent-uid"
LabelKeyParentName = KeyPrefix + "parent-name"
LabelKeyParentNamespace = KeyPrefix + "parent-namespace"
LabelKeyParentClusterName = KeyPrefix + "parent-cluster-name"
// used to get pod chaperon (whose name is generated) given proxy pod ("list one" hack), without indexer
LabelKeyParentUID = KeyPrefix + "parent-uid"
AnnotationKeyParentName = KeyPrefix + "parent-name"
AnnotationKeyParentNamespace = KeyPrefix + "parent-namespace"

AnnotationKeyCiliumGlobalService = "io.cilium/global-service"

Expand Down
80 changes: 48 additions & 32 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -164,51 +164,67 @@ func (c *Controller) EnqueueController(ownerKind string, getOwner GetOwner) func
}
}

func (c *Controller) EnqueueRemoteController(ownerKind string, getOwner GetOwner) func(obj interface{}) {
func (c *Controller) EnqueueRemoteController(parentClusterName string) func(obj interface{}) {
return func(obj interface{}) {
object := obj.(metav1.Object)
a := object.GetAnnotations()
parentUID, ok := a[common.LabelKeyParentUID]
if !ok {
// for backward compatibility use labels instead,
// even though didn't work for parent names longer than 63 characters
a = object.GetLabels()
parentUID, ok = a[common.LabelKeyParentUID]
if IsRemoteControlled(object, parentClusterName) {
c.workqueue.Add(ParentKey(object))
return
}
if ok {
parentNamespace := a[common.LabelKeyParentNamespace]
if parentNamespace == "" {
parentNamespace = object.GetNamespace()
}
parentName := a[common.LabelKeyParentName]
if parentName == "" {
parentName = object.GetName()
}
owner, err := getOwner(parentNamespace, parentName)
if err != nil {
return
}
}
}

if string(owner.GetUID()) != parentUID {
// TODO handle unlikely yet possible cross-cluster UID conflict with signing
return
}
func IsRemoteControlled(object metav1.Object, parentClusterName string) bool {
v, ok := object.GetLabels()[common.LabelKeyParentClusterName]
// support empty parent cluster name
// check that label is present to filter out regular objects
return ok && v == parentClusterName
}

c.EnqueueObject(owner)
return
func ParentKey(child metav1.Object) string {
a := child.GetAnnotations()
parentNamespace := a[common.AnnotationKeyParentNamespace]
if parentNamespace == "" {
parentNamespace = child.GetNamespace()
}
parentName := a[common.AnnotationKeyParentName]
if parentName == "" {
parentName = child.GetName()
}
if parentNamespace != "" {
return parentNamespace + "/" + parentName
}
return parentName
}

func IndexByRemoteController(parentClusterName string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
meta, ok := obj.(metav1.Object)
if !ok {
return nil, nil
}
if !IsRemoteControlled(meta, parentClusterName) {
return nil, nil
}
return []string{ParentKey(meta)}, nil
}
}

func AddRemoteControllerReference(child metav1.Object, parent metav1.Object) {
func AddRemoteControllerReference(child metav1.Object, parent metav1.Object, parentClusterName string) {
l := child.GetLabels()
if l == nil {
l = map[string]string{}
child.SetLabels(l)
}
l[common.LabelKeyParentUID] = string(parent.GetUID())
l[common.LabelKeyParentClusterName] = parentClusterName
a := child.GetAnnotations()
if a == nil {
a = map[string]string{}
child.SetAnnotations(a)
}
a[common.LabelKeyParentUID] = string(parent.GetUID())
a[common.LabelKeyParentNamespace] = parent.GetNamespace()
a[common.LabelKeyParentName] = parent.GetName()
a[common.AnnotationKeyParentNamespace] = parent.GetNamespace()
a[common.AnnotationKeyParentName] = parent.GetName()
}

func ParentControlsChild(child metav1.Object, parent metav1.Object) bool {
Expand Down
67 changes: 37 additions & 30 deletions pkg/controllers/feedback/controller.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 The Multicluster-Scheduler Authors.
* Copyright 2023 The Multicluster-Scheduler Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,30 +50,24 @@ import (

// this file is modified from k8s.io/sample-controller

const controllerAgentName = "admiralty"

const (
// SuccessSynced is used as part of the Event 'reason' when a proxy pod is synced
SuccessSynced = "Synced"
// MessageResourceSynced is the message used for an Event fired when a proxy pod
// is synced successfully
MessageResourceSynced = "proxy pod synced successfully"
)
const podChaperonByPodNamespacedName = "podChaperonByPodNamespacedName"

type reconciler struct {
target agent.Target
clusterName string
target agent.Target

kubeclientset kubernetes.Interface
customclientset clientset.Interface

podsLister corelisters.PodLister
podChaperonsLister listers.PodChaperonLister

recorder record.EventRecorder
podChaperonIndex cache.Indexer
}

// NewController returns a new feedback controller
func NewController(
clusterName string,
target agent.Target,
kubeclientset kubernetes.Interface,
customclientset clientset.Interface,
Expand All @@ -85,21 +79,20 @@ func NewController(
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

r := &reconciler{
target: target,
clusterName: clusterName,
target: target,

kubeclientset: kubeclientset,
customclientset: customclientset,

podsLister: podInformer.Lister(),
podChaperonsLister: podChaperonInformer.Lister(),

recorder: recorder,
podChaperonIndex: podChaperonInformer.Informer().GetIndexer(),
}

getPod := func(namespace, name string) (metav1.Object, error) { return r.podsLister.Pods(namespace).Get(name) }
c := controller.New("feedback", r, podInformer.Informer().HasSynced, podChaperonInformer.Informer().HasSynced)

enqueueProxyPod := func(obj interface{}) {
Expand All @@ -110,7 +103,11 @@ func NewController(
}

podInformer.Informer().AddEventHandler(controller.HandleAddUpdateWith(enqueueProxyPod))
podChaperonInformer.Informer().AddEventHandler(controller.HandleAllWith(c.EnqueueRemoteController("Pod", getPod)))
podChaperonInformer.Informer().AddEventHandler(controller.HandleAllWith(c.EnqueueRemoteController(clusterName)))

utilruntime.Must(podChaperonInformer.Informer().AddIndexers(map[string]cache.IndexFunc{
podChaperonByPodNamespacedName: controller.IndexByRemoteController(clusterName),
}))

return c
}
Expand All @@ -125,17 +122,26 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
proxyPod, err := c.podsLister.Pods(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("proxy pod '%s' in work queue no longer exists", key))
objs, err := c.podChaperonIndex.ByIndex(podChaperonByPodNamespacedName, key)
utilruntime.Must(err)
for _, obj := range objs {
candidate := obj.(*multiclusterv1alpha1.PodChaperon)
if err := c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Delete(ctx, candidate.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return nil, fmt.Errorf("cannot delete orphaned pod chaperon: %v", err)
}
}
return nil, nil
} else {
return nil, fmt.Errorf("cannot get proxy pod: %v", err)
}

return nil, err
}

proxyPodTerminating := proxyPod.DeletionTimestamp != nil

proxyPodHasFinalizer, j := controller.HasFinalizer(proxyPod.Finalizers, c.target.Finalizer)

// get pod chaperon by parent UID (when parent still exists) rather than using index
// for backward compatibility with existing pod chaperons
var candidate *multiclusterv1alpha1.PodChaperon
l, err := c.podChaperonsLister.PodChaperons(namespace).List(labels.SelectorFromSet(map[string]string{common.LabelKeyParentUID: string(proxyPod.UID)}))
if err != nil {
Expand All @@ -148,20 +154,16 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
candidate = l[0]
}

didSomething := false

virtualNodeName := proxypod.GetScheduledClusterName(proxyPod)
if proxyPodTerminating || virtualNodeName != "" && virtualNodeName != c.target.VirtualNodeName {
if candidate != nil {
if err := c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Delete(ctx, candidate.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return nil, err
}
didSomething = true
} else if proxyPodHasFinalizer {
if proxyPod, err = c.removeFinalizer(ctx, proxyPod, j); err != nil {
return nil, err
}
didSomething = true
}
}

Expand All @@ -183,7 +185,6 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
if proxyPod, err = c.kubeclientset.CoreV1().Pods(namespace).Update(ctx, podCopy, metav1.UpdateOptions{}); err != nil {
return nil, err
}
didSomething = true
}

// we can't group annotation and status updates into an update,
Expand All @@ -198,17 +199,23 @@ func (c *reconciler) Handle(obj interface{}) (requeueAfter *time.Duration, err e
if proxyPod, err = c.kubeclientset.CoreV1().Pods(namespace).UpdateStatus(ctx, podCopy, metav1.UpdateOptions{}); err != nil {
return nil, err
}
didSomething = true
}
}

if didSomething {
c.recorder.Event(proxyPod, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
needRemoteUpdate := delegate.Labels[common.LabelKeyParentClusterName] != c.clusterName
if needRemoteUpdate {
delegateCopy := delegate.DeepCopy()
delegateCopy.Labels[common.LabelKeyParentClusterName] = c.clusterName
var err error
if delegate, err = c.customclientset.MulticlusterV1alpha1().PodChaperons(namespace).Update(ctx, delegateCopy, metav1.UpdateOptions{}); err != nil {
return nil, fmt.Errorf("cannot update candidate pod chaperon")
}
}
}

return nil, nil
}

func (c reconciler) removeFinalizer(ctx context.Context, pod *corev1.Pod, j int) (*corev1.Pod, error) {
func (c *reconciler) removeFinalizer(ctx context.Context, pod *corev1.Pod, j int) (*corev1.Pod, error) {
podCopy := pod.DeepCopy()
podCopy.Finalizers = append(podCopy.Finalizers[:j], podCopy.Finalizers[j+1:]...)
return c.kubeclientset.CoreV1().Pods(pod.Namespace).Update(ctx, podCopy, metav1.UpdateOptions{})
Expand Down
Loading