From aafe19bff90178173f71f022564950b7381161a6 Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Mon, 30 Oct 2023 07:53:44 +1100 Subject: [PATCH 01/13] Header for verification fail message --- agent/verify_job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/verify_job.go b/agent/verify_job.go index 12564ba4e8..3b26591106 100644 --- a/agent/verify_job.go +++ b/agent/verify_job.go @@ -41,7 +41,7 @@ func (r *JobRunner) verificationFailureLogs(err error, behavior string) { } r.logger.Warn("Job verification failed: %s", err.Error()) - r.logStreamer.Process([]byte(fmt.Sprintf("⚠️ %s: Job verification failed: %s\n", label, err.Error()))) + r.logStreamer.Process([]byte(fmt.Sprintf("+++ ⚠️ %s: Job verification failed: %s\n", label, err.Error()))) if behavior == VerificationBehaviourWarn { r.logger.Warn("Job will be run whether or not it can be verified - this is not recommended. You can change this behavior with the `job-verification-failure-behavior` agent configuration option.") From 4c74422ee64367dd4df329a1e0118a8221e2085a Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Mon, 30 Oct 2023 14:52:26 +1100 Subject: [PATCH 02/13] Consolidate verification failure logging into a single method and add timestamps --- agent/run_job.go | 59 ++++++++++++++++++++++++++++++++++++++++++--- agent/verify_job.go | 15 ------------ 2 files changed, 56 insertions(+), 18 deletions(-) diff --git a/agent/run_job.go b/agent/run_job.go index f78a918d01..cd14eb35cc 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -11,6 +11,7 @@ import ( "github.com/buildkite/agent/v3/hook" "github.com/buildkite/agent/v3/kubernetes" + "github.com/buildkite/agent/v3/logger" "github.com/buildkite/agent/v3/metrics" "github.com/buildkite/agent/v3/process" "github.com/buildkite/agent/v3/status" @@ -25,6 +26,14 @@ const ( SignalReasonProcessRunError = "process_run_error" ) +type missingKeyError struct { + signature string +} + +func (e *missingKeyError) Error() string { + return fmt.Sprintf("job was signed with signature %q, but no verification key was provided", e.signature) +} + // Runs the job func (r *JobRunner) Run(ctx context.Context) error { r.logger.Info("Starting job %s", r.conf.Job.ID) @@ -77,8 +86,8 @@ func (r *JobRunner) Run(ctx context.Context) error { if r.conf.JWKS == nil && job.Step.Signature != nil { r.verificationFailureLogs( - fmt.Errorf("job %q was signed with signature %q, but no verification key was provided, so the job can't be verified", job.ID, job.Step.Signature.Value), VerificationBehaviourBlock, + &missingKeyError{signature: job.Step.Signature.Value}, ) exit.Status = -1 exit.SignalReason = SignalReasonSignatureRejected @@ -89,7 +98,7 @@ func (r *JobRunner) Run(ctx context.Context) error { ise := &invalidSignatureError{} switch err := r.verifyJob(r.conf.JWKS); { case errors.Is(err, ErrNoSignature) || errors.As(err, &ise): - r.verificationFailureLogs(err, r.VerificationFailureBehavior) + r.verificationFailureLogs(r.VerificationFailureBehavior, err) if r.VerificationFailureBehavior == VerificationBehaviourBlock { exit.Status = -1 exit.SignalReason = SignalReasonSignatureRejected @@ -97,7 +106,7 @@ func (r *JobRunner) Run(ctx context.Context) error { } case err != nil: // some other error - r.verificationFailureLogs(err, VerificationBehaviourBlock) // errors in verification are always fatal + r.verificationFailureLogs(VerificationBehaviourBlock, err) // errors in verification are always fatal exit.Status = -1 exit.SignalReason = SignalReasonSignatureRejected return nil @@ -155,6 +164,50 @@ func (r *JobRunner) Run(ctx context.Context) error { return nil } +func (r *JobRunner) prependTimestampForLogs(s string, args ...any) []byte { + switch { + case r.conf.AgentConfiguration.ANSITimestamps: + return []byte(fmt.Sprintf( + "\x1b_bk;t=%d\x07%s", + time.Now().UnixNano()/int64(time.Millisecond), + fmt.Sprintf(s, args...), + )) + case r.conf.AgentConfiguration.TimestampLines: + return []byte(fmt.Sprintf( + "[%s] %s", + time.Now().UTC().Format(time.RFC3339), + fmt.Sprintf(s, args...), + )) + default: + return []byte(fmt.Sprintf(s, args...)) + } +} + +func (r *JobRunner) verificationFailureLogs(behavior string, err error) { + l := r.logger.WithFields(logger.StringField("jobID", r.conf.Job.ID), logger.StringField("error", err.Error())) + prefix := "~~~ ⚠️" + if behavior == VerificationBehaviourBlock { + prefix = "+++ ⛔" + } + + l.Warn("Job verification failed") + r.logStreamer.Process([]byte(r.prependTimestampForLogs("%s Job verification failed\n", prefix))) + r.logStreamer.Process([]byte(r.prependTimestampForLogs("error: %s\n", err))) + + if errors.Is(err, ErrNoSignature) { + r.logStreamer.Process([]byte(r.prependTimestampForLogs("no signature in job\n"))) + } else if ise := new(invalidSignatureError); errors.As(err, &ise) { + r.logStreamer.Process([]byte(r.prependTimestampForLogs("signature: %s\n", r.conf.Job.Step.Signature.Value))) + } else if mke := new(missingKeyError); errors.As(err, &mke) { + r.logStreamer.Process([]byte(r.prependTimestampForLogs("signature: %s\n", mke.signature))) + } + + if behavior == VerificationBehaviourWarn { + r.logger.Warn("Job will be run whether or not it can be verified - this is not recommended. You can change this behavior with the `job-verification-failure-behavior` agent configuration option.") + r.logStreamer.Process(r.prependTimestampForLogs("Job will be run without verification\n")) + } +} + type processExit struct { Status int Signal string diff --git a/agent/verify_job.go b/agent/verify_job.go index 3b26591106..76a7c7ff0d 100644 --- a/agent/verify_job.go +++ b/agent/verify_job.go @@ -34,21 +34,6 @@ func (e *invalidSignatureError) Unwrap() error { return e.underlying } -func (r *JobRunner) verificationFailureLogs(err error, behavior string) { - label := "WARNING" - if behavior == VerificationBehaviourBlock { - label = "ERROR" - } - - r.logger.Warn("Job verification failed: %s", err.Error()) - r.logStreamer.Process([]byte(fmt.Sprintf("+++ ⚠️ %s: Job verification failed: %s\n", label, err.Error()))) - - if behavior == VerificationBehaviourWarn { - r.logger.Warn("Job will be run whether or not it can be verified - this is not recommended. You can change this behavior with the `job-verification-failure-behavior` agent configuration option.") - r.logStreamer.Process([]byte(fmt.Sprintf("⚠️ %s: Job will be run without verification\n", label))) - } -} - func (r *JobRunner) verifyJob(keySet jwk.Set) error { step := r.conf.Job.Step From 1a763402b8e9c42828c9986ad703c38d4d9a1086 Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Mon, 30 Oct 2023 14:52:59 +1100 Subject: [PATCH 03/13] Add timestamps to verification and log group headers to success message --- agent/run_job.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/agent/run_job.go b/agent/run_job.go index cd14eb35cc..d4695c71fa 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -112,8 +112,10 @@ func (r *JobRunner) Run(ctx context.Context) error { return nil default: // no error, all good, keep going - r.logger.Info("Successfully verified job %s with signature %s", job.ID, job.Step.Signature.Value) - r.logStreamer.Process([]byte(fmt.Sprintf("✅ Verified job with signature %s\n", job.Step.Signature.Value))) + l := r.logger.WithFields(logger.StringField("jobID", job.ID), logger.StringField("signature", job.Step.Signature.Value)) + l.Info("Successfully verified job") + r.logStreamer.Process(r.prependTimestampForLogs("~~~ ✅ Job verified\n")) + r.logStreamer.Process(r.prependTimestampForLogs("signature: %s\n", job.Step.Signature.Value)) } } From 1095f5aa073b22cea37a67149ba818337525e95e Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Mon, 30 Oct 2023 14:53:26 +1100 Subject: [PATCH 04/13] Add log group header to Job API logs --- jobapi/server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/jobapi/server.go b/jobapi/server.go index 2e22d3a162..779008928c 100644 --- a/jobapi/server.go +++ b/jobapi/server.go @@ -57,7 +57,8 @@ func (s *Server) Start() error { return fmt.Errorf("starting socket server: %w", err) } - s.Logger.Commentf("Job API server listening on %s", s.SocketPath) + s.Logger.Printf("~~~ Job API") + s.Logger.Printf("Server listening on %s", s.SocketPath) return nil } From 6df583c3c99349c46b1d43ce2d7bbbefa2511372 Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Mon, 30 Oct 2023 18:03:11 +1100 Subject: [PATCH 05/13] Specify that it's the signature that's being verified Co-authored-by: Ben Moskovitz --- agent/run_job.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/run_job.go b/agent/run_job.go index d4695c71fa..c8876d2bfe 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -114,7 +114,7 @@ func (r *JobRunner) Run(ctx context.Context) error { default: // no error, all good, keep going l := r.logger.WithFields(logger.StringField("jobID", job.ID), logger.StringField("signature", job.Step.Signature.Value)) l.Info("Successfully verified job") - r.logStreamer.Process(r.prependTimestampForLogs("~~~ ✅ Job verified\n")) + r.logStreamer.Process(r.prependTimestampForLogs("~~~ ✅ Job signature verified\n")) r.logStreamer.Process(r.prependTimestampForLogs("signature: %s\n", job.Step.Signature.Value)) } } @@ -193,7 +193,7 @@ func (r *JobRunner) verificationFailureLogs(behavior string, err error) { } l.Warn("Job verification failed") - r.logStreamer.Process([]byte(r.prependTimestampForLogs("%s Job verification failed\n", prefix))) + r.logStreamer.Process([]byte(r.prependTimestampForLogs("%s Job signature verification failed\n", prefix))) r.logStreamer.Process([]byte(r.prependTimestampForLogs("error: %s\n", err))) if errors.Is(err, ErrNoSignature) { From 0e7e61c9e7c35ba18de01652b42ad9b99c430d02 Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Mon, 30 Oct 2023 18:05:36 +1100 Subject: [PATCH 06/13] Expand warnings too Co-authored-by: Ben Moskovitz --- agent/run_job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/run_job.go b/agent/run_job.go index c8876d2bfe..4374f42405 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -187,7 +187,7 @@ func (r *JobRunner) prependTimestampForLogs(s string, args ...any) []byte { func (r *JobRunner) verificationFailureLogs(behavior string, err error) { l := r.logger.WithFields(logger.StringField("jobID", r.conf.Job.ID), logger.StringField("error", err.Error())) - prefix := "~~~ ⚠️" + prefix := "+++ ⚠️" if behavior == VerificationBehaviourBlock { prefix = "+++ ⛔" } From 52d882508df20ff5e1ae2116c921a0f082a813e6 Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Wed, 1 Nov 2023 17:55:02 +1100 Subject: [PATCH 07/13] Use an io.Writer to write to the combined stream instead of the logStreamer directly --- agent/job_runner.go | 83 ++++++++++++++++--------------- agent/log_streamer.go | 4 +- agent/run_job.go | 111 +++++++++++++++++------------------------- agent/verify_job.go | 28 +++++------ 4 files changed, 103 insertions(+), 123 deletions(-) diff --git a/agent/job_runner.go b/agent/job_runner.go index f406c3a952..7c1caa829e 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -118,8 +118,8 @@ type JobRunner struct { // How the JobRunner should respond when job verification fails (one of `block` or `warn`) VerificationFailureBehavior string - // The logger to use - logger logger.Logger + // agentLogger is a agentLogger that outputs to the agent logs + agentLogger logger.Logger // The APIClient that will be used when updating the job apiClient APIClient @@ -133,9 +133,12 @@ type JobRunner struct { // The internal header time streamer headerTimesStreamer *headerTimesStreamer - // The internal log streamer + // The internal log streamer. Don't write to this directly, use `jobLogs` instead logStreamer *LogStreamer + // jobLogs is a logger that outputs to the job logs + jobLogs io.Writer + // If the job is being cancelled cancelled bool @@ -166,9 +169,9 @@ var _ jobRunner = (*JobRunner)(nil) // Initializes the job runner func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, conf JobRunnerConfig) (jobRunner, error) { r := &JobRunner{ - logger: l, - conf: conf, - apiClient: apiClient, + agentLogger: l, + conf: conf, + apiClient: apiClient, } var err error @@ -186,15 +189,15 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con if r.conf.Job.Token != "" { clientConf := r.apiClient.Config() clientConf.Token = r.conf.Job.Token - r.apiClient = api.NewClient(r.logger, clientConf) + r.apiClient = api.NewClient(r.agentLogger, clientConf) } // Create our header times struct - r.headerTimesStreamer = newHeaderTimesStreamer(r.logger, r.onUploadHeaderTime) + r.headerTimesStreamer = newHeaderTimesStreamer(r.agentLogger, r.onUploadHeaderTime) // The log streamer that will take the output chunks, and send them to // the Buildkite Agent API - r.logStreamer = NewLogStreamer(r.logger, r.onUploadChunk, LogStreamerConfig{ + r.logStreamer = NewLogStreamer(r.agentLogger, r.onUploadChunk, LogStreamerConfig{ Concurrency: 3, MaxChunkSizeBytes: r.conf.Job.ChunksMaxSizeBytes, MaxSizeBytes: r.conf.Job.LogMaxSizeBytes, @@ -213,7 +216,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con if file, err := os.CreateTemp(tempDir, fmt.Sprintf("job-env-%s", r.conf.Job.ID)); err != nil { return r, err } else { - r.logger.Debug("[JobRunner] Created env file: %s", file.Name()) + r.agentLogger.Debug("[JobRunner] Created env file: %s", file.Name()) r.envFile = file } @@ -237,7 +240,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con jobLogDir := "" if conf.AgentConfiguration.JobLogPath != "" { jobLogDir = conf.AgentConfiguration.JobLogPath - r.logger.Debug("[JobRunner] Job Log Path: %s", jobLogDir) + r.agentLogger.Debug("[JobRunner] Job Log Path: %s", jobLogDir) } tmpFile, err = os.CreateTemp(jobLogDir, "buildkite_job_log") if err != nil { @@ -270,7 +273,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con go func() { // Use a scanner to process output line by line - err := process.NewScanner(r.logger).ScanLines(pr, func(line string) { + err := process.NewScanner(r.agentLogger).ScanLines(pr, func(line string) { // Send to our header streamer and determine if it's a header // or header expansion. isHeaderOrExpansion := r.headerTimesStreamer.Scan(line) @@ -284,7 +287,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con _, _ = outputWriter.Write([]byte(line + "\n")) }) if err != nil { - r.logger.Error("[JobRunner] Encountered error %v", err) + r.agentLogger.Error("[JobRunner] Encountered error %v", err) } }() @@ -297,11 +300,11 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con // Use a scanner to process output for headers only go func() { - err := process.NewScanner(r.logger).ScanLines(pr, func(line string) { + err := process.NewScanner(r.agentLogger).ScanLines(pr, func(line string) { r.headerTimesStreamer.Scan(line) }) if err != nil { - r.logger.Error("[JobRunner] Encountered error %v", err) + r.agentLogger.Error("[JobRunner] Encountered error %v", err) } }() } @@ -322,7 +325,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con } // The writer that output from the process goes into - processWriter := io.MultiWriter(allWriters...) + r.jobLogs = io.MultiWriter(allWriters...) // Copy the current processes ENV and merge in the new ones. We do this // so the sub process gets PATH and stuff. We merge our path in over @@ -336,10 +339,10 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con if err != nil { return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err) } - r.process = kubernetes.New(r.logger, kubernetes.Config{ + r.process = kubernetes.New(r.agentLogger, kubernetes.Config{ AccessToken: r.apiClient.Config().Token, - Stdout: processWriter, - Stderr: processWriter, + Stdout: r.jobLogs, + Stderr: r.jobLogs, ClientCount: containerCount, }) } else { @@ -349,14 +352,14 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con return nil, fmt.Errorf("splitting bootstrap-script (%q) into tokens: %w", conf.AgentConfiguration.BootstrapScript, err) } - r.process = process.New(r.logger, process.Config{ + r.process = process.New(r.agentLogger, process.Config{ Path: cmd[0], Args: cmd[1:], Dir: conf.AgentConfiguration.BuildPath, Env: processEnv, PTY: conf.AgentConfiguration.RunInPty, - Stdout: processWriter, - Stderr: processWriter, + Stdout: r.jobLogs, + Stderr: r.jobLogs, InterruptSignal: conf.CancelSignal, SignalGracePeriod: conf.AgentConfiguration.SignalGracePeriod, }) @@ -366,11 +369,11 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con go func() { <-r.process.Done() if err := pw.Close(); err != nil { - r.logger.Error("%v", err) + r.agentLogger.Error("%v", err) } if tmpFile != nil { if err := os.Remove(tmpFile.Name()); err != nil { - r.logger.Error("%v", err) + r.agentLogger.Error("%v", err) } } }() @@ -520,8 +523,8 @@ func (r *JobRunner) createEnvironment(ctx context.Context) ([]string, error) { } // see documentation for BuildkiteMessageMax - if err := truncateEnv(r.logger, env, BuildkiteMessageName, BuildkiteMessageMax); err != nil { - r.logger.Warn("failed to truncate %s: %v", BuildkiteMessageName, err) + if err := truncateEnv(r.agentLogger, env, BuildkiteMessageName, BuildkiteMessageMax); err != nil { + r.agentLogger.Warn("failed to truncate %s: %v", BuildkiteMessageName, err) // attempt to continue anyway } @@ -597,7 +600,7 @@ func (r *JobRunner) checkPlugins(ctx context.Context) error { } func (r *JobRunner) executePreBootstrapHook(ctx context.Context, hook string) (bool, error) { - r.logger.Info("Running pre-bootstrap hook %q", hook) + r.agentLogger.Info("Running pre-bootstrap hook %q", hook) sh, err := shell.New() if err != nil { @@ -609,16 +612,16 @@ func (r *JobRunner) executePreBootstrapHook(ctx context.Context, hook string) (b sh.Env.Set("BUILDKITE_ENV_FILE", r.envFile.Name()) sh.Writer = LogWriter{ - l: r.logger, + l: r.agentLogger, } if err := sh.RunWithoutPrompt(ctx, hook); err != nil { fmt.Printf("err: %s\n", err) - r.logger.Error("Finished pre-bootstrap hook %q: job rejected", hook) + r.agentLogger.Error("Finished pre-bootstrap hook %q: job rejected", hook) return false, err } - r.logger.Info("Finished pre-bootstrap hook %q: job accepted", hook) + r.agentLogger.Info("Finished pre-bootstrap hook %q: job accepted", hook) return true, nil } @@ -637,11 +640,11 @@ func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error { if err != nil { if response != nil && api.IsRetryableStatus(response) { - r.logger.Warn("%s (%s)", err, rtr) + r.agentLogger.Warn("%s (%s)", err, rtr) } else if api.IsRetryableError(err) { - r.logger.Warn("%s (%s)", err, rtr) + r.agentLogger.Warn("%s (%s)", err, rtr) } else { - r.logger.Warn("Buildkite rejected the call to start the job (%s)", err) + r.agentLogger.Warn("Buildkite rejected the call to start the job (%s)", err) rtr.Break() } } @@ -662,7 +665,7 @@ func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGro // Mark this routine as done in the wait group wg.Done() - r.logger.Debug("[JobRunner] Routine that refreshes the job has finished") + r.agentLogger.Debug("[JobRunner] Routine that refreshes the job has finished") }() select { @@ -679,10 +682,10 @@ func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGro if err != nil { // We don't really care if it fails, we'll just // try again soon anyway - r.logger.Warn("Problem with getting job state %s (%s)", r.conf.Job.ID, err) + r.agentLogger.Warn("Problem with getting job state %s (%s)", r.conf.Job.ID, err) } else if jobState.State == "canceling" || jobState.State == "canceled" { if err := r.Cancel(); err != nil { - r.logger.Error("Unexpected error canceling process as requested by server (job: %s) (err: %s)", r.conf.Job.ID, err) + r.agentLogger.Error("Unexpected error canceling process as requested by server (job: %s) (err: %s)", r.conf.Job.ID, err) } } @@ -707,10 +710,10 @@ func (r *JobRunner) onUploadHeaderTime(ctx context.Context, cursor, total int, t response, err := r.apiClient.SaveHeaderTimes(ctx, r.conf.Job.ID, &api.HeaderTimes{Times: times}) if err != nil { if response != nil && (response.StatusCode >= 400 && response.StatusCode <= 499) { - r.logger.Warn("Buildkite rejected the header times (%s)", err) + r.agentLogger.Warn("Buildkite rejected the header times (%s)", err) retrier.Break() } else { - r.logger.Warn("%s (%s)", err, retrier) + r.agentLogger.Warn("%s (%s)", err, retrier) } } @@ -745,10 +748,10 @@ func (r *JobRunner) onUploadChunk(ctx context.Context, chunk *LogStreamerChunk) }) if err != nil { if response != nil && (response.StatusCode >= 400 && response.StatusCode <= 499) { - r.logger.Warn("Buildkite rejected the chunk upload (%s)", err) + r.agentLogger.Warn("Buildkite rejected the chunk upload (%s)", err) retrier.Break() } else { - r.logger.Warn("%s (%s)", err, retrier) + r.agentLogger.Warn("%s (%s)", err, retrier) } } diff --git a/agent/log_streamer.go b/agent/log_streamer.go index bcc03592c6..57a2929293 100644 --- a/agent/log_streamer.go +++ b/agent/log_streamer.go @@ -102,7 +102,7 @@ func (ls *LogStreamer) FailedChunks() int { } // Process streams the output. -func (ls *LogStreamer) Process(output []byte) error { +func (ls *LogStreamer) Process(output []byte) { // Only allow one streamer process at a time ls.processMutex.Lock() defer ls.processMutex.Unlock() @@ -150,8 +150,6 @@ func (ls *LogStreamer) Process(output []byte) error { // Save the new amount of bytes ls.bytes += size } - - return nil } // Waits for all the chunks to be uploaded, then shuts down all the workers diff --git a/agent/run_job.go b/agent/run_job.go index 4374f42405..42459356ce 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -36,7 +36,7 @@ func (e *missingKeyError) Error() string { // Runs the job func (r *JobRunner) Run(ctx context.Context) error { - r.logger.Info("Starting job %s", r.conf.Job.ID) + r.agentLogger.Info("Starting job %s", r.conf.Job.ID) ctx, done := status.AddItem(ctx, "Job Runner", "", nil) defer done() @@ -55,7 +55,7 @@ func (r *JobRunner) Run(ctx context.Context) error { if r.conf.Job.RunnableAt != "" { runnableAt, err := time.Parse(time.RFC3339Nano, r.conf.Job.RunnableAt) if err != nil { - r.logger.Error("Metric submission failed to parse %s", r.conf.Job.RunnableAt) + r.agentLogger.Error("Metric submission failed to parse %s", r.conf.Job.RunnableAt) } else { r.conf.MetricsScope.Timing("queue.duration", r.startedAt.Sub(runnableAt)) } @@ -112,18 +112,18 @@ func (r *JobRunner) Run(ctx context.Context) error { return nil default: // no error, all good, keep going - l := r.logger.WithFields(logger.StringField("jobID", job.ID), logger.StringField("signature", job.Step.Signature.Value)) + l := r.agentLogger.WithFields(logger.StringField("jobID", job.ID), logger.StringField("signature", job.Step.Signature.Value)) l.Info("Successfully verified job") - r.logStreamer.Process(r.prependTimestampForLogs("~~~ ✅ Job signature verified\n")) - r.logStreamer.Process(r.prependTimestampForLogs("signature: %s\n", job.Step.Signature.Value)) + fmt.Fprintln(r.jobLogs, "~~~ ✅ Job signature verified") + fmt.Fprintf(r.jobLogs, "signature: %s\n", job.Step.Signature.Value) } } // Validate the repository if the list of allowed repositories is set. err := validateJobValue(r.conf.AgentConfiguration.AllowedRepositories, job.Env["BUILDKITE_REPO"]) if err != nil { - r.logStreamer.Process([]byte(fmt.Sprintf("%s", err))) - r.logger.Error("Repo %s", err) + fmt.Fprintln(r.jobLogs, err.Error()) + r.agentLogger.Error("Failed to validate repo: %s", err) exit.Status = -1 exit.SignalReason = SignalReasonAgentRefused return nil @@ -131,8 +131,8 @@ func (r *JobRunner) Run(ctx context.Context) error { // Validate the plugins if the list of allowed plugins is set. err = r.checkPlugins(ctx) if err != nil { - r.logStreamer.Process([]byte(fmt.Sprintf("%s", err))) - r.logger.Error("Plugin %s", err) + fmt.Fprintln(r.jobLogs, err.Error()) + r.agentLogger.Error("Failed to validate plugins: %s", err) exit.Status = -1 exit.SignalReason = SignalReasonAgentRefused return nil @@ -145,9 +145,9 @@ func (r *JobRunner) Run(ctx context.Context) error { ok, err := r.executePreBootstrapHook(ctx, hook) if !ok { // Ensure the Job UI knows why this job resulted in failure - r.logStreamer.Process([]byte("pre-bootstrap hook rejected this job, see the buildkite-agent logs for more details")) + fmt.Fprintln(r.jobLogs, "pre-bootstrap hook rejected this job, see the buildkite-agent logs for more details") // But disclose more information in the agent logs - r.logger.Error("pre-bootstrap hook rejected this job: %s", err) + r.agentLogger.Error("pre-bootstrap hook rejected this job: %s", err) exit.Status = -1 exit.SignalReason = SignalReasonAgentRefused @@ -166,47 +166,29 @@ func (r *JobRunner) Run(ctx context.Context) error { return nil } -func (r *JobRunner) prependTimestampForLogs(s string, args ...any) []byte { - switch { - case r.conf.AgentConfiguration.ANSITimestamps: - return []byte(fmt.Sprintf( - "\x1b_bk;t=%d\x07%s", - time.Now().UnixNano()/int64(time.Millisecond), - fmt.Sprintf(s, args...), - )) - case r.conf.AgentConfiguration.TimestampLines: - return []byte(fmt.Sprintf( - "[%s] %s", - time.Now().UTC().Format(time.RFC3339), - fmt.Sprintf(s, args...), - )) - default: - return []byte(fmt.Sprintf(s, args...)) - } -} - func (r *JobRunner) verificationFailureLogs(behavior string, err error) { - l := r.logger.WithFields(logger.StringField("jobID", r.conf.Job.ID), logger.StringField("error", err.Error())) + l := r.agentLogger.WithFields(logger.StringField("jobID", r.conf.Job.ID), logger.StringField("error", err.Error())) prefix := "+++ ⚠️" if behavior == VerificationBehaviourBlock { prefix = "+++ ⛔" } l.Warn("Job verification failed") - r.logStreamer.Process([]byte(r.prependTimestampForLogs("%s Job signature verification failed\n", prefix))) - r.logStreamer.Process([]byte(r.prependTimestampForLogs("error: %s\n", err))) + fmt.Fprintf(r.jobLogs, "%s Job signature verification failed\n", prefix) + fmt.Fprintf(r.jobLogs, "error: %s\n", err) if errors.Is(err, ErrNoSignature) { - r.logStreamer.Process([]byte(r.prependTimestampForLogs("no signature in job\n"))) + fmt.Fprintln(r.jobLogs, "no signature in job") } else if ise := new(invalidSignatureError); errors.As(err, &ise) { - r.logStreamer.Process([]byte(r.prependTimestampForLogs("signature: %s\n", r.conf.Job.Step.Signature.Value))) + fmt.Fprintf(r.jobLogs, "signature: %s\n", r.conf.Job.Step.Signature.Value) } else if mke := new(missingKeyError); errors.As(err, &mke) { - r.logStreamer.Process([]byte(r.prependTimestampForLogs("signature: %s\n", mke.signature))) + fmt.Fprintf(r.jobLogs, "signature: %s\n", mke.signature) } if behavior == VerificationBehaviourWarn { - r.logger.Warn("Job will be run whether or not it can be verified - this is not recommended. You can change this behavior with the `job-verification-failure-behavior` agent configuration option.") - r.logStreamer.Process(r.prependTimestampForLogs("Job will be run without verification\n")) + l.Warn("Job will be run whether or not it can be verified - this is not recommended.") + l.Warn("You can change this behavior with the `job-verification-failure-behavior` agent configuration option.") + fmt.Fprintln(r.jobLogs, "Job will be run without verification") } } @@ -220,8 +202,8 @@ func (r *JobRunner) runJob(ctx context.Context) processExit { exit := processExit{} // Run the process. This will block until it finishes. if err := r.process.Run(ctx); err != nil { - // Send the error as output - r.logStreamer.Process([]byte(err.Error())) + // Send the error as to job logs + fmt.Fprintf(r.jobLogs, "Error running job: %s\n", err) // The process did not run at all, so make sure it fails return processExit{ @@ -234,13 +216,14 @@ func (r *JobRunner) runJob(ctx context.Context) processExit { // to the user as they may be the caused by errors in the pipeline definition. k8sProcess, ok := r.process.(*kubernetes.Runner) if ok && r.cancelled && !r.stopped && k8sProcess.ClientStateUnknown() { - r.logStreamer.Process([]byte( + fmt.Fprintln( + r.jobLogs, "Some containers had unknown exit statuses. Perhaps they were in ImagePullBackOff.", - )) + ) } // Add the final output to the streamer - r.logStreamer.Process(r.output.ReadAndTruncate()) + r.jobLogs.Write(r.output.ReadAndTruncate()) // Collect the finished process' exit status exit.Status = r.process.WaitStatus().ExitStatus() @@ -274,19 +257,19 @@ func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit proces // Warn about failed chunks if count := r.logStreamer.FailedChunks(); count > 0 { - r.logger.Warn("%d chunks failed to upload for this job", count) + r.agentLogger.Warn("%d chunks failed to upload for this job", count) } // Wait for the routines that we spun up to finish - r.logger.Debug("[JobRunner] Waiting for all other routines to finish") + r.agentLogger.Debug("[JobRunner] Waiting for all other routines to finish") wg.Wait() // Remove the env file, if any if r.envFile != nil { if err := os.Remove(r.envFile.Name()); err != nil { - r.logger.Warn("[JobRunner] Error cleaning up env file: %s", err) + r.agentLogger.Warn("[JobRunner] Error cleaning up env file: %s", err) } - r.logger.Debug("[JobRunner] Deleted env file: %s", r.envFile.Name()) + r.agentLogger.Debug("[JobRunner] Deleted env file: %s", r.envFile.Name()) } // Write some metrics about the job run @@ -304,7 +287,7 @@ func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit proces // Once we tell the API we're finished it might assign us new work, so make sure everything else is done first. r.finishJob(ctx, finishedAt, exit, r.logStreamer.FailedChunks()) - r.logger.Info("Finished job %s", r.conf.Job.ID) + r.agentLogger.Info("Finished job %s", r.conf.Job.ID) } // finishJob finishes the job in the Buildkite Agent API. If the FinishJob call @@ -316,7 +299,7 @@ func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit pr r.conf.Job.SignalReason = exit.SignalReason r.conf.Job.ChunksFailedCount = failedChunkCount - r.logger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s", + r.agentLogger.Debug("[JobRunner] Finishing job with exit_status=%s, signal=%s and signal_reason=%s", r.conf.Job.ExitStatus, r.conf.Job.Signal, r.conf.Job.SignalReason) ctx, cancel := context.WithTimeout(ctx, 48*time.Hour) @@ -339,10 +322,10 @@ func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit pr // to finish the job forever so we'll just bail out and // go find some more work to do. if response != nil && response.StatusCode == 422 { - r.logger.Warn("Buildkite rejected the call to finish the job (%s)", err) + r.agentLogger.Warn("Buildkite rejected the call to finish the job (%s)", err) retrier.Break() } else { - r.logger.Warn("%s (%s)", err, retrier) + r.agentLogger.Warn("%s (%s)", err, retrier) } } @@ -359,7 +342,7 @@ func (r *JobRunner) jobLogStreamer(ctx context.Context, wg *sync.WaitGroup) { defer func() { wg.Done() - r.logger.Debug("[JobRunner] Routine that processes the log has finished") + r.agentLogger.Debug("[JobRunner] Routine that processes the log has finished") }() select { @@ -371,17 +354,8 @@ func (r *JobRunner) jobLogStreamer(ctx context.Context, wg *sync.WaitGroup) { for { setStat("📨 Sending process output to log streamer") - // Send the output of the process to the log streamer - // for processing - chunk := r.output.ReadAndTruncate() - if err := r.logStreamer.Process(chunk); err != nil { - r.logger.Error("Could not stream the log output: %v", err) - // So far, the only error from logStreamer.Process is if the log has - // reached the limit. - // Since we're not writing any more, close the buffer, which will - // cause future Writes to return an error. - r.output.Close() - } + // Send the output of the process to the log streamer for processing + r.logStreamer.Process(r.output.ReadAndTruncate()) setStat("😴 Sleeping for a bit") @@ -413,7 +387,7 @@ func (r *JobRunner) Cancel() error { } if r.process == nil { - r.logger.Error("No process to kill") + r.agentLogger.Error("No process to kill") return nil } @@ -422,7 +396,12 @@ func (r *JobRunner) Cancel() error { reason = "(agent stopping)" } - r.logger.Info("Canceling job %s with a grace period of %ds %s", r.conf.Job.ID, r.conf.AgentConfiguration.CancelGracePeriod, reason) + r.agentLogger.Info( + "Canceling job %s with a grace period of %ds %s", + r.conf.Job.ID, + r.conf.AgentConfiguration.CancelGracePeriod, + reason, + ) r.cancelled = true @@ -434,7 +413,7 @@ func (r *JobRunner) Cancel() error { select { // Grace period for cancelling case <-time.After(time.Second * time.Duration(r.conf.AgentConfiguration.CancelGracePeriod)): - r.logger.Info("Job %s hasn't stopped in time, terminating", r.conf.Job.ID) + r.agentLogger.Info("Job %s hasn't stopped in time, terminating", r.conf.Job.ID) // Terminate the process as we've exceeded our context return r.process.Terminate() diff --git a/agent/verify_job.go b/agent/verify_job.go index 76a7c7ff0d..129c27454c 100644 --- a/agent/verify_job.go +++ b/agent/verify_job.go @@ -38,7 +38,7 @@ func (r *JobRunner) verifyJob(keySet jwk.Set) error { step := r.conf.Job.Step if step.Signature == nil { - r.logger.Debug("verifyJob: Job.Step.Signature == nil") + r.agentLogger.Debug("verifyJob: Job.Step.Signature == nil") return ErrNoSignature } @@ -53,14 +53,14 @@ func (r *JobRunner) verifyJob(keySet jwk.Set) error { // Verify the signature if err := step.Signature.Verify(r.conf.JWKS, r.conf.Job.Env, stepWithInvariants); err != nil { - r.logger.Debug("verifyJob: step.Signature.Verify(Job.Env, stepWithInvariants, JWKS) = %v", err) + r.agentLogger.Debug("verifyJob: step.Signature.Verify(Job.Env, stepWithInvariants, JWKS) = %v", err) return newInvalidSignatureError(ErrVerificationFailed) } // Interpolate the matrix permutation (validating the permutation in the // process). if err := step.InterpolateMatrixPermutation(r.conf.Job.MatrixPermutation); err != nil { - r.logger.Debug("verifyJob: step.InterpolateMatrixPermutation(% #v) = %v", r.conf.Job.MatrixPermutation, err) + r.agentLogger.Debug("verifyJob: step.InterpolateMatrixPermutation(% #v) = %v", r.conf.Job.MatrixPermutation, err) return newInvalidSignatureError(ErrInvalidJob) } @@ -112,7 +112,7 @@ func (r *JobRunner) verifyJob(keySet jwk.Set) error { case "command": // compare directly jobCommand := r.conf.Job.Env["BUILDKITE_COMMAND"] if step.Command != jobCommand { - r.logger.Debug("verifyJob: BUILDKITE_COMMAND = %q != %q = step.Command", jobCommand, step.Command) + r.agentLogger.Debug("verifyJob: BUILDKITE_COMMAND = %q != %q = step.Command", jobCommand, step.Command) return newInvalidSignatureError(ErrInvalidJob) } @@ -122,11 +122,11 @@ func (r *JobRunner) verifyJob(keySet jwk.Set) error { for name, stepEnvValue := range step.Env { jobEnvValue, has := r.conf.Job.Env[name] if !has { - r.logger.Debug("verifyJob: %q missing from Job.Env; step.Env[%q] = %q", name, name, stepEnvValue) + r.agentLogger.Debug("verifyJob: %q missing from Job.Env; step.Env[%q] = %q", name, name, stepEnvValue) return newInvalidSignatureError(ErrInvalidJob) } if jobEnvValue != stepEnvValue { - r.logger.Debug("verifyJob: Job.Env[%q] = %q != %q = step.Env[%q]", name, jobEnvValue, stepEnvValue, name) + r.agentLogger.Debug("verifyJob: Job.Env[%q] = %q != %q = step.Env[%q]", name, jobEnvValue, stepEnvValue, name) return newInvalidSignatureError(ErrInvalidJob) } } @@ -140,33 +140,33 @@ func (r *JobRunner) verifyJob(keySet jwk.Set) error { emptyJobPlugins := (jobPluginsJSON == "" || jobPluginsJSON == "null" || jobPluginsJSON == "[]") if emptyStepPlugins && emptyJobPlugins { - r.logger.Debug("verifyJob: both BUILDKITE_PLUGINS and step.Plugins are empty/null") + r.agentLogger.Debug("verifyJob: both BUILDKITE_PLUGINS and step.Plugins are empty/null") continue // both empty } if emptyStepPlugins != emptyJobPlugins { // one is empty but the other is not - r.logger.Debug("verifyJob: emptyJobPlugins = %t != %t = emptyStepPlugins", emptyJobPlugins, emptyStepPlugins) + r.agentLogger.Debug("verifyJob: emptyJobPlugins = %t != %t = emptyStepPlugins", emptyJobPlugins, emptyStepPlugins) return newInvalidSignatureError(ErrInvalidJob) } stepPluginsJSON, err := json.Marshal(step.Plugins) if err != nil { - r.logger.Debug("verifyJob: json.Marshal(step.Plugins) = %v", err) + r.agentLogger.Debug("verifyJob: json.Marshal(step.Plugins) = %v", err) return newInvalidSignatureError(ErrInvalidJob) } stepPluginsNorm, err := jcs.Transform(stepPluginsJSON) if err != nil { - r.logger.Debug("verifyJob: jcs.Transform(stepPluginsJSON) = %v", err) + r.agentLogger.Debug("verifyJob: jcs.Transform(stepPluginsJSON) = %v", err) return newInvalidSignatureError(ErrInvalidJob) } jobPluginsNorm, err := jcs.Transform([]byte(jobPluginsJSON)) if err != nil { - r.logger.Debug("verifyJob: jcs.Transform(jobPluginsJSON) = %v", err) + r.agentLogger.Debug("verifyJob: jcs.Transform(jobPluginsJSON) = %v", err) return newInvalidSignatureError(ErrInvalidJob) } if !bytes.Equal(jobPluginsNorm, stepPluginsNorm) { - r.logger.Debug("verifyJob: jobPluginsNorm = %q != %q = stepPluginsNorm", jobPluginsNorm, stepPluginsNorm) + r.agentLogger.Debug("verifyJob: jobPluginsNorm = %q != %q = stepPluginsNorm", jobPluginsNorm, stepPluginsNorm) return newInvalidSignatureError(ErrInvalidJob) } @@ -183,7 +183,7 @@ func (r *JobRunner) verifyJob(keySet jwk.Set) error { if name, isEnv := strings.CutPrefix(field, pipeline.EnvNamespacePrefix); isEnv { if _, has := r.conf.Job.Env[name]; !has { // A pipeline env var that is now missing. - r.logger.Debug("verifyJob: %q missing from Job.Env", name) + r.agentLogger.Debug("verifyJob: %q missing from Job.Env", name) return newInvalidSignatureError(ErrInvalidJob) } // The env var is present. Signature.Verify used the value from @@ -193,7 +193,7 @@ func (r *JobRunner) verifyJob(keySet jwk.Set) error { // We don't know this field, so we cannot ensure it is consistent // with the job. - r.logger.Debug("verifyJob: mystery signed field %q", field) + r.agentLogger.Debug("verifyJob: mystery signed field %q", field) return newInvalidSignatureError(ErrInvalidJob) } } From fef91272ab3a6e0f1559ed8dcb0dd4e65d3e236b Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Thu, 2 Nov 2023 00:02:57 +1100 Subject: [PATCH 08/13] Remove error return value from a method that does not use it --- agent/log_streamer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/agent/log_streamer.go b/agent/log_streamer.go index 57a2929293..069aa76382 100644 --- a/agent/log_streamer.go +++ b/agent/log_streamer.go @@ -71,7 +71,11 @@ type LogStreamerChunk struct { } // Creates a new instance of the log streamer -func NewLogStreamer(l logger.Logger, cb func(context.Context, *LogStreamerChunk) error, c LogStreamerConfig) *LogStreamer { +func NewLogStreamer( + l logger.Logger, + cb func(context.Context, *LogStreamerChunk) error, + c LogStreamerConfig, +) *LogStreamer { return &LogStreamer{ logger: l, conf: c, @@ -153,7 +157,7 @@ func (ls *LogStreamer) Process(output []byte) { } // Waits for all the chunks to be uploaded, then shuts down all the workers -func (ls *LogStreamer) Stop() error { +func (ls *LogStreamer) Stop() { ls.logger.Debug("[LogStreamer] Waiting for all the chunks to be uploaded") ls.chunkWaitGroup.Wait() @@ -163,8 +167,6 @@ func (ls *LogStreamer) Stop() error { for n := 0; n < ls.conf.Concurrency; n++ { ls.queue <- nil } - - return nil } // The actual log streamer worker From f1c84c5360cb9ea04e5b948987753e38b33da7a1 Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Thu, 2 Nov 2023 00:03:17 +1100 Subject: [PATCH 09/13] Flush job logs when the job never started --- agent/run_job.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/agent/run_job.go b/agent/run_job.go index 42459356ce..c647307526 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -158,7 +158,7 @@ func (r *JobRunner) Run(ctx context.Context) error { // Kick off log streaming and job status checking when the process starts. wg.Add(2) - go r.jobLogStreamer(cctx, &wg) + go r.streamJobLogsAfterProcessStart(cctx, &wg) go r.jobCancellationChecker(cctx, &wg) exit = r.runJob(cctx) @@ -167,7 +167,10 @@ func (r *JobRunner) Run(ctx context.Context) error { } func (r *JobRunner) verificationFailureLogs(behavior string, err error) { - l := r.agentLogger.WithFields(logger.StringField("jobID", r.conf.Job.ID), logger.StringField("error", err.Error())) + l := r.agentLogger.WithFields( + logger.StringField("jobID", r.conf.Job.ID), + logger.StringField("error", err.Error()), + ) prefix := "+++ ⚠️" if behavior == VerificationBehaviourBlock { prefix = "+++ ⛔" @@ -249,6 +252,10 @@ func (r *JobRunner) runJob(ctx context.Context) processExit { func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit processExit) { finishedAt := time.Now() + // Flush the job logs. These should have been flushed already if the process started, but if it + // never started, then logs from prior to the attempt to start the process will still be buffered. + r.logStreamer.Process(r.output.ReadAndTruncate()) + // Stop the header time streamer. This will block until all the chunks have been uploaded r.headerTimesStreamer.Stop() @@ -333,9 +340,9 @@ func (r *JobRunner) finishJob(ctx context.Context, finishedAt time.Time, exit pr }) } -// jobLogStreamer waits for the process to start, then grabs the job output +// streamJobLogsAfterProcessStart waits for the process to start, then grabs the job output // every few seconds and sends it back to Buildkite. -func (r *JobRunner) jobLogStreamer(ctx context.Context, wg *sync.WaitGroup) { +func (r *JobRunner) streamJobLogsAfterProcessStart(ctx context.Context, wg *sync.WaitGroup) { ctx, setStat, done := status.AddSimpleItem(ctx, "Job Log Streamer") defer done() setStat("🏃 Starting...") @@ -371,6 +378,7 @@ func (r *JobRunner) jobLogStreamer(ctx context.Context, wg *sync.WaitGroup) { // The final output after the process has finished is processed in Run(). } + func (r *JobRunner) CancelAndStop() error { r.cancelLock.Lock() r.stopped = true From aed166908cb66da06e7220c507d1c0f31e08852c Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Thu, 2 Nov 2023 00:03:37 +1100 Subject: [PATCH 10/13] Update tests for new error messages --- .../job_verification_integration_test.go | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/agent/integration/job_verification_integration_test.go b/agent/integration/job_verification_integration_test.go index b90053f0a3..2537335a76 100644 --- a/agent/integration/job_verification_integration_test.go +++ b/agent/integration/job_verification_integration_test.go @@ -264,7 +264,7 @@ func TestJobVerification(t *testing.T) { mockBootstrapExpectation: func(bt *bintest.Mock) { bt.Expect().NotCalled() }, expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, - expectLogsContain: []string{"⚠️ ERROR"}, + expectLogsContain: []string{"+++ ⛔"}, }, { name: "when job signature is invalid, and JobVerificationFailureBehaviour is warn, it warns and runs the job", @@ -275,7 +275,7 @@ func TestJobVerification(t *testing.T) { verificationJWKS: jwksFromKeys(t, symmetricJWKFor(t, signingKeyAlpacas)), mockBootstrapExpectation: func(bt *bintest.Mock) { bt.Expect().Once().AndExitWith(0) }, expectedExitStatus: "0", - expectLogsContain: []string{"⚠️ WARNING"}, + expectLogsContain: []string{"+++ ⚠️"}, }, { name: "when job signature is valid, it runs the job", @@ -320,7 +320,7 @@ func TestJobVerification(t *testing.T) { mockBootstrapExpectation: func(bt *bintest.Mock) { bt.Expect().NotCalled() }, expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, - expectLogsContain: []string{"⚠️ ERROR"}, + expectLogsContain: []string{"+++ ⛔"}, }, { name: "when job signature is missing, and JobVerificationFailureBehaviour is warn, it warns and runs the job", @@ -331,7 +331,7 @@ func TestJobVerification(t *testing.T) { verificationJWKS: jwksFromKeys(t, symmetricJWKFor(t, signingKeyLlamas)), mockBootstrapExpectation: func(bt *bintest.Mock) { bt.Expect().Once().AndExitWith(0) }, expectedExitStatus: "0", - expectLogsContain: []string{"⚠️ WARNING"}, + expectLogsContain: []string{"+++ ⚠️"}, }, { name: "when the step signature matches, but the job doesn't match the step, it fails signature verification", @@ -344,7 +344,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "job does not match signed step", }, }, @@ -359,7 +359,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "job does not match signed step", }, }, @@ -374,7 +374,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "job does not match signed step", }, }, @@ -389,8 +389,8 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", - "but no verification key was provided, so the job can't be verified", + "+++ ⛔", + "but no verification key was provided", }, }, { @@ -404,7 +404,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "signature verification failed", }, }, @@ -419,7 +419,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "job does not match signed step", }, }, @@ -434,7 +434,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "signature verification failed", }, }, @@ -460,7 +460,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "job does not match signed step", }, }, @@ -475,7 +475,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "job does not match signed step", }, }, @@ -509,7 +509,7 @@ func TestJobVerification(t *testing.T) { expectedExitStatus: "-1", expectedSignalReason: agent.SignalReasonSignatureRejected, expectLogsContain: []string{ - "⚠️ ERROR", + "+++ ⛔", "signature verification failed", }, }, From abbf57c382622607377d7003a3a4b78c1a93bc7d Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Thu, 2 Nov 2023 08:37:00 +1100 Subject: [PATCH 11/13] Fix typos in comments --- agent/job_runner.go | 2 +- agent/run_job.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/job_runner.go b/agent/job_runner.go index 7c1caa829e..114e68932d 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -136,7 +136,7 @@ type JobRunner struct { // The internal log streamer. Don't write to this directly, use `jobLogs` instead logStreamer *LogStreamer - // jobLogs is a logger that outputs to the job logs + // jobLogs is an io.Writer that sends data to the job logs jobLogs io.Writer // If the job is being cancelled diff --git a/agent/run_job.go b/agent/run_job.go index c647307526..a4de692dbe 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -205,7 +205,7 @@ func (r *JobRunner) runJob(ctx context.Context) processExit { exit := processExit{} // Run the process. This will block until it finishes. if err := r.process.Run(ctx); err != nil { - // Send the error as to job logs + // Send the error to job logs fmt.Fprintf(r.jobLogs, "Error running job: %s\n", err) // The process did not run at all, so make sure it fails From 2dfc6f8c823a6889286dd7743111b4174e1321e4 Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Thu, 2 Nov 2023 08:56:33 +1100 Subject: [PATCH 12/13] Consolidate jobLog flushes into the cleanup method --- agent/run_job.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/agent/run_job.go b/agent/run_job.go index a4de692dbe..b6f4a47820 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -225,9 +225,6 @@ func (r *JobRunner) runJob(ctx context.Context) processExit { ) } - // Add the final output to the streamer - r.jobLogs.Write(r.output.ReadAndTruncate()) - // Collect the finished process' exit status exit.Status = r.process.WaitStatus().ExitStatus() @@ -252,8 +249,9 @@ func (r *JobRunner) runJob(ctx context.Context) processExit { func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit processExit) { finishedAt := time.Now() - // Flush the job logs. These should have been flushed already if the process started, but if it - // never started, then logs from prior to the attempt to start the process will still be buffered. + // Flush the job logs. If the process is never started, then logs from prior to the attempt to + // start the process will still be buffered. Also, there may still be logs in the buffer that + // were left behind because the uploader goroutine exited before it could flush them. r.logStreamer.Process(r.output.ReadAndTruncate()) // Stop the header time streamer. This will block until all the chunks have been uploaded From 4619a3741fe40afe7bf98b259e82cdba341c0165 Mon Sep 17 00:00:00 2001 From: Narthana Epa Date: Thu, 2 Nov 2023 09:04:36 +1100 Subject: [PATCH 13/13] Stop the log stream before the header time streamer --- agent/run_job.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agent/run_job.go b/agent/run_job.go index b6f4a47820..7d6e3d120c 100644 --- a/agent/run_job.go +++ b/agent/run_job.go @@ -254,12 +254,12 @@ func (r *JobRunner) cleanup(ctx context.Context, wg *sync.WaitGroup, exit proces // were left behind because the uploader goroutine exited before it could flush them. r.logStreamer.Process(r.output.ReadAndTruncate()) - // Stop the header time streamer. This will block until all the chunks have been uploaded - r.headerTimesStreamer.Stop() - // Stop the log streamer. This will block until all the chunks have been uploaded r.logStreamer.Stop() + // Stop the header time streamer. This will block until all the chunks have been uploaded + r.headerTimesStreamer.Stop() + // Warn about failed chunks if count := r.logStreamer.FailedChunks(); count > 0 { r.agentLogger.Warn("%d chunks failed to upload for this job", count)