Skip to content

Commit

Permalink
Check pods events via infomer in user configured namespaces (#2653)
Browse files Browse the repository at this point in the history
* informer changes for event checker in multi namespace

* run informers in wait group
  • Loading branch information
shubham-bansal96 committed Dec 5, 2022
1 parent 9612bae commit ee623d3
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 83 deletions.
142 changes: 59 additions & 83 deletions pkg/executor/executortype/poolmgr/gpm.go
Expand Up @@ -33,10 +33,8 @@ import (
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
k8sTypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
k8sInformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
Expand Down Expand Up @@ -171,10 +169,12 @@ func MakeGenericPoolManager(ctx context.Context,
}

func (gpm *GenericPoolManager) Run(ctx context.Context) {
waitSynced := make([]k8sCache.InformerSynced, 0)
for _, podListerSynced := range gpm.podListerSynced {
if ok := k8sCache.WaitForCacheSync(ctx.Done(), podListerSynced); !ok {
gpm.logger.Fatal("failed to wait for caches to sync")
}
waitSynced = append(waitSynced, podListerSynced)
}
if ok := k8sCache.WaitForCacheSync(ctx.Done(), waitSynced...); !ok {
gpm.logger.Fatal("failed to wait for caches to sync")
}
go gpm.service()
gpm.poolPodC.InjectGpm(gpm)
Expand Down Expand Up @@ -680,93 +680,69 @@ func (gpm *GenericPoolManager) doIdleObjectReaper(ctx context.Context) {

// WebsocketStartEventChecker checks if the pod has emitted a websocket connection start event
func (gpm *GenericPoolManager) WebsocketStartEventChecker(ctx context.Context, kubeClient kubernetes.Interface) {

informer := k8sCache.NewSharedInformer(
&k8sCache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = "involvedObject.kind=Pod,type=Normal,reason=WsConnectionStarted"
return kubeClient.CoreV1().Events(apiv1.NamespaceAll).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = "involvedObject.kind=Pod,type=Normal,reason=WsConnectionStarted"
return kubeClient.CoreV1().Events(apiv1.NamespaceAll).Watch(ctx, options)
},
},
&apiv1.Event{},
0,
)

stopper := make(chan struct{})
defer close(stopper)
informer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(metav1.Object)
gpm.logger.Info("Websocket event detected for pod",
zap.String("Pod name", mObj.GetName()))

podName := strings.SplitAfter(mObj.GetName(), ".")
if fsvc, ok := gpm.fsCache.PodToFsvc.Load(strings.TrimSuffix(podName[0], ".")); ok {
fsvc, ok := fsvc.(*fscache.FuncSvc)
if !ok {
gpm.logger.Error("could not convert item from PodToFsvc")
return
}
gpm.fsCache.WebsocketFsvc.Store(fsvc.Name, true)
}
},
})
informer.Run(stopper)

var wg wait.Group
for _, informer := range utils.GetInformerEventChecker(ctx, kubeClient, "WsConnectionStarted") {
informer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(metav1.Object)
gpm.logger.Info("Websocket event detected for pod",
zap.String("Pod name", mObj.GetName()))

podName := strings.SplitAfter(mObj.GetName(), ".")
if fsvc, ok := gpm.fsCache.PodToFsvc.Load(strings.TrimSuffix(podName[0], ".")); ok {
fsvc, ok := fsvc.(*fscache.FuncSvc)
if !ok {
gpm.logger.Error("could not convert item from PodToFsvc")
return
}
gpm.fsCache.WebsocketFsvc.Store(fsvc.Name, true)
}
},
})
wg.StartWithChannel(stopper, informer.Run)
}
wg.Wait()
}

// NoActiveConnectionEventChecker checks if the pod has emitted an inactive event
func (gpm *GenericPoolManager) NoActiveConnectionEventChecker(ctx context.Context, kubeClient kubernetes.Interface) {

informer := k8sCache.NewSharedInformer(
&k8sCache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = "involvedObject.kind=Pod,type=Normal,reason=NoActiveConnections"
return kubeClient.CoreV1().Events(apiv1.NamespaceAll).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = "involvedObject.kind=Pod,type=Normal,reason=NoActiveConnections"
return kubeClient.CoreV1().Events(apiv1.NamespaceAll).Watch(ctx, options)
},
},
&apiv1.Event{},
0,
)

stopper := make(chan struct{})
defer close(stopper)
informer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(metav1.Object)
gpm.logger.Info("Inactive event detected for pod",
zap.String("Pod name", mObj.GetName()))

podName := strings.SplitAfter(mObj.GetName(), ".")
if fsvc, ok := gpm.fsCache.PodToFsvc.Load(strings.TrimSuffix(podName[0], ".")); ok {
fsvc, ok := fsvc.(*fscache.FuncSvc)
if !ok {
gpm.logger.Error("could not convert value from PodToFsvc")
return
}
gpm.fsCache.DeleteFunctionSvc(ctx, fsvc)
for i := range fsvc.KubernetesObjects {
gpm.logger.Info("release idle function resources due to inactivity",
zap.String("function", fsvc.Function.Name),
zap.String("address", fsvc.Address),
zap.String("executor", string(fsvc.Executor)),
zap.String("pod", fsvc.Name),
)
reaper.CleanupKubeObject(ctx, gpm.logger, gpm.kubernetesClient, &fsvc.KubernetesObjects[i])
time.Sleep(50 * time.Millisecond)
}
}

},
})
informer.Run(stopper)
var wg wait.Group
for _, informer := range utils.GetInformerEventChecker(ctx, kubeClient, "WsConnectionStarted") {
informer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
mObj := obj.(metav1.Object)
gpm.logger.Info("Inactive event detected for pod",
zap.String("Pod name", mObj.GetName()))

podName := strings.SplitAfter(mObj.GetName(), ".")
if fsvc, ok := gpm.fsCache.PodToFsvc.Load(strings.TrimSuffix(podName[0], ".")); ok {
fsvc, ok := fsvc.(*fscache.FuncSvc)
if !ok {
gpm.logger.Error("could not convert value from PodToFsvc")
return
}
gpm.fsCache.DeleteFunctionSvc(ctx, fsvc)
for i := range fsvc.KubernetesObjects {
gpm.logger.Info("release idle function resources due to inactivity",
zap.String("function", fsvc.Function.Name),
zap.String("address", fsvc.Address),
zap.String("executor", string(fsvc.Executor)),
zap.String("pod", fsvc.Name),
)
reaper.CleanupKubeObject(ctx, gpm.logger, gpm.kubernetesClient, &fsvc.KubernetesObjects[i])
time.Sleep(50 * time.Millisecond)
}
}

},
})
wg.StartWithChannel(stopper, informer.Run)
}
wg.Wait()
}
27 changes: 27 additions & 0 deletions pkg/utils/informer.go
@@ -1,11 +1,16 @@
package utils

import (
"context"
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
k8sInformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -69,6 +74,28 @@ func GetK8sInformersForNamespaces(client kubernetes.Interface, defaultSync time.
return informers
}

func GetInformerEventChecker(ctx context.Context, client kubernetes.Interface, reason string) map[string]cache.SharedInformer {
informers := make(map[string]cache.SharedInformer)
namespaces := DefaultNSResolver()
for _, ns := range namespaces.FissionNSWithOptions(WithBuilderNs(), WithFunctionNs(), WithDefaultNs()) {
informers[ns] = cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fmt.Sprintf("involvedObject.kind=Pod,type=Normal,reason=%s", reason)
return client.CoreV1().Events(ns).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fmt.Sprintf("involvedObject.kind=Pod,type=Normal,reason=%s", reason)
return client.CoreV1().Events(ns).Watch(ctx, options)
},
},
&apiv1.Event{},
0,
)
}
return informers
}

func GetInformerFactoryByExecutor(client kubernetes.Interface, labels labels.Selector, defaultResync time.Duration) map[string]k8sInformers.SharedInformerFactory {
informerFactory := make(map[string]k8sInformers.SharedInformerFactory)

Expand Down

0 comments on commit ee623d3

Please sign in to comment.