diff --git a/testing/it_sidecar/it_sidecar.go b/testing/it_sidecar/it_sidecar.go index 0d4252c4..e03e66cb 100644 --- a/testing/it_sidecar/it_sidecar.go +++ b/testing/it_sidecar/it_sidecar.go @@ -70,15 +70,16 @@ var ( timeout = flag.Duration("timeout", time.Second*30, "execution timeout") deleteNamespace = flag.Bool("delete_namespace", false, "delete namespace as part of the cleanup") pfconfig = portForwardConf{services: make(map[string][]uint16)} - signalChannel chan os.Signal kubeconfig string waitForApps arrayFlags + allowErrors bool ) func init() { flag.Var(&pfconfig, "portforward", "set a port forward item in form of servicename:port") flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("KUBECONFIG"), "path to kubernetes config file") flag.Var(&waitForApps, "waitforapp", "wait for pods with label app=") + flag.BoolVar(&allowErrors, "allow_errors", false, "do not treat Failed in events as error. Use only if crashloop is expected") } // contains returns true if slice v contains an item @@ -90,7 +91,6 @@ func contains(v []string, item string) bool { } return false } - // listReadyApps converts a list returned from podsInformer.GetStore().List() to a map containing apps with ready status // app is determined by app label func listReadyApps(list []interface{}) (readypods, notReady []string) { @@ -125,9 +125,38 @@ func listReadyApps(list []interface{}) (readypods, notReady []string) { return } +// listenForEvents listens for events and prints them to stdout. if event reason is "Failed" it will call the failure callback +func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFailure func(*v1.Event)) { + + kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil) + eventsInformer := kubeInformerFactory.Core().V1().Events().Informer() + + fn := func(obj interface{}) { + event, ok := obj.(*v1.Event) + if !ok { + log.Println("Event informer received unexpected object") + return + } + log.Printf("EVENT %s %s %s %s\n", event.Namespace, event.InvolvedObject.Name, event.Reason, event.Message) + if event.Reason == "Failed" { + onFailure(event) + } + } + + handler := &cache.ResourceEventHandlerFuncs{ + AddFunc: fn, + DeleteFunc: fn, + UpdateFunc: func(old interface{}, new interface{}) { + fn(new) + }, + } + + eventsInformer.AddEventHandler(handler) + + go kubeInformerFactory.Start(ctx.Done()) +} + func waitForPods(ctx context.Context, clientset *kubernetes.Clientset) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() events := make(chan interface{}) fn := func(obj interface{}) { events <- obj @@ -266,14 +295,14 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r url := clientset.CoreV1().RESTClient().Post().Resource("pods").Namespace(podnamespace).Name(podname).SubResource("portforward").URL() transport, upgrader, err := spdy.RoundTripperFor(config) if err != nil { - return fmt.Errorf("Could not create round tripper: %v", err) + return fmt.Errorf("could not create round tripper: %v", err) } dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) ports := []string{fmt.Sprintf(":%d", port)} readyChan := make(chan struct{}, 1) pf, err := portforward.New(dialer, ports, ctx.Done(), readyChan, os.Stderr, os.Stderr) if err != nil { - return fmt.Errorf("Could not port forward into pod: %v", err) + return fmt.Errorf("could not port forward into pod: %v", err) } go func(port uint16) { err := pf.ForwardPorts() @@ -311,23 +340,10 @@ func cleanup(clientset *kubernetes.Clientset) { func main() { flag.Parse() log.SetOutput(os.Stdout) - ctx, cancel := context.WithTimeout(context.Background(), *timeout) - - signalChannel = make(chan os.Signal, 1) - signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM) - defer func() { - signal.Stop(signalChannel) - cancel() - }() - // cancel context if signal is received - go func() { - select { - case <-signalChannel: - cancel() - case <-ctx.Done(): - } - }() + defer cancel() + ctx, stopSignal := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) + defer stopSignal() // cancel context if stdin is closed go func() { reader := bufio.NewReader(os.Stdin) @@ -356,6 +372,13 @@ func main() { go stern.Run(ctx, *namespace, clientset) + listenForEvents(ctx, clientset, func(event *v1.Event) { + if !allowErrors { + log.Println("Terminate due to failure") + cancel() + } + }) + if len(waitForApps) > 0 { err = waitForPods(ctx, clientset) if err != nil { diff --git a/testing/it_sidecar/stern/main.go b/testing/it_sidecar/stern/main.go index 192efaa9..5bc5d6b6 100644 --- a/testing/it_sidecar/stern/main.go +++ b/testing/it_sidecar/stern/main.go @@ -17,7 +17,6 @@ package stern import ( "context" "fmt" - "regexp" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" @@ -26,39 +25,28 @@ import ( // Run starts the main run loop func Run(ctx context.Context, namespace string, clientset *kubernetes.Clientset) error { - added, removed, err := Watch(ctx, clientset.CoreV1().Pods(namespace), regexp.MustCompile(".*"), regexp.MustCompile(".*"), RUNNING, labels.Everything()) - if err != nil { - return fmt.Errorf("failed to set up watch: %v", err) - } - tails := make(map[string]*Tail) - go func() { - for p := range added { - id := p.GetID() - if tails[id] != nil { - continue - } - - tail := NewTail(p.Namespace, p.Pod, p.Container) - tails[id] = tail - - tail.Start(ctx, clientset.CoreV1().Pods(p.Namespace)) + err := Watch(ctx, clientset.CoreV1().Pods(namespace), RUNNING, labels.Everything(), func(p *Target) { + id := p.GetID() + if tails[id] != nil { + return } - }() - go func() { - for p := range removed { - id := p.GetID() - if tails[id] == nil { - continue - } - tails[id].Close() - delete(tails, id) + tail := NewTail(ctx, p.Namespace, p.Pod, p.Container) + tails[id] = tail + tail.Start(clientset.CoreV1().Pods(p.Namespace)) + }, func(p *Target) { + id := p.GetID() + if tails[id] == nil { + return } - }() - - <-ctx.Done() + tails[id].Close() + delete(tails, id) + }) + if err != nil { + return fmt.Errorf("failed to set up watch: %v", err) + } return nil } diff --git a/testing/it_sidecar/stern/tail.go b/testing/it_sidecar/stern/tail.go index d766e1d8..6ea69f25 100644 --- a/testing/it_sidecar/stern/tail.go +++ b/testing/it_sidecar/stern/tail.go @@ -23,29 +23,30 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/rest" ) type Tail struct { Namespace string PodName string ContainerName string - req *rest.Request - closed chan struct{} + ctx context.Context + cancel context.CancelFunc } // NewTail returns a new tail for a Kubernetes container inside a pod -func NewTail(namespace, podName, containerName string) *Tail { +func NewTail(ctx context.Context, namespace, podName, containerName string) *Tail { + ctx, cancel := context.WithCancel(ctx) return &Tail{ Namespace: namespace, PodName: podName, ContainerName: containerName, - closed: make(chan struct{}), + ctx: ctx, + cancel: cancel, } } // Start starts tailing -func (t *Tail) Start(ctx context.Context, i v1.PodInterface) { +func (t *Tail) Start(i v1.PodInterface) { go func() { fmt.Fprintf(os.Stderr, "+ %s/%s\n", t.PodName, t.ContainerName) @@ -56,7 +57,7 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface) { Container: t.ContainerName, }) - stream, err := req.Stream(ctx) + stream, err := req.Stream(t.ctx) if err != nil { log.Printf("Error opening stream to %s/%s/%s: %s", t.Namespace, t.PodName, t.ContainerName, err) return @@ -64,36 +65,31 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface) { defer stream.Close() go func() { - <-t.closed + <-t.ctx.Done() stream.Close() }() reader := bufio.NewReader(stream) for { - line, err := reader.ReadBytes('\n') + line, err := reader.ReadString('\n') if err != nil { return } - str := string(line) - t.Print(str) + t.Print(line) } }() - go func() { - <-ctx.Done() - close(t.closed) - }() } // Close stops tailing func (t *Tail) Close() { fmt.Fprintf(os.Stderr, "Log finished %s\n", t.PodName) - close(t.closed) + t.cancel() } -// Print prints a color coded log message with the pod and container names +// Print prints a log message with the pod and container names func (t *Tail) Print(msg string) { fmt.Fprintf(os.Stderr, "[%s/%s]: %s", t.PodName, t.ContainerName, msg) } diff --git a/testing/it_sidecar/stern/watch.go b/testing/it_sidecar/stern/watch.go index d27e7a87..b52328da 100644 --- a/testing/it_sidecar/stern/watch.go +++ b/testing/it_sidecar/stern/watch.go @@ -17,13 +17,13 @@ package stern import ( "context" "fmt" - "regexp" + "log" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes/typed/core/v1" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) // Target is a target to watch @@ -41,86 +41,71 @@ func (t *Target) GetID() string { // Watch starts listening to Kubernetes events and emits modified // containers/pods. The first result is targets added, the second is targets // removed -func Watch(ctx context.Context, i v1.PodInterface, podFilter *regexp.Regexp, containerFilter *regexp.Regexp, containerState ContainerState, labelSelector labels.Selector) (chan *Target, chan *Target, error) { +func Watch(ctx context.Context, i v1.PodInterface, containerState ContainerState, labelSelector labels.Selector, onAdded, onRemoved func(*Target)) error { watcher, err := i.Watch(ctx, metav1.ListOptions{Watch: true, LabelSelector: labelSelector.String()}) if err != nil { - return nil, nil, fmt.Errorf("failed to set up watch: %s", err) + return fmt.Errorf("failed to set up watch: %s", err) } - added := make(chan *Target) - removed := make(chan *Target) + defer watcher.Stop() - go func() { - for { - select { - case e := <-watcher.ResultChan(): - if e.Object == nil { - // Closed because of error - return - } + for { + select { + case <-ctx.Done(): + return nil + case e := <-watcher.ResultChan(): + if e.Object == nil { + // watcher channel was closed (because of error) + return nil + } - var ( - pod *corev1.Pod - ok bool - ) - if pod, ok = e.Object.(*corev1.Pod); !ok { - continue - } + var ( + pod *corev1.Pod + ok bool + ) + if pod, ok = e.Object.(*corev1.Pod); !ok { + continue + } - if !podFilter.MatchString(pod.Name) { - continue - } + log.Printf("pod %s/%s event %s", pod.Namespace, pod.Name, e.Type) - switch e.Type { - case watch.Added, watch.Modified: - var statuses []corev1.ContainerStatus - statuses = append(statuses, pod.Status.InitContainerStatuses...) - statuses = append(statuses, pod.Status.ContainerStatuses...) - - for _, c := range statuses { - if !containerFilter.MatchString(c.Name) { - continue - } - // if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) { - // continue - // } - - if containerState.Match(c.State) { - added <- &Target{ - Namespace: pod.Namespace, - Pod: pod.Name, - Container: c.Name, - } - } - } - case watch.Deleted: - var containers []corev1.Container - containers = append(containers, pod.Spec.Containers...) - containers = append(containers, pod.Spec.InitContainers...) - - for _, c := range containers { - if !containerFilter.MatchString(c.Name) { - continue - } - // if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) { - // continue - // } - - removed <- &Target{ + switch e.Type { + case watch.Added, watch.Modified: + var statuses []corev1.ContainerStatus + statuses = append(statuses, pod.Status.InitContainerStatuses...) + statuses = append(statuses, pod.Status.ContainerStatuses...) + + for _, c := range statuses { + // if c.RestartCount > 0 { + // log.Print("container ", c.Name, " has restart count ", c.RestartCount) + // return + // } + + log.Print("container ", c.Name, " has state ", c.State) + + if containerState.Match(c.State) { + onAdded(&Target{ Namespace: pod.Namespace, Pod: pod.Name, Container: c.Name, - } + }) } } - case <-ctx.Done(): - watcher.Stop() - close(added) - close(removed) - return + case watch.Deleted: + var containers []corev1.Container + containers = append(containers, pod.Spec.Containers...) + containers = append(containers, pod.Spec.InitContainers...) + + for _, c := range containers { + + onRemoved(&Target{ + Namespace: pod.Namespace, + Pod: pod.Name, + Container: c.Name, + }) + } } } - }() + } - return added, removed, nil }