Skip to content

Commit

Permalink
Deployments to work with specific namespaces instead of all namespaces (
Browse files Browse the repository at this point in the history
#2635)

* remove namespaceAll in deployments
* remove GetNamespace function from informer
* added logger and improvement for naming
* move reaper logic to utils

Signed-off-by: Sanket Sudake <sanketsudake@gmail.com>
Co-authored-by: Sanket Sudake <sanketsudake@gmail.com>
  • Loading branch information
shubham-bansal96 and sanketsudake committed Nov 29, 2022
1 parent 0aec9e1 commit 526b5f0
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 231 deletions.
2 changes: 1 addition & 1 deletion cmd/preupgradechecks/checks.go
Expand Up @@ -123,7 +123,7 @@ func (client *PreUpgradeTaskClient) VerifyFunctionSpecReferences(ctx context.Con
var fList *fv1.FunctionList
errs := &multierror.Error{}

for _, namespace := range utils.GetNamespaces() {
for _, namespace := range utils.DefaultNSResolver().FissionResourceNS {
for i := 0; i < maxRetries; i++ {
fList, err = client.fissionClient.CoreV1().Functions(namespace).List(ctx, metav1.ListOptions{})
if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/executor/executor.go
Expand Up @@ -290,7 +290,7 @@ func StartExecutor(ctx context.Context, logger *zap.Logger, port int) error {
envInformer := make(map[string]finformerv1.EnvironmentInformer, 0)
pkgInformer := make(map[string]finformerv1.PackageInformer, 0)

for _, ns := range utils.GetNamespaces() {
for _, ns := range utils.DefaultNSResolver().FissionResourceNS {
factory := genInformer.NewFilteredSharedInformerFactory(fissionClient, time.Minute*30, ns, nil)
funcInformer[ns] = factory.Core().V1().Functions()
envInformer[ns] = factory.Core().V1().Environments()
Expand Down Expand Up @@ -369,7 +369,7 @@ func StartExecutor(ctx context.Context, logger *zap.Logger, port int) error {
configMapInformer := make(map[string]k8sInformersv1.ConfigMapInformer, 0)
secretInformer := make(map[string]k8sInformersv1.SecretInformer, 0)

for _, ns := range utils.GetNamespaces() {
for _, ns := range utils.DefaultNSResolver().FissionResourceNS {
factory := k8sInformers.NewFilteredSharedInformerFactory(kubernetesClient, time.Minute*30, ns, nil)
configMapInformer[ns] = factory.Core().V1().ConfigMaps()
secretInformer[ns] = factory.Core().V1().Secrets()
Expand Down
5 changes: 3 additions & 2 deletions pkg/executor/executortype/container/containermgr.go
Expand Up @@ -242,7 +242,8 @@ func (caaf *Container) RefreshFuncPods(ctx context.Context, logger *zap.Logger,

funcLabels := caaf.getDeployLabels(f.ObjectMeta)

dep, err := caaf.kubernetesClient.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
nsResolver := utils.DefaultNSResolver()
dep, err := caaf.kubernetesClient.AppsV1().Deployments(nsResolver.GetFunctionNS(f.ObjectMeta.Namespace)).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(funcLabels).AsSelector().String(),
})
if err != nil {
Expand Down Expand Up @@ -273,7 +274,7 @@ func (caaf *Container) RefreshFuncPods(ctx context.Context, logger *zap.Logger,
func (caaf *Container) AdoptExistingResources(ctx context.Context) {
wg := &sync.WaitGroup{}

for _, namepsace := range utils.GetNamespaces() {
for _, namepsace := range utils.DefaultNSResolver().FissionResourceNS {
fnList, err := caaf.fissionClient.CoreV1().Functions(namepsace).List(ctx, metav1.ListOptions{})
if err != nil {
caaf.logger.Error("error getting function list", zap.Error(err))
Expand Down
6 changes: 3 additions & 3 deletions pkg/executor/executortype/newdeploy/newdeploymgr.go
Expand Up @@ -260,7 +260,7 @@ func (deploy *NewDeploy) RefreshFuncPods(ctx context.Context, logger *zap.Logger
UID: env.ObjectMeta.UID,
})

dep, err := deploy.kubernetesClient.AppsV1().Deployments(metav1.NamespaceAll).List(ctx, metav1.ListOptions{
dep, err := deploy.kubernetesClient.AppsV1().Deployments(deploy.nsResolver.GetFunctionNS(f.ObjectMeta.Namespace)).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(funcLabels).AsSelector().String(),
})

Expand Down Expand Up @@ -292,7 +292,7 @@ func (deploy *NewDeploy) RefreshFuncPods(ctx context.Context, logger *zap.Logger
func (deploy *NewDeploy) AdoptExistingResources(ctx context.Context) {
wg := &sync.WaitGroup{}

for _, namepsace := range utils.GetNamespaces() {
for _, namepsace := range utils.DefaultNSResolver().FissionResourceNS {
fnList, err := deploy.fissionClient.CoreV1().Functions(namepsace).List(ctx, metav1.ListOptions{})
if err != nil {
deploy.logger.Error("error getting function list", zap.Error(err))
Expand Down Expand Up @@ -769,7 +769,7 @@ func (deploy *NewDeploy) idleObjectReaper(ctx context.Context) {

func (deploy *NewDeploy) doIdleObjectReaper(ctx context.Context) {
envList := make(map[k8sTypes.UID]struct{})
for _, namespace := range utils.GetNamespaces() {
for _, namespace := range utils.DefaultNSResolver().FissionResourceNS {
envs, err := deploy.fissionClient.CoreV1().Environments(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
deploy.logger.Fatal("failed to get environment list", zap.Error(err), zap.String("namespace", namespace))
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/executortype/newdeploy/newdeploymgr_test.go
Expand Up @@ -92,6 +92,7 @@ func TestRefreshFuncPods(t *testing.T) {
nsResolver := utils.NamespaceResolver{
FunctionNamespace: functionNamespace,
BuiderNamespace: builderNamespace,
DefaultNamespace: defaultNamespace,
}
ndm.nsResolver = &nsResolver

Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/executortype/poolmgr/gpm.go
Expand Up @@ -317,7 +317,7 @@ func (gpm *GenericPoolManager) AdoptExistingResources(ctx context.Context) {
envMap := make(map[string]fv1.Environment)
wg := &sync.WaitGroup{}

for _, namespace := range utils.GetNamespaces() {
for _, namespace := range utils.DefaultNSResolver().FissionResourceNS {
envs, err := gpm.fissionClient.CoreV1().Environments(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
gpm.logger.Error("error getting environment list", zap.Error(err))
Expand Down Expand Up @@ -351,7 +351,7 @@ func (gpm *GenericPoolManager) AdoptExistingResources(ctx context.Context) {
fv1.EXECUTOR_TYPE: string(fv1.ExecutorTypePoolmgr),
}

for _, namespace := range utils.GetNamespaces() {
for _, namespace := range utils.DefaultNSResolver().FissionResourceNS {
podList, err := gpm.kubernetesClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(l).AsSelector().String(),
})
Expand Down Expand Up @@ -588,7 +588,7 @@ func (gpm *GenericPoolManager) idleObjectReaper(ctx context.Context) {

func (gpm *GenericPoolManager) doIdleObjectReaper(ctx context.Context) {
envList := make(map[k8sTypes.UID]struct{})
for _, namespace := range utils.GetNamespaces() {
for _, namespace := range utils.DefaultNSResolver().FissionResourceNS {
envs, err := gpm.fissionClient.CoreV1().Environments(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
gpm.logger.Error("failed to get environment list", zap.Error(err), zap.String("namespace", namespace))
Expand All @@ -601,7 +601,7 @@ func (gpm *GenericPoolManager) doIdleObjectReaper(ctx context.Context) {
}

fnList := make(map[k8sTypes.UID]fv1.Function)
for _, namespace := range utils.GetNamespaces() {
for _, namespace := range utils.DefaultNSResolver().FissionResourceNS {
fns, err := gpm.fissionClient.CoreV1().Functions(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
gpm.logger.Error("failed to get environment list", zap.Error(err), zap.String("namespace", namespace))
Expand Down

0 comments on commit 526b5f0

Please sign in to comment.