Skip to content

Commit

Permalink
Fix error in k8s after job completes
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Jun 5, 2024
1 parent 3880115 commit 8c4946a
Showing 1 changed file with 22 additions and 5 deletions.
27 changes: 22 additions & 5 deletions internal/job/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -59,7 +60,9 @@ type Executor struct {
cleanupDirs []string

// A channel to track cancellation
cancelCh chan struct{}
cancelMu sync.Mutex
cancelCh chan struct{}
cancelled bool

// redactors for the job logs. The will be populated with values both from environment variable and through the Job API.
// In order for the latter to happen, a reference is passed into the the Job API server as well
Expand Down Expand Up @@ -271,9 +274,18 @@ func (e *Executor) includePhase(phase string) bool {
return slices.Contains(e.Phases, phase)
}

// Cancel interrupts any running shell processes and causes the job to stop
// Cancel interrupts any running shell processes and causes the job to stop.
func (e *Executor) Cancel() error {
e.cancelCh <- struct{}{}
// Closing e.cancelCh broadcasts to any goroutine receiving that the job is
// being cancelled/stopped.
// Double-closing a channel is a panic, so guard it with a bool and a mutex.
e.cancelMu.Lock()
defer e.cancelMu.Unlock()
if e.cancelled {
return errors.New("already cancelled")
}
e.cancelled = true
close(e.cancelCh)
return nil
}

Expand Down Expand Up @@ -1203,10 +1215,15 @@ func (e *Executor) startKubernetesClient(ctx context.Context, kubernetesClient *
}

go func() {
if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil {
// If the k8s client is interrupted because the "server" agent is
// stopped or unreachable, we should stop running the job.
// If the k8s client is interrupted because our own ctx was cancelled,
// then the job is already stopping, so there's no point logging an
// error.
if err := kubernetesClient.Await(ctx, kubernetes.RunStateInterrupt); err != nil && !errors.Is(err, context.Canceled) {
e.shell.Errorf("Error waiting for client interrupt: %v", err)
}
e.cancelCh <- struct{}{}
e.Cancel()
}()
return nil
}

0 comments on commit 8c4946a

Please sign in to comment.