-
Notifications
You must be signed in to change notification settings - Fork 17
aviold event duplicates #45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,8 +66,8 @@ func (i *arrayFlags) Set(value string) error { | |
| } | ||
|
|
||
| var ( | ||
| namespace = flag.String("namespace", os.Getenv("NAMESPACE"), "kubernetes namespace") | ||
| timeout = flag.Duration("timeout", time.Second*30, "execution timeout") | ||
| namespace string | ||
| timeout time.Duration | ||
| pfconfig = portForwardConf{services: make(map[string][]uint16)} | ||
| kubeconfig string | ||
| waitForApps arrayFlags | ||
|
|
@@ -76,6 +76,8 @@ var ( | |
| ) | ||
|
|
||
| func init() { | ||
| flag.StringVar(&namespace, "namespace", os.Getenv("NAMESPACE"), "kubernetes namespace") | ||
| flag.DurationVar(&timeout, "timeout", time.Second*30, "execution timeout") | ||
| flag.Var(&pfconfig, "portforward", "set a port forward item in form of servicename:port") | ||
| flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("KUBECONFIG"), "path to kubernetes config file") | ||
| flag.Var(&waitForApps, "waitforapp", "wait for pods with label app=<this parameter>") | ||
|
|
@@ -130,7 +132,7 @@ func listReadyApps(list []interface{}) (readypods, notReady []string) { | |
| // listenForEvents listens for events and prints them to stdout. if event reason is "Failed" it will call the failure callback | ||
| func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFailure func(*v1.Event)) { | ||
|
|
||
| kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil) | ||
| kubeInformerFactory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithNamespace(namespace)) | ||
| eventsInformer := kubeInformerFactory.Core().V1().Events().Informer() | ||
|
|
||
| fn := func(obj interface{}) { | ||
|
|
@@ -146,8 +148,7 @@ func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFai | |
| } | ||
|
|
||
| handler := &cache.ResourceEventHandlerFuncs{ | ||
| AddFunc: fn, | ||
| DeleteFunc: fn, | ||
| AddFunc: fn, | ||
| UpdateFunc: func(old interface{}, new interface{}) { | ||
| fn(new) | ||
| }, | ||
|
|
@@ -172,7 +173,7 @@ func waitForPods(ctx context.Context, clientset *kubernetes.Clientset) error { | |
| }, | ||
| } | ||
|
|
||
| kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil) | ||
| kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, namespace, nil) | ||
| podsInformer := kubeInformerFactory.Core().V1().Pods().Informer() | ||
| podsInformer.AddEventHandler(handler) | ||
| go kubeInformerFactory.Start(ctx.Done()) | ||
|
|
@@ -233,7 +234,7 @@ func waitForEndpoints(ctx context.Context, clientset *kubernetes.Clientset, conf | |
| }, | ||
| } | ||
|
|
||
| kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil) | ||
| kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, namespace, nil) | ||
| endpointsInformer := kubeInformerFactory.Core().V1().Endpoints().Informer() | ||
| endpointsInformer.AddEventHandler(handler) | ||
| go kubeInformerFactory.Start(ctx.Done()) | ||
|
|
@@ -276,7 +277,7 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r | |
| var wg sync.WaitGroup | ||
| wg.Add(len(ports)) | ||
| for _, port := range ports { | ||
| ep, err := clientset.CoreV1().Endpoints(*namespace).Get(ctx, serviceName, meta_v1.GetOptions{}) | ||
| ep, err := clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, meta_v1.GetOptions{}) | ||
| if err != nil { | ||
| return fmt.Errorf("error listing endpoints for service %s: %v", serviceName, err) | ||
| } | ||
|
|
@@ -335,7 +336,7 @@ var ErrTermSignalReceived = errors.New("TERM signal received") | |
| func main() { | ||
| flag.Parse() | ||
| log.SetOutput(os.Stdout) | ||
| ctx, timeoutCancel := context.WithTimeoutCause(context.Background(), *timeout, ErrTimedOut) | ||
| ctx, timeoutCancel := context.WithTimeoutCause(context.Background(), timeout, ErrTimedOut) | ||
| defer timeoutCancel() | ||
| ctx, cancel := context.WithCancelCause(ctx) | ||
| c := make(chan os.Signal, 1) | ||
|
|
@@ -372,7 +373,7 @@ func main() { | |
| clientset = kubernetes.NewForConfigOrDie(config) | ||
|
|
||
| go func() { | ||
| err := stern.Run(ctx, *namespace, clientset, allowErrors, disablePodLogs) | ||
| err := stern.Run(ctx, namespace, clientset, allowErrors, disablePodLogs) | ||
| if err != nil { | ||
| log.Print(err) | ||
| } | ||
|
Comment on lines
373
to
379
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error handling in the goroutine that runs Recommended Solution: |
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
Getcall to retrieve endpoints for a service is made inside a loop without any rate limiting or retries. This can lead to throttling by the Kubernetes API server if there are many ports, causing potential failures.Recommended Solution:
Implement rate limiting and retries for the
Getcall to ensure that the function can handle cases where the API server throttles requests. Use a backoff strategy to retry the request in case of transient errors.