From 5e9414abfcf8ef9a36d0b43038afb25fdf8fcdcd Mon Sep 17 00:00:00 2001 From: Aleksey Pesternikov Date: Sun, 18 Aug 2024 10:25:06 -0700 Subject: [PATCH 1/2] aviold event duplicates --- testing/it_sidecar/it_sidecar.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/testing/it_sidecar/it_sidecar.go b/testing/it_sidecar/it_sidecar.go index 43904496..f9631577 100644 --- a/testing/it_sidecar/it_sidecar.go +++ b/testing/it_sidecar/it_sidecar.go @@ -130,7 +130,7 @@ func listReadyApps(list []interface{}) (readypods, notReady []string) { // listenForEvents listens for events and prints them to stdout. if event reason is "Failed" it will call the failure callback func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFailure func(*v1.Event)) { - kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil) + kubeInformerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(*namespace)) eventsInformer := kubeInformerFactory.Core().V1().Events().Informer() fn := func(obj interface{}) { @@ -146,8 +146,7 @@ func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFai } handler := &cache.ResourceEventHandlerFuncs{ - AddFunc: fn, - DeleteFunc: fn, + AddFunc: fn, UpdateFunc: func(old interface{}, new interface{}) { fn(new) }, From 05923d59b41503adf0739cb976163780e12ceebb Mon Sep 17 00:00:00 2001 From: Aleksey Pesternikov Date: Sun, 18 Aug 2024 11:19:06 -0700 Subject: [PATCH 2/2] safer use of flag vars --- testing/it_sidecar/it_sidecar.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/testing/it_sidecar/it_sidecar.go b/testing/it_sidecar/it_sidecar.go index f9631577..53d23328 100644 --- a/testing/it_sidecar/it_sidecar.go +++ b/testing/it_sidecar/it_sidecar.go @@ -66,8 +66,8 @@ func (i *arrayFlags) Set(value string) error { } var ( - namespace = flag.String("namespace", os.Getenv("NAMESPACE"), "kubernetes namespace") - timeout = flag.Duration("timeout", time.Second*30, "execution timeout") + namespace string + timeout time.Duration pfconfig = portForwardConf{services: make(map[string][]uint16)} kubeconfig string waitForApps arrayFlags @@ -76,6 +76,8 @@ var ( ) func init() { + flag.StringVar(&namespace, "namespace", os.Getenv("NAMESPACE"), "kubernetes namespace") + flag.DurationVar(&timeout, "timeout", time.Second*30, "execution timeout") flag.Var(&pfconfig, "portforward", "set a port forward item in form of servicename:port") flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("KUBECONFIG"), "path to kubernetes config file") flag.Var(&waitForApps, "waitforapp", "wait for pods with label app=") @@ -130,7 +132,7 @@ func listReadyApps(list []interface{}) (readypods, notReady []string) { // listenForEvents listens for events and prints them to stdout. if event reason is "Failed" it will call the failure callback func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFailure func(*v1.Event)) { - kubeInformerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(*namespace)) + kubeInformerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace)) eventsInformer := kubeInformerFactory.Core().V1().Events().Informer() fn := func(obj interface{}) { @@ -171,7 +173,7 @@ func waitForPods(ctx context.Context, clientset *kubernetes.Clientset) error { }, } - kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil) + kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, namespace, nil) podsInformer := kubeInformerFactory.Core().V1().Pods().Informer() podsInformer.AddEventHandler(handler) go kubeInformerFactory.Start(ctx.Done()) @@ -232,7 +234,7 @@ func waitForEndpoints(ctx context.Context, clientset *kubernetes.Clientset, conf }, } - kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil) + kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, namespace, nil) endpointsInformer := kubeInformerFactory.Core().V1().Endpoints().Informer() endpointsInformer.AddEventHandler(handler) go kubeInformerFactory.Start(ctx.Done()) @@ -275,7 +277,7 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r var wg sync.WaitGroup wg.Add(len(ports)) for _, port := range ports { - ep, err := clientset.CoreV1().Endpoints(*namespace).Get(ctx, serviceName, meta_v1.GetOptions{}) + ep, err := clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, meta_v1.GetOptions{}) if err != nil { return fmt.Errorf("error listing endpoints for service %s: %v", serviceName, err) } @@ -334,7 +336,7 @@ var ErrTermSignalReceived = errors.New("TERM signal received") func main() { flag.Parse() log.SetOutput(os.Stdout) - ctx, timeoutCancel := context.WithTimeoutCause(context.Background(), *timeout, ErrTimedOut) + ctx, timeoutCancel := context.WithTimeoutCause(context.Background(), timeout, ErrTimedOut) defer timeoutCancel() ctx, cancel := context.WithCancelCause(ctx) c := make(chan os.Signal, 1) @@ -371,7 +373,7 @@ func main() { clientset = kubernetes.NewForConfigOrDie(config) go func() { - err := stern.Run(ctx, *namespace, clientset, allowErrors, disablePodLogs) + err := stern.Run(ctx, namespace, clientset, allowErrors, disablePodLogs) if err != nil { log.Print(err) }