Skip to content

Commit

Permalink
poolmgr: stop pod specialization when pod namespace and cm/secret nam…
Browse files Browse the repository at this point in the history
…espace is mismatched (#2703)

Signed-off-by: Nikhil Sharma <nikhilsharma230303@gmail.com>
  • Loading branch information
NikhilSharmaWe committed Mar 11, 2023
1 parent 0de8923 commit a93e9b4
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 19 deletions.
60 changes: 46 additions & 14 deletions pkg/executor/executortype/poolmgr/gp.go
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
k8s_err "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -61,7 +62,7 @@ type (
logger *zap.Logger
env *fv1.Environment
deployment *appsv1.Deployment // kubernetes deployment
namespace string // namespace to keep our resources
fnNamespace string // namespace to keep our resources
podReadyTimeout time.Duration // timeout for generic pods to become ready
fsCache *fscache.FunctionServiceCache // cache funcSvc's by function, address and podname
useSvc bool // create k8s service for specialized pods
Expand Down Expand Up @@ -90,7 +91,7 @@ func MakeGenericPool(
kubernetesClient kubernetes.Interface,
metricsClient metricsclient.Interface,
env *fv1.Environment,
namespace string,
fnNamespace string,
fsCache *fscache.FunctionServiceCache,
fetcherConfig *fetcherConfig.Config,
instanceID string,
Expand Down Expand Up @@ -119,7 +120,7 @@ func MakeGenericPool(
fissionClient: fissionClient,
kubernetesClient: kubernetesClient,
metricsClient: metricsClient,
namespace: namespace,
fnNamespace: fnNamespace,
podReadyTimeout: podReadyTimeout,
fsCache: fsCache,
fetcherConfig: fetcherConfig,
Expand Down Expand Up @@ -186,7 +187,7 @@ func (gp *GenericPool) updateCPUUtilizationSvc(ctx context.Context) {
}

serviceFunc := func(ctx context.Context) {
podMetricsList, err := gp.metricsClient.MetricsV1beta1().PodMetricses(gp.namespace).List(ctx, metav1.ListOptions{
podMetricsList, err := gp.metricsClient.MetricsV1beta1().PodMetricses(gp.fnNamespace).List(ctx, metav1.ListOptions{
LabelSelector: "managed=false",
})
if err != nil {
Expand Down Expand Up @@ -338,12 +339,12 @@ func (gp *GenericPool) scheduleDeletePod(ctx context.Context, name string) {
// cleaned up. (We need a better solutions for both those things; log
// aggregation and storage will help.)
gp.logger.Error("error in pod - scheduling cleanup", zap.String("pod", name))
err := gp.kubernetesClient.CoreV1().Pods(gp.namespace).Delete(ctx, name, metav1.DeleteOptions{})
err := gp.kubernetesClient.CoreV1().Pods(gp.fnNamespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
gp.logger.Error(
"error deleting pod",
zap.String("name", name),
zap.String("namespace", gp.namespace),
zap.String("namespace", gp.fnNamespace),
zap.Error(err),
)
}
Expand Down Expand Up @@ -387,10 +388,41 @@ func (gp *GenericPool) specializePod(ctx context.Context, pod *apiv1.Pod, fn *fv
if len(podIP) == 0 {
return errors.Errorf("Pod %s in namespace %s has no IP", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace)
}
for _, cm := range fn.Spec.ConfigMaps {
_, err := gp.kubernetesClient.CoreV1().ConfigMaps(gp.fnNamespace).Get(ctx, cm.Name, metav1.GetOptions{})
if err != nil {
if k8s_err.IsNotFound(err) {
logger.Error("configmap namespace mismatch", zap.String("error", "configmap must be in same namespace as function namespace"),
zap.String("configmap_name", cm.Name),
zap.String("configmap_namespace", cm.Namespace),
zap.String("function_name", fn.ObjectMeta.Name),
zap.String("function_namespace", gp.fnNamespace))
return fmt.Errorf(fmt.Sprintf("configmap %s must be in same namespace as function namespace", cm.Name))
} else {
return err
}
}
}
for _, sec := range fn.Spec.Secrets {
_, err := gp.kubernetesClient.CoreV1().Secrets(gp.fnNamespace).Get(ctx, sec.Name, metav1.GetOptions{})
if err != nil {
if k8s_err.IsNotFound(err) {
logger.Error("secret namespace mismatch", zap.String("error", "secret must be in same namespace as function namespace"),
zap.String("secret_name", sec.Name),
zap.String("secret_namespace", sec.Namespace),
zap.String("function_name", fn.ObjectMeta.Name),
zap.String("function_namespace", gp.fnNamespace))
return fmt.Errorf(fmt.Sprintf("secret %s must be in same namespace as function namespace", sec.Name))
} else {
return err
}

}
}
// specialize pod with service
if gp.useIstio {
svc := utils.GetFunctionIstioServiceName(fn.ObjectMeta.Name, fn.ObjectMeta.Namespace)
podIP = fmt.Sprintf("%v.%v", svc, gp.namespace)
podIP = fmt.Sprintf("%v.%v", svc, gp.fnNamespace)
}

// tell fetcher to get the function.
Expand Down Expand Up @@ -432,7 +464,7 @@ func (gp *GenericPool) createSvc(ctx context.Context, name string, labels map[st
Selector: labels,
},
}
svc, err := gp.kubernetesClient.CoreV1().Services(gp.namespace).Create(ctx, &service, metav1.CreateOptions{})
svc, err := gp.kubernetesClient.CoreV1().Services(gp.fnNamespace).Create(ctx, &service, metav1.CreateOptions{})
return svc, err
}

Expand Down Expand Up @@ -470,7 +502,7 @@ func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscac
"functionName": fn.ObjectMeta.Name,
"functionUid": string(fn.ObjectMeta.UID),
}
podList, err := gp.kubernetesClient.CoreV1().Pods(gp.namespace).List(ctx, metav1.ListOptions{
podList, err := gp.kubernetesClient.CoreV1().Pods(gp.fnNamespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(sel).AsSelector().String(),
})
if err != nil {
Expand All @@ -480,7 +512,7 @@ func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscac
// Remove old versions function pods
for _, pod := range podList.Items {
// Delete pod no matter what status it is
gp.kubernetesClient.CoreV1().Pods(gp.namespace).Delete(ctx, pod.ObjectMeta.Name, metav1.DeleteOptions{}) //nolint errcheck
gp.kubernetesClient.CoreV1().Pods(gp.fnNamespace).Delete(ctx, pod.ObjectMeta.Name, metav1.DeleteOptions{}) //nolint errcheck
}
}

Expand Down Expand Up @@ -515,10 +547,10 @@ func (gp *GenericPool) getFuncSvc(ctx context.Context, fn *fv1.Function) (*fscac

// the fission router isn't in the same namespace, so return a
// namespace-qualified hostname
svcHost = fmt.Sprintf("%v.%v:8888", svcName, gp.namespace)
svcHost = fmt.Sprintf("%v.%v:8888", svcName, gp.fnNamespace)
} else if gp.useIstio {
svc := utils.GetFunctionIstioServiceName(fn.ObjectMeta.Name, fn.ObjectMeta.Namespace)
svcHost = fmt.Sprintf("%v.%v:8888", svc, gp.namespace)
svcHost = fmt.Sprintf("%v.%v:8888", svc, gp.fnNamespace)
} else {
svcHost = fmt.Sprintf("%v:8888", pod.Status.PodIP)
}
Expand Down Expand Up @@ -605,12 +637,12 @@ func (gp *GenericPool) destroy(ctx context.Context) error {
}

err := gp.kubernetesClient.AppsV1().
Deployments(gp.namespace).Delete(ctx, gp.deployment.ObjectMeta.Name, delOpt)
Deployments(gp.fnNamespace).Delete(ctx, gp.deployment.ObjectMeta.Name, delOpt)
if err != nil {
gp.logger.Error("error destroying deployment",
zap.Error(err),
zap.String("deployment_name", gp.deployment.ObjectMeta.Name),
zap.String("deployment_namespace", gp.namespace))
zap.String("deployment_namespace", gp.fnNamespace))
return err
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/executortype/poolmgr/gp_deployment.go
Expand Up @@ -205,13 +205,13 @@ func (gp *GenericPool) createPoolDeployment(ctx context.Context, env *fv1.Enviro
ObjectMeta: deploymentMeta,
Spec: *deploymentSpec,
}
depl, err := gp.kubernetesClient.AppsV1().Deployments(gp.namespace).Get(ctx, deployment.Name, metav1.GetOptions{})
depl, err := gp.kubernetesClient.AppsV1().Deployments(gp.fnNamespace).Get(ctx, deployment.Name, metav1.GetOptions{})
if err == nil {
if depl.Annotations[fv1.EXECUTOR_INSTANCEID_LABEL] != gp.instanceID {
deployment.Annotations[fv1.EXECUTOR_INSTANCEID_LABEL] = gp.instanceID
// Update with the latest deployment spec. Kubernetes will trigger
// rolling update if spec is different from the one in the cluster.
depl, err = gp.kubernetesClient.AppsV1().Deployments(gp.namespace).Update(ctx, deployment, metav1.UpdateOptions{})
depl, err = gp.kubernetesClient.AppsV1().Deployments(gp.fnNamespace).Update(ctx, deployment, metav1.UpdateOptions{})
}
gp.deployment = depl
return err
Expand All @@ -220,7 +220,7 @@ func (gp *GenericPool) createPoolDeployment(ctx context.Context, env *fv1.Enviro
return err
}

depl, err = gp.kubernetesClient.AppsV1().Deployments(gp.namespace).Create(ctx, deployment, metav1.CreateOptions{})
depl, err = gp.kubernetesClient.AppsV1().Deployments(gp.fnNamespace).Create(ctx, deployment, metav1.CreateOptions{})
if err != nil {
gp.logger.Error("error creating deployment in kubernetes", zap.Error(err), zap.String("deployment", deployment.Name))
return err
Expand Down Expand Up @@ -256,7 +256,7 @@ func (gp *GenericPool) updatePoolDeployment(ctx context.Context, env *fv1.Enviro
}
newDeployment.Spec.Replicas = &poolsize

depl, err := gp.kubernetesClient.AppsV1().Deployments(gp.namespace).Update(ctx, newDeployment, metav1.UpdateOptions{})
depl, err := gp.kubernetesClient.AppsV1().Deployments(gp.fnNamespace).Update(ctx, newDeployment, metav1.UpdateOptions{})
if err != nil {
logger.Error("error updating deployment in kubernetes", zap.Error(err), zap.String("deployment", depl.Name))
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/executortype/poolmgr/readyPodController.go
Expand Up @@ -31,7 +31,7 @@ func (gp *GenericPool) readyPodEventHandlers() k8sCache.ResourceEventHandlerFunc

func (gp *GenericPool) setupReadyPodController() error {
gp.readyPodQueue = workqueue.NewDelayingQueue()
informerFactory, err := utils.GetInformerFactoryByReadyPod(gp.kubernetesClient, gp.namespace, gp.deployment.Spec.Selector)
informerFactory, err := utils.GetInformerFactoryByReadyPod(gp.kubernetesClient, gp.fnNamespace, gp.deployment.Spec.Selector)
if err != nil {
return err
}
Expand Down

0 comments on commit a93e9b4

Please sign in to comment.