From a93e9b40743cc2ce3471c2d744b0736e234fd012 Mon Sep 17 00:00:00 2001 From: Nikhil Sharma Date: Sat, 11 Mar 2023 16:24:52 +0530 Subject: [PATCH] poolmgr: stop pod specialization when pod namespace and cm/secret namespace is mismatched (#2703) Signed-off-by: Nikhil Sharma --- pkg/executor/executortype/poolmgr/gp.go | 60 ++++++++++++++----- .../executortype/poolmgr/gp_deployment.go | 8 +-- .../poolmgr/readyPodController.go | 2 +- 3 files changed, 51 insertions(+), 19 deletions(-) diff --git a/pkg/executor/executortype/poolmgr/gp.go b/pkg/executor/executortype/poolmgr/gp.go index 563078194a..f77678cb88 100644 --- a/pkg/executor/executortype/poolmgr/gp.go +++ b/pkg/executor/executortype/poolmgr/gp.go @@ -32,6 +32,7 @@ import ( "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" + k8s_err "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -61,7 +62,7 @@ type ( logger *zap.Logger env *fv1.Environment deployment *appsv1.Deployment // kubernetes deployment - namespace string // namespace to keep our resources + fnNamespace string // namespace to keep our resources podReadyTimeout time.Duration // timeout for generic pods to become ready fsCache *fscache.FunctionServiceCache // cache funcSvc's by function, address and podname useSvc bool // create k8s service for specialized pods @@ -90,7 +91,7 @@ func MakeGenericPool( kubernetesClient kubernetes.Interface, metricsClient metricsclient.Interface, env *fv1.Environment, - namespace string, + fnNamespace string, fsCache *fscache.FunctionServiceCache, fetcherConfig *fetcherConfig.Config, instanceID string, @@ -119,7 +120,7 @@ func MakeGenericPool( fissionClient: fissionClient, kubernetesClient: kubernetesClient, metricsClient: metricsClient, - namespace: namespace, + fnNamespace: fnNamespace, podReadyTimeout: podReadyTimeout, fsCache: fsCache, fetcherConfig: fetcherConfig, @@ -186,7 +187,7 @@ func (gp *GenericPool) updateCPUUtilizationSvc(ctx context.Context) { } serviceFunc := func(ctx context.Context) { - podMetricsList, err := gp.metricsClient.MetricsV1beta1().PodMetricses(gp.namespace).List(ctx, metav1.ListOptions{ + podMetricsList, err := gp.metricsClient.MetricsV1beta1().PodMetricses(gp.fnNamespace).List(ctx, metav1.ListOptions{ LabelSelector: "managed=false", }) if err != nil { @@ -338,12 +339,12 @@ func (gp *GenericPool) scheduleDeletePod(ctx context.Context, name string) { // cleaned up. (We need a better solutions for both those things; log // aggregation and storage will help.) gp.logger.Error("error in pod - scheduling cleanup", zap.String("pod", name)) - err := gp.kubernetesClient.CoreV1().Pods(gp.namespace).Delete(ctx, name, metav1.DeleteOptions{}) + err := gp.kubernetesClient.CoreV1().Pods(gp.fnNamespace).Delete(ctx, name, metav1.DeleteOptions{}) if err != nil { gp.logger.Error( "error deleting pod", zap.String("name", name), - zap.String("namespace", gp.namespace), + zap.String("namespace", gp.fnNamespace), zap.Error(err), ) } @@ -387,10 +388,41 @@ func (gp *GenericPool) specializePod(ctx context.Context, pod *apiv1.Pod, fn *fv if len(podIP) == 0 { return errors.Errorf("Pod %s in namespace %s has no IP", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace) } + for _, cm := range fn.Spec.ConfigMaps { + _, err := gp.kubernetesClient.CoreV1().ConfigMaps(gp.fnNamespace).Get(ctx, cm.Name, metav1.GetOptions{}) + if err != nil { + if k8s_err.IsNotFound(err) { + logger.Error("configmap namespace mismatch", zap.String("error", "configmap must be in same namespace as function namespace"), + zap.String("configmap_name", cm.Name), + zap.String("configmap_namespace", cm.Namespace), + zap.String("function_name", fn.ObjectMeta.Name), + zap.String("function_namespace", gp.fnNamespace)) + return fmt.Errorf(fmt.Sprintf("configmap %s must be in same namespace as function namespace", cm.Name)) + } else { + return err + } + } + } + for _, sec := range fn.Spec.Secrets { + _, err := gp.kubernetesClient.CoreV1().Secrets(gp.fnNamespace).Get(ctx, sec.Name, metav1.GetOptions{}) + if err != nil { + if k8s_err.IsNotFound(err) { + logger.Error("secret namespace mismatch", zap.String("error", "secret must be in same namespace as function namespace"), + zap.String("secret_name", sec.Name), + zap.String("secret_namespace", sec.Namespace), + zap.String("function_name", fn.ObjectMeta.Name), + zap.String("function_namespace", gp.fnNamespace)) + return fmt.Errorf(fmt.Sprintf("secret %s must be in same namespace as function namespace", sec.Name)) + } else { + return err + } + + } + } // specialize pod with service if gp.useIstio { svc := utils.GetFunctionIstioServiceName(fn.ObjectMeta.Name, fn.ObjectMeta.Namespace) - podIP = fmt.Sprintf("%v.%v", svc, gp.namespace) + podIP = fmt.Sprintf("%v.%v", svc, gp.fnNamespace) } // tell fetcher to get the function. @@ -432,7 +464,7 @@ func (gp *GenericPool) createSvc(ctx context.Context, name string, labels map[st Selector: labels, }, } - svc, err := gp.kubernetesClient.CoreV1().Services(gp.namespace).Create(ctx, &service, metav1.CreateOptions{}) + svc, err := gp.kubernetesClient.CoreV1().Services(gp.fnNamespace).Create(ctx, &service, metav1.CreateOptions{}) return svc, err } @@ -470,7 +502,7 @@ func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscac "functionName": fn.ObjectMeta.Name, "functionUid": string(fn.ObjectMeta.UID), } - podList, err := gp.kubernetesClient.CoreV1().Pods(gp.namespace).List(ctx, metav1.ListOptions{ + podList, err := gp.kubernetesClient.CoreV1().Pods(gp.fnNamespace).List(ctx, metav1.ListOptions{ LabelSelector: labels.Set(sel).AsSelector().String(), }) if err != nil { @@ -480,7 +512,7 @@ func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscac // Remove old versions function pods for _, pod := range podList.Items { // Delete pod no matter what status it is - gp.kubernetesClient.CoreV1().Pods(gp.namespace).Delete(ctx, pod.ObjectMeta.Name, metav1.DeleteOptions{}) //nolint errcheck + gp.kubernetesClient.CoreV1().Pods(gp.fnNamespace).Delete(ctx, pod.ObjectMeta.Name, metav1.DeleteOptions{}) //nolint errcheck } } @@ -515,10 +547,10 @@ func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscac // the fission router isn't in the same namespace, so return a // namespace-qualified hostname - svcHost = fmt.Sprintf("%v.%v:8888", svcName, gp.namespace) + svcHost = fmt.Sprintf("%v.%v:8888", svcName, gp.fnNamespace) } else if gp.useIstio { svc := utils.GetFunctionIstioServiceName(fn.ObjectMeta.Name, fn.ObjectMeta.Namespace) - svcHost = fmt.Sprintf("%v.%v:8888", svc, gp.namespace) + svcHost = fmt.Sprintf("%v.%v:8888", svc, gp.fnNamespace) } else { svcHost = fmt.Sprintf("%v:8888", pod.Status.PodIP) } @@ -605,12 +637,12 @@ func (gp *GenericPool) destroy(ctx context.Context) error { } err := gp.kubernetesClient.AppsV1(). - Deployments(gp.namespace).Delete(ctx, gp.deployment.ObjectMeta.Name, delOpt) + Deployments(gp.fnNamespace).Delete(ctx, gp.deployment.ObjectMeta.Name, delOpt) if err != nil { gp.logger.Error("error destroying deployment", zap.Error(err), zap.String("deployment_name", gp.deployment.ObjectMeta.Name), - zap.String("deployment_namespace", gp.namespace)) + zap.String("deployment_namespace", gp.fnNamespace)) return err } return nil diff --git a/pkg/executor/executortype/poolmgr/gp_deployment.go b/pkg/executor/executortype/poolmgr/gp_deployment.go index 99d1b1d289..c44c440c4f 100644 --- a/pkg/executor/executortype/poolmgr/gp_deployment.go +++ b/pkg/executor/executortype/poolmgr/gp_deployment.go @@ -205,13 +205,13 @@ func (gp *GenericPool) createPoolDeployment(ctx context.Context, env *fv1.Enviro ObjectMeta: deploymentMeta, Spec: *deploymentSpec, } - depl, err := gp.kubernetesClient.AppsV1().Deployments(gp.namespace).Get(ctx, deployment.Name, metav1.GetOptions{}) + depl, err := gp.kubernetesClient.AppsV1().Deployments(gp.fnNamespace).Get(ctx, deployment.Name, metav1.GetOptions{}) if err == nil { if depl.Annotations[fv1.EXECUTOR_INSTANCEID_LABEL] != gp.instanceID { deployment.Annotations[fv1.EXECUTOR_INSTANCEID_LABEL] = gp.instanceID // Update with the latest deployment spec. Kubernetes will trigger // rolling update if spec is different from the one in the cluster. - depl, err = gp.kubernetesClient.AppsV1().Deployments(gp.namespace).Update(ctx, deployment, metav1.UpdateOptions{}) + depl, err = gp.kubernetesClient.AppsV1().Deployments(gp.fnNamespace).Update(ctx, deployment, metav1.UpdateOptions{}) } gp.deployment = depl return err @@ -220,7 +220,7 @@ func (gp *GenericPool) createPoolDeployment(ctx context.Context, env *fv1.Enviro return err } - depl, err = gp.kubernetesClient.AppsV1().Deployments(gp.namespace).Create(ctx, deployment, metav1.CreateOptions{}) + depl, err = gp.kubernetesClient.AppsV1().Deployments(gp.fnNamespace).Create(ctx, deployment, metav1.CreateOptions{}) if err != nil { gp.logger.Error("error creating deployment in kubernetes", zap.Error(err), zap.String("deployment", deployment.Name)) return err @@ -256,7 +256,7 @@ func (gp *GenericPool) updatePoolDeployment(ctx context.Context, env *fv1.Enviro } newDeployment.Spec.Replicas = &poolsize - depl, err := gp.kubernetesClient.AppsV1().Deployments(gp.namespace).Update(ctx, newDeployment, metav1.UpdateOptions{}) + depl, err := gp.kubernetesClient.AppsV1().Deployments(gp.fnNamespace).Update(ctx, newDeployment, metav1.UpdateOptions{}) if err != nil { logger.Error("error updating deployment in kubernetes", zap.Error(err), zap.String("deployment", depl.Name)) return err diff --git a/pkg/executor/executortype/poolmgr/readyPodController.go b/pkg/executor/executortype/poolmgr/readyPodController.go index f1946d02e5..a5d489dec7 100644 --- a/pkg/executor/executortype/poolmgr/readyPodController.go +++ b/pkg/executor/executortype/poolmgr/readyPodController.go @@ -31,7 +31,7 @@ func (gp *GenericPool) readyPodEventHandlers() k8sCache.ResourceEventHandlerFunc func (gp *GenericPool) setupReadyPodController() error { gp.readyPodQueue = workqueue.NewDelayingQueue() - informerFactory, err := utils.GetInformerFactoryByReadyPod(gp.kubernetesClient, gp.namespace, gp.deployment.Spec.Selector) + informerFactory, err := utils.GetInformerFactoryByReadyPod(gp.kubernetesClient, gp.fnNamespace, gp.deployment.Spec.Selector) if err != nil { return err }