From 69db5d33f538aa5a47c3bc83ec852539aa26ea42 Mon Sep 17 00:00:00 2001 From: Aleksey Pesternikov Date: Sat, 2 Dec 2023 18:07:08 -0800 Subject: [PATCH 1/6] early termination on errors --- testing/it_sidecar/it_sidecar.go | 81 ++++++++++++++++++------------- testing/it_sidecar/stern/main.go | 3 +- testing/it_sidecar/stern/watch.go | 35 ++++++------- 3 files changed, 64 insertions(+), 55 deletions(-) diff --git a/testing/it_sidecar/it_sidecar.go b/testing/it_sidecar/it_sidecar.go index 0d4252c4..17f8d40f 100644 --- a/testing/it_sidecar/it_sidecar.go +++ b/testing/it_sidecar/it_sidecar.go @@ -12,6 +12,7 @@ import ( "os" "os/signal" "path/filepath" + "slices" "strconv" "strings" "sync" @@ -70,25 +71,16 @@ var ( timeout = flag.Duration("timeout", time.Second*30, "execution timeout") deleteNamespace = flag.Bool("delete_namespace", false, "delete namespace as part of the cleanup") pfconfig = portForwardConf{services: make(map[string][]uint16)} - signalChannel chan os.Signal kubeconfig string waitForApps arrayFlags + allowErrors bool ) func init() { flag.Var(&pfconfig, "portforward", "set a port forward item in form of servicename:port") flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("KUBECONFIG"), "path to kubernetes config file") flag.Var(&waitForApps, "waitforapp", "wait for pods with label app=") -} - -// contains returns true if slice v contains an item -func contains(v []string, item string) bool { - for _, s := range v { - if s == item { - return true - } - } - return false + flag.BoolVar(&allowErrors, "allow_errors", false, "do not treat Failed in events as error. Use only if crashloop is expected") } // listReadyApps converts a list returned from podsInformer.GetStore().List() to a map containing apps with ready status @@ -118,16 +110,45 @@ func listReadyApps(list []interface{}) (readypods, notReady []string) { } } for _, app := range waitForApps { - if !contains(readyApps, app) { + if !slices.Contains(readyApps, app) { notReady = append(notReady, app) } } return } +// listenForEvents listens for events and prints them to stdout. if event reason is "Failed" it will call the failure callback +func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFailure func(*v1.Event)) { + + kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil) + eventsInformer := kubeInformerFactory.Core().V1().Events().Informer() + + fn := func(obj interface{}) { + event, ok := obj.(*v1.Event) + if !ok { + log.Print("Event informer received unexpected object") + return + } + log.Printf("EVENT %s %s %s %s", event.Namespace, event.InvolvedObject.Name, event.Reason, event.Message) + if event.Reason == "Failed" { + onFailure(event) + } + } + + handler := &cache.ResourceEventHandlerFuncs{ + AddFunc: fn, + DeleteFunc: fn, + UpdateFunc: func(old interface{}, new interface{}) { + fn(new) + }, + } + + eventsInformer.AddEventHandler(handler) + + go kubeInformerFactory.Start(ctx.Done()) +} + func waitForPods(ctx context.Context, clientset *kubernetes.Clientset) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() events := make(chan interface{}) fn := func(obj interface{}) { events <- obj @@ -181,7 +202,7 @@ func listReadyServices(list []interface{}) (ready, notReady []string) { } } for service, _ := range pfconfig.services { - if !contains(ready, service) { + if !slices.Contains(ready, service) { notReady = append(notReady, service) } } @@ -266,14 +287,14 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r url := clientset.CoreV1().RESTClient().Post().Resource("pods").Namespace(podnamespace).Name(podname).SubResource("portforward").URL() transport, upgrader, err := spdy.RoundTripperFor(config) if err != nil { - return fmt.Errorf("Could not create round tripper: %v", err) + return fmt.Errorf("could not create round tripper: %v", err) } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) ports := []string{fmt.Sprintf(":%d", port)} readyChan := make(chan struct{}, 1) pf, err := portforward.New(dialer, ports, ctx.Done(), readyChan, os.Stderr, os.Stderr) if err != nil { - return fmt.Errorf("Could not port forward into pod: %v", err) + return fmt.Errorf("could not port forward into pod: %v", err) } go func(port uint16) { err := pf.ForwardPorts() @@ -311,23 +332,10 @@ func cleanup(clientset *kubernetes.Clientset) { func main() { flag.Parse() log.SetOutput(os.Stdout) - ctx, cancel := context.WithTimeout(context.Background(), *timeout) - - signalChannel = make(chan os.Signal, 1) - signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM) - defer func() { - signal.Stop(signalChannel) - cancel() - }() - // cancel context if signal is received - go func() { - select { - case <-signalChannel: - cancel() - case <-ctx.Done(): - } - }() + defer cancel() + ctx, stopSignal := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + defer stopSignal() // cancel context if stdin is closed go func() { reader := bufio.NewReader(os.Stdin) @@ -356,6 +364,13 @@ func main() { go stern.Run(ctx, *namespace, clientset) + listenForEvents(ctx, clientset, func(event *v1.Event) { + if !allowErrors { + log.Print("Terminate due to failure") + cancel() + } + }) + if len(waitForApps) > 0 { err = waitForPods(ctx, clientset) if err != nil { diff --git a/testing/it_sidecar/stern/main.go b/testing/it_sidecar/stern/main.go index 192efaa9..25a2b17e 100644 --- a/testing/it_sidecar/stern/main.go +++ b/testing/it_sidecar/stern/main.go @@ -17,7 +17,6 @@ package stern import ( "context" "fmt" - "regexp" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" @@ -26,7 +25,7 @@ import ( // Run starts the main run loop func Run(ctx context.Context, namespace string, clientset *kubernetes.Clientset) error { - added, removed, err := Watch(ctx, clientset.CoreV1().Pods(namespace), regexp.MustCompile(".*"), regexp.MustCompile(".*"), RUNNING, labels.Everything()) + added, removed, err := Watch(ctx, clientset.CoreV1().Pods(namespace), RUNNING, labels.Everything()) if err != nil { return fmt.Errorf("failed to set up watch: %v", err) } diff --git a/testing/it_sidecar/stern/watch.go b/testing/it_sidecar/stern/watch.go index d27e7a87..8978aec8 100644 --- a/testing/it_sidecar/stern/watch.go +++ b/testing/it_sidecar/stern/watch.go @@ -17,13 +17,13 @@ package stern import ( "context" "fmt" - "regexp" + "log" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/typed/core/v1" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) // Target is a target to watch @@ -41,7 +41,7 @@ func (t *Target) GetID() string { // Watch starts listening to Kubernetes events and emits modified // containers/pods. The first result is targets added, the second is targets // removed -func Watch(ctx context.Context, i v1.PodInterface, podFilter *regexp.Regexp, containerFilter *regexp.Regexp, containerState ContainerState, labelSelector labels.Selector) (chan *Target, chan *Target, error) { +func Watch(ctx context.Context, i v1.PodInterface, containerState ContainerState, labelSelector labels.Selector) (chan *Target, chan *Target, error) { watcher, err := i.Watch(ctx, metav1.ListOptions{Watch: true, LabelSelector: labelSelector.String()}) if err != nil { return nil, nil, fmt.Errorf("failed to set up watch: %s", err) @@ -50,6 +50,12 @@ func Watch(ctx context.Context, i v1.PodInterface, podFilter *regexp.Regexp, con added := make(chan *Target) removed := make(chan *Target) + defer func() { + watcher.Stop() + close(added) + close(removed) + }() + go func() { for { select { @@ -67,9 +73,7 @@ func Watch(ctx context.Context, i v1.PodInterface, podFilter *regexp.Regexp, con continue } - if !podFilter.MatchString(pod.Name) { - continue - } + log.Printf("pod %s/%s event %s", pod.Namespace, pod.Name, e.Type) switch e.Type { case watch.Added, watch.Modified: @@ -78,13 +82,13 @@ func Watch(ctx context.Context, i v1.PodInterface, podFilter *regexp.Regexp, con statuses = append(statuses, pod.Status.ContainerStatuses...) for _, c := range statuses { - if !containerFilter.MatchString(c.Name) { - continue - } - // if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) { - // continue + // if c.RestartCount > 0 { + // log.Print("container ", c.Name, " has restart count ", c.RestartCount) + // return // } + log.Print("container ", c.Name, " has state ", c.State) + if containerState.Match(c.State) { added <- &Target{ Namespace: pod.Namespace, @@ -99,12 +103,6 @@ func Watch(ctx context.Context, i v1.PodInterface, podFilter *regexp.Regexp, con containers = append(containers, pod.Spec.InitContainers...) for _, c := range containers { - if !containerFilter.MatchString(c.Name) { - continue - } - // if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) { - // continue - // } removed <- &Target{ Namespace: pod.Namespace, @@ -114,9 +112,6 @@ func Watch(ctx context.Context, i v1.PodInterface, podFilter *regexp.Regexp, con } } case <-ctx.Done(): - watcher.Stop() - close(added) - close(removed) return } } From 439fced311c1877ea9721fd33118f051c8b078fc Mon Sep 17 00:00:00 2001 From: Aleksey Pesternikov Date: Sat, 2 Dec 2023 18:59:29 -0800 Subject: [PATCH 2/6] fix 2 race conditions in tailer --- testing/it_sidecar/it_sidecar.go | 14 +++- testing/it_sidecar/stern/main.go | 47 +++++-------- testing/it_sidecar/stern/tail.go | 30 ++++---- testing/it_sidecar/stern/watch.go | 112 ++++++++++++++---------------- 4 files changed, 93 insertions(+), 110 deletions(-) diff --git a/testing/it_sidecar/it_sidecar.go b/testing/it_sidecar/it_sidecar.go index 17f8d40f..da9f3329 100644 --- a/testing/it_sidecar/it_sidecar.go +++ b/testing/it_sidecar/it_sidecar.go @@ -12,7 +12,6 @@ import ( "os" "os/signal" "path/filepath" - "slices" "strconv" "strings" "sync" @@ -83,6 +82,15 @@ func init() { flag.BoolVar(&allowErrors, "allow_errors", false, "do not treat Failed in events as error. Use only if crashloop is expected") } +// contains returns true if slice v contains an item +func contains(v []string, item string) bool { + for _, s := range v { + if s == item { + return true + } + } + return false +} // listReadyApps converts a list returned from podsInformer.GetStore().List() to a map containing apps with ready status // app is determined by app label func listReadyApps(list []interface{}) (readypods, notReady []string) { @@ -110,7 +118,7 @@ func listReadyApps(list []interface{}) (readypods, notReady []string) { } } for _, app := range waitForApps { - if !slices.Contains(readyApps, app) { + if !contains(readyApps, app) { notReady = append(notReady, app) } } @@ -202,7 +210,7 @@ func listReadyServices(list []interface{}) (ready, notReady []string) { } } for service, _ := range pfconfig.services { - if !slices.Contains(ready, service) { + if !contains(ready, service) { notReady = append(notReady, service) } } diff --git a/testing/it_sidecar/stern/main.go b/testing/it_sidecar/stern/main.go index 25a2b17e..5bc5d6b6 100644 --- a/testing/it_sidecar/stern/main.go +++ b/testing/it_sidecar/stern/main.go @@ -25,39 +25,28 @@ import ( // Run starts the main run loop func Run(ctx context.Context, namespace string, clientset *kubernetes.Clientset) error { - added, removed, err := Watch(ctx, clientset.CoreV1().Pods(namespace), RUNNING, labels.Everything()) - if err != nil { - return fmt.Errorf("failed to set up watch: %v", err) - } - tails := make(map[string]*Tail) - go func() { - for p := range added { - id := p.GetID() - if tails[id] != nil { - continue - } - - tail := NewTail(p.Namespace, p.Pod, p.Container) - tails[id] = tail - - tail.Start(ctx, clientset.CoreV1().Pods(p.Namespace)) + err := Watch(ctx, clientset.CoreV1().Pods(namespace), RUNNING, labels.Everything(), func(p *Target) { + id := p.GetID() + if tails[id] != nil { + return } - }() - - go func() { - for p := range removed { - id := p.GetID() - if tails[id] == nil { - continue - } - tails[id].Close() - delete(tails, id) - } - }() - <-ctx.Done() + tail := NewTail(ctx, p.Namespace, p.Pod, p.Container) + tails[id] = tail + tail.Start(clientset.CoreV1().Pods(p.Namespace)) + }, func(p *Target) { + id := p.GetID() + if tails[id] == nil { + return + } + tails[id].Close() + delete(tails, id) + }) + if err != nil { + return fmt.Errorf("failed to set up watch: %v", err) + } return nil } diff --git a/testing/it_sidecar/stern/tail.go b/testing/it_sidecar/stern/tail.go index d766e1d8..6ea69f25 100644 --- a/testing/it_sidecar/stern/tail.go +++ b/testing/it_sidecar/stern/tail.go @@ -23,29 +23,30 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" ) type Tail struct { Namespace string PodName string ContainerName string - req *rest.Request - closed chan struct{} + ctx context.Context + cancel context.CancelFunc } // NewTail returns a new tail for a Kubernetes container inside a pod -func NewTail(namespace, podName, containerName string) *Tail { +func NewTail(ctx context.Context, namespace, podName, containerName string) *Tail { + ctx, cancel := context.WithCancel(ctx) return &Tail{ Namespace: namespace, PodName: podName, ContainerName: containerName, - closed: make(chan struct{}), + ctx: ctx, + cancel: cancel, } } // Start starts tailing -func (t *Tail) Start(ctx context.Context, i v1.PodInterface) { +func (t *Tail) Start(i v1.PodInterface) { go func() { fmt.Fprintf(os.Stderr, "+ %s/%s\n", t.PodName, t.ContainerName) @@ -56,7 +57,7 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface) { Container: t.ContainerName, }) - stream, err := req.Stream(ctx) + stream, err := req.Stream(t.ctx) if err != nil { log.Printf("Error opening stream to %s/%s/%s: %s", t.Namespace, t.PodName, t.ContainerName, err) return @@ -64,36 +65,31 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface) { defer stream.Close() go func() { - <-t.closed + <-t.ctx.Done() stream.Close() }() reader := bufio.NewReader(stream) for { - line, err := reader.ReadBytes('\n') + line, err := reader.ReadString('\n') if err != nil { return } - str := string(line) - t.Print(str) + t.Print(line) } }() - go func() { - <-ctx.Done() - close(t.closed) - }() } // Close stops tailing func (t *Tail) Close() { fmt.Fprintf(os.Stderr, "Log finished %s\n", t.PodName) - close(t.closed) + t.cancel() } -// Print prints a color coded log message with the pod and container names +// Print prints a log message with the pod and container names func (t *Tail) Print(msg string) { fmt.Fprintf(os.Stderr, "[%s/%s]: %s", t.PodName, t.ContainerName, msg) } diff --git a/testing/it_sidecar/stern/watch.go b/testing/it_sidecar/stern/watch.go index 8978aec8..a6024db5 100644 --- a/testing/it_sidecar/stern/watch.go +++ b/testing/it_sidecar/stern/watch.go @@ -41,81 +41,71 @@ func (t *Target) GetID() string { // Watch starts listening to Kubernetes events and emits modified // containers/pods. The first result is targets added, the second is targets // removed -func Watch(ctx context.Context, i v1.PodInterface, containerState ContainerState, labelSelector labels.Selector) (chan *Target, chan *Target, error) { +func Watch(ctx context.Context, i v1.PodInterface, containerState ContainerState, labelSelector labels.Selector, onAdded, onRemoved func(*Target)) error { watcher, err := i.Watch(ctx, metav1.ListOptions{Watch: true, LabelSelector: labelSelector.String()}) if err != nil { - return nil, nil, fmt.Errorf("failed to set up watch: %s", err) + return fmt.Errorf("failed to set up watch: %s", err) } - added := make(chan *Target) - removed := make(chan *Target) - - defer func() { - watcher.Stop() - close(added) - close(removed) - }() - - go func() { - for { - select { - case e := <-watcher.ResultChan(): - if e.Object == nil { - // Closed because of error - return - } + defer watcher.Stop() - var ( - pod *corev1.Pod - ok bool - ) - if pod, ok = e.Object.(*corev1.Pod); !ok { - continue - } + for { + select { + case e := <-watcher.ResultChan(): + if e.Object == nil { + // watcher channel was closed (because of error) + return nil + } - log.Printf("pod %s/%s event %s", pod.Namespace, pod.Name, e.Type) - - switch e.Type { - case watch.Added, watch.Modified: - var statuses []corev1.ContainerStatus - statuses = append(statuses, pod.Status.InitContainerStatuses...) - statuses = append(statuses, pod.Status.ContainerStatuses...) - - for _, c := range statuses { - // if c.RestartCount > 0 { - // log.Print("container ", c.Name, " has restart count ", c.RestartCount) - // return - // } - - log.Print("container ", c.Name, " has state ", c.State) - - if containerState.Match(c.State) { - added <- &Target{ - Namespace: pod.Namespace, - Pod: pod.Name, - Container: c.Name, - } - } - } - case watch.Deleted: - var containers []corev1.Container - containers = append(containers, pod.Spec.Containers...) - containers = append(containers, pod.Spec.InitContainers...) + var ( + pod *corev1.Pod + ok bool + ) + if pod, ok = e.Object.(*corev1.Pod); !ok { + continue + } - for _, c := range containers { + log.Printf("pod %s/%s event %s", pod.Namespace, pod.Name, e.Type) - removed <- &Target{ + switch e.Type { + case watch.Added, watch.Modified: + var statuses []corev1.ContainerStatus + statuses = append(statuses, pod.Status.InitContainerStatuses...) + statuses = append(statuses, pod.Status.ContainerStatuses...) + + for _, c := range statuses { + // if c.RestartCount > 0 { + // log.Print("container ", c.Name, " has restart count ", c.RestartCount) + // return + // } + + log.Print("container ", c.Name, " has state ", c.State) + + if containerState.Match(c.State) { + onAdded(&Target{ Namespace: pod.Namespace, Pod: pod.Name, Container: c.Name, - } + }) } } - case <-ctx.Done(): - return + case watch.Deleted: + var containers []corev1.Container + containers = append(containers, pod.Spec.Containers...) + containers = append(containers, pod.Spec.InitContainers...) + + for _, c := range containers { + + onRemoved(&Target{ + Namespace: pod.Namespace, + Pod: pod.Name, + Container: c.Name, + }) + } } + case <-ctx.Done(): + return nil } - }() + } - return added, removed, nil } From 349de4d1519556849b5aca951abde5bacca68a4f Mon Sep 17 00:00:00 2001 From: Aleksey Pesternikov Date: Sat, 2 Dec 2023 19:20:12 -0800 Subject: [PATCH 3/6] prio to ctx.Done() --- testing/it_sidecar/stern/watch.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testing/it_sidecar/stern/watch.go b/testing/it_sidecar/stern/watch.go index a6024db5..b52328da 100644 --- a/testing/it_sidecar/stern/watch.go +++ b/testing/it_sidecar/stern/watch.go @@ -51,6 +51,8 @@ func Watch(ctx context.Context, i v1.PodInterface, containerState ContainerState for { select { + case <-ctx.Done(): + return nil case e := <-watcher.ResultChan(): if e.Object == nil { // watcher channel was closed (because of error) @@ -103,8 +105,6 @@ func Watch(ctx context.Context, i v1.PodInterface, containerState ContainerState }) } } - case <-ctx.Done(): - return nil } } From 13d028383f2201db0b78320b2d59573e832cafe9 Mon Sep 17 00:00:00 2001 From: apesternikov Date: Sat, 10 Feb 2024 09:41:30 -0800 Subject: [PATCH 4/6] Update testing/it_sidecar/it_sidecar.go Co-authored-by: michaelschiff --- testing/it_sidecar/it_sidecar.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/it_sidecar/it_sidecar.go b/testing/it_sidecar/it_sidecar.go index da9f3329..0b7a4478 100644 --- a/testing/it_sidecar/it_sidecar.go +++ b/testing/it_sidecar/it_sidecar.go @@ -134,7 +134,7 @@ func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFai fn := func(obj interface{}) { event, ok := obj.(*v1.Event) if !ok { - log.Print("Event informer received unexpected object") + log.Println("Event informer received unexpected object") return } log.Printf("EVENT %s %s %s %s", event.Namespace, event.InvolvedObject.Name, event.Reason, event.Message) From 2847ef87f597746297f0ed42c2a9664d3daa7d6f Mon Sep 17 00:00:00 2001 From: apesternikov Date: Sat, 10 Feb 2024 09:41:41 -0800 Subject: [PATCH 5/6] Update testing/it_sidecar/it_sidecar.go Co-authored-by: michaelschiff --- testing/it_sidecar/it_sidecar.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/it_sidecar/it_sidecar.go b/testing/it_sidecar/it_sidecar.go index 0b7a4478..ae4da535 100644 --- a/testing/it_sidecar/it_sidecar.go +++ b/testing/it_sidecar/it_sidecar.go @@ -137,7 +137,7 @@ func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFai log.Println("Event informer received unexpected object") return } - log.Printf("EVENT %s %s %s %s", event.Namespace, event.InvolvedObject.Name, event.Reason, event.Message) + log.Printf("EVENT %s %s %s %s\n", event.Namespace, event.InvolvedObject.Name, event.Reason, event.Message) if event.Reason == "Failed" { onFailure(event) } From af7bf0589de4c0e5c8d3b3ebfb9d929e27caeea8 Mon Sep 17 00:00:00 2001 From: apesternikov Date: Sat, 10 Feb 2024 09:41:49 -0800 Subject: [PATCH 6/6] Update testing/it_sidecar/it_sidecar.go Co-authored-by: michaelschiff --- testing/it_sidecar/it_sidecar.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/it_sidecar/it_sidecar.go b/testing/it_sidecar/it_sidecar.go index ae4da535..e03e66cb 100644 --- a/testing/it_sidecar/it_sidecar.go +++ b/testing/it_sidecar/it_sidecar.go @@ -374,7 +374,7 @@ func main() { listenForEvents(ctx, clientset, func(event *v1.Event) { if !allowErrors { - log.Print("Terminate due to failure") + log.Println("Terminate due to failure") cancel() } })