Skip to content

Conversation

@apesternikov
Copy link
Contributor

Comment on lines 128 to 157
// listenForEvents listens for events and prints them to stdout. if event reason is "Failed" it will call the failure callback
func listenForEvents(ctx context.Context, clientset *kubernetes.Clientset, onFailure func(*v1.Event)) {

kubeInformerFactory := informers.NewFilteredSharedInformerFactory(clientset, time.Second*30, *namespace, nil)
eventsInformer := kubeInformerFactory.Core().V1().Events().Informer()

fn := func(obj interface{}) {
event, ok := obj.(*v1.Event)
if !ok {
log.Print("Event informer received unexpected object")
return
}
log.Printf("EVENT %s %s %s %s", event.Namespace, event.InvolvedObject.Name, event.Reason, event.Message)
if event.Reason == "Failed" {
onFailure(event)
}
}

handler := &cache.ResourceEventHandlerFuncs{
AddFunc: fn,
DeleteFunc: fn,
UpdateFunc: func(old interface{}, new interface{}) {
fn(new)
},
}

eventsInformer.AddEventHandler(handler)

go kubeInformerFactory.Start(ctx.Done())
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function listenForEvents starts a goroutine with kubeInformerFactory.Start(ctx.Done()) but there is no mechanism to ensure that this goroutine has completed before the function returns. This could lead to a data race if the context is cancelled or the function returns before the goroutine has finished executing.

To fix this, consider using a sync.WaitGroup to ensure that the goroutine has completed before returning from the function. Add a WaitGroup variable, call Add(1) before starting the goroutine, and call Done() at the end of the goroutine. Then, call Wait() before returning from the function to block until the goroutine has completed.

Comment on lines +60 to 64
stream, err := req.Stream(t.ctx)
if err != nil {
log.Printf("Error opening stream to %s/%s/%s: %s", t.Namespace, t.PodName, t.ContainerName, err)
return
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling in this code is not robust. When an error occurs while opening the stream, the function simply logs the error and returns. This could lead to silent failures where the function fails to perform its intended task but the rest of the program continues to run as if nothing happened. It would be better to propagate the error up to the caller so that it can decide how to handle it. This could be done by modifying the function to return an error, or by passing an error channel as a parameter to the function.

Comment on lines +75 to 78
line, err := reader.ReadString('\n')
if err != nil {
return
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling in this loop is also not robust. If an error occurs while reading a line from the stream, the function simply returns. This could lead to silent failures where some lines are not read from the stream but the rest of the program continues to run as if nothing happened. It would be better to handle this error in a more meaningful way. For example, you could log the error and continue reading the next lines, or you could propagate the error up to the caller.

Comment on lines 44 to 111
func Watch(ctx context.Context, i v1.PodInterface, containerState ContainerState, labelSelector labels.Selector, onAdded, onRemoved func(*Target)) error {
watcher, err := i.Watch(ctx, metav1.ListOptions{Watch: true, LabelSelector: labelSelector.String()})
if err != nil {
return nil, nil, fmt.Errorf("failed to set up watch: %s", err)
return fmt.Errorf("failed to set up watch: %s", err)
}

added := make(chan *Target)
removed := make(chan *Target)
defer watcher.Stop()

go func() {
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
// Closed because of error
return
}
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
// watcher channel was closed (because of error)
return nil
}

var (
pod *corev1.Pod
ok bool
)
if pod, ok = e.Object.(*corev1.Pod); !ok {
continue
}
var (
pod *corev1.Pod
ok bool
)
if pod, ok = e.Object.(*corev1.Pod); !ok {
continue
}

if !podFilter.MatchString(pod.Name) {
continue
}
log.Printf("pod %s/%s event %s", pod.Namespace, pod.Name, e.Type)

switch e.Type {
case watch.Added, watch.Modified:
var statuses []corev1.ContainerStatus
statuses = append(statuses, pod.Status.InitContainerStatuses...)
statuses = append(statuses, pod.Status.ContainerStatuses...)

for _, c := range statuses {
if !containerFilter.MatchString(c.Name) {
continue
}
// if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) {
// continue
// }

if containerState.Match(c.State) {
added <- &Target{
Namespace: pod.Namespace,
Pod: pod.Name,
Container: c.Name,
}
}
}
case watch.Deleted:
var containers []corev1.Container
containers = append(containers, pod.Spec.Containers...)
containers = append(containers, pod.Spec.InitContainers...)

for _, c := range containers {
if !containerFilter.MatchString(c.Name) {
continue
}
// if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) {
// continue
// }

removed <- &Target{
switch e.Type {
case watch.Added, watch.Modified:
var statuses []corev1.ContainerStatus
statuses = append(statuses, pod.Status.InitContainerStatuses...)
statuses = append(statuses, pod.Status.ContainerStatuses...)

for _, c := range statuses {
// if c.RestartCount > 0 {
// log.Print("container ", c.Name, " has restart count ", c.RestartCount)
// return
// }

log.Print("container ", c.Name, " has state ", c.State)

if containerState.Match(c.State) {
onAdded(&Target{
Namespace: pod.Namespace,
Pod: pod.Name,
Container: c.Name,
}
})
}
}
case <-ctx.Done():
watcher.Stop()
close(added)
close(removed)
return
case watch.Deleted:
var containers []corev1.Container
containers = append(containers, pod.Spec.Containers...)
containers = append(containers, pod.Spec.InitContainers...)

for _, c := range containers {

onRemoved(&Target{
Namespace: pod.Namespace,
Pod: pod.Name,
Container: c.Name,
})
}
}
case <-ctx.Done():
return nil
}
}()
}

return added, removed, nil
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function Watch does not handle the case when the watcher.ResultChan() is closed. In Go, receiving from a closed channel always immediately returns the zero value for the channel type. In this case, it would be a watch.Event{} with Object set to nil. The current implementation assumes that if Object is nil, the channel was closed due to an error and returns nil without any error handling. This could lead to silent failures where the watch stops working without any indication of an error.

To fix this, you should check if the channel is closed by using a comma-ok idiom. If the channel is closed, ok will be false. You can then return an error indicating that the watch unexpectedly stopped. This will allow the caller to handle this situation appropriately, for example by restarting the watch.

Comment on lines 44 to 111
func Watch(ctx context.Context, i v1.PodInterface, containerState ContainerState, labelSelector labels.Selector, onAdded, onRemoved func(*Target)) error {
watcher, err := i.Watch(ctx, metav1.ListOptions{Watch: true, LabelSelector: labelSelector.String()})
if err != nil {
return nil, nil, fmt.Errorf("failed to set up watch: %s", err)
return fmt.Errorf("failed to set up watch: %s", err)
}

added := make(chan *Target)
removed := make(chan *Target)
defer watcher.Stop()

go func() {
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
// Closed because of error
return
}
for {
select {
case e := <-watcher.ResultChan():
if e.Object == nil {
// watcher channel was closed (because of error)
return nil
}

var (
pod *corev1.Pod
ok bool
)
if pod, ok = e.Object.(*corev1.Pod); !ok {
continue
}
var (
pod *corev1.Pod
ok bool
)
if pod, ok = e.Object.(*corev1.Pod); !ok {
continue
}

if !podFilter.MatchString(pod.Name) {
continue
}
log.Printf("pod %s/%s event %s", pod.Namespace, pod.Name, e.Type)

switch e.Type {
case watch.Added, watch.Modified:
var statuses []corev1.ContainerStatus
statuses = append(statuses, pod.Status.InitContainerStatuses...)
statuses = append(statuses, pod.Status.ContainerStatuses...)

for _, c := range statuses {
if !containerFilter.MatchString(c.Name) {
continue
}
// if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) {
// continue
// }

if containerState.Match(c.State) {
added <- &Target{
Namespace: pod.Namespace,
Pod: pod.Name,
Container: c.Name,
}
}
}
case watch.Deleted:
var containers []corev1.Container
containers = append(containers, pod.Spec.Containers...)
containers = append(containers, pod.Spec.InitContainers...)

for _, c := range containers {
if !containerFilter.MatchString(c.Name) {
continue
}
// if containerExcludeFilter != nil && containerExcludeFilter.MatchString(c.Name) {
// continue
// }

removed <- &Target{
switch e.Type {
case watch.Added, watch.Modified:
var statuses []corev1.ContainerStatus
statuses = append(statuses, pod.Status.InitContainerStatuses...)
statuses = append(statuses, pod.Status.ContainerStatuses...)

for _, c := range statuses {
// if c.RestartCount > 0 {
// log.Print("container ", c.Name, " has restart count ", c.RestartCount)
// return
// }

log.Print("container ", c.Name, " has state ", c.State)

if containerState.Match(c.State) {
onAdded(&Target{
Namespace: pod.Namespace,
Pod: pod.Name,
Container: c.Name,
}
})
}
}
case <-ctx.Done():
watcher.Stop()
close(added)
close(removed)
return
case watch.Deleted:
var containers []corev1.Container
containers = append(containers, pod.Spec.Containers...)
containers = append(containers, pod.Spec.InitContainers...)

for _, c := range containers {

onRemoved(&Target{
Namespace: pod.Namespace,
Pod: pod.Name,
Container: c.Name,
})
}
}
case <-ctx.Done():
return nil
}
}()
}

return added, removed, nil
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function Watch does not handle potential panics that could occur if the provided onAdded or onRemoved functions panic. If these callbacks panic, it would cause the entire Watch function to stop and potentially bring down the entire application depending on how it's used.

To improve this, you should add a deferred function at the start of the Watch function that recovers from any panics and logs an error message. This would prevent a panic in a callback from stopping the watch and provide visibility into any issues with the callbacks.

Comment on lines +57 to +60
if e.Object == nil {
// watcher channel was closed (because of error)
return nil
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code does not handle the case when the watcher channel is closed due to an error. It simply returns nil, which might lead to silent failures. It would be better to return an error or at least log a message when the watcher channel is closed.

Comment on lines +66 to +68
if pod, ok = e.Object.(*corev1.Pod); !ok {
continue
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code silently ignores events that are not of type *corev1.Pod. This might lead to missed events if the type assertion fails. It would be better to log a message or return an error when the type assertion fails.

apesternikov and others added 2 commits February 10, 2024 09:41
Co-authored-by: michaelschiff <schiff.michael@gmail.com>
Co-authored-by: michaelschiff <schiff.michael@gmail.com>
Comment on lines 159 to 162
func waitForPods(ctx context.Context, clientset *kubernetes.Clientset) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
events := make(chan interface{})
fn := func(obj interface{}) {
events <- obj

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The waitForPods function initializes a channel events to receive events but does not appear to have a corresponding listener to consume these events, which could lead to a goroutine leak if the channel is written to but never read from. This can eventually lead to out-of-memory issues if the channel is continuously written to without being drained.

Recommended solution: Ensure that there is a corresponding goroutine that reads from the events channel and processes the events. If the channel is meant to be used outside of this function, it should be returned or passed to the relevant consumer. If it is not used, it should be removed to avoid confusion and potential resource leaks.

Comment on lines 159 to 162
func waitForPods(ctx context.Context, clientset *kubernetes.Clientset) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
events := make(chan interface{})
fn := func(obj interface{}) {
events <- obj

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The waitForPods function creates a channel events to receive events but does not appear to have a corresponding listener to consume these events, which could lead to a goroutine leak if the channel is written to but never read from. This can eventually lead to out-of-memory issues if the channel is continuously written to without being drained.

Recommended solution: Ensure that there is a corresponding goroutine that reads from the events channel and processes the events. If the channel is meant to be used outside of this function, it should be returned or passed to the caller to handle appropriately. If the channel is not needed, it should be removed to prevent confusion and potential resource leaks.

Co-authored-by: michaelschiff <schiff.michael@gmail.com>
Comment on lines +375 to +379
listenForEvents(ctx, clientset, func(event *v1.Event) {
if !allowErrors {
log.Println("Terminate due to failure")
cancel()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The listenForEvents function is set up to terminate the context if allowErrors is false and an event is received. However, there is no check to determine if the event is actually an error or a different type of event. This could lead to premature termination of the context even for non-error events. The recommended solution is to add a check to ensure that the event being processed is indeed an error before deciding to terminate the context.

Comment on lines 382 to 384
if len(waitForApps) > 0 {
err = waitForPods(ctx, clientset)
if err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling after waitForPods is incomplete. If an error occurs, it is stored in err but not handled or logged. This could lead to situations where the error is silently ignored, and the program continues execution without proper handling of the failure condition. The recommended solution is to add error handling logic after the waitForPods call, such as logging the error and potentially terminating the program if the error is critical.

@apesternikov apesternikov merged commit af2cbd4 into main Feb 20, 2024
@apesternikov apesternikov deleted the ap/it_fail_fast branch February 20, 2024 16:51
shraykay pushed a commit to shraykay/rules_gitops that referenced this pull request Jan 22, 2025
# This is the 1st commit message:

app name first

# This is the commit message nazzzzz#2:

clean up file

# This is the commit message nazzzzz#3:

fix reference to proto package

# This is the commit message fasterci#4:

better parse

# This is the commit message fasterci#5:

fix again

# This is the commit message fasterci#6:

fix again

# This is the commit message fasterci#7:

fix again

# This is the commit message fasterci#8:

fix again

# This is the commit message fasterci#9:

fix again

# This is the commit message fasterci#10:

fix again

# This is the commit message fasterci#11:

add debug statements for seeing if branch is dirty

# This is the commit message fasterci#12:

fix again

# This is the commit message fasterci#13:

fix again

# This is the commit message fasterci#14:

fix again

# This is the commit message fasterci#15:

fix again

# This is the commit message fasterci#16:

fix again

# This is the commit message fasterci#17:

fix again

# This is the commit message fasterci#18:

fix again

# This is the commit message fasterci#19:

fix again

# This is the commit message fasterci#20:

fix again

# This is the commit message fasterci#21:

fix again

# This is the commit message fasterci#22:

fix again

# This is the commit message fasterci#23:

fix again
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants