Skip to content

Commit

Permalink
Merge pull request #2461 from buildkite/pdp-1572-improve-build-log-me…
Browse files Browse the repository at this point in the history
…ssaging-for-verification-successfail
  • Loading branch information
triarius committed Nov 2, 2023
2 parents a3c3ca8 + 4619a37 commit 5a43e6f
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 140 deletions.
30 changes: 15 additions & 15 deletions agent/integration/job_verification_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestJobVerification(t *testing.T) {
mockBootstrapExpectation: func(bt *bintest.Mock) { bt.Expect().NotCalled() },
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{"⚠️ ERROR"},
expectLogsContain: []string{"+++ ⛔"},
},
{
name: "when job signature is invalid, and JobVerificationFailureBehaviour is warn, it warns and runs the job",
Expand All @@ -275,7 +275,7 @@ func TestJobVerification(t *testing.T) {
verificationJWKS: jwksFromKeys(t, symmetricJWKFor(t, signingKeyAlpacas)),
mockBootstrapExpectation: func(bt *bintest.Mock) { bt.Expect().Once().AndExitWith(0) },
expectedExitStatus: "0",
expectLogsContain: []string{"⚠️ WARNING"},
expectLogsContain: []string{"+++ ⚠️"},
},
{
name: "when job signature is valid, it runs the job",
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestJobVerification(t *testing.T) {
mockBootstrapExpectation: func(bt *bintest.Mock) { bt.Expect().NotCalled() },
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{"⚠️ ERROR"},
expectLogsContain: []string{"+++ ⛔"},
},
{
name: "when job signature is missing, and JobVerificationFailureBehaviour is warn, it warns and runs the job",
Expand All @@ -331,7 +331,7 @@ func TestJobVerification(t *testing.T) {
verificationJWKS: jwksFromKeys(t, symmetricJWKFor(t, signingKeyLlamas)),
mockBootstrapExpectation: func(bt *bintest.Mock) { bt.Expect().Once().AndExitWith(0) },
expectedExitStatus: "0",
expectLogsContain: []string{"⚠️ WARNING"},
expectLogsContain: []string{"+++ ⚠️"},
},
{
name: "when the step signature matches, but the job doesn't match the step, it fails signature verification",
Expand All @@ -344,7 +344,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"job does not match signed step",
},
},
Expand All @@ -359,7 +359,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"job does not match signed step",
},
},
Expand All @@ -374,7 +374,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"job does not match signed step",
},
},
Expand All @@ -389,8 +389,8 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"but no verification key was provided, so the job can't be verified",
"+++ ⛔",
"but no verification key was provided",
},
},
{
Expand All @@ -404,7 +404,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"signature verification failed",
},
},
Expand All @@ -419,7 +419,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"job does not match signed step",
},
},
Expand All @@ -434,7 +434,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"signature verification failed",
},
},
Expand All @@ -460,7 +460,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"job does not match signed step",
},
},
Expand All @@ -475,7 +475,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"job does not match signed step",
},
},
Expand Down Expand Up @@ -509,7 +509,7 @@ func TestJobVerification(t *testing.T) {
expectedExitStatus: "-1",
expectedSignalReason: agent.SignalReasonSignatureRejected,
expectLogsContain: []string{
"⚠️ ERROR",
"+++ ⛔",
"signature verification failed",
},
},
Expand Down
83 changes: 43 additions & 40 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ type JobRunner struct {
// How the JobRunner should respond when job verification fails (one of `block` or `warn`)
VerificationFailureBehavior string

// The logger to use
logger logger.Logger
// agentLogger is a agentLogger that outputs to the agent logs
agentLogger logger.Logger

// The APIClient that will be used when updating the job
apiClient APIClient
Expand All @@ -133,9 +133,12 @@ type JobRunner struct {
// The internal header time streamer
headerTimesStreamer *headerTimesStreamer

// The internal log streamer
// The internal log streamer. Don't write to this directly, use `jobLogs` instead
logStreamer *LogStreamer

// jobLogs is an io.Writer that sends data to the job logs
jobLogs io.Writer

// If the job is being cancelled
cancelled bool

Expand Down Expand Up @@ -166,9 +169,9 @@ var _ jobRunner = (*JobRunner)(nil)
// Initializes the job runner
func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, conf JobRunnerConfig) (jobRunner, error) {
r := &JobRunner{
logger: l,
conf: conf,
apiClient: apiClient,
agentLogger: l,
conf: conf,
apiClient: apiClient,
}

var err error
Expand All @@ -186,15 +189,15 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
if r.conf.Job.Token != "" {
clientConf := r.apiClient.Config()
clientConf.Token = r.conf.Job.Token
r.apiClient = api.NewClient(r.logger, clientConf)
r.apiClient = api.NewClient(r.agentLogger, clientConf)
}

// Create our header times struct
r.headerTimesStreamer = newHeaderTimesStreamer(r.logger, r.onUploadHeaderTime)
r.headerTimesStreamer = newHeaderTimesStreamer(r.agentLogger, r.onUploadHeaderTime)

// The log streamer that will take the output chunks, and send them to
// the Buildkite Agent API
r.logStreamer = NewLogStreamer(r.logger, r.onUploadChunk, LogStreamerConfig{
r.logStreamer = NewLogStreamer(r.agentLogger, r.onUploadChunk, LogStreamerConfig{
Concurrency: 3,
MaxChunkSizeBytes: r.conf.Job.ChunksMaxSizeBytes,
MaxSizeBytes: r.conf.Job.LogMaxSizeBytes,
Expand All @@ -213,7 +216,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
if file, err := os.CreateTemp(tempDir, fmt.Sprintf("job-env-%s", r.conf.Job.ID)); err != nil {
return r, err
} else {
r.logger.Debug("[JobRunner] Created env file: %s", file.Name())
r.agentLogger.Debug("[JobRunner] Created env file: %s", file.Name())
r.envFile = file
}

Expand All @@ -237,7 +240,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
jobLogDir := ""
if conf.AgentConfiguration.JobLogPath != "" {
jobLogDir = conf.AgentConfiguration.JobLogPath
r.logger.Debug("[JobRunner] Job Log Path: %s", jobLogDir)
r.agentLogger.Debug("[JobRunner] Job Log Path: %s", jobLogDir)
}
tmpFile, err = os.CreateTemp(jobLogDir, "buildkite_job_log")
if err != nil {
Expand Down Expand Up @@ -270,7 +273,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con

go func() {
// Use a scanner to process output line by line
err := process.NewScanner(r.logger).ScanLines(pr, func(line string) {
err := process.NewScanner(r.agentLogger).ScanLines(pr, func(line string) {
// Send to our header streamer and determine if it's a header
// or header expansion.
isHeaderOrExpansion := r.headerTimesStreamer.Scan(line)
Expand All @@ -284,7 +287,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
_, _ = outputWriter.Write([]byte(line + "\n"))
})
if err != nil {
r.logger.Error("[JobRunner] Encountered error %v", err)
r.agentLogger.Error("[JobRunner] Encountered error %v", err)
}
}()

Expand All @@ -297,11 +300,11 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con

// Use a scanner to process output for headers only
go func() {
err := process.NewScanner(r.logger).ScanLines(pr, func(line string) {
err := process.NewScanner(r.agentLogger).ScanLines(pr, func(line string) {
r.headerTimesStreamer.Scan(line)
})
if err != nil {
r.logger.Error("[JobRunner] Encountered error %v", err)
r.agentLogger.Error("[JobRunner] Encountered error %v", err)
}
}()
}
Expand All @@ -322,7 +325,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
}

// The writer that output from the process goes into
processWriter := io.MultiWriter(allWriters...)
r.jobLogs = io.MultiWriter(allWriters...)

// Copy the current processes ENV and merge in the new ones. We do this
// so the sub process gets PATH and stuff. We merge our path in over
Expand All @@ -336,10 +339,10 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
if err != nil {
return nil, fmt.Errorf("failed to parse BUILDKITE_CONTAINER_COUNT: %w", err)
}
r.process = kubernetes.New(r.logger, kubernetes.Config{
r.process = kubernetes.New(r.agentLogger, kubernetes.Config{
AccessToken: r.apiClient.Config().Token,
Stdout: processWriter,
Stderr: processWriter,
Stdout: r.jobLogs,
Stderr: r.jobLogs,
ClientCount: containerCount,
})
} else {
Expand All @@ -349,14 +352,14 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
return nil, fmt.Errorf("splitting bootstrap-script (%q) into tokens: %w", conf.AgentConfiguration.BootstrapScript, err)
}

r.process = process.New(r.logger, process.Config{
r.process = process.New(r.agentLogger, process.Config{
Path: cmd[0],
Args: cmd[1:],
Dir: conf.AgentConfiguration.BuildPath,
Env: processEnv,
PTY: conf.AgentConfiguration.RunInPty,
Stdout: processWriter,
Stderr: processWriter,
Stdout: r.jobLogs,
Stderr: r.jobLogs,
InterruptSignal: conf.CancelSignal,
SignalGracePeriod: conf.AgentConfiguration.SignalGracePeriod,
})
Expand All @@ -366,11 +369,11 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient APIClient, con
go func() {
<-r.process.Done()
if err := pw.Close(); err != nil {
r.logger.Error("%v", err)
r.agentLogger.Error("%v", err)
}
if tmpFile != nil {
if err := os.Remove(tmpFile.Name()); err != nil {
r.logger.Error("%v", err)
r.agentLogger.Error("%v", err)
}
}
}()
Expand Down Expand Up @@ -520,8 +523,8 @@ func (r *JobRunner) createEnvironment(ctx context.Context) ([]string, error) {
}

// see documentation for BuildkiteMessageMax
if err := truncateEnv(r.logger, env, BuildkiteMessageName, BuildkiteMessageMax); err != nil {
r.logger.Warn("failed to truncate %s: %v", BuildkiteMessageName, err)
if err := truncateEnv(r.agentLogger, env, BuildkiteMessageName, BuildkiteMessageMax); err != nil {
r.agentLogger.Warn("failed to truncate %s: %v", BuildkiteMessageName, err)
// attempt to continue anyway
}

Expand Down Expand Up @@ -597,7 +600,7 @@ func (r *JobRunner) checkPlugins(ctx context.Context) error {
}

func (r *JobRunner) executePreBootstrapHook(ctx context.Context, hook string) (bool, error) {
r.logger.Info("Running pre-bootstrap hook %q", hook)
r.agentLogger.Info("Running pre-bootstrap hook %q", hook)

sh, err := shell.New()
if err != nil {
Expand All @@ -609,16 +612,16 @@ func (r *JobRunner) executePreBootstrapHook(ctx context.Context, hook string) (b
sh.Env.Set("BUILDKITE_ENV_FILE", r.envFile.Name())

sh.Writer = LogWriter{
l: r.logger,
l: r.agentLogger,
}

if err := sh.RunWithoutPrompt(ctx, hook); err != nil {
fmt.Printf("err: %s\n", err)
r.logger.Error("Finished pre-bootstrap hook %q: job rejected", hook)
r.agentLogger.Error("Finished pre-bootstrap hook %q: job rejected", hook)
return false, err
}

r.logger.Info("Finished pre-bootstrap hook %q: job accepted", hook)
r.agentLogger.Info("Finished pre-bootstrap hook %q: job accepted", hook)
return true, nil
}

Expand All @@ -637,11 +640,11 @@ func (r *JobRunner) startJob(ctx context.Context, startedAt time.Time) error {

if err != nil {
if response != nil && api.IsRetryableStatus(response) {
r.logger.Warn("%s (%s)", err, rtr)
r.agentLogger.Warn("%s (%s)", err, rtr)
} else if api.IsRetryableError(err) {
r.logger.Warn("%s (%s)", err, rtr)
r.agentLogger.Warn("%s (%s)", err, rtr)
} else {
r.logger.Warn("Buildkite rejected the call to start the job (%s)", err)
r.agentLogger.Warn("Buildkite rejected the call to start the job (%s)", err)
rtr.Break()
}
}
Expand All @@ -662,7 +665,7 @@ func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGro
// Mark this routine as done in the wait group
wg.Done()

r.logger.Debug("[JobRunner] Routine that refreshes the job has finished")
r.agentLogger.Debug("[JobRunner] Routine that refreshes the job has finished")
}()

select {
Expand All @@ -679,10 +682,10 @@ func (r *JobRunner) jobCancellationChecker(ctx context.Context, wg *sync.WaitGro
if err != nil {
// We don't really care if it fails, we'll just
// try again soon anyway
r.logger.Warn("Problem with getting job state %s (%s)", r.conf.Job.ID, err)
r.agentLogger.Warn("Problem with getting job state %s (%s)", r.conf.Job.ID, err)
} else if jobState.State == "canceling" || jobState.State == "canceled" {
if err := r.Cancel(); err != nil {
r.logger.Error("Unexpected error canceling process as requested by server (job: %s) (err: %s)", r.conf.Job.ID, err)
r.agentLogger.Error("Unexpected error canceling process as requested by server (job: %s) (err: %s)", r.conf.Job.ID, err)
}
}

Expand All @@ -707,10 +710,10 @@ func (r *JobRunner) onUploadHeaderTime(ctx context.Context, cursor, total int, t
response, err := r.apiClient.SaveHeaderTimes(ctx, r.conf.Job.ID, &api.HeaderTimes{Times: times})
if err != nil {
if response != nil && (response.StatusCode >= 400 && response.StatusCode <= 499) {
r.logger.Warn("Buildkite rejected the header times (%s)", err)
r.agentLogger.Warn("Buildkite rejected the header times (%s)", err)
retrier.Break()
} else {
r.logger.Warn("%s (%s)", err, retrier)
r.agentLogger.Warn("%s (%s)", err, retrier)
}
}

Expand Down Expand Up @@ -745,10 +748,10 @@ func (r *JobRunner) onUploadChunk(ctx context.Context, chunk *LogStreamerChunk)
})
if err != nil {
if response != nil && (response.StatusCode >= 400 && response.StatusCode <= 499) {
r.logger.Warn("Buildkite rejected the chunk upload (%s)", err)
r.agentLogger.Warn("Buildkite rejected the chunk upload (%s)", err)
retrier.Break()
} else {
r.logger.Warn("%s (%s)", err, retrier)
r.agentLogger.Warn("%s (%s)", err, retrier)
}
}

Expand Down

0 comments on commit 5a43e6f

Please sign in to comment.