diff --git a/pkg/api/ci.go b/pkg/api/ci.go index 3ba40504..addf3d57 100644 --- a/pkg/api/ci.go +++ b/pkg/api/ci.go @@ -93,6 +93,22 @@ type CILogStreamTarget struct { // attempt of a job, resuming from the last cursor after transient stream errors. // If onStatus is non-nil, it receives attempt status updates from the stream. func CIStreamJobAttemptLogs(ctx context.Context, token, orgID string, target CILogStreamTarget, w io.Writer, onStatus func(string)) error { + var onStatusWithError func(string) error + if onStatus != nil { + onStatusWithError = func(status string) error { + onStatus(status) + return nil + } + } + return CIStreamJobAttemptLogLines(ctx, token, orgID, target, func(line *civ1.LogLine) error { + return writeLogLine(w, line) + }, onStatusWithError) +} + +// CIStreamJobAttemptLogLines streams log line metadata for a job attempt or the +// latest attempt of a job, resuming from the last cursor after transient stream +// errors. Duplicate replayed lines are suppressed before onLine is called. +func CIStreamJobAttemptLogLines(ctx context.Context, token, orgID string, target CILogStreamTarget, onLine func(*civ1.LogLine) error, onStatus func(string) error) error { if target.AttemptID == "" && target.JobID == "" { return fmt.Errorf("exactly one of attempt ID or job ID is required") } @@ -123,7 +139,10 @@ func CIStreamJobAttemptLogs(ctx context.Context, token, orgID string, target CIL msg := stream.Msg() backoff = ciStreamInitialBackoff if status := msg.GetAttemptStatus(); status != "" && onStatus != nil { - onStatus(status) + if err := onStatus(status); err != nil { + stream.Close() + return err + } } line := msg.GetLine() @@ -133,9 +152,11 @@ func CIStreamJobAttemptLogs(ctx context.Context, token, orgID string, target CIL identity := logLineIdentity(line) if !seen.Contains(identity) { - if err := writeLogLine(w, line); err != nil { - stream.Close() - return err + if onLine != nil { + if err := onLine(line); err != nil { + stream.Close() + return err + } } seen.Add(identity) if msg.GetNextCursor() != "" { diff --git a/pkg/api/ci_test.go b/pkg/api/ci_test.go index 8222af4c..24bd623e 100644 --- a/pkg/api/ci_test.go +++ b/pkg/api/ci_test.go @@ -424,6 +424,74 @@ func TestCIStreamJobAttemptLogsReconnectsFromLastWrittenCursorAndSuppressesDupli } } +func TestCIStreamJobAttemptLogLinesReturnsMetadataAndSuppressesDuplicates(t *testing.T) { + recorder := &streamLogsRecorder{} + _, handler := civ1connect.NewCIServiceHandler(recorder) + server := httptest.NewServer(h2c.NewHandler(handler, &http2.Server{})) + t.Cleanup(server.Close) + + originalBaseURLFunc := baseURLFunc + baseURLFunc = func() string { return server.URL } + t.Cleanup(func() { baseURLFunc = originalBaseURLFunc }) + + originalInitialBackoff := ciStreamInitialBackoff + ciStreamInitialBackoff = 0 + t.Cleanup(func() { ciStreamInitialBackoff = originalInitialBackoff }) + + var lines []*civ1.LogLine + var statuses []string + if err := CIStreamJobAttemptLogLines(context.Background(), "token-123", "org-123", CILogStreamTarget{AttemptID: "attempt-123"}, func(line *civ1.LogLine) error { + lines = append(lines, proto.Clone(line).(*civ1.LogLine)) + return nil + }, func(status string) error { + statuses = append(statuses, status) + return nil + }); err != nil { + t.Fatal(err) + } + + if got, want := len(lines), 3; got != want { + t.Fatalf("lines = %d, want %d", got, want) + } + for i, wantBody := range []string{"first", "second", "third"} { + if got := lines[i].GetBody(); got != wantBody { + t.Fatalf("line %d body = %q, want %q", i, got, wantBody) + } + if got, want := lines[i].GetLineNumber(), uint32(i+1); got != want { + t.Fatalf("line %d line number = %d, want %d", i, got, want) + } + if got, want := lines[i].GetTimestampMs(), int64(i+1); got != want { + t.Fatalf("line %d timestamp_ms = %d, want %d", i, got, want) + } + } + if got, want := statuses, []string{"running", "running", "running", "running", "finished"}; !slices.Equal(got, want) { + t.Fatalf("statuses = %v, want %v", got, want) + } +} + +func TestCIStreamJobAttemptLogLinesPropagatesLineCallbackError(t *testing.T) { + recorder := &streamLogsRecorder{} + _, handler := civ1connect.NewCIServiceHandler(recorder) + server := httptest.NewServer(h2c.NewHandler(handler, &http2.Server{})) + t.Cleanup(server.Close) + + originalBaseURLFunc := baseURLFunc + baseURLFunc = func() string { return server.URL } + t.Cleanup(func() { baseURLFunc = originalBaseURLFunc }) + + originalInitialBackoff := ciStreamInitialBackoff + ciStreamInitialBackoff = 0 + t.Cleanup(func() { ciStreamInitialBackoff = originalInitialBackoff }) + + callbackErr := errors.New("callback failed") + err := CIStreamJobAttemptLogLines(context.Background(), "token-123", "org-123", CILogStreamTarget{AttemptID: "attempt-123"}, func(*civ1.LogLine) error { + return callbackErr + }, nil) + if !errors.Is(err, callbackErr) { + t.Fatalf("expected callback error, got %v", err) + } +} + type statusOnlyStreamRecorder struct { civ1connect.UnimplementedCIServiceHandler requests []*civ1.StreamJobAttemptLogsRequest diff --git a/pkg/cmd/ci/logs.go b/pkg/cmd/ci/logs.go index fbd77699..49e8789e 100644 --- a/pkg/cmd/ci/logs.go +++ b/pkg/cmd/ci/logs.go @@ -2,6 +2,7 @@ package ci import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -21,17 +22,25 @@ const ( followAttemptRetryTimeout = 30 * time.Second followAttemptRetryInterval = 1 * time.Second followLogIdleDelay = 2 * time.Second + logOutputJSON = "json" ) -var ciStreamJobAttemptLogs = api.CIStreamJobAttemptLogs +var ( + ciGetRunStatus = api.CIGetRunStatus + ciGetJobAttemptLogs = api.CIGetJobAttemptLogs + ciStreamJobAttemptLogs = api.CIStreamJobAttemptLogs + ciStreamJobAttemptLogLines = api.CIStreamJobAttemptLogLines +) func NewCmdLogs() *cobra.Command { var ( - orgID string - token string - job string - workflow string - follow bool + orgID string + token string + job string + workflow string + follow bool + output string + timestamps bool ) cmd := &cobra.Command{ @@ -58,12 +67,23 @@ ID, use --job and --workflow to disambiguate by workflow job key.`, depot ci logs --job build --workflow ci.yml # Follow live logs for a job's latest attempt - depot ci logs --follow`, + depot ci logs --follow + + # Prefix log lines with persisted UTC timestamps + depot ci logs --timestamps + + # Emit newline-delimited JSON log events + depot ci logs --output json`, RunE: func(cmd *cobra.Command, args []string) error { if len(args) == 0 { return cmd.Help() } + outputOptions := logOutputOptions{timestamps: timestamps, output: output} + if err := outputOptions.validate(); err != nil { + return err + } + ctx := cmd.Context() id := args[0] @@ -79,14 +99,21 @@ ID, use --job and --workflow to disambiguate by workflow job key.`, return fmt.Errorf("missing API token, please run `depot login`") } - reporter := newLogFollowReporter(cmd.ErrOrStderr(), follow && helpers.IsTerminal()) + reporterWriter := cmd.ErrOrStderr() + reporterInteractive := follow && helpers.IsTerminal() + if outputOptions.json() { + reporterWriter = io.Discard + reporterInteractive = false + } + reporter := newLogFollowReporter(reporterWriter, reporterInteractive) // First, try resolving as a run ID (or job ID — the API accepts both). - resp, runErr := api.CIGetRunStatus(ctx, tokenVal, orgID, id) + resolutionOptions := logTargetResolutionOptionsForOutput(outputOptions) + resp, runErr := ciGetRunStatus(ctx, tokenVal, orgID, id) if runErr == nil { - target, err := resolveLogTarget(resp, id, job, workflow) + target, err := resolveLogTargetWithOptions(resp, id, job, workflow, resolutionOptions) if follow && err != nil { - target, err = resolveLogTargetWithFollowRetry(ctx, tokenVal, orgID, id, job, workflow, err, reporter) + target, err = resolveLogTargetWithFollowRetry(ctx, tokenVal, orgID, id, job, workflow, err, reporter, resolutionOptions) } if err != nil { return err @@ -97,12 +124,12 @@ ID, use --job and --workflow to disambiguate by workflow job key.`, } if follow { - if err := streamLogsWithFollowUX(ctx, tokenVal, orgID, target, cmd.OutOrStdout(), reporter); err != nil { + if err := streamLogsWithFollowUX(ctx, tokenVal, orgID, target, cmd.OutOrStdout(), reporter, outputOptions); err != nil { return fmt.Errorf("failed to stream logs: %w", err) } } else { reportLogTargetSelection(target, reporter, false) - lines, err := api.CIGetJobAttemptLogs(ctx, tokenVal, orgID, target.attemptID) + lines, err := ciGetJobAttemptLogs(ctx, tokenVal, orgID, target.attemptID) if err != nil { return fmt.Errorf("failed to get logs: %w", err) } @@ -110,7 +137,9 @@ ID, use --job and --workflow to disambiguate by workflow job key.`, reporter.Message(noLogsProducedMessage(target.jobKey, target.jobStatus)) return nil } - printLogLines(cmd.OutOrStdout(), lines) + if err := printLogLines(cmd.OutOrStdout(), lines, outputOptions); err != nil { + return err + } } return nil } @@ -123,7 +152,7 @@ ID, use --job and --workflow to disambiguate by workflow job key.`, } if follow { - if err := streamUnresolvedLogsWithFollowUX(ctx, tokenVal, orgID, id, cmd.OutOrStdout(), reporter); err != nil { + if err := streamUnresolvedLogsWithFollowUX(ctx, tokenVal, orgID, id, cmd.OutOrStdout(), reporter, outputOptions); err != nil { if unresolvedErr, ok := err.(*unresolvedLogStreamError); ok { return fmt.Errorf( "could not resolve %q as a run, job, or attempt ID:\n as run: %v\n as job: %v\n as attempt: %v", @@ -136,7 +165,7 @@ ID, use --job and --workflow to disambiguate by workflow job key.`, return fmt.Errorf("failed to stream logs: %w", err) } } else { - lines, err := api.CIGetJobAttemptLogs(ctx, tokenVal, orgID, id) + lines, err := ciGetJobAttemptLogs(ctx, tokenVal, orgID, id) if err != nil { // Both paths failed — show both errors so the user can // distinguish "bad ID" from "auth/network failure". @@ -151,7 +180,9 @@ ID, use --job and --workflow to disambiguate by workflow job key.`, reporter.Message("No logs were produced.") return nil } - printLogLines(cmd.OutOrStdout(), lines) + if err := printLogLines(cmd.OutOrStdout(), lines, outputOptions); err != nil { + return err + } } return nil }, @@ -162,13 +193,132 @@ ID, use --job and --workflow to disambiguate by workflow job key.`, cmd.Flags().StringVar(&job, "job", "", "Workflow job key to select when using a run ID") cmd.Flags().StringVar(&workflow, "workflow", "", "Workflow path to filter jobs (e.g. ci.yml)") cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Follow live logs") + cmd.Flags().BoolVar(×tamps, "timestamps", false, "Prefix plain log lines with UTC timestamps") + cmd.Flags().StringVarP(&output, "output", "o", "", "Output format (json)") return cmd } -func printLogLines(w io.Writer, lines []*civ1.LogLine) { +type logOutputOptions struct { + timestamps bool + output string +} + +func (o logOutputOptions) validate() error { + switch o.output { + case "", logOutputJSON: + return nil + default: + return fmt.Errorf("unsupported output %q (valid: json)", o.output) + } +} + +func (o logOutputOptions) json() bool { + return o.output == logOutputJSON +} + +type logLineEvent struct { + Type string `json:"type"` + Timestamp string `json:"timestamp"` + TimestampMs int64 `json:"timestamp_ms"` + Stream string `json:"stream"` + StepKey string `json:"step_key"` + LineNumber uint32 `json:"line_number"` + Body string `json:"body"` +} + +type logStatusEvent struct { + Type string `json:"type"` + Status string `json:"status"` +} + +type logEndEvent struct { + Type string `json:"type"` + Status string `json:"status,omitempty"` + LineCount int `json:"line_count"` +} + +type logJSONEventWriter struct { + enc *json.Encoder +} + +func newLogJSONEventWriter(w io.Writer) *logJSONEventWriter { + return &logJSONEventWriter{enc: json.NewEncoder(w)} +} + +func (w *logJSONEventWriter) Line(line *civ1.LogLine) error { + return w.enc.Encode(logLineEventFromLine(line)) +} + +func (w *logJSONEventWriter) Status(status string) error { + if status == "" { + return nil + } + return w.enc.Encode(logStatusEvent{Type: "status", Status: status}) +} + +func (w *logJSONEventWriter) End(status string, lineCount int) error { + return w.enc.Encode(logEndEvent{Type: "end", Status: status, LineCount: lineCount}) +} + +func logLineEventFromLine(line *civ1.LogLine) logLineEvent { + return logLineEvent{ + Type: "line", + Timestamp: formatLogTimestamp(line), + TimestampMs: line.GetTimestampMs(), + Stream: logStreamName(line.GetStream()), + StepKey: line.GetStepId(), + LineNumber: line.GetLineNumber(), + Body: line.GetBody(), + } +} + +func printLogLines(w io.Writer, lines []*civ1.LogLine, options logOutputOptions) error { + if options.json() { + writer := newLogJSONEventWriter(w) + for _, line := range lines { + if err := writer.Line(line); err != nil { + return err + } + } + return nil + } + for _, line := range lines { - fmt.Fprintln(w, line.Body) + if err := writePlainLogLine(w, line, options.timestamps); err != nil { + return err + } + } + return nil +} + +func writePlainLogLine(w io.Writer, line *civ1.LogLine, timestamps bool) error { + text := line.GetBody() + "\n" + if timestamps { + text = formatLogTimestamp(line) + " " + line.GetBody() + "\n" + } + n, err := io.WriteString(w, text) + if err != nil { + return err + } + if n != len(text) { + return io.ErrShortWrite + } + return nil +} + +func formatLogTimestamp(line *civ1.LogLine) string { + return time.UnixMilli(line.GetTimestampMs()).UTC().Format(time.RFC3339Nano) +} + +func logStreamName(stream uint32) string { + switch stream { + case 0: + return "stdout" + case 1: + return "stderr" + default: + return fmt.Sprintf("stream_%d", stream) } } @@ -318,6 +468,16 @@ func (w *followLogWriter) Write(p []byte) (int, error) { return n, err } +func (w *followLogWriter) WriteLine(line *civ1.LogLine, timestamps bool) error { + w.lines++ + w.reporter.pause() + err := writePlainLogLine(w.w, line, timestamps) + if err == nil { + w.reporter.SawLogs() + } + return err +} + func streamLogsWithFollowUX( ctx context.Context, tokenVal string, @@ -325,14 +485,17 @@ func streamLogsWithFollowUX( target logTarget, out io.Writer, reporter *logFollowReporter, + options logOutputOptions, ) error { - reportLogTargetSelection(target, reporter, true) + if !options.json() { + reportLogTargetSelection(target, reporter, true) + } streamTarget := api.CILogStreamTarget{AttemptID: target.attemptID} if target.streamJobID != "" { streamTarget = api.CILogStreamTarget{JobID: target.streamJobID} } - return streamLogTargetWithFollowUX(ctx, tokenVal, orgID, streamTarget, target, out, reporter) + return streamLogTargetWithFollowUX(ctx, tokenVal, orgID, streamTarget, target, out, reporter, options) } type unresolvedLogStreamError struct { @@ -351,8 +514,11 @@ func streamUnresolvedLogsWithFollowUX( id string, out io.Writer, reporter *logFollowReporter, + options logOutputOptions, ) error { - reporter.Message(fmt.Sprintf("Following logs for %s.", id)) + if !options.json() { + reporter.Message(fmt.Sprintf("Following logs for %s.", id)) + } jobErr := streamLogTargetWithFollowUX( ctx, @@ -362,6 +528,7 @@ func streamUnresolvedLogsWithFollowUX( logTarget{}, out, reporter, + options, ) if jobErr == nil { return nil @@ -378,6 +545,7 @@ func streamUnresolvedLogsWithFollowUX( logTarget{}, out, reporter, + options, ) if attemptErr == nil { return nil @@ -401,17 +569,36 @@ func streamLogTargetWithFollowUX( target logTarget, out io.Writer, reporter *logFollowReporter, + options logOutputOptions, ) error { + if options.json() { + return streamLogTargetAsJSON(ctx, tokenVal, orgID, streamTarget, target, out) + } + reporter.Status(logStreamWaitingMessage(target)) logWriter := &followLogWriter{w: out, reporter: reporter} - err := ciStreamJobAttemptLogs(ctx, tokenVal, orgID, streamTarget, logWriter, func(status string) { - if logWriter.lines > 0 && status == target.attemptStatus { - return - } - target.attemptStatus = status - reporter.Status(logStreamWaitingMessage(target)) - }) + var err error + if options.timestamps { + err = ciStreamJobAttemptLogLines(ctx, tokenVal, orgID, streamTarget, func(line *civ1.LogLine) error { + return logWriter.WriteLine(line, true) + }, func(status string) error { + if logWriter.lines > 0 && status == target.attemptStatus { + return nil + } + target.attemptStatus = status + reporter.Status(logStreamWaitingMessage(target)) + return nil + }) + } else { + err = ciStreamJobAttemptLogs(ctx, tokenVal, orgID, streamTarget, logWriter, func(status string) { + if logWriter.lines > 0 && status == target.attemptStatus { + return + } + target.attemptStatus = status + reporter.Status(logStreamWaitingMessage(target)) + }) + } reporter.Stop() if err != nil { return err @@ -425,6 +612,43 @@ func streamLogTargetWithFollowUX( return nil } +func streamLogTargetAsJSON( + ctx context.Context, + tokenVal string, + orgID string, + streamTarget api.CILogStreamTarget, + target logTarget, + out io.Writer, +) error { + writer := newLogJSONEventWriter(out) + lineCount := 0 + lastStatus := "" + + if status := logTargetStatus(target); status != "" { + if err := writer.Status(status); err != nil { + return err + } + lastStatus = status + } + + err := ciStreamJobAttemptLogLines(ctx, tokenVal, orgID, streamTarget, func(line *civ1.LogLine) error { + lineCount++ + return writer.Line(line) + }, func(status string) error { + target.attemptStatus = status + if status == "" || status == lastStatus { + return nil + } + lastStatus = status + return writer.Status(status) + }) + if err != nil { + return err + } + + return writer.End(logTargetStatus(target), lineCount) +} + func resolveLogTargetWithFollowRetry( ctx context.Context, tokenVal string, @@ -434,6 +658,7 @@ func resolveLogTargetWithFollowRetry( workflow string, initialErr error, reporter *logFollowReporter, + options logTargetResolutionOptions, ) (logTarget, error) { if !isFollowRetryableResolutionError(initialErr) { return logTarget{}, initialErr @@ -458,11 +683,11 @@ func resolveLogTargetWithFollowRetry( return logTarget{}, lastErr case <-ticker.C: var err error - resp, err := api.CIGetRunStatus(ctx, tokenVal, orgID, id) + resp, err := ciGetRunStatus(ctx, tokenVal, orgID, id) if err != nil { return logTarget{}, err } - target, err := resolveLogTarget(resp, id, job, workflow) + target, err := resolveLogTargetWithOptions(resp, id, job, workflow, options) if err == nil { return target, nil } @@ -492,6 +717,14 @@ type jobCandidate struct { workflowName string } +type logTargetResolutionOptions struct { + allowInteractive bool +} + +func logTargetResolutionOptionsForOutput(outputOptions logOutputOptions) logTargetResolutionOptions { + return logTargetResolutionOptions{allowInteractive: !outputOptions.json()} +} + // jobKeyShort returns the short form of a job key (after the first colon), // or the full key if there's no colon. func jobKeyShort(key string) string { @@ -524,7 +757,11 @@ func jobDisplayNames(candidates []jobCandidate) map[string]string { } func resolveLogTarget(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflowFilter string) (logTarget, error) { - targetJob, workflowPath, err := findLogsJob(resp, originalID, jobKey, workflowFilter) + return resolveLogTargetWithOptions(resp, originalID, jobKey, workflowFilter, logTargetResolutionOptions{allowInteractive: true}) +} + +func resolveLogTargetWithOptions(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflowFilter string, options logTargetResolutionOptions) (logTarget, error) { + targetJob, workflowPath, err := findLogsJobWithOptions(resp, originalID, jobKey, workflowFilter, options) if err != nil { if isRetryableLogJobError(err) { if isActiveRunStatus(resp.Status) { @@ -740,6 +977,10 @@ func reportLogTargetSelection(target logTarget, reporter *logFollowReporter, fol // findLogsJob locates the target job in the run status response. // Returns the job and the workflow path it belongs to. func findLogsJob(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflowFilter string) (*civ1.JobStatus, string, error) { + return findLogsJobWithOptions(resp, originalID, jobKey, workflowFilter, logTargetResolutionOptions{allowInteractive: true}) +} + +func findLogsJobWithOptions(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflowFilter string, options logTargetResolutionOptions) (*civ1.JobStatus, string, error) { var candidates []jobCandidate for _, wf := range resp.Workflows { if workflowFilter != "" && !workflowPathMatches(wf.WorkflowPath, workflowFilter) { @@ -819,8 +1060,8 @@ func findLogsJob(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflowFi return candidates[0].job, candidates[0].workflowPath, nil } - // Interactive fuzzy picker when terminal is available. - if helpers.IsTerminal() { + // Interactive fuzzy picker when terminal is available and interactive mode is allowed. + if options.allowInteractive && helpers.IsTerminal() { displayNames := jobDisplayNames(candidates) items := make([]PickJobItem, len(candidates)) for i, c := range candidates { diff --git a/pkg/cmd/ci/logs_test.go b/pkg/cmd/ci/logs_test.go index 6162ecdb..82f6832a 100644 --- a/pkg/cmd/ci/logs_test.go +++ b/pkg/cmd/ci/logs_test.go @@ -3,6 +3,7 @@ package ci import ( "bytes" "context" + "encoding/json" "errors" "io" "strings" @@ -58,6 +59,34 @@ func TestFindLogsJob_MultipleJobsRequiresFlag(t *testing.T) { } } +func TestResolveLogTargetJSONOptionsReturnNonInteractiveAmbiguity(t *testing.T) { + resp := &civ1.GetRunStatusResponse{ + RunId: "run-1", + Workflows: []*civ1.WorkflowStatus{ + { + WorkflowPath: ".depot/workflows/ci.yml", + Jobs: []*civ1.JobStatus{ + {JobId: "job-1", JobKey: "build", Status: "finished"}, + {JobId: "job-2", JobKey: "test", Status: "running"}, + }, + }, + }, + } + + options := logTargetResolutionOptionsForOutput(logOutputOptions{output: logOutputJSON}) + if options.allowInteractive { + t.Fatal("json output should disable interactive job resolution") + } + + _, err := resolveLogTargetWithOptions(resp, "run-1", "", "", options) + if err == nil { + t.Fatal("expected ambiguity error") + } + if !strings.Contains(err.Error(), "run has multiple jobs, specify one with --job") { + t.Fatalf("expected multiple-jobs ambiguity error, got: %v", err) + } +} + func TestFindLogsJob_MatchByJobKey(t *testing.T) { resp := &civ1.GetRunStatusResponse{ RunId: "run-1", @@ -355,6 +384,372 @@ func TestLogStreamWaitingMessageIncludesUnresolvedStatus(t *testing.T) { } } +func TestLogsCommandHistoricalDirectAttemptDefaultOutputUnchanged(t *testing.T) { + originalGetRunStatus := ciGetRunStatus + originalGetJobAttemptLogs := ciGetJobAttemptLogs + t.Cleanup(func() { + ciGetRunStatus = originalGetRunStatus + ciGetJobAttemptLogs = originalGetJobAttemptLogs + }) + + ciGetRunStatus = func(context.Context, string, string, string) (*civ1.GetRunStatusResponse, error) { + return nil, errors.New("not a run") + } + ciGetJobAttemptLogs = func(_ context.Context, _, _, attemptID string) ([]*civ1.LogLine, error) { + if attemptID != "attempt-1" { + t.Fatalf("attemptID = %q, want attempt-1", attemptID) + } + return []*civ1.LogLine{ + {Body: "first"}, + {Body: "second"}, + }, nil + } + + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd := NewCmdLogs() + cmd.SetArgs([]string{"attempt-1", "--token", "token-123", "--org", "org-123"}) + cmd.SetOut(&stdout) + cmd.SetErr(&stderr) + + if err := cmd.Execute(); err != nil { + t.Fatal(err) + } + if got, want := stdout.String(), "first\nsecond\n"; got != want { + t.Fatalf("stdout = %q, want %q", got, want) + } + if got := stderr.String(); got != "" { + t.Fatalf("stderr = %q, want empty", got) + } +} + +func TestLogsCommandHistoricalJSONSuppressesHumanTargetMessages(t *testing.T) { + originalGetRunStatus := ciGetRunStatus + originalGetJobAttemptLogs := ciGetJobAttemptLogs + t.Cleanup(func() { + ciGetRunStatus = originalGetRunStatus + ciGetJobAttemptLogs = originalGetJobAttemptLogs + }) + + ciGetRunStatus = func(context.Context, string, string, string) (*civ1.GetRunStatusResponse, error) { + return &civ1.GetRunStatusResponse{ + RunId: "run-1", + Workflows: []*civ1.WorkflowStatus{ + { + WorkflowPath: ".depot/workflows/ci.yml", + Jobs: []*civ1.JobStatus{ + { + JobId: "job-1", + JobKey: "ci.yml:build", + Status: "finished", + Attempts: []*civ1.AttemptStatus{ + {AttemptId: "attempt-1", Attempt: 1, Status: "finished"}, + }, + }, + }, + }, + }, + }, nil + } + ciGetJobAttemptLogs = func(context.Context, string, string, string) ([]*civ1.LogLine, error) { + return []*civ1.LogLine{ + testCmdLogLine("step-1", 7, 123, 1, `quoted "body" \ path`), + }, nil + } + + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd := NewCmdLogs() + cmd.SetArgs([]string{"run-1", "--token", "token-123", "--org", "org-123", "-o", "json"}) + cmd.SetOut(&stdout) + cmd.SetErr(&stderr) + + if err := cmd.Execute(); err != nil { + t.Fatal(err) + } + + events := decodeLogEvents(t, stdout.String()) + if len(events) != 1 { + t.Fatalf("events = %d, want 1: %s", len(events), stdout.String()) + } + assertLogLineEvent(t, events[0], map[string]any{ + "type": "line", + "timestamp": "1970-01-01T00:00:00.123Z", + "timestamp_ms": float64(123), + "stream": "stderr", + "step_key": "step-1", + "line_number": float64(7), + "body": `quoted "body" \ path`, + }) + if got := stderr.String(); got != "" { + t.Fatalf("stderr = %q, want empty", got) + } +} + +func TestPrintLogLinesDefaultOutputUnchanged(t *testing.T) { + lines := []*civ1.LogLine{ + {Body: "first"}, + {Body: "second"}, + } + + var out bytes.Buffer + if err := printLogLines(&out, lines, logOutputOptions{}); err != nil { + t.Fatal(err) + } + + if got, want := out.String(), "first\nsecond\n"; got != want { + t.Fatalf("output = %q, want %q", got, want) + } +} + +func TestPrintLogLinesTimestamps(t *testing.T) { + lines := []*civ1.LogLine{ + {TimestampMs: 0, Body: "first"}, + {TimestampMs: 123, Body: "second"}, + } + + var out bytes.Buffer + if err := printLogLines(&out, lines, logOutputOptions{timestamps: true}); err != nil { + t.Fatal(err) + } + + want := "1970-01-01T00:00:00Z first\n1970-01-01T00:00:00.123Z second\n" + if got := out.String(); got != want { + t.Fatalf("output = %q, want %q", got, want) + } +} + +func TestPrintLogLinesJSON(t *testing.T) { + largeBody := strings.Repeat("x", 5000) + ` "quoted" \ slash` + lines := []*civ1.LogLine{ + testCmdLogLine("step-1", 7, 123, 1, largeBody), + } + + var out bytes.Buffer + if err := printLogLines(&out, lines, logOutputOptions{output: logOutputJSON}); err != nil { + t.Fatal(err) + } + + events := decodeLogEvents(t, out.String()) + if len(events) != 1 { + t.Fatalf("events = %d, want 1: %s", len(events), out.String()) + } + assertLogLineEvent(t, events[0], map[string]any{ + "type": "line", + "timestamp": "1970-01-01T00:00:00.123Z", + "timestamp_ms": float64(123), + "stream": "stderr", + "step_key": "step-1", + "line_number": float64(7), + "body": largeBody, + }) +} + +func TestPrintLogLinesJSONIgnoresTimestampsOption(t *testing.T) { + lines := []*civ1.LogLine{ + testCmdLogLine("step-1", 7, 123, 1, "body"), + } + + var jsonOnly bytes.Buffer + if err := printLogLines(&jsonOnly, lines, logOutputOptions{output: logOutputJSON}); err != nil { + t.Fatal(err) + } + + var jsonWithTimestamps bytes.Buffer + if err := printLogLines(&jsonWithTimestamps, lines, logOutputOptions{output: logOutputJSON, timestamps: true}); err != nil { + t.Fatal(err) + } + + if jsonWithTimestamps.String() != jsonOnly.String() { + t.Fatalf("json with timestamps differs:\nwithout: %s\nwith: %s", jsonOnly.String(), jsonWithTimestamps.String()) + } +} + +func TestLogOutputOptionsValidateRejectsUnsupportedOutput(t *testing.T) { + err := (logOutputOptions{output: "yaml"}).validate() + if err == nil { + t.Fatal("expected validation error") + } + if !strings.Contains(err.Error(), `unsupported output "yaml"`) { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestLogsCommandRejectsUnsupportedOutputBeforeAuth(t *testing.T) { + var stdout bytes.Buffer + var stderr bytes.Buffer + cmd := NewCmdLogs() + cmd.SetArgs([]string{"attempt-1", "--output", "yaml"}) + cmd.SetOut(&stdout) + cmd.SetErr(&stderr) + + err := cmd.Execute() + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), `unsupported output "yaml"`) { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestStreamLogTargetWithFollowUXTimestamps(t *testing.T) { + original := ciStreamJobAttemptLogLines + t.Cleanup(func() { ciStreamJobAttemptLogLines = original }) + + ciStreamJobAttemptLogLines = func( + _ context.Context, + _, _ string, + target api.CILogStreamTarget, + onLine func(*civ1.LogLine) error, + onStatus func(string) error, + ) error { + if target.AttemptID != "attempt-1" { + t.Fatalf("attemptID = %q, want attempt-1", target.AttemptID) + } + if err := onStatus("running"); err != nil { + return err + } + return onLine(testCmdLogLine("step-1", 1, 123, 0, "build")) + } + + var stdout bytes.Buffer + var stderr bytes.Buffer + err := streamLogTargetWithFollowUX( + context.Background(), + "token-123", + "org-123", + api.CILogStreamTarget{AttemptID: "attempt-1"}, + logTarget{attemptID: "attempt-1", attemptStatus: "queued"}, + &stdout, + newLogFollowReporter(&stderr, false), + logOutputOptions{timestamps: true}, + ) + if err != nil { + t.Fatal(err) + } + + if got, want := stdout.String(), "1970-01-01T00:00:00.123Z build\n"; got != want { + t.Fatalf("stdout = %q, want %q", got, want) + } + if strings.Contains(stderr.String(), "1970-01-01") { + t.Fatalf("status output should not be timestamp-prefixed: %q", stderr.String()) + } +} + +func TestStreamLogTargetWithFollowUXJSONEmitsStatusLineAndEnd(t *testing.T) { + original := ciStreamJobAttemptLogLines + t.Cleanup(func() { ciStreamJobAttemptLogLines = original }) + + ciStreamJobAttemptLogLines = func( + _ context.Context, + _, _ string, + target api.CILogStreamTarget, + onLine func(*civ1.LogLine) error, + onStatus func(string) error, + ) error { + if target.AttemptID != "attempt-1" { + t.Fatalf("attemptID = %q, want attempt-1", target.AttemptID) + } + if err := onStatus("running"); err != nil { + return err + } + if err := onStatus("running"); err != nil { + return err + } + if err := onLine(testCmdLogLine("step-1", 1, 123, 0, "build")); err != nil { + return err + } + return onStatus("finished") + } + + var stdout bytes.Buffer + var stderr bytes.Buffer + err := streamLogTargetWithFollowUX( + context.Background(), + "token-123", + "org-123", + api.CILogStreamTarget{AttemptID: "attempt-1"}, + logTarget{attemptID: "attempt-1", attemptStatus: "running"}, + &stdout, + newLogFollowReporter(&stderr, false), + logOutputOptions{output: logOutputJSON}, + ) + if err != nil { + t.Fatal(err) + } + + events := decodeLogEvents(t, stdout.String()) + if len(events) != 4 { + t.Fatalf("events = %d, want 4: %s", len(events), stdout.String()) + } + assertEventFields(t, events[0], map[string]any{"type": "status", "status": "running"}) + assertLogLineEvent(t, events[1], map[string]any{ + "type": "line", + "timestamp": "1970-01-01T00:00:00.123Z", + "timestamp_ms": float64(123), + "stream": "stdout", + "step_key": "step-1", + "line_number": float64(1), + "body": "build", + }) + assertEventFields(t, events[2], map[string]any{"type": "status", "status": "finished"}) + assertEventFields(t, events[3], map[string]any{"type": "end", "status": "finished", "line_count": float64(1)}) + if got := stderr.String(); got != "" { + t.Fatalf("stderr = %q, want empty", got) + } +} + +func testCmdLogLine(stepID string, lineNumber uint32, timestampMs int64, stream uint32, body string) *civ1.LogLine { + return &civ1.LogLine{ + StepId: stepID, + TimestampMs: timestampMs, + LineNumber: lineNumber, + Stream: stream, + Body: body, + } +} + +func decodeLogEvents(t *testing.T, output string) []map[string]any { + t.Helper() + + dec := json.NewDecoder(strings.NewReader(output)) + var events []map[string]any + for { + var event map[string]any + err := dec.Decode(&event) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("invalid NDJSON: %v\n%s", err, output) + } + events = append(events, event) + } + return events +} + +func assertLogLineEvent(t *testing.T, got map[string]any, want map[string]any) { + t.Helper() + + assertEventFields(t, got, want) + if _, ok := got["step_id"]; ok { + t.Fatalf("line event should not include step_id: %#v", got) + } + if _, ok := got["step_name"]; ok { + t.Fatalf("line event should not include step_name: %#v", got) + } +} + +func assertEventFields(t *testing.T, got map[string]any, want map[string]any) { + t.Helper() + + for key, wantValue := range want { + if gotValue := got[key]; gotValue != wantValue { + t.Fatalf("%s = %#v, want %#v in event %#v", key, gotValue, wantValue, got) + } + } +} + func TestLogFollowReporterRestartsWaitingAfterIdleLogs(t *testing.T) { reporter := newLogFollowReporter(io.Discard, true) reporter.idleDelay = 10 * time.Millisecond @@ -424,6 +819,7 @@ func TestResolveLogTargetWithFollowRetryStopsReporterOnCancellation(t *testing.T "", &pendingLogTargetError{message: "Waiting for job to start..."}, reporter, + logTargetResolutionOptions{allowInteractive: true}, ) if !errors.Is(err, context.Canceled) { t.Fatalf("expected context cancellation, got %v", err) @@ -471,6 +867,7 @@ func TestStreamUnresolvedLogsWithFollowUXTriesJobThenAttempt(t *testing.T) { "id-123", &stdout, newLogFollowReporter(&stderr, false), + logOutputOptions{}, ) if err != nil { t.Fatal(err) @@ -516,6 +913,7 @@ func TestStreamUnresolvedLogsWithFollowUXPropagatesCancellation(t *testing.T) { "id-123", io.Discard, newLogFollowReporter(io.Discard, false), + logOutputOptions{}, ) if !errors.Is(err, context.Canceled) { t.Fatalf("expected context cancellation, got %v", err) @@ -552,6 +950,7 @@ func TestStreamUnresolvedLogsWithFollowUXReturnsBothTargetErrors(t *testing.T) { "id-123", io.Discard, newLogFollowReporter(io.Discard, false), + logOutputOptions{}, ) if err == nil { t.Fatal("expected error")