Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 45 additions & 22 deletions testing/it_sidecar/it_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ var (
timeout = flag.Duration("timeout", time.Second*30, "execution timeout")
deleteNamespace = flag.Bool("delete_namespace", false, "delete namespace as part of the cleanup")
pfconfig = portForwardConf{services: make(map[string][]uint16)}
signalChannel chan os.Signal
kubeconfig string
waitForApps arrayFlags
allowErrors bool
)

func init() {
flag.Var(&pfconfig, "portforward", "set a port forward item in form of servicename:port")
flag.StringVar(&kubeconfig, "kubeconfig", os.Getenv("KUBECONFIG"), "path to kubernetes config file")
flag.Var(&waitForApps, "waitforapp", "wait for pods with label app=<this parameter>")
flag.BoolVar(&allowErrors, "allow_errors", false, "do not treat Failed in events as error. Use only if crashloop is expected")
}

// contains returns true if slice v contains an item
Expand All @@ -90,7 +91,6 @@ func contains(v []string, item string) bool {
}
return false
}

// listReadyApps converts a list returned from podsInformer.GetStore().List() to a map containing apps with ready status
// app is determined by app label
func listReadyApps(list []interface{}) (readypods, notReady []string) {
Expand Down Expand Up @@ -125,9 +125,38 @@ func listReadyApps(list []interface{}) (readypods, notReady []string) {
return
}

// listenForEvents listens for events and prints them to stdout. if event reason is "Failed" it will call the failure callback
func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFailure func(*v1.Event)) {

kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil)
eventsInformer := kubeInformerFactory.Core().V1().Events().Informer()

fn := func(obj interface{}) {
event, ok := obj.(*v1.Event)
if !ok {
log.Println("Event informer received unexpected object")
return
}
log.Printf("EVENT %s %s %s %s\n", event.Namespace, event.InvolvedObject.Name, event.Reason, event.Message)
if event.Reason == "Failed" {
onFailure(event)
}
}

handler := &cache.ResourceEventHandlerFuncs{
AddFunc: fn,
DeleteFunc: fn,
UpdateFunc: func(old interface{}, new interface{}) {
fn(new)
},
}

eventsInformer.AddEventHandler(handler)

go kubeInformerFactory.Start(ctx.Done())
}

func waitForPods(ctx context.Context, clientset *kubernetes.Clientset) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
events := make(chan interface{})
fn := func(obj interface{}) {
events <- obj
Comment on lines 159 to 162

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The waitForPods function initializes a channel events to receive events but does not appear to have a corresponding listener to consume these events, which could lead to a goroutine leak if the channel is written to but never read from. This can eventually lead to out-of-memory issues if the channel is continuously written to without being drained.

Recommended solution: Ensure that there is a corresponding goroutine that reads from the events channel and processes the events. If the channel is meant to be used outside of this function, it should be returned or passed to the relevant consumer. If it is not used, it should be removed to avoid confusion and potential resource leaks.

Comment on lines 159 to 162

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The waitForPods function creates a channel events to receive events but does not appear to have a corresponding listener to consume these events, which could lead to a goroutine leak if the channel is written to but never read from. This can eventually lead to out-of-memory issues if the channel is continuously written to without being drained.

Recommended solution: Ensure that there is a corresponding goroutine that reads from the events channel and processes the events. If the channel is meant to be used outside of this function, it should be returned or passed to the caller to handle appropriately. If the channel is not needed, it should be removed to prevent confusion and potential resource leaks.

Expand Down Expand Up @@ -266,14 +295,14 @@ func portForward(ctx context.Context, clientset *kubernetes.Clientset, config *r
url := clientset.CoreV1().RESTClient().Post().Resource("pods").Namespace(podnamespace).Name(podname).SubResource("portforward").URL()
transport, upgrader, err := spdy.RoundTripperFor(config)
if err != nil {
return fmt.Errorf("Could not create round tripper: %v", err)
return fmt.Errorf("could not create round tripper: %v", err)
}
dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url)
ports := []string{fmt.Sprintf(":%d", port)}
readyChan := make(chan struct{}, 1)
pf, err := portforward.New(dialer, ports, ctx.Done(), readyChan, os.Stderr, os.Stderr)
if err != nil {
return fmt.Errorf("Could not port forward into pod: %v", err)
return fmt.Errorf("could not port forward into pod: %v", err)
}
go func(port uint16) {
err := pf.ForwardPorts()
Expand Down Expand Up @@ -311,23 +340,10 @@ func cleanup(clientset *kubernetes.Clientset) {
func main() {
flag.Parse()
log.SetOutput(os.Stdout)

ctx, cancel := context.WithTimeout(context.Background(), *timeout)

signalChannel = make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
defer func() {
signal.Stop(signalChannel)
cancel()
}()
// cancel context if signal is received
go func() {
select {
case <-signalChannel:
cancel()
case <-ctx.Done():
}
}()
defer cancel()
ctx, stopSignal := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stopSignal()
// cancel context if stdin is closed
go func() {
reader := bufio.NewReader(os.Stdin)
Expand Down Expand Up @@ -356,6 +372,13 @@ func main() {

go stern.Run(ctx, *namespace, clientset)

listenForEvents(ctx, clientset, func(event *v1.Event) {
if !allowErrors {
log.Println("Terminate due to failure")
cancel()
}
Comment on lines +375 to +379

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The listenForEvents function is set up to terminate the context if allowErrors is false and an event is received. However, there is no check to determine if the event is actually an error or a different type of event. This could lead to premature termination of the context even for non-error events. The recommended solution is to add a check to ensure that the event being processed is indeed an error before deciding to terminate the context.

})

if len(waitForApps) > 0 {
err = waitForPods(ctx, clientset)
if err != nil {
Comment on lines 382 to 384

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling after waitForPods is incomplete. If an error occurs, it is stored in err but not handled or logged. This could lead to situations where the error is silently ignored, and the program continues execution without proper handling of the failure condition. The recommended solution is to add error handling logic after the waitForPods call, such as logging the error and potentially terminating the program if the error is critical.

Expand Down
46 changes: 17 additions & 29 deletions testing/it_sidecar/stern/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package stern
import (
"context"
"fmt"
"regexp"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
Expand All @@ -26,39 +25,28 @@ import (
// Run starts the main run loop
func Run(ctx context.Context, namespace string, clientset *kubernetes.Clientset) error {

added, removed, err := Watch(ctx, clientset.CoreV1().Pods(namespace), regexp.MustCompile(".*"), regexp.MustCompile(".*"), RUNNING, labels.Everything())
if err != nil {
return fmt.Errorf("failed to set up watch: %v", err)
}

tails := make(map[string]*Tail)

go func() {
for p := range added {
id := p.GetID()
if tails[id] != nil {
continue
}

tail := NewTail(p.Namespace, p.Pod, p.Container)
tails[id] = tail

tail.Start(ctx, clientset.CoreV1().Pods(p.Namespace))
err := Watch(ctx, clientset.CoreV1().Pods(namespace), RUNNING, labels.Everything(), func(p *Target) {
id := p.GetID()
if tails[id] != nil {
return
}
}()

go func() {
for p := range removed {
id := p.GetID()
if tails[id] == nil {
continue
}
tails[id].Close()
delete(tails, id)
tail := NewTail(ctx, p.Namespace, p.Pod, p.Container)
tails[id] = tail
tail.Start(clientset.CoreV1().Pods(p.Namespace))
}, func(p *Target) {
id := p.GetID()
if tails[id] == nil {
return
}
}()

<-ctx.Done()
tails[id].Close()
delete(tails, id)
})
if err != nil {
return fmt.Errorf("failed to set up watch: %v", err)
}

return nil
}
30 changes: 13 additions & 17 deletions testing/it_sidecar/stern/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,30 @@ import (

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
)

type Tail struct {
Namespace string
PodName string
ContainerName string
req *rest.Request
closed chan struct{}
ctx context.Context
cancel context.CancelFunc
}

// NewTail returns a new tail for a Kubernetes container inside a pod
func NewTail(namespace, podName, containerName string) *Tail {
func NewTail(ctx context.Context, namespace, podName, containerName string) *Tail {
ctx, cancel := context.WithCancel(ctx)
return &Tail{
Namespace: namespace,
PodName: podName,
ContainerName: containerName,
closed: make(chan struct{}),
ctx: ctx,
cancel: cancel,
}
}

// Start starts tailing
func (t *Tail) Start(ctx context.Context, i v1.PodInterface) {
func (t *Tail) Start(i v1.PodInterface) {

go func() {
fmt.Fprintf(os.Stderr, "+ %s/%s\n", t.PodName, t.ContainerName)
Expand All @@ -56,44 +57,39 @@ func (t *Tail) Start(ctx context.Context, i v1.PodInterface) {
Container: t.ContainerName,
})

stream, err := req.Stream(ctx)
stream, err := req.Stream(t.ctx)
if err != nil {
log.Printf("Error opening stream to %s/%s/%s: %s", t.Namespace, t.PodName, t.ContainerName, err)
return
}
Comment on lines +60 to 64

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling in this code is not robust. When an error occurs while opening the stream, the function simply logs the error and returns. This could lead to silent failures where the function fails to perform its intended task but the rest of the program continues to run as if nothing happened. It would be better to propagate the error up to the caller so that it can decide how to handle it. This could be done by modifying the function to return an error, or by passing an error channel as a parameter to the function.

defer stream.Close()

go func() {
<-t.closed
<-t.ctx.Done()
stream.Close()
}()

reader := bufio.NewReader(stream)

for {
line, err := reader.ReadBytes('\n')
line, err := reader.ReadString('\n')
if err != nil {
return
}
Comment on lines +75 to 78

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling in this loop is also not robust. If an error occurs while reading a line from the stream, the function simply returns. This could lead to silent failures where some lines are not read from the stream but the rest of the program continues to run as if nothing happened. It would be better to handle this error in a more meaningful way. For example, you could log the error and continue reading the next lines, or you could propagate the error up to the caller.


str := string(line)
t.Print(str)
t.Print(line)
}
}()

go func() {
<-ctx.Done()
close(t.closed)
}()
}

// Close stops tailing
func (t *Tail) Close() {
fmt.Fprintf(os.Stderr, "Log finished %s\n", t.PodName)
close(t.closed)
t.cancel()
}

// Print prints a color coded log message with the pod and container names
// Print prints a log message with the pod and container names
func (t *Tail) Print(msg string) {
fmt.Fprintf(os.Stderr, "[%s/%s]: %s", t.PodName, t.ContainerName, msg)
}
Loading