diff --git a/pkg/api/ci.go b/pkg/api/ci.go index 87f460ea..3ba40504 100644 --- a/pkg/api/ci.go +++ b/pkg/api/ci.go @@ -1,7 +1,13 @@ package api import ( + "container/list" "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "io" "time" "connectrpc.com/connect" @@ -13,6 +19,14 @@ import ( var baseURLFunc = getBaseURL +const ciStreamLogDedupeSize = 4096 + +var ( + ciStreamInitialBackoff = 250 * time.Millisecond + ciStreamMaxBackoff = 30 * time.Second + ciStreamSleep = sleepWithContext +) + func newCIServiceClient() civ1connect.CIServiceClient { baseURL := baseURLFunc() return civ1connect.NewCIServiceClient(getHTTPClient(baseURL), baseURL, WithUserAgent()) @@ -70,6 +84,169 @@ func CIGetJobAttemptLogs(ctx context.Context, token, orgID, attemptID string) ([ return allLines, nil } +type CILogStreamTarget struct { + AttemptID string + JobID string +} + +// CIStreamJobAttemptLogs streams log lines for a job attempt or the latest +// attempt of a job, resuming from the last cursor after transient stream errors. +// If onStatus is non-nil, it receives attempt status updates from the stream. +func CIStreamJobAttemptLogs(ctx context.Context, token, orgID string, target CILogStreamTarget, w io.Writer, onStatus func(string)) error { + if target.AttemptID == "" && target.JobID == "" { + return fmt.Errorf("exactly one of attempt ID or job ID is required") + } + if target.AttemptID != "" && target.JobID != "" { + return fmt.Errorf("exactly one of attempt ID or job ID is required") + } + + client := newCIServiceClient() + cursor := "" + backoff := ciStreamInitialBackoff + seen := newLogLineDedupe(ciStreamLogDedupeSize) + + for { + req := &civ1.StreamJobAttemptLogsRequest{AttemptId: target.AttemptID, JobId: target.JobID, Cursor: cursor} + stream, err := client.StreamJobAttemptLogs(ctx, WithAuthenticationAndOrg(connect.NewRequest(req), token, orgID)) + if err != nil { + if !isTransientConnectError(err) { + return err + } + if err := ciStreamSleep(ctx, backoff); err != nil { + return err + } + backoff = nextCIStreamBackoff(backoff) + continue + } + + for stream.Receive() { + msg := stream.Msg() + backoff = ciStreamInitialBackoff + if status := msg.GetAttemptStatus(); status != "" && onStatus != nil { + onStatus(status) + } + + line := msg.GetLine() + if line == nil { + continue + } + + identity := logLineIdentity(line) + if !seen.Contains(identity) { + if err := writeLogLine(w, line); err != nil { + stream.Close() + return err + } + seen.Add(identity) + if msg.GetNextCursor() != "" { + cursor = msg.GetNextCursor() + } + } + } + + err = stream.Err() + stream.Close() + if err == nil { + return nil + } + if !isTransientConnectError(err) { + return err + } + if err := ciStreamSleep(ctx, backoff); err != nil { + return err + } + backoff = nextCIStreamBackoff(backoff) + } +} + +func writeLogLine(w io.Writer, line *civ1.LogLine) error { + text := line.GetBody() + "\n" + n, err := io.WriteString(w, text) + if err != nil { + return err + } + if n != len(text) { + return io.ErrShortWrite + } + return nil +} + +func isTransientConnectError(err error) bool { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + switch connect.CodeOf(err) { + case connect.CodeUnavailable, connect.CodeDeadlineExceeded, connect.CodeAborted: + return true + default: + return false + } +} + +func sleepWithContext(ctx context.Context, d time.Duration) error { + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + +func nextCIStreamBackoff(current time.Duration) time.Duration { + next := current * 2 + if next > ciStreamMaxBackoff { + return ciStreamMaxBackoff + } + return next +} + +func logLineIdentity(line *civ1.LogLine) string { + sum := sha256.Sum256([]byte(line.GetBody())) + return fmt.Sprintf("%s:%d:%d:%d:%s", line.GetStepId(), line.GetTimestampMs(), line.GetLineNumber(), line.GetStream(), hex.EncodeToString(sum[:])) +} + +type logLineDedupe struct { + capacity int + entries map[string]*list.Element + order *list.List +} + +func newLogLineDedupe(capacity int) *logLineDedupe { + return &logLineDedupe{ + capacity: capacity, + entries: make(map[string]*list.Element, capacity), + order: list.New(), + } +} + +func (d *logLineDedupe) Contains(key string) bool { + if elem, ok := d.entries[key]; ok { + d.order.MoveToFront(elem) + return true + } + return false +} + +func (d *logLineDedupe) Add(key string) { + if elem, ok := d.entries[key]; ok { + d.order.MoveToFront(elem) + return + } + elem := d.order.PushFront(key) + d.entries[key] = elem + if d.order.Len() <= d.capacity { + return + } + oldest := d.order.Back() + if oldest == nil { + return + } + d.order.Remove(oldest) + delete(d.entries, oldest.Value.(string)) +} + // CIRun triggers a CI run. func CIRun(ctx context.Context, token, orgID string, req *civ1.RunRequest) (*civ1.RunResponse, error) { client := newCIServiceClient() diff --git a/pkg/api/ci_test.go b/pkg/api/ci_test.go index 51ac5d05..8222af4c 100644 --- a/pkg/api/ci_test.go +++ b/pkg/api/ci_test.go @@ -1,11 +1,15 @@ package api import ( + "bytes" "context" + "errors" + "io" "net/http" "net/http/httptest" "slices" "testing" + "time" "connectrpc.com/connect" civ1 "github.com/depot/cli/pkg/proto/depot/ci/v1" @@ -79,6 +83,10 @@ func (h ciServiceTestHandler) GetJobAttemptLogs(context.Context, *connect.Reques return nil, connect.NewError(connect.CodeUnimplemented, nil) } +func (h ciServiceTestHandler) StreamJobAttemptLogs(context.Context, *connect.Request[civ1.StreamJobAttemptLogsRequest], *connect.ServerStream[civ1.StreamJobAttemptLogsResponse]) error { + return connect.NewError(connect.CodeUnimplemented, nil) +} + func (h ciServiceTestHandler) ListRuns(context.Context, *connect.Request[civ1.ListRunsRequest]) (*connect.Response[civ1.ListRunsResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, nil) } @@ -319,3 +327,241 @@ func TestCIListWorkflowsSendsRecentDiscoveryFilters(t *testing.T) { t.Fatalf("Pr = %q, want 42", request.GetPr()) } } + +type streamLogsRecorder struct { + civ1connect.UnimplementedCIServiceHandler + requests []*civ1.StreamJobAttemptLogsRequest +} + +func (r *streamLogsRecorder) StreamJobAttemptLogs(_ context.Context, req *connect.Request[civ1.StreamJobAttemptLogsRequest], stream *connect.ServerStream[civ1.StreamJobAttemptLogsResponse]) error { + r.requests = append(r.requests, proto.Clone(req.Msg).(*civ1.StreamJobAttemptLogsRequest)) + + if req.Msg.GetAttemptId() != "attempt-123" || req.Msg.GetJobId() != "" { + return connect.NewError(connect.CodeInvalidArgument, errors.New("unexpected stream target")) + } + + switch len(r.requests) { + case 1: + if err := stream.Send(&civ1.StreamJobAttemptLogsResponse{ + AttemptStatus: "running", + }); err != nil { + return err + } + if err := stream.Send(&civ1.StreamJobAttemptLogsResponse{ + Line: testLogLine("step-1", 1, "first"), + NextCursor: "cursor-1", + AttemptStatus: "running", + }); err != nil { + return err + } + if err := stream.Send(&civ1.StreamJobAttemptLogsResponse{ + Line: testLogLine("step-1", 2, "second"), + NextCursor: "cursor-2", + AttemptStatus: "running", + }); err != nil { + return err + } + return connect.NewError(connect.CodeUnavailable, errors.New("stream interrupted")) + case 2: + if err := stream.Send(&civ1.StreamJobAttemptLogsResponse{ + Line: testLogLine("step-1", 2, "second"), + NextCursor: "cursor-2-replay", + AttemptStatus: "running", + }); err != nil { + return err + } + return connect.NewError(connect.CodeUnavailable, errors.New("stream interrupted after duplicate")) + case 3: + return stream.Send(&civ1.StreamJobAttemptLogsResponse{ + Line: testLogLine("step-1", 3, "third"), + NextCursor: "cursor-3", + AttemptStatus: "finished", + }) + default: + return nil + } +} + +func TestCIStreamJobAttemptLogsReconnectsFromLastWrittenCursorAndSuppressesDuplicates(t *testing.T) { + recorder := &streamLogsRecorder{} + _, handler := civ1connect.NewCIServiceHandler(recorder) + server := httptest.NewServer(h2c.NewHandler(handler, &http2.Server{})) + t.Cleanup(server.Close) + + originalBaseURLFunc := baseURLFunc + baseURLFunc = func() string { return server.URL } + t.Cleanup(func() { baseURLFunc = originalBaseURLFunc }) + + originalInitialBackoff := ciStreamInitialBackoff + ciStreamInitialBackoff = 0 + t.Cleanup(func() { ciStreamInitialBackoff = originalInitialBackoff }) + + var output bytes.Buffer + var statuses []string + if err := CIStreamJobAttemptLogs(context.Background(), "token-123", "org-123", CILogStreamTarget{AttemptID: "attempt-123"}, &output, func(status string) { + statuses = append(statuses, status) + }); err != nil { + t.Fatal(err) + } + + if got, want := output.String(), "first\nsecond\nthird\n"; got != want { + t.Fatalf("output = %q, want %q", got, want) + } + if len(recorder.requests) != 3 { + t.Fatalf("requests = %d, want 3", len(recorder.requests)) + } + if got := recorder.requests[0].GetCursor(); got != "" { + t.Fatalf("first cursor = %q, want empty", got) + } + if got := recorder.requests[1].GetCursor(); got != "cursor-2" { + t.Fatalf("second cursor = %q, want cursor-2", got) + } + if got := recorder.requests[2].GetCursor(); got != "cursor-2" { + t.Fatalf("third cursor = %q, want cursor-2", got) + } + if got, want := statuses, []string{"running", "running", "running", "running", "finished"}; !slices.Equal(got, want) { + t.Fatalf("statuses = %v, want %v", got, want) + } +} + +type statusOnlyStreamRecorder struct { + civ1connect.UnimplementedCIServiceHandler + requests []*civ1.StreamJobAttemptLogsRequest +} + +func (r *statusOnlyStreamRecorder) StreamJobAttemptLogs(_ context.Context, req *connect.Request[civ1.StreamJobAttemptLogsRequest], stream *connect.ServerStream[civ1.StreamJobAttemptLogsResponse]) error { + r.requests = append(r.requests, proto.Clone(req.Msg).(*civ1.StreamJobAttemptLogsRequest)) + + switch len(r.requests) { + case 1: + return connect.NewError(connect.CodeUnavailable, errors.New("stream unavailable")) + case 2: + if err := stream.Send(&civ1.StreamJobAttemptLogsResponse{AttemptStatus: "running"}); err != nil { + return err + } + return connect.NewError(connect.CodeUnavailable, errors.New("status-only stream interrupted")) + default: + return nil + } +} + +func TestCIStreamJobAttemptLogsResetsBackoffAfterStatusOnlyMessage(t *testing.T) { + recorder := &statusOnlyStreamRecorder{} + _, handler := civ1connect.NewCIServiceHandler(recorder) + server := httptest.NewServer(h2c.NewHandler(handler, &http2.Server{})) + t.Cleanup(server.Close) + + originalBaseURLFunc := baseURLFunc + baseURLFunc = func() string { return server.URL } + t.Cleanup(func() { baseURLFunc = originalBaseURLFunc }) + + originalInitialBackoff := ciStreamInitialBackoff + ciStreamInitialBackoff = 10 * time.Millisecond + t.Cleanup(func() { ciStreamInitialBackoff = originalInitialBackoff }) + + originalSleep := ciStreamSleep + var sleeps []time.Duration + ciStreamSleep = func(ctx context.Context, d time.Duration) error { + sleeps = append(sleeps, d) + return nil + } + t.Cleanup(func() { ciStreamSleep = originalSleep }) + + var statuses []string + if err := CIStreamJobAttemptLogs(context.Background(), "token-123", "org-123", CILogStreamTarget{AttemptID: "attempt-123"}, io.Discard, func(status string) { + statuses = append(statuses, status) + }); err != nil { + t.Fatal(err) + } + + if len(recorder.requests) != 3 { + t.Fatalf("requests = %d, want 3", len(recorder.requests)) + } + if got, want := sleeps, []time.Duration{10 * time.Millisecond, 10 * time.Millisecond}; !slices.Equal(got, want) { + t.Fatalf("sleeps = %v, want %v", got, want) + } + if got, want := statuses, []string{"running"}; !slices.Equal(got, want) { + t.Fatalf("statuses = %v, want %v", got, want) + } +} + +func TestCIStreamJobAttemptLogsSendsJobIDTarget(t *testing.T) { + recorder := &streamLogsRecorder{} + _, handler := civ1connect.NewCIServiceHandler(recorder) + server := httptest.NewServer(h2c.NewHandler(handler, &http2.Server{})) + t.Cleanup(server.Close) + + originalBaseURLFunc := baseURLFunc + baseURLFunc = func() string { return server.URL } + t.Cleanup(func() { baseURLFunc = originalBaseURLFunc }) + + if err := CIStreamJobAttemptLogs(context.Background(), "token-123", "org-123", CILogStreamTarget{JobID: "job-123"}, io.Discard, nil); err == nil { + t.Fatal("expected test handler to reject the job ID target after recording it") + } + if len(recorder.requests) != 1 { + t.Fatalf("requests = %d, want 1", len(recorder.requests)) + } + if got := recorder.requests[0].GetJobId(); got != "job-123" { + t.Fatalf("job ID = %q, want job-123", got) + } + if got := recorder.requests[0].GetAttemptId(); got != "" { + t.Fatalf("attempt ID = %q, want empty", got) + } +} + +func TestIsTransientConnectError(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "context canceled", + err: context.Canceled, + want: false, + }, + { + name: "context deadline exceeded", + err: context.DeadlineExceeded, + want: false, + }, + { + name: "connect-wrapped context deadline exceeded", + err: connect.NewError(connect.CodeDeadlineExceeded, context.DeadlineExceeded), + want: false, + }, + { + name: "connect unavailable", + err: connect.NewError(connect.CodeUnavailable, errors.New("stream interrupted")), + want: true, + }, + { + name: "connect deadline exceeded", + err: connect.NewError(connect.CodeDeadlineExceeded, errors.New("server deadline exceeded")), + want: true, + }, + { + name: "connect invalid argument", + err: connect.NewError(connect.CodeInvalidArgument, errors.New("bad request")), + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := isTransientConnectError(tt.err); got != tt.want { + t.Fatalf("isTransientConnectError(%v) = %t, want %t", tt.err, got, tt.want) + } + }) + } +} + +func testLogLine(stepID string, lineNumber uint32, body string) *civ1.LogLine { + return &civ1.LogLine{ + StepId: stepID, + TimestampMs: int64(lineNumber), + LineNumber: lineNumber, + Stream: 0, + Body: body, + } +} diff --git a/pkg/cmd/ci/logs.go b/pkg/cmd/ci/logs.go index bb31d414..fbd77699 100644 --- a/pkg/cmd/ci/logs.go +++ b/pkg/cmd/ci/logs.go @@ -1,10 +1,15 @@ package ci import ( + "context" + "errors" "fmt" - "os" + "io" "strings" + "sync" + "time" + "github.com/briandowns/spinner" "github.com/depot/cli/pkg/api" "github.com/depot/cli/pkg/config" "github.com/depot/cli/pkg/helpers" @@ -12,12 +17,21 @@ import ( "github.com/spf13/cobra" ) +const ( + followAttemptRetryTimeout = 30 * time.Second + followAttemptRetryInterval = 1 * time.Second + followLogIdleDelay = 2 * time.Second +) + +var ciStreamJobAttemptLogs = api.CIStreamJobAttemptLogs + func NewCmdLogs() *cobra.Command { var ( orgID string token string job string workflow string + follow bool ) cmd := &cobra.Command{ @@ -26,19 +40,25 @@ func NewCmdLogs() *cobra.Command { Long: `Fetch and display log output for a CI job. Accepts a run ID, job ID, or attempt ID. When given a run or job ID, the -command resolves to the latest attempt automatically. Use --job and --workflow -to disambiguate when a run has multiple jobs.`, +command resolves to the latest attempt automatically. When starting from a run +ID, use --job and --workflow to disambiguate by workflow job key.`, Example: ` # Logs for a specific attempt depot ci logs + # Logs for the latest attempt of a job + depot ci logs + # Logs for a run (auto-selects job if only one) depot ci logs - # Logs for a specific job in a run + # Logs for a specific workflow job key in a run depot ci logs --job test # Disambiguate when multiple workflows define the same job key - depot ci logs --job build --workflow ci.yml`, + depot ci logs --job build --workflow ci.yml + + # Follow live logs for a job's latest attempt + depot ci logs --follow`, RunE: func(cmd *cobra.Command, args []string) error { if len(args) == 0 { return cmd.Help() @@ -59,41 +79,79 @@ to disambiguate when a run has multiple jobs.`, return fmt.Errorf("missing API token, please run `depot login`") } + reporter := newLogFollowReporter(cmd.ErrOrStderr(), follow && helpers.IsTerminal()) + // First, try resolving as a run ID (or job ID — the API accepts both). resp, runErr := api.CIGetRunStatus(ctx, tokenVal, orgID, id) if runErr == nil { - attemptID, err := resolveAttempt(resp, id, job, workflow) + target, err := resolveLogTarget(resp, id, job, workflow) + if follow && err != nil { + target, err = resolveLogTargetWithFollowRetry(ctx, tokenVal, orgID, id, job, workflow, err, reporter) + } if err != nil { return err } - - lines, err := api.CIGetJobAttemptLogs(ctx, tokenVal, orgID, attemptID) - if err != nil { - return fmt.Errorf("failed to get logs: %w", err) + if target.noLogsMessage != "" { + reporter.Message(target.noLogsMessage) + return nil } - for _, line := range lines { - fmt.Println(line.Body) + if follow { + if err := streamLogsWithFollowUX(ctx, tokenVal, orgID, target, cmd.OutOrStdout(), reporter); err != nil { + return fmt.Errorf("failed to stream logs: %w", err) + } + } else { + reportLogTargetSelection(target, reporter, false) + lines, err := api.CIGetJobAttemptLogs(ctx, tokenVal, orgID, target.attemptID) + if err != nil { + return fmt.Errorf("failed to get logs: %w", err) + } + if len(lines) == 0 { + reporter.Message(noLogsProducedMessage(target.jobKey, target.jobStatus)) + return nil + } + printLogLines(cmd.OutOrStdout(), lines) } return nil } - // Fall back to treating the ID as an attempt ID directly. + // Fall back to probing the positional ID directly. // Don't fall back if --job or --workflow were specified — those // only make sense for run-level resolution. if job != "" || workflow != "" { return fmt.Errorf("failed to look up run: %w", runErr) } - lines, err := api.CIGetJobAttemptLogs(ctx, tokenVal, orgID, id) - if err != nil { - // Both paths failed — show both errors so the user can - // distinguish "bad ID" from "auth/network failure". - return fmt.Errorf("could not resolve %q as a run, job, or attempt ID:\n as run: %v\n as attempt: %v", id, runErr, err) - } - - for _, line := range lines { - fmt.Println(line.Body) + if follow { + if err := streamUnresolvedLogsWithFollowUX(ctx, tokenVal, orgID, id, cmd.OutOrStdout(), reporter); err != nil { + if unresolvedErr, ok := err.(*unresolvedLogStreamError); ok { + return fmt.Errorf( + "could not resolve %q as a run, job, or attempt ID:\n as run: %v\n as job: %v\n as attempt: %v", + id, + runErr, + unresolvedErr.jobErr, + unresolvedErr.attemptErr, + ) + } + return fmt.Errorf("failed to stream logs: %w", err) + } + } else { + lines, err := api.CIGetJobAttemptLogs(ctx, tokenVal, orgID, id) + if err != nil { + // Both paths failed — show both errors so the user can + // distinguish "bad ID" from "auth/network failure". + return fmt.Errorf( + "could not resolve %q as a run, job, or attempt ID:\n as run/job: %v\n as attempt: %v", + id, + runErr, + err, + ) + } + if len(lines) == 0 { + reporter.Message("No logs were produced.") + return nil + } + printLogLines(cmd.OutOrStdout(), lines) } return nil }, @@ -101,12 +159,333 @@ to disambiguate when a run has multiple jobs.`, cmd.Flags().StringVar(&orgID, "org", "", "Organization ID (required when user is a member of multiple organizations)") cmd.Flags().StringVar(&token, "token", "", "Depot API token") - cmd.Flags().StringVar(&job, "job", "", "Job key to select (required when run has multiple jobs)") + cmd.Flags().StringVar(&job, "job", "", "Workflow job key to select when using a run ID") cmd.Flags().StringVar(&workflow, "workflow", "", "Workflow path to filter jobs (e.g. ci.yml)") + cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Follow live logs") return cmd } +func printLogLines(w io.Writer, lines []*civ1.LogLine) { + for _, line := range lines { + fmt.Fprintln(w, line.Body) + } +} + +type logTarget struct { + attemptID string + attemptNumber int32 + attemptStatus string + jobID string + streamJobID string + jobKey string + jobStatus string + workflowPath string + noLogsMessage string + hasAlternates bool + alternateLabel string +} + +type pendingLogTargetError struct { + message string + timeoutMessage string +} + +func (e *pendingLogTargetError) Error() string { + return e.message +} + +func (e *pendingLogTargetError) TimeoutError() error { + if e.timeoutMessage != "" { + return errors.New(e.timeoutMessage) + } + return fmt.Errorf("timed out while %s", strings.TrimSuffix(strings.ToLower(e.message), "...")) +} + +type logFollowReporter struct { + w io.Writer + interactive bool + spinner *spinner.Spinner + lastStatus string + mu sync.Mutex + idleTimer *time.Timer + idleDelay time.Duration +} + +func newLogFollowReporter(w io.Writer, interactive bool) *logFollowReporter { + return &logFollowReporter{w: w, interactive: interactive, idleDelay: followLogIdleDelay} +} + +func (r *logFollowReporter) Status(message string) { + if r == nil || message == "" { + return + } + r.mu.Lock() + defer r.mu.Unlock() + + if message == r.lastStatus && (!r.interactive || r.spinner != nil) { + return + } + r.lastStatus = message + + if !r.interactive { + fmt.Fprintln(r.w, message) + return + } + + r.stopLocked() + r.startLocked(message) +} + +func (r *logFollowReporter) Message(message string) { + if r == nil || message == "" { + return + } + r.Stop() + fmt.Fprintln(r.w, message) +} + +func (r *logFollowReporter) Stop() { + if r == nil { + return + } + r.mu.Lock() + defer r.mu.Unlock() + r.stopLocked() + r.lastStatus = "" +} + +func (r *logFollowReporter) pause() { + if r == nil { + return + } + r.mu.Lock() + defer r.mu.Unlock() + r.stopLocked() +} + +func (r *logFollowReporter) SawLogs() { + if r == nil || !r.interactive { + return + } + r.mu.Lock() + defer r.mu.Unlock() + + r.stopLocked() + if r.lastStatus == "" { + return + } + r.idleTimer = time.AfterFunc(r.idleDelay, func() { + r.mu.Lock() + defer r.mu.Unlock() + if r.spinner == nil && r.lastStatus != "" { + r.startLocked(r.lastStatus) + } + }) +} + +func (r *logFollowReporter) startLocked(message string) { + r.spinner = spinner.New(spinner.CharSets[14], 100*time.Millisecond, spinner.WithWriter(r.w)) + r.spinner.Prefix = message + " " + r.spinner.Start() +} + +func (r *logFollowReporter) stopLocked() { + if r.idleTimer != nil { + r.idleTimer.Stop() + r.idleTimer = nil + } + if r.spinner == nil { + return + } + r.spinner.Stop() + r.spinner = nil +} + +type followLogWriter struct { + w io.Writer + reporter *logFollowReporter + lines int +} + +func (w *followLogWriter) Write(p []byte) (int, error) { + w.lines++ + w.reporter.pause() + n, err := w.w.Write(p) + if err == nil { + w.reporter.SawLogs() + } + return n, err +} + +func streamLogsWithFollowUX( + ctx context.Context, + tokenVal string, + orgID string, + target logTarget, + out io.Writer, + reporter *logFollowReporter, +) error { + reportLogTargetSelection(target, reporter, true) + streamTarget := api.CILogStreamTarget{AttemptID: target.attemptID} + if target.streamJobID != "" { + streamTarget = api.CILogStreamTarget{JobID: target.streamJobID} + } + + return streamLogTargetWithFollowUX(ctx, tokenVal, orgID, streamTarget, target, out, reporter) +} + +type unresolvedLogStreamError struct { + jobErr error + attemptErr error +} + +func (e *unresolvedLogStreamError) Error() string { + return fmt.Sprintf("as job: %v; as attempt: %v", e.jobErr, e.attemptErr) +} + +func streamUnresolvedLogsWithFollowUX( + ctx context.Context, + tokenVal string, + orgID string, + id string, + out io.Writer, + reporter *logFollowReporter, +) error { + reporter.Message(fmt.Sprintf("Following logs for %s.", id)) + + jobErr := streamLogTargetWithFollowUX( + ctx, + tokenVal, + orgID, + api.CILogStreamTarget{JobID: id}, + logTarget{}, + out, + reporter, + ) + if jobErr == nil { + return nil + } + if isContextDoneError(jobErr) { + return jobErr + } + + attemptErr := streamLogTargetWithFollowUX( + ctx, + tokenVal, + orgID, + api.CILogStreamTarget{AttemptID: id}, + logTarget{}, + out, + reporter, + ) + if attemptErr == nil { + return nil + } + if isContextDoneError(attemptErr) { + return attemptErr + } + + return &unresolvedLogStreamError{jobErr: jobErr, attemptErr: attemptErr} +} + +func isContextDoneError(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) +} + +func streamLogTargetWithFollowUX( + ctx context.Context, + tokenVal string, + orgID string, + streamTarget api.CILogStreamTarget, + target logTarget, + out io.Writer, + reporter *logFollowReporter, +) error { + reporter.Status(logStreamWaitingMessage(target)) + + logWriter := &followLogWriter{w: out, reporter: reporter} + err := ciStreamJobAttemptLogs(ctx, tokenVal, orgID, streamTarget, logWriter, func(status string) { + if logWriter.lines > 0 && status == target.attemptStatus { + return + } + target.attemptStatus = status + reporter.Status(logStreamWaitingMessage(target)) + }) + reporter.Stop() + if err != nil { + return err + } + + if logWriter.lines == 0 { + reporter.Message(noStreamLogsReceivedMessage(target)) + return nil + } + + return nil +} + +func resolveLogTargetWithFollowRetry( + ctx context.Context, + tokenVal string, + orgID string, + id string, + job string, + workflow string, + initialErr error, + reporter *logFollowReporter, +) (logTarget, error) { + if !isFollowRetryableResolutionError(initialErr) { + return logTarget{}, initialErr + } + + timeout := time.NewTimer(followAttemptRetryTimeout) + defer timeout.Stop() + ticker := time.NewTicker(followAttemptRetryInterval) + defer ticker.Stop() + + lastErr := initialErr + reportPendingLogTarget(lastErr, reporter) + defer reporter.Stop() + for { + select { + case <-ctx.Done(): + return logTarget{}, ctx.Err() + case <-timeout.C: + if pending, ok := lastErr.(*pendingLogTargetError); ok { + return logTarget{}, pending.TimeoutError() + } + return logTarget{}, lastErr + case <-ticker.C: + var err error + resp, err := api.CIGetRunStatus(ctx, tokenVal, orgID, id) + if err != nil { + return logTarget{}, err + } + target, err := resolveLogTarget(resp, id, job, workflow) + if err == nil { + return target, nil + } + if !isFollowRetryableResolutionError(err) { + return logTarget{}, err + } + lastErr = err + reportPendingLogTarget(lastErr, reporter) + } + } +} + +func isFollowRetryableResolutionError(err error) bool { + _, ok := err.(*pendingLogTargetError) + return ok +} + +func reportPendingLogTarget(err error, reporter *logFollowReporter) { + if pending, ok := err.(*pendingLogTargetError); ok { + reporter.Status(pending.message) + } +} + type jobCandidate struct { job *civ1.JobStatus workflowPath string @@ -144,17 +523,35 @@ func jobDisplayNames(candidates []jobCandidate) map[string]string { return names } -// resolveAttempt finds the target attempt from a run status response. -// It selects a job (by --job flag, by job ID match, or auto-select), then -// picks the latest attempt and prints informational messages about what was chosen. -func resolveAttempt(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflowFilter string) (string, error) { +func resolveLogTarget(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflowFilter string) (logTarget, error) { targetJob, workflowPath, err := findLogsJob(resp, originalID, jobKey, workflowFilter) if err != nil { - return "", err + if isRetryableLogJobError(err) { + if isActiveRunStatus(resp.Status) { + return logTarget{}, &pendingLogTargetError{ + message: pendingRunLogsMessage(resp.Status), + timeoutMessage: "Timed out waiting for jobs to be created. Try again after the run starts.", + } + } + return logTarget{noLogsMessage: terminalRunNoLogsMessage(resp.Status, err)}, nil + } + return logTarget{}, err } if len(targetJob.Attempts) == 0 { - return "", fmt.Errorf("job %q has no attempts yet", targetJob.JobKey) + if isActiveJobStatus(targetJob.Status) { + return logTarget{}, &pendingLogTargetError{ + message: pendingJobLogsMessage(targetJob), + timeoutMessage: fmt.Sprintf("Timed out waiting for job %q to start. Try again after the job is running.", jobKeyShort(targetJob.JobKey)), + } + } + return logTarget{ + jobKey: targetJob.JobKey, + jobID: targetJob.JobId, + jobStatus: targetJob.Status, + workflowPath: workflowPath, + noLogsMessage: noLogsProducedMessage(targetJob.JobKey, targetJob.Status), + }, nil } latest := targetJob.Attempts[0] @@ -164,16 +561,7 @@ func resolveAttempt(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflo } } - // Print what we auto-selected so the user knows. - var info []string - if jobKey == "" { - if workflowPath != "" { - info = append(info, fmt.Sprintf("job %q from %s", jobKeyShort(targetJob.JobKey), workflowPath)) - } else { - info = append(info, fmt.Sprintf("job %q", jobKeyShort(targetJob.JobKey))) - } - } - + var alternateLabel string if len(targetJob.Attempts) > 1 { var others []string for _, a := range targetJob.Attempts { @@ -181,14 +569,172 @@ func resolveAttempt(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflo others = append(others, fmt.Sprintf("#%d %s", a.Attempt, a.AttemptId)) } } - info = append(info, fmt.Sprintf("attempt #%d (also available: %s)", latest.Attempt, strings.Join(others, ", "))) + alternateLabel = strings.Join(others, ", ") + } + + return logTarget{ + attemptID: latest.AttemptId, + attemptNumber: latest.Attempt, + attemptStatus: latest.Status, + jobID: targetJob.JobId, + streamJobID: directJobIDStreamTarget(originalID, targetJob), + jobKey: targetJob.JobKey, + jobStatus: targetJob.Status, + workflowPath: workflowPath, + hasAlternates: len(targetJob.Attempts) > 1, + alternateLabel: alternateLabel, + }, nil +} + +func directJobIDStreamTarget(originalID string, job *civ1.JobStatus) string { + if job != nil && originalID == job.JobId { + return job.JobId + } + return "" +} + +func isRetryableLogJobError(err error) bool { + if err == nil { + return false + } + message := err.Error() + return strings.Contains(message, "has no jobs") || + strings.Contains(message, "no jobs found in workflow") +} + +func isActiveRunStatus(status string) bool { + switch status { + case "queued", "running": + return true + default: + return false + } +} + +func isActiveJobStatus(status string) bool { + switch status { + case "queued", "waiting", "running": + return true + default: + return false + } +} + +func isTerminalStatus(status string) bool { + return status == "finished" || status == "failed" || status == "cancelled" || status == "skipped" +} + +func pendingRunLogsMessage(status string) string { + if status == "" { + return "Waiting for jobs to be created..." + } + return fmt.Sprintf("Waiting for jobs to be created (run status: %s)...", status) +} + +func pendingJobLogsMessage(job *civ1.JobStatus) string { + return fmt.Sprintf("Waiting for job %q to start (status: %s)...", jobKeyShort(job.JobKey), job.Status) +} + +func terminalRunNoLogsMessage(status string, err error) string { + if status == "" { + return fmt.Sprintf("%s; no logs were produced.", err.Error()) + } + return fmt.Sprintf("%s (run status: %s); no logs were produced.", err.Error(), status) +} + +func noLogsProducedMessage(jobKey, status string) string { + if jobKey == "" { + return "No logs were produced." + } + name := jobKeyShort(jobKey) + switch { + case status == "skipped": + return fmt.Sprintf("Job %q was skipped; no logs were produced.", name) + case isTerminalStatus(status): + return fmt.Sprintf("Job %q is %s; no logs were produced.", name, status) + case status != "": + return fmt.Sprintf("No logs were produced yet for job %q (status: %s).", name, status) + default: + return "No logs were produced." + } +} + +func noStreamLogsReceivedMessage(target logTarget) string { + status := logTargetStatus(target) + if target.jobKey != "" { + if status != "" { + return fmt.Sprintf("Log stream ended for job %q (status: %s); no logs were produced.", jobKeyShort(target.jobKey), status) + } + return fmt.Sprintf("Log stream ended for job %q; no logs were produced.", jobKeyShort(target.jobKey)) } + if target.attemptID != "" { + if status != "" { + return fmt.Sprintf("Log stream ended for attempt %s (status: %s); no logs were produced.", target.attemptID, status) + } + return fmt.Sprintf("Log stream ended for attempt %s; no logs were produced.", target.attemptID) + } + if status != "" { + return fmt.Sprintf("Log stream ended (status: %s); no logs were produced.", status) + } + return "Log stream ended; no logs were produced." +} + +func logStreamWaitingMessage(target logTarget) string { + status := logTargetStatus(target) + if target.jobKey != "" { + if status != "" { + return fmt.Sprintf("Waiting for logs from job %q (status: %s)...", jobKeyShort(target.jobKey), status) + } + return fmt.Sprintf("Waiting for logs from job %q...", jobKeyShort(target.jobKey)) + } + if target.attemptID != "" { + if status != "" { + return fmt.Sprintf("Waiting for logs from attempt %s (status: %s)...", target.attemptID, status) + } + return fmt.Sprintf("Waiting for logs from attempt %s...", target.attemptID) + } + if status != "" { + return fmt.Sprintf("Waiting for logs (status: %s)...", status) + } + return "Waiting for logs..." +} - if len(info) > 0 { - fmt.Fprintf(os.Stderr, "Using %s\n", strings.Join(info, ", ")) +func logTargetStatus(target logTarget) string { + if target.attemptStatus != "" { + return target.attemptStatus + } + return target.jobStatus +} + +func reportLogTargetSelection(target logTarget, reporter *logFollowReporter, follow bool) { + if target.attemptID == "" { + return } - return latest.AttemptId, nil + action := "Fetching" + if follow { + action = "Following" + } + + if target.jobKey == "" { + reporter.Message(fmt.Sprintf("%s logs for attempt %s.", action, target.attemptID)) + return + } + + parts := []string{fmt.Sprintf("%s logs for job %q", action, jobKeyShort(target.jobKey))} + if target.workflowPath != "" { + parts = append(parts, fmt.Sprintf("from %s", target.workflowPath)) + } + if target.attemptNumber > 0 { + parts = append(parts, fmt.Sprintf("attempt #%d", target.attemptNumber)) + } + if target.attemptStatus != "" { + parts = append(parts, fmt.Sprintf("status: %s", target.attemptStatus)) + } + if target.hasAlternates && target.alternateLabel != "" { + parts = append(parts, fmt.Sprintf("other attempts: %s", target.alternateLabel)) + } + reporter.Message(strings.Join(parts, ", ") + ".") } // findLogsJob locates the target job in the run status response. @@ -215,7 +761,7 @@ func findLogsJob(resp *civ1.GetRunStatusResponse, originalID, jobKey, workflowFi return nil, "", fmt.Errorf("run %s has no jobs", resp.RunId) } - // Match by job key (--job flag): exact > suffix > segment, best tier wins. + // Match by workflow job key (--job flag): exact > suffix > segment, best tier wins. if jobKey != "" { bestTier := 0 tierMatches := map[int][]jobCandidate{} diff --git a/pkg/cmd/ci/logs_test.go b/pkg/cmd/ci/logs_test.go index 0efd94cf..6162ecdb 100644 --- a/pkg/cmd/ci/logs_test.go +++ b/pkg/cmd/ci/logs_test.go @@ -1,9 +1,15 @@ package ci import ( + "bytes" + "context" + "errors" + "io" "strings" "testing" + "time" + "github.com/depot/cli/pkg/api" civ1 "github.com/depot/cli/pkg/proto/depot/ci/v1" ) @@ -98,6 +104,38 @@ func TestFindLogsJob_MatchByJobID(t *testing.T) { } } +func TestResolveLogTarget_DirectJobIDUsesJobIDStreamTarget(t *testing.T) { + resp := &civ1.GetRunStatusResponse{ + RunId: "run-1", + Workflows: []*civ1.WorkflowStatus{ + { + WorkflowPath: ".depot/workflows/ci.yml", + Jobs: []*civ1.JobStatus{ + { + JobId: "job-2", + JobKey: "test", + Status: "running", + Attempts: []*civ1.AttemptStatus{ + {AttemptId: "att-2", Attempt: 2, Status: "running"}, + }, + }, + }, + }, + }, + } + + target, err := resolveLogTarget(resp, "job-2", "", "") + if err != nil { + t.Fatal(err) + } + if target.streamJobID != "job-2" { + t.Fatalf("expected stream job ID %q, got %q", "job-2", target.streamJobID) + } + if target.attemptID != "att-2" { + t.Fatalf("expected display attempt ID %q, got %q", "att-2", target.attemptID) + } +} + func TestFindLogsJob_DuplicateJobKeyRequiresWorkflow(t *testing.T) { resp := &civ1.GetRunStatusResponse{ RunId: "run-1", @@ -173,52 +211,360 @@ func TestFindLogsJob_WorkflowFilterNoMatch(t *testing.T) { } } -func TestResolveAttempt_LatestAttempt(t *testing.T) { +func TestResolveLogTarget_NoAttempts(t *testing.T) { resp := &civ1.GetRunStatusResponse{ RunId: "run-1", Workflows: []*civ1.WorkflowStatus{ { WorkflowPath: ".depot/workflows/ci.yml", Jobs: []*civ1.JobStatus{ - { - JobId: "job-1", - JobKey: "build", - Status: "finished", - Attempts: []*civ1.AttemptStatus{ - {AttemptId: "att-1", Attempt: 1, Status: "failed"}, - {AttemptId: "att-2", Attempt: 2, Status: "finished"}, - }, - }, + {JobId: "job-1", JobKey: "build", Status: "queued"}, }, }, }, } - attemptID, err := resolveAttempt(resp, "run-1", "build", "") + _, err := resolveLogTarget(resp, "run-1", "", "") + if err == nil { + t.Fatal("expected error for job with no attempts") + } + if !isFollowRetryableResolutionError(err) { + t.Fatalf("expected no-attempt error to be retryable for --follow, got: %v", err) + } +} + +func TestResolveLogTarget_QueuedRunWithoutJobsIsPending(t *testing.T) { + resp := &civ1.GetRunStatusResponse{ + RunId: "run-1", + Status: "queued", + } + + _, err := resolveLogTarget(resp, "run-1", "", "") + if err == nil { + t.Fatal("expected pending error") + } + pending, ok := err.(*pendingLogTargetError) + if !ok { + t.Fatalf("expected pendingLogTargetError, got %T: %v", err, err) + } + want := "Waiting for jobs to be created (run status: queued)..." + if pending.Error() != want { + t.Fatalf("expected %q, got %q", want, pending.Error()) + } +} + +func TestResolveLogTarget_TerminalRunWithoutJobsReturnsNoLogsMessage(t *testing.T) { + resp := &civ1.GetRunStatusResponse{ + RunId: "run-1", + Status: "finished", + } + + target, err := resolveLogTarget(resp, "run-1", "", "") if err != nil { t.Fatal(err) } - if attemptID != "att-2" { - t.Fatalf("expected attempt ID %q, got %q", "att-2", attemptID) + want := "run run-1 has no jobs (run status: finished); no logs were produced." + if target.noLogsMessage != want { + t.Fatalf("expected %q, got %q", want, target.noLogsMessage) } } -func TestResolveAttempt_NoAttempts(t *testing.T) { +func TestResolveLogTarget_QueuedJobWithoutAttemptsIsPending(t *testing.T) { resp := &civ1.GetRunStatusResponse{ - RunId: "run-1", + RunId: "run-1", + Status: "running", Workflows: []*civ1.WorkflowStatus{ { WorkflowPath: ".depot/workflows/ci.yml", Jobs: []*civ1.JobStatus{ - {JobId: "job-1", JobKey: "build", Status: "queued"}, + {JobId: "job-1", JobKey: "ci.yml:build", Status: "queued"}, }, }, }, } - _, err := resolveAttempt(resp, "run-1", "", "") + _, err := resolveLogTarget(resp, "run-1", "", "") if err == nil { - t.Fatal("expected error for job with no attempts") + t.Fatal("expected pending error") + } + pending, ok := err.(*pendingLogTargetError) + if !ok { + t.Fatalf("expected pendingLogTargetError, got %T: %v", err, err) + } + want := `Waiting for job "build" to start (status: queued)...` + if pending.Error() != want { + t.Fatalf("expected %q, got %q", want, pending.Error()) + } +} + +func TestResolveLogTarget_SkippedJobWithoutAttemptsReturnsNoLogsMessage(t *testing.T) { + resp := &civ1.GetRunStatusResponse{ + RunId: "run-1", + Status: "finished", + Workflows: []*civ1.WorkflowStatus{ + { + WorkflowPath: ".depot/workflows/ci.yml", + Jobs: []*civ1.JobStatus{ + {JobId: "job-1", JobKey: "ci.yml:build", Status: "skipped"}, + }, + }, + }, + } + + target, err := resolveLogTarget(resp, "run-1", "", "") + if err != nil { + t.Fatal(err) + } + want := `Job "build" was skipped; no logs were produced.` + if target.noLogsMessage != want { + t.Fatalf("expected %q, got %q", want, target.noLogsMessage) + } +} + +func TestFollowRetryableResolutionError(t *testing.T) { + if !isFollowRetryableResolutionError(&pendingLogTargetError{message: "Waiting for job to start..."}) { + t.Fatal("pending log target errors should be retryable") + } + + if isFollowRetryableResolutionError(errors.New(`job "deploy" not found`)) { + t.Fatal("plain errors should not be retryable") + } +} + +func TestNoStreamLogsReceivedMessage(t *testing.T) { + target := logTarget{attemptID: "att-1", attemptStatus: "failed", jobKey: "ci.yml:build"} + want := `Log stream ended for job "build" (status: failed); no logs were produced.` + if got := noStreamLogsReceivedMessage(target); got != want { + t.Fatalf("expected %q, got %q", want, got) + } +} + +func TestLogStreamWaitingMessageIncludesStatus(t *testing.T) { + target := logTarget{attemptID: "att-1", attemptStatus: "running"} + want := "Waiting for logs from attempt att-1 (status: running)..." + if got := logStreamWaitingMessage(target); got != want { + t.Fatalf("expected %q, got %q", want, got) + } +} + +func TestLogStreamWaitingMessageIncludesUnresolvedStatus(t *testing.T) { + target := logTarget{attemptStatus: "running"} + want := "Waiting for logs (status: running)..." + if got := logStreamWaitingMessage(target); got != want { + t.Fatalf("expected %q, got %q", want, got) + } +} + +func TestLogFollowReporterRestartsWaitingAfterIdleLogs(t *testing.T) { + reporter := newLogFollowReporter(io.Discard, true) + reporter.idleDelay = 10 * time.Millisecond + t.Cleanup(reporter.Stop) + + reporter.Status("Waiting for logs (status: running)...") + reporter.SawLogs() + + reporter.mu.Lock() + activeImmediately := reporter.spinner != nil + reporter.mu.Unlock() + if activeImmediately { + t.Fatal("spinner should stop while logs are being written") + } + + deadline := time.Now().Add(250 * time.Millisecond) + for { + reporter.mu.Lock() + active := reporter.spinner != nil + reporter.mu.Unlock() + if active { + return + } + if time.Now().After(deadline) { + t.Fatal("spinner did not restart after logs went idle") + } + time.Sleep(5 * time.Millisecond) + } +} + +func TestLogFollowReporterStopPreventsIdleRestart(t *testing.T) { + reporter := newLogFollowReporter(io.Discard, true) + reporter.idleDelay = 10 * time.Millisecond + + reporter.Status("Waiting for logs (status: running)...") + reporter.SawLogs() + reporter.Stop() + + time.Sleep(50 * time.Millisecond) + + reporter.mu.Lock() + active := reporter.spinner != nil + lastStatus := reporter.lastStatus + reporter.mu.Unlock() + + if active { + t.Fatal("spinner restarted after Stop") + } + if lastStatus != "" { + t.Fatalf("lastStatus = %q, want empty", lastStatus) + } +} + +func TestResolveLogTargetWithFollowRetryStopsReporterOnCancellation(t *testing.T) { + reporter := newLogFollowReporter(io.Discard, true) + t.Cleanup(reporter.Stop) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := resolveLogTargetWithFollowRetry( + ctx, + "token-123", + "org-123", + "run-123", + "", + "", + &pendingLogTargetError{message: "Waiting for job to start..."}, + reporter, + ) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context cancellation, got %v", err) + } + + reporter.mu.Lock() + active := reporter.spinner != nil + lastStatus := reporter.lastStatus + reporter.mu.Unlock() + + if active { + t.Fatal("spinner should stop when retry resolution is cancelled") + } + if lastStatus != "" { + t.Fatalf("lastStatus = %q, want empty", lastStatus) + } +} + +func TestStreamUnresolvedLogsWithFollowUXTriesJobThenAttempt(t *testing.T) { + original := ciStreamJobAttemptLogs + t.Cleanup(func() { ciStreamJobAttemptLogs = original }) + + var calls []api.CILogStreamTarget + ciStreamJobAttemptLogs = func( + _ context.Context, + _, _ string, + target api.CILogStreamTarget, + w io.Writer, + _ func(string), + ) error { + calls = append(calls, target) + if target.JobID != "" { + return errors.New("job not found") + } + _, err := w.Write([]byte("hello\n")) + return err + } + + var stdout bytes.Buffer + var stderr bytes.Buffer + err := streamUnresolvedLogsWithFollowUX( + context.Background(), + "token-123", + "org-123", + "id-123", + &stdout, + newLogFollowReporter(&stderr, false), + ) + if err != nil { + t.Fatal(err) + } + + if got, want := stdout.String(), "hello\n"; got != want { + t.Fatalf("stdout = %q, want %q", got, want) + } + if len(calls) != 2 { + t.Fatalf("calls = %d, want 2", len(calls)) + } + if calls[0].JobID != "id-123" || calls[0].AttemptID != "" { + t.Fatalf("first call = %+v, want job target", calls[0]) + } + if calls[1].AttemptID != "id-123" || calls[1].JobID != "" { + t.Fatalf("second call = %+v, want attempt target", calls[1]) + } + if strings.Contains(stderr.String(), "Following logs for attempt") { + t.Fatalf("stderr should not classify unresolved ID as attempt: %q", stderr.String()) + } +} + +func TestStreamUnresolvedLogsWithFollowUXPropagatesCancellation(t *testing.T) { + original := ciStreamJobAttemptLogs + t.Cleanup(func() { ciStreamJobAttemptLogs = original }) + + var calls []api.CILogStreamTarget + ciStreamJobAttemptLogs = func( + _ context.Context, + _, _ string, + target api.CILogStreamTarget, + _ io.Writer, + _ func(string), + ) error { + calls = append(calls, target) + return context.Canceled + } + + err := streamUnresolvedLogsWithFollowUX( + context.Background(), + "token-123", + "org-123", + "id-123", + io.Discard, + newLogFollowReporter(io.Discard, false), + ) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context cancellation, got %v", err) + } + if len(calls) != 1 { + t.Fatalf("calls = %d, want 1", len(calls)) + } + if calls[0].JobID != "id-123" || calls[0].AttemptID != "" { + t.Fatalf("first call = %+v, want job target", calls[0]) + } +} + +func TestStreamUnresolvedLogsWithFollowUXReturnsBothTargetErrors(t *testing.T) { + original := ciStreamJobAttemptLogs + t.Cleanup(func() { ciStreamJobAttemptLogs = original }) + + ciStreamJobAttemptLogs = func( + _ context.Context, + _, _ string, + target api.CILogStreamTarget, + _ io.Writer, + _ func(string), + ) error { + if target.JobID != "" { + return errors.New("job failed") + } + return errors.New("attempt failed") + } + + err := streamUnresolvedLogsWithFollowUX( + context.Background(), + "token-123", + "org-123", + "id-123", + io.Discard, + newLogFollowReporter(io.Discard, false), + ) + if err == nil { + t.Fatal("expected error") + } + unresolvedErr, ok := err.(*unresolvedLogStreamError) + if !ok { + t.Fatalf("expected unresolvedLogStreamError, got %T", err) + } + if unresolvedErr.jobErr.Error() != "job failed" { + t.Fatalf("job error = %v", unresolvedErr.jobErr) + } + if unresolvedErr.attemptErr.Error() != "attempt failed" { + t.Fatalf("attempt error = %v", unresolvedErr.attemptErr) } } diff --git a/pkg/proto/depot/ci/v1/ci.pb.go b/pkg/proto/depot/ci/v1/ci.pb.go index 289e32f9..ae8a3bc0 100644 --- a/pkg/proto/depot/ci/v1/ci.pb.go +++ b/pkg/proto/depot/ci/v1/ci.pb.go @@ -2620,6 +2620,140 @@ func (x *GetJobAttemptLogsResponse) GetNextPageToken() string { return "" } +type StreamJobAttemptLogsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // attempt_id identifies the concrete job attempt whose logs to stream. + AttemptId string `protobuf:"bytes,1,opt,name=attempt_id,json=attemptId,proto3" json:"attempt_id,omitempty"` + // Opaque cursor returned by a previous stream response. + Cursor string `protobuf:"bytes,2,opt,name=cursor,proto3" json:"cursor,omitempty"` + // job_id identifies a CI job. The stream resolves it to that job's latest attempt. + JobId string `protobuf:"bytes,3,opt,name=job_id,json=jobId,proto3" json:"job_id,omitempty"` +} + +func (x *StreamJobAttemptLogsRequest) Reset() { + *x = StreamJobAttemptLogsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_depot_ci_v1_ci_proto_msgTypes[37] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamJobAttemptLogsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamJobAttemptLogsRequest) ProtoMessage() {} + +func (x *StreamJobAttemptLogsRequest) ProtoReflect() protoreflect.Message { + mi := &file_depot_ci_v1_ci_proto_msgTypes[37] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamJobAttemptLogsRequest.ProtoReflect.Descriptor instead. +func (*StreamJobAttemptLogsRequest) Descriptor() ([]byte, []int) { + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{37} +} + +func (x *StreamJobAttemptLogsRequest) GetAttemptId() string { + if x != nil { + return x.AttemptId + } + return "" +} + +func (x *StreamJobAttemptLogsRequest) GetCursor() string { + if x != nil { + return x.Cursor + } + return "" +} + +func (x *StreamJobAttemptLogsRequest) GetJobId() string { + if x != nil { + return x.JobId + } + return "" +} + +type StreamJobAttemptLogsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // line is set when this response carries a persisted log line. + // Status-only responses can omit it while the stream is waiting for rows. + Line *LogLine `protobuf:"bytes,1,opt,name=line,proto3" json:"line,omitempty"` + // Opaque cursor to resume after the emitted line. + NextCursor string `protobuf:"bytes,2,opt,name=next_cursor,json=nextCursor,proto3" json:"next_cursor,omitempty"` + // attempt_status is the current lowercase attempt state, e.g. "queued", + // "running", "finished", "failed", or "cancelled". + AttemptStatus string `protobuf:"bytes,3,opt,name=attempt_status,json=attemptStatus,proto3" json:"attempt_status,omitempty"` +} + +func (x *StreamJobAttemptLogsResponse) Reset() { + *x = StreamJobAttemptLogsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_depot_ci_v1_ci_proto_msgTypes[38] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamJobAttemptLogsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamJobAttemptLogsResponse) ProtoMessage() {} + +func (x *StreamJobAttemptLogsResponse) ProtoReflect() protoreflect.Message { + mi := &file_depot_ci_v1_ci_proto_msgTypes[38] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamJobAttemptLogsResponse.ProtoReflect.Descriptor instead. +func (*StreamJobAttemptLogsResponse) Descriptor() ([]byte, []int) { + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{38} +} + +func (x *StreamJobAttemptLogsResponse) GetLine() *LogLine { + if x != nil { + return x.Line + } + return nil +} + +func (x *StreamJobAttemptLogsResponse) GetNextCursor() string { + if x != nil { + return x.NextCursor + } + return "" +} + +func (x *StreamJobAttemptLogsResponse) GetAttemptStatus() string { + if x != nil { + return x.AttemptStatus + } + return "" +} + type LogLine struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -2635,7 +2769,7 @@ type LogLine struct { func (x *LogLine) Reset() { *x = LogLine{} if protoimpl.UnsafeEnabled { - mi := &file_depot_ci_v1_ci_proto_msgTypes[37] + mi := &file_depot_ci_v1_ci_proto_msgTypes[39] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2648,7 +2782,7 @@ func (x *LogLine) String() string { func (*LogLine) ProtoMessage() {} func (x *LogLine) ProtoReflect() protoreflect.Message { - mi := &file_depot_ci_v1_ci_proto_msgTypes[37] + mi := &file_depot_ci_v1_ci_proto_msgTypes[39] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2661,7 +2795,7 @@ func (x *LogLine) ProtoReflect() protoreflect.Message { // Deprecated: Use LogLine.ProtoReflect.Descriptor instead. func (*LogLine) Descriptor() ([]byte, []int) { - return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{37} + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{39} } func (x *LogLine) GetStepId() string { @@ -2724,7 +2858,7 @@ type ListRunsRequest struct { func (x *ListRunsRequest) Reset() { *x = ListRunsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_depot_ci_v1_ci_proto_msgTypes[38] + mi := &file_depot_ci_v1_ci_proto_msgTypes[40] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2737,7 +2871,7 @@ func (x *ListRunsRequest) String() string { func (*ListRunsRequest) ProtoMessage() {} func (x *ListRunsRequest) ProtoReflect() protoreflect.Message { - mi := &file_depot_ci_v1_ci_proto_msgTypes[38] + mi := &file_depot_ci_v1_ci_proto_msgTypes[40] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2750,7 +2884,7 @@ func (x *ListRunsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ListRunsRequest.ProtoReflect.Descriptor instead. func (*ListRunsRequest) Descriptor() ([]byte, []int) { - return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{38} + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{40} } func (x *ListRunsRequest) GetStatus() []string { @@ -2814,7 +2948,7 @@ type ListRunsResponse struct { func (x *ListRunsResponse) Reset() { *x = ListRunsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_depot_ci_v1_ci_proto_msgTypes[39] + mi := &file_depot_ci_v1_ci_proto_msgTypes[41] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2827,7 +2961,7 @@ func (x *ListRunsResponse) String() string { func (*ListRunsResponse) ProtoMessage() {} func (x *ListRunsResponse) ProtoReflect() protoreflect.Message { - mi := &file_depot_ci_v1_ci_proto_msgTypes[39] + mi := &file_depot_ci_v1_ci_proto_msgTypes[41] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2840,7 +2974,7 @@ func (x *ListRunsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListRunsResponse.ProtoReflect.Descriptor instead. func (*ListRunsResponse) Descriptor() ([]byte, []int) { - return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{39} + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{41} } func (x *ListRunsResponse) GetRuns() []*ListRunsResponseRun { @@ -2876,7 +3010,7 @@ type ListRunsResponseRun struct { func (x *ListRunsResponseRun) Reset() { *x = ListRunsResponseRun{} if protoimpl.UnsafeEnabled { - mi := &file_depot_ci_v1_ci_proto_msgTypes[40] + mi := &file_depot_ci_v1_ci_proto_msgTypes[42] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2889,7 +3023,7 @@ func (x *ListRunsResponseRun) String() string { func (*ListRunsResponseRun) ProtoMessage() {} func (x *ListRunsResponseRun) ProtoReflect() protoreflect.Message { - mi := &file_depot_ci_v1_ci_proto_msgTypes[40] + mi := &file_depot_ci_v1_ci_proto_msgTypes[42] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -2902,7 +3036,7 @@ func (x *ListRunsResponseRun) ProtoReflect() protoreflect.Message { // Deprecated: Use ListRunsResponseRun.ProtoReflect.Descriptor instead. func (*ListRunsResponseRun) Descriptor() ([]byte, []int) { - return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{40} + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{42} } func (x *ListRunsResponseRun) GetRunId() string { @@ -2985,7 +3119,7 @@ type ListWorkflowsRequest struct { func (x *ListWorkflowsRequest) Reset() { *x = ListWorkflowsRequest{} if protoimpl.UnsafeEnabled { - mi := &file_depot_ci_v1_ci_proto_msgTypes[41] + mi := &file_depot_ci_v1_ci_proto_msgTypes[43] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -2998,7 +3132,7 @@ func (x *ListWorkflowsRequest) String() string { func (*ListWorkflowsRequest) ProtoMessage() {} func (x *ListWorkflowsRequest) ProtoReflect() protoreflect.Message { - mi := &file_depot_ci_v1_ci_proto_msgTypes[41] + mi := &file_depot_ci_v1_ci_proto_msgTypes[43] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3011,7 +3145,7 @@ func (x *ListWorkflowsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use ListWorkflowsRequest.ProtoReflect.Descriptor instead. func (*ListWorkflowsRequest) Descriptor() ([]byte, []int) { - return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{41} + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{43} } func (x *ListWorkflowsRequest) GetPageSize() int32 { @@ -3074,7 +3208,7 @@ type ListWorkflowsResponse struct { func (x *ListWorkflowsResponse) Reset() { *x = ListWorkflowsResponse{} if protoimpl.UnsafeEnabled { - mi := &file_depot_ci_v1_ci_proto_msgTypes[42] + mi := &file_depot_ci_v1_ci_proto_msgTypes[44] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3087,7 +3221,7 @@ func (x *ListWorkflowsResponse) String() string { func (*ListWorkflowsResponse) ProtoMessage() {} func (x *ListWorkflowsResponse) ProtoReflect() protoreflect.Message { - mi := &file_depot_ci_v1_ci_proto_msgTypes[42] + mi := &file_depot_ci_v1_ci_proto_msgTypes[44] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3100,7 +3234,7 @@ func (x *ListWorkflowsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ListWorkflowsResponse.ProtoReflect.Descriptor instead. func (*ListWorkflowsResponse) Descriptor() ([]byte, []int) { - return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{42} + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{44} } func (x *ListWorkflowsResponse) GetWorkflows() []*ListWorkflowsResponseWorkflow { @@ -3132,7 +3266,7 @@ type ListWorkflowsResponseWorkflow struct { func (x *ListWorkflowsResponseWorkflow) Reset() { *x = ListWorkflowsResponseWorkflow{} if protoimpl.UnsafeEnabled { - mi := &file_depot_ci_v1_ci_proto_msgTypes[43] + mi := &file_depot_ci_v1_ci_proto_msgTypes[45] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3145,7 +3279,7 @@ func (x *ListWorkflowsResponseWorkflow) String() string { func (*ListWorkflowsResponseWorkflow) ProtoMessage() {} func (x *ListWorkflowsResponseWorkflow) ProtoReflect() protoreflect.Message { - mi := &file_depot_ci_v1_ci_proto_msgTypes[43] + mi := &file_depot_ci_v1_ci_proto_msgTypes[45] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3158,7 +3292,7 @@ func (x *ListWorkflowsResponseWorkflow) ProtoReflect() protoreflect.Message { // Deprecated: Use ListWorkflowsResponseWorkflow.ProtoReflect.Descriptor instead. func (*ListWorkflowsResponseWorkflow) Descriptor() ([]byte, []int) { - return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{43} + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{45} } func (x *ListWorkflowsResponseWorkflow) GetWorkflowId() string { @@ -3256,7 +3390,7 @@ type ListWorkflowsResponseJobCounts struct { func (x *ListWorkflowsResponseJobCounts) Reset() { *x = ListWorkflowsResponseJobCounts{} if protoimpl.UnsafeEnabled { - mi := &file_depot_ci_v1_ci_proto_msgTypes[44] + mi := &file_depot_ci_v1_ci_proto_msgTypes[46] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3269,7 +3403,7 @@ func (x *ListWorkflowsResponseJobCounts) String() string { func (*ListWorkflowsResponseJobCounts) ProtoMessage() {} func (x *ListWorkflowsResponseJobCounts) ProtoReflect() protoreflect.Message { - mi := &file_depot_ci_v1_ci_proto_msgTypes[44] + mi := &file_depot_ci_v1_ci_proto_msgTypes[46] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3282,7 +3416,7 @@ func (x *ListWorkflowsResponseJobCounts) ProtoReflect() protoreflect.Message { // Deprecated: Use ListWorkflowsResponseJobCounts.ProtoReflect.Descriptor instead. func (*ListWorkflowsResponseJobCounts) Descriptor() ([]byte, []int) { - return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{44} + return file_depot_ci_v1_ci_proto_rawDescGZIP(), []int{46} } func (x *ListWorkflowsResponseJobCounts) GetTotal() int32 { @@ -3671,7 +3805,23 @@ var file_depot_ci_v1_ci_proto_rawDesc = []byte{ 0x76, 0x31, 0x2e, 0x4c, 0x6f, 0x67, 0x4c, 0x69, 0x6e, 0x65, 0x52, 0x05, 0x6c, 0x69, 0x6e, 0x65, 0x73, 0x12, 0x26, 0x0a, 0x0f, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x70, 0x61, 0x67, 0x65, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x6e, 0x65, 0x78, 0x74, - 0x50, 0x61, 0x67, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x92, 0x01, 0x0a, 0x07, 0x4c, 0x6f, + 0x50, 0x61, 0x67, 0x65, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x6b, 0x0a, 0x1b, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x4a, 0x6f, 0x62, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x4c, 0x6f, 0x67, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x61, 0x74, 0x74, 0x65, + 0x6d, 0x70, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x61, 0x74, + 0x74, 0x65, 0x6d, 0x70, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, + 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, + 0x15, 0x0a, 0x06, 0x6a, 0x6f, 0x62, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x22, 0x90, 0x01, 0x0a, 0x1c, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x4a, 0x6f, 0x62, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, + 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x6f, 0x67, 0x4c, 0x69, 0x6e, 0x65, 0x52, 0x04, 0x6c, 0x69, 0x6e, + 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, 0x65, 0x78, 0x74, 0x5f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x6e, 0x65, 0x78, 0x74, 0x43, 0x75, 0x72, 0x73, + 0x6f, 0x72, 0x12, 0x25, 0x0a, 0x0e, 0x61, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x5f, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x61, 0x74, 0x74, 0x65, + 0x6d, 0x70, 0x74, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x07, 0x4c, 0x6f, 0x67, 0x4c, 0x69, 0x6e, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x73, 0x74, 0x65, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x65, 0x70, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x6d, 0x73, 0x18, 0x02, @@ -3770,7 +3920,7 @@ var file_depot_ci_v1_ci_proto_rawDesc = []byte{ 0x6c, 0x65, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x65, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x6c, 0x65, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64, 0x18, 0x08, 0x20, 0x01, - 0x28, 0x05, 0x52, 0x07, 0x73, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64, 0x32, 0xa3, 0x09, 0x0a, 0x09, + 0x28, 0x05, 0x52, 0x07, 0x73, 0x6b, 0x69, 0x70, 0x70, 0x65, 0x64, 0x32, 0x94, 0x0a, 0x0a, 0x09, 0x43, 0x49, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3a, 0x0a, 0x03, 0x52, 0x75, 0x6e, 0x12, 0x17, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x18, 0x2e, 0x64, 0x65, 0x70, 0x6f, @@ -3834,42 +3984,49 @@ var file_depot_ci_v1_ci_proto_rawDesc = []byte{ 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x4a, 0x6f, 0x62, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x4c, 0x6f, - 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x49, 0x0a, 0x08, - 0x4c, 0x69, 0x73, 0x74, 0x52, 0x75, 0x6e, 0x73, 0x12, 0x1c, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, - 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x75, 0x6e, 0x73, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, - 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x75, 0x6e, 0x73, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x58, 0x0a, 0x0d, 0x4c, 0x69, 0x73, 0x74, 0x57, - 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x12, 0x21, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, - 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, 0x66, - 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x64, 0x65, - 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x57, 0x6f, - 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, - 0x00, 0x32, 0xe1, 0x01, 0x0a, 0x10, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5e, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x73, - 0x74, 0x61, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x2e, 0x64, 0x65, 0x70, 0x6f, - 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, - 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, - 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, - 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6d, 0x0a, 0x14, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, - 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x73, 0x41, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x73, 0x12, 0x28, - 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6d, 0x70, - 0x6f, 0x72, 0x74, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x73, 0x41, 0x6e, 0x64, 0x56, 0x61, 0x72, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, - 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x53, 0x65, 0x63, - 0x72, 0x65, 0x74, 0x73, 0x41, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x99, 0x01, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x64, 0x65, - 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x42, 0x07, 0x43, 0x69, 0x50, 0x72, 0x6f, - 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2f, 0x63, 0x6c, 0x69, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2f, 0x63, 0x69, 0x2f, 0x76, 0x31, - 0x3b, 0x63, 0x69, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x44, 0x43, 0x58, 0xaa, 0x02, 0x0b, 0x44, 0x65, - 0x70, 0x6f, 0x74, 0x2e, 0x43, 0x69, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0b, 0x44, 0x65, 0x70, 0x6f, - 0x74, 0x5c, 0x43, 0x69, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x17, 0x44, 0x65, 0x70, 0x6f, 0x74, 0x5c, - 0x43, 0x69, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, - 0x61, 0xea, 0x02, 0x0d, 0x44, 0x65, 0x70, 0x6f, 0x74, 0x3a, 0x3a, 0x43, 0x69, 0x3a, 0x3a, 0x56, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x67, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x14, + 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4a, 0x6f, 0x62, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, + 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x28, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, + 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4a, 0x6f, 0x62, 0x41, 0x74, 0x74, 0x65, + 0x6d, 0x70, 0x74, 0x4c, 0x6f, 0x67, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, + 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x4a, 0x6f, 0x62, 0x41, 0x74, 0x74, 0x65, 0x6d, 0x70, 0x74, 0x4c, 0x6f, 0x67, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x49, 0x0a, + 0x08, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x75, 0x6e, 0x73, 0x12, 0x1c, 0x2e, 0x64, 0x65, 0x70, 0x6f, + 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x75, 0x6e, 0x73, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, + 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x75, 0x6e, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x58, 0x0a, 0x0d, 0x4c, 0x69, 0x73, 0x74, + 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x12, 0x21, 0x2e, 0x64, 0x65, 0x70, 0x6f, + 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x57, 0x6f, 0x72, 0x6b, + 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x22, 0x2e, 0x64, + 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x57, + 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x32, 0xe1, 0x01, 0x0a, 0x10, 0x4d, 0x69, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5e, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x49, 0x6e, + 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x23, 0x2e, 0x64, 0x65, 0x70, + 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x49, 0x6e, 0x73, 0x74, + 0x61, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x24, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, + 0x74, 0x49, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x6d, 0x0a, 0x14, 0x49, 0x6d, 0x70, 0x6f, 0x72, + 0x74, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x73, 0x41, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x73, 0x12, + 0x28, 0x2e, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6d, + 0x70, 0x6f, 0x72, 0x74, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x73, 0x41, 0x6e, 0x64, 0x56, 0x61, + 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x64, 0x65, 0x70, 0x6f, + 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x49, 0x6d, 0x70, 0x6f, 0x72, 0x74, 0x53, 0x65, + 0x63, 0x72, 0x65, 0x74, 0x73, 0x41, 0x6e, 0x64, 0x56, 0x61, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x99, 0x01, 0x0a, 0x0f, 0x63, 0x6f, 0x6d, 0x2e, 0x64, + 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x63, 0x69, 0x2e, 0x76, 0x31, 0x42, 0x07, 0x43, 0x69, 0x50, 0x72, + 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2f, 0x63, 0x6c, 0x69, 0x2f, 0x70, 0x6b, 0x67, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x64, 0x65, 0x70, 0x6f, 0x74, 0x2f, 0x63, 0x69, 0x2f, 0x76, + 0x31, 0x3b, 0x63, 0x69, 0x76, 0x31, 0xa2, 0x02, 0x03, 0x44, 0x43, 0x58, 0xaa, 0x02, 0x0b, 0x44, + 0x65, 0x70, 0x6f, 0x74, 0x2e, 0x43, 0x69, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x0b, 0x44, 0x65, 0x70, + 0x6f, 0x74, 0x5c, 0x43, 0x69, 0x5c, 0x56, 0x31, 0xe2, 0x02, 0x17, 0x44, 0x65, 0x70, 0x6f, 0x74, + 0x5c, 0x43, 0x69, 0x5c, 0x56, 0x31, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0xea, 0x02, 0x0d, 0x44, 0x65, 0x70, 0x6f, 0x74, 0x3a, 0x3a, 0x43, 0x69, 0x3a, 0x3a, + 0x56, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -3884,7 +4041,7 @@ func file_depot_ci_v1_ci_proto_rawDescGZIP() []byte { return file_depot_ci_v1_ci_proto_rawDescData } -var file_depot_ci_v1_ci_proto_msgTypes = make([]protoimpl.MessageInfo, 46) +var file_depot_ci_v1_ci_proto_msgTypes = make([]protoimpl.MessageInfo, 48) var file_depot_ci_v1_ci_proto_goTypes = []interface{}{ (*GetInstallationRequest)(nil), // 0: depot.ci.v1.GetInstallationRequest (*GetInstallationResponse)(nil), // 1: depot.ci.v1.GetInstallationResponse @@ -3923,68 +4080,73 @@ var file_depot_ci_v1_ci_proto_goTypes = []interface{}{ (*GetWorkflowJobAttempt)(nil), // 34: depot.ci.v1.GetWorkflowJobAttempt (*GetJobAttemptLogsRequest)(nil), // 35: depot.ci.v1.GetJobAttemptLogsRequest (*GetJobAttemptLogsResponse)(nil), // 36: depot.ci.v1.GetJobAttemptLogsResponse - (*LogLine)(nil), // 37: depot.ci.v1.LogLine - (*ListRunsRequest)(nil), // 38: depot.ci.v1.ListRunsRequest - (*ListRunsResponse)(nil), // 39: depot.ci.v1.ListRunsResponse - (*ListRunsResponseRun)(nil), // 40: depot.ci.v1.ListRunsResponseRun - (*ListWorkflowsRequest)(nil), // 41: depot.ci.v1.ListWorkflowsRequest - (*ListWorkflowsResponse)(nil), // 42: depot.ci.v1.ListWorkflowsResponse - (*ListWorkflowsResponseWorkflow)(nil), // 43: depot.ci.v1.ListWorkflowsResponseWorkflow - (*ListWorkflowsResponseJobCounts)(nil), // 44: depot.ci.v1.ListWorkflowsResponseJobCounts - nil, // 45: depot.ci.v1.DispatchWorkflowRequest.InputsEntry + (*StreamJobAttemptLogsRequest)(nil), // 37: depot.ci.v1.StreamJobAttemptLogsRequest + (*StreamJobAttemptLogsResponse)(nil), // 38: depot.ci.v1.StreamJobAttemptLogsResponse + (*LogLine)(nil), // 39: depot.ci.v1.LogLine + (*ListRunsRequest)(nil), // 40: depot.ci.v1.ListRunsRequest + (*ListRunsResponse)(nil), // 41: depot.ci.v1.ListRunsResponse + (*ListRunsResponseRun)(nil), // 42: depot.ci.v1.ListRunsResponseRun + (*ListWorkflowsRequest)(nil), // 43: depot.ci.v1.ListWorkflowsRequest + (*ListWorkflowsResponse)(nil), // 44: depot.ci.v1.ListWorkflowsResponse + (*ListWorkflowsResponseWorkflow)(nil), // 45: depot.ci.v1.ListWorkflowsResponseWorkflow + (*ListWorkflowsResponseJobCounts)(nil), // 46: depot.ci.v1.ListWorkflowsResponseJobCounts + nil, // 47: depot.ci.v1.DispatchWorkflowRequest.InputsEntry } var file_depot_ci_v1_ci_proto_depIdxs = []int32{ 6, // 0: depot.ci.v1.GetInstallationResponse.installations:type_name -> depot.ci.v1.Installation 4, // 1: depot.ci.v1.ImportSecretsAndVarsResponse.dry_run_result:type_name -> depot.ci.v1.DryRunResult 5, // 2: depot.ci.v1.ImportSecretsAndVarsResponse.run_result:type_name -> depot.ci.v1.RunResult - 45, // 3: depot.ci.v1.DispatchWorkflowRequest.inputs:type_name -> depot.ci.v1.DispatchWorkflowRequest.InputsEntry + 47, // 3: depot.ci.v1.DispatchWorkflowRequest.inputs:type_name -> depot.ci.v1.DispatchWorkflowRequest.InputsEntry 27, // 4: depot.ci.v1.GetRunStatusResponse.workflows:type_name -> depot.ci.v1.WorkflowStatus 28, // 5: depot.ci.v1.WorkflowStatus.jobs:type_name -> depot.ci.v1.JobStatus 29, // 6: depot.ci.v1.JobStatus.attempts:type_name -> depot.ci.v1.AttemptStatus 32, // 7: depot.ci.v1.GetWorkflowResponse.executions:type_name -> depot.ci.v1.GetWorkflowExecution 33, // 8: depot.ci.v1.GetWorkflowResponse.jobs:type_name -> depot.ci.v1.GetWorkflowJob 34, // 9: depot.ci.v1.GetWorkflowJob.attempts:type_name -> depot.ci.v1.GetWorkflowJobAttempt - 37, // 10: depot.ci.v1.GetJobAttemptLogsResponse.lines:type_name -> depot.ci.v1.LogLine - 40, // 11: depot.ci.v1.ListRunsResponse.runs:type_name -> depot.ci.v1.ListRunsResponseRun - 43, // 12: depot.ci.v1.ListWorkflowsResponse.workflows:type_name -> depot.ci.v1.ListWorkflowsResponseWorkflow - 44, // 13: depot.ci.v1.ListWorkflowsResponseWorkflow.job_counts:type_name -> depot.ci.v1.ListWorkflowsResponseJobCounts - 7, // 14: depot.ci.v1.CIService.Run:input_type -> depot.ci.v1.RunRequest - 9, // 15: depot.ci.v1.CIService.DispatchWorkflow:input_type -> depot.ci.v1.DispatchWorkflowRequest - 11, // 16: depot.ci.v1.CIService.RetryJob:input_type -> depot.ci.v1.RetryJobRequest - 13, // 17: depot.ci.v1.CIService.RerunWorkflow:input_type -> depot.ci.v1.RerunWorkflowRequest - 15, // 18: depot.ci.v1.CIService.RetryFailedJobs:input_type -> depot.ci.v1.RetryFailedJobsRequest - 17, // 19: depot.ci.v1.CIService.CancelJob:input_type -> depot.ci.v1.CancelJobRequest - 19, // 20: depot.ci.v1.CIService.CancelWorkflow:input_type -> depot.ci.v1.CancelWorkflowRequest - 21, // 21: depot.ci.v1.CIService.GetRun:input_type -> depot.ci.v1.GetRunRequest - 23, // 22: depot.ci.v1.CIService.CancelRun:input_type -> depot.ci.v1.CancelRunRequest - 25, // 23: depot.ci.v1.CIService.GetRunStatus:input_type -> depot.ci.v1.GetRunStatusRequest - 30, // 24: depot.ci.v1.CIService.GetWorkflow:input_type -> depot.ci.v1.GetWorkflowRequest - 35, // 25: depot.ci.v1.CIService.GetJobAttemptLogs:input_type -> depot.ci.v1.GetJobAttemptLogsRequest - 38, // 26: depot.ci.v1.CIService.ListRuns:input_type -> depot.ci.v1.ListRunsRequest - 41, // 27: depot.ci.v1.CIService.ListWorkflows:input_type -> depot.ci.v1.ListWorkflowsRequest - 0, // 28: depot.ci.v1.MigrationService.GetInstallation:input_type -> depot.ci.v1.GetInstallationRequest - 2, // 29: depot.ci.v1.MigrationService.ImportSecretsAndVars:input_type -> depot.ci.v1.ImportSecretsAndVarsRequest - 8, // 30: depot.ci.v1.CIService.Run:output_type -> depot.ci.v1.RunResponse - 10, // 31: depot.ci.v1.CIService.DispatchWorkflow:output_type -> depot.ci.v1.DispatchWorkflowResponse - 12, // 32: depot.ci.v1.CIService.RetryJob:output_type -> depot.ci.v1.RetryJobResponse - 14, // 33: depot.ci.v1.CIService.RerunWorkflow:output_type -> depot.ci.v1.RerunWorkflowResponse - 16, // 34: depot.ci.v1.CIService.RetryFailedJobs:output_type -> depot.ci.v1.RetryFailedJobsResponse - 18, // 35: depot.ci.v1.CIService.CancelJob:output_type -> depot.ci.v1.CancelJobResponse - 20, // 36: depot.ci.v1.CIService.CancelWorkflow:output_type -> depot.ci.v1.CancelWorkflowResponse - 22, // 37: depot.ci.v1.CIService.GetRun:output_type -> depot.ci.v1.GetRunResponse - 24, // 38: depot.ci.v1.CIService.CancelRun:output_type -> depot.ci.v1.CancelRunResponse - 26, // 39: depot.ci.v1.CIService.GetRunStatus:output_type -> depot.ci.v1.GetRunStatusResponse - 31, // 40: depot.ci.v1.CIService.GetWorkflow:output_type -> depot.ci.v1.GetWorkflowResponse - 36, // 41: depot.ci.v1.CIService.GetJobAttemptLogs:output_type -> depot.ci.v1.GetJobAttemptLogsResponse - 39, // 42: depot.ci.v1.CIService.ListRuns:output_type -> depot.ci.v1.ListRunsResponse - 42, // 43: depot.ci.v1.CIService.ListWorkflows:output_type -> depot.ci.v1.ListWorkflowsResponse - 1, // 44: depot.ci.v1.MigrationService.GetInstallation:output_type -> depot.ci.v1.GetInstallationResponse - 3, // 45: depot.ci.v1.MigrationService.ImportSecretsAndVars:output_type -> depot.ci.v1.ImportSecretsAndVarsResponse - 30, // [30:46] is the sub-list for method output_type - 14, // [14:30] is the sub-list for method input_type - 14, // [14:14] is the sub-list for extension type_name - 14, // [14:14] is the sub-list for extension extendee - 0, // [0:14] is the sub-list for field type_name + 39, // 10: depot.ci.v1.GetJobAttemptLogsResponse.lines:type_name -> depot.ci.v1.LogLine + 39, // 11: depot.ci.v1.StreamJobAttemptLogsResponse.line:type_name -> depot.ci.v1.LogLine + 42, // 12: depot.ci.v1.ListRunsResponse.runs:type_name -> depot.ci.v1.ListRunsResponseRun + 45, // 13: depot.ci.v1.ListWorkflowsResponse.workflows:type_name -> depot.ci.v1.ListWorkflowsResponseWorkflow + 46, // 14: depot.ci.v1.ListWorkflowsResponseWorkflow.job_counts:type_name -> depot.ci.v1.ListWorkflowsResponseJobCounts + 7, // 15: depot.ci.v1.CIService.Run:input_type -> depot.ci.v1.RunRequest + 9, // 16: depot.ci.v1.CIService.DispatchWorkflow:input_type -> depot.ci.v1.DispatchWorkflowRequest + 11, // 17: depot.ci.v1.CIService.RetryJob:input_type -> depot.ci.v1.RetryJobRequest + 13, // 18: depot.ci.v1.CIService.RerunWorkflow:input_type -> depot.ci.v1.RerunWorkflowRequest + 15, // 19: depot.ci.v1.CIService.RetryFailedJobs:input_type -> depot.ci.v1.RetryFailedJobsRequest + 17, // 20: depot.ci.v1.CIService.CancelJob:input_type -> depot.ci.v1.CancelJobRequest + 19, // 21: depot.ci.v1.CIService.CancelWorkflow:input_type -> depot.ci.v1.CancelWorkflowRequest + 21, // 22: depot.ci.v1.CIService.GetRun:input_type -> depot.ci.v1.GetRunRequest + 23, // 23: depot.ci.v1.CIService.CancelRun:input_type -> depot.ci.v1.CancelRunRequest + 25, // 24: depot.ci.v1.CIService.GetRunStatus:input_type -> depot.ci.v1.GetRunStatusRequest + 30, // 25: depot.ci.v1.CIService.GetWorkflow:input_type -> depot.ci.v1.GetWorkflowRequest + 35, // 26: depot.ci.v1.CIService.GetJobAttemptLogs:input_type -> depot.ci.v1.GetJobAttemptLogsRequest + 37, // 27: depot.ci.v1.CIService.StreamJobAttemptLogs:input_type -> depot.ci.v1.StreamJobAttemptLogsRequest + 40, // 28: depot.ci.v1.CIService.ListRuns:input_type -> depot.ci.v1.ListRunsRequest + 43, // 29: depot.ci.v1.CIService.ListWorkflows:input_type -> depot.ci.v1.ListWorkflowsRequest + 0, // 30: depot.ci.v1.MigrationService.GetInstallation:input_type -> depot.ci.v1.GetInstallationRequest + 2, // 31: depot.ci.v1.MigrationService.ImportSecretsAndVars:input_type -> depot.ci.v1.ImportSecretsAndVarsRequest + 8, // 32: depot.ci.v1.CIService.Run:output_type -> depot.ci.v1.RunResponse + 10, // 33: depot.ci.v1.CIService.DispatchWorkflow:output_type -> depot.ci.v1.DispatchWorkflowResponse + 12, // 34: depot.ci.v1.CIService.RetryJob:output_type -> depot.ci.v1.RetryJobResponse + 14, // 35: depot.ci.v1.CIService.RerunWorkflow:output_type -> depot.ci.v1.RerunWorkflowResponse + 16, // 36: depot.ci.v1.CIService.RetryFailedJobs:output_type -> depot.ci.v1.RetryFailedJobsResponse + 18, // 37: depot.ci.v1.CIService.CancelJob:output_type -> depot.ci.v1.CancelJobResponse + 20, // 38: depot.ci.v1.CIService.CancelWorkflow:output_type -> depot.ci.v1.CancelWorkflowResponse + 22, // 39: depot.ci.v1.CIService.GetRun:output_type -> depot.ci.v1.GetRunResponse + 24, // 40: depot.ci.v1.CIService.CancelRun:output_type -> depot.ci.v1.CancelRunResponse + 26, // 41: depot.ci.v1.CIService.GetRunStatus:output_type -> depot.ci.v1.GetRunStatusResponse + 31, // 42: depot.ci.v1.CIService.GetWorkflow:output_type -> depot.ci.v1.GetWorkflowResponse + 36, // 43: depot.ci.v1.CIService.GetJobAttemptLogs:output_type -> depot.ci.v1.GetJobAttemptLogsResponse + 38, // 44: depot.ci.v1.CIService.StreamJobAttemptLogs:output_type -> depot.ci.v1.StreamJobAttemptLogsResponse + 41, // 45: depot.ci.v1.CIService.ListRuns:output_type -> depot.ci.v1.ListRunsResponse + 44, // 46: depot.ci.v1.CIService.ListWorkflows:output_type -> depot.ci.v1.ListWorkflowsResponse + 1, // 47: depot.ci.v1.MigrationService.GetInstallation:output_type -> depot.ci.v1.GetInstallationResponse + 3, // 48: depot.ci.v1.MigrationService.ImportSecretsAndVars:output_type -> depot.ci.v1.ImportSecretsAndVarsResponse + 32, // [32:49] is the sub-list for method output_type + 15, // [15:32] is the sub-list for method input_type + 15, // [15:15] is the sub-list for extension type_name + 15, // [15:15] is the sub-list for extension extendee + 0, // [0:15] is the sub-list for field type_name } func init() { file_depot_ci_v1_ci_proto_init() } @@ -4438,7 +4600,7 @@ func file_depot_ci_v1_ci_proto_init() { } } file_depot_ci_v1_ci_proto_msgTypes[37].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*LogLine); i { + switch v := v.(*StreamJobAttemptLogsRequest); i { case 0: return &v.state case 1: @@ -4450,7 +4612,7 @@ func file_depot_ci_v1_ci_proto_init() { } } file_depot_ci_v1_ci_proto_msgTypes[38].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListRunsRequest); i { + switch v := v.(*StreamJobAttemptLogsResponse); i { case 0: return &v.state case 1: @@ -4462,7 +4624,7 @@ func file_depot_ci_v1_ci_proto_init() { } } file_depot_ci_v1_ci_proto_msgTypes[39].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListRunsResponse); i { + switch v := v.(*LogLine); i { case 0: return &v.state case 1: @@ -4474,7 +4636,7 @@ func file_depot_ci_v1_ci_proto_init() { } } file_depot_ci_v1_ci_proto_msgTypes[40].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListRunsResponseRun); i { + switch v := v.(*ListRunsRequest); i { case 0: return &v.state case 1: @@ -4486,7 +4648,7 @@ func file_depot_ci_v1_ci_proto_init() { } } file_depot_ci_v1_ci_proto_msgTypes[41].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListWorkflowsRequest); i { + switch v := v.(*ListRunsResponse); i { case 0: return &v.state case 1: @@ -4498,7 +4660,7 @@ func file_depot_ci_v1_ci_proto_init() { } } file_depot_ci_v1_ci_proto_msgTypes[42].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListWorkflowsResponse); i { + switch v := v.(*ListRunsResponseRun); i { case 0: return &v.state case 1: @@ -4510,7 +4672,7 @@ func file_depot_ci_v1_ci_proto_init() { } } file_depot_ci_v1_ci_proto_msgTypes[43].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ListWorkflowsResponseWorkflow); i { + switch v := v.(*ListWorkflowsRequest); i { case 0: return &v.state case 1: @@ -4522,6 +4684,30 @@ func file_depot_ci_v1_ci_proto_init() { } } file_depot_ci_v1_ci_proto_msgTypes[44].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListWorkflowsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_depot_ci_v1_ci_proto_msgTypes[45].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ListWorkflowsResponseWorkflow); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_depot_ci_v1_ci_proto_msgTypes[46].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ListWorkflowsResponseJobCounts); i { case 0: return &v.state @@ -4545,7 +4731,7 @@ func file_depot_ci_v1_ci_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_depot_ci_v1_ci_proto_rawDesc, NumEnums: 0, - NumMessages: 46, + NumMessages: 48, NumExtensions: 0, NumServices: 2, }, diff --git a/pkg/proto/depot/ci/v1/civ1connect/ci.connect.go b/pkg/proto/depot/ci/v1/civ1connect/ci.connect.go index 6ece23f1..e97d03e6 100644 --- a/pkg/proto/depot/ci/v1/civ1connect/ci.connect.go +++ b/pkg/proto/depot/ci/v1/civ1connect/ci.connect.go @@ -63,6 +63,9 @@ const ( // CIServiceGetJobAttemptLogsProcedure is the fully-qualified name of the CIService's // GetJobAttemptLogs RPC. CIServiceGetJobAttemptLogsProcedure = "/depot.ci.v1.CIService/GetJobAttemptLogs" + // CIServiceStreamJobAttemptLogsProcedure is the fully-qualified name of the CIService's + // StreamJobAttemptLogs RPC. + CIServiceStreamJobAttemptLogsProcedure = "/depot.ci.v1.CIService/StreamJobAttemptLogs" // CIServiceListRunsProcedure is the fully-qualified name of the CIService's ListRuns RPC. CIServiceListRunsProcedure = "/depot.ci.v1.CIService/ListRuns" // CIServiceListWorkflowsProcedure is the fully-qualified name of the CIService's ListWorkflows RPC. @@ -101,6 +104,8 @@ type CIServiceClient interface { GetWorkflow(context.Context, *connect.Request[v1.GetWorkflowRequest]) (*connect.Response[v1.GetWorkflowResponse], error) // GetJobAttemptLogs returns log lines for a job attempt GetJobAttemptLogs(context.Context, *connect.Request[v1.GetJobAttemptLogsRequest]) (*connect.Response[v1.GetJobAttemptLogsResponse], error) + // StreamJobAttemptLogs streams persisted log lines for a job attempt + StreamJobAttemptLogs(context.Context, *connect.Request[v1.StreamJobAttemptLogsRequest]) (*connect.ServerStreamForClient[v1.StreamJobAttemptLogsResponse], error) // ListRuns returns recent CI runs for the authenticated organization. // // This is a recent run discovery API, not a historical search API. Results are always @@ -180,6 +185,11 @@ func NewCIServiceClient(httpClient connect.HTTPClient, baseURL string, opts ...c baseURL+CIServiceGetJobAttemptLogsProcedure, opts..., ), + streamJobAttemptLogs: connect.NewClient[v1.StreamJobAttemptLogsRequest, v1.StreamJobAttemptLogsResponse]( + httpClient, + baseURL+CIServiceStreamJobAttemptLogsProcedure, + opts..., + ), listRuns: connect.NewClient[v1.ListRunsRequest, v1.ListRunsResponse]( httpClient, baseURL+CIServiceListRunsProcedure, @@ -195,20 +205,21 @@ func NewCIServiceClient(httpClient connect.HTTPClient, baseURL string, opts ...c // cIServiceClient implements CIServiceClient. type cIServiceClient struct { - run *connect.Client[v1.RunRequest, v1.RunResponse] - dispatchWorkflow *connect.Client[v1.DispatchWorkflowRequest, v1.DispatchWorkflowResponse] - retryJob *connect.Client[v1.RetryJobRequest, v1.RetryJobResponse] - rerunWorkflow *connect.Client[v1.RerunWorkflowRequest, v1.RerunWorkflowResponse] - retryFailedJobs *connect.Client[v1.RetryFailedJobsRequest, v1.RetryFailedJobsResponse] - cancelJob *connect.Client[v1.CancelJobRequest, v1.CancelJobResponse] - cancelWorkflow *connect.Client[v1.CancelWorkflowRequest, v1.CancelWorkflowResponse] - getRun *connect.Client[v1.GetRunRequest, v1.GetRunResponse] - cancelRun *connect.Client[v1.CancelRunRequest, v1.CancelRunResponse] - getRunStatus *connect.Client[v1.GetRunStatusRequest, v1.GetRunStatusResponse] - getWorkflow *connect.Client[v1.GetWorkflowRequest, v1.GetWorkflowResponse] - getJobAttemptLogs *connect.Client[v1.GetJobAttemptLogsRequest, v1.GetJobAttemptLogsResponse] - listRuns *connect.Client[v1.ListRunsRequest, v1.ListRunsResponse] - listWorkflows *connect.Client[v1.ListWorkflowsRequest, v1.ListWorkflowsResponse] + run *connect.Client[v1.RunRequest, v1.RunResponse] + dispatchWorkflow *connect.Client[v1.DispatchWorkflowRequest, v1.DispatchWorkflowResponse] + retryJob *connect.Client[v1.RetryJobRequest, v1.RetryJobResponse] + rerunWorkflow *connect.Client[v1.RerunWorkflowRequest, v1.RerunWorkflowResponse] + retryFailedJobs *connect.Client[v1.RetryFailedJobsRequest, v1.RetryFailedJobsResponse] + cancelJob *connect.Client[v1.CancelJobRequest, v1.CancelJobResponse] + cancelWorkflow *connect.Client[v1.CancelWorkflowRequest, v1.CancelWorkflowResponse] + getRun *connect.Client[v1.GetRunRequest, v1.GetRunResponse] + cancelRun *connect.Client[v1.CancelRunRequest, v1.CancelRunResponse] + getRunStatus *connect.Client[v1.GetRunStatusRequest, v1.GetRunStatusResponse] + getWorkflow *connect.Client[v1.GetWorkflowRequest, v1.GetWorkflowResponse] + getJobAttemptLogs *connect.Client[v1.GetJobAttemptLogsRequest, v1.GetJobAttemptLogsResponse] + streamJobAttemptLogs *connect.Client[v1.StreamJobAttemptLogsRequest, v1.StreamJobAttemptLogsResponse] + listRuns *connect.Client[v1.ListRunsRequest, v1.ListRunsResponse] + listWorkflows *connect.Client[v1.ListWorkflowsRequest, v1.ListWorkflowsResponse] } // Run calls depot.ci.v1.CIService.Run. @@ -271,6 +282,11 @@ func (c *cIServiceClient) GetJobAttemptLogs(ctx context.Context, req *connect.Re return c.getJobAttemptLogs.CallUnary(ctx, req) } +// StreamJobAttemptLogs calls depot.ci.v1.CIService.StreamJobAttemptLogs. +func (c *cIServiceClient) StreamJobAttemptLogs(ctx context.Context, req *connect.Request[v1.StreamJobAttemptLogsRequest]) (*connect.ServerStreamForClient[v1.StreamJobAttemptLogsResponse], error) { + return c.streamJobAttemptLogs.CallServerStream(ctx, req) +} + // ListRuns calls depot.ci.v1.CIService.ListRuns. func (c *cIServiceClient) ListRuns(ctx context.Context, req *connect.Request[v1.ListRunsRequest]) (*connect.Response[v1.ListRunsResponse], error) { return c.listRuns.CallUnary(ctx, req) @@ -307,6 +323,8 @@ type CIServiceHandler interface { GetWorkflow(context.Context, *connect.Request[v1.GetWorkflowRequest]) (*connect.Response[v1.GetWorkflowResponse], error) // GetJobAttemptLogs returns log lines for a job attempt GetJobAttemptLogs(context.Context, *connect.Request[v1.GetJobAttemptLogsRequest]) (*connect.Response[v1.GetJobAttemptLogsResponse], error) + // StreamJobAttemptLogs streams persisted log lines for a job attempt + StreamJobAttemptLogs(context.Context, *connect.Request[v1.StreamJobAttemptLogsRequest], *connect.ServerStream[v1.StreamJobAttemptLogsResponse]) error // ListRuns returns recent CI runs for the authenticated organization. // // This is a recent run discovery API, not a historical search API. Results are always @@ -382,6 +400,11 @@ func NewCIServiceHandler(svc CIServiceHandler, opts ...connect.HandlerOption) (s svc.GetJobAttemptLogs, opts..., ) + cIServiceStreamJobAttemptLogsHandler := connect.NewServerStreamHandler( + CIServiceStreamJobAttemptLogsProcedure, + svc.StreamJobAttemptLogs, + opts..., + ) cIServiceListRunsHandler := connect.NewUnaryHandler( CIServiceListRunsProcedure, svc.ListRuns, @@ -418,6 +441,8 @@ func NewCIServiceHandler(svc CIServiceHandler, opts ...connect.HandlerOption) (s cIServiceGetWorkflowHandler.ServeHTTP(w, r) case CIServiceGetJobAttemptLogsProcedure: cIServiceGetJobAttemptLogsHandler.ServeHTTP(w, r) + case CIServiceStreamJobAttemptLogsProcedure: + cIServiceStreamJobAttemptLogsHandler.ServeHTTP(w, r) case CIServiceListRunsProcedure: cIServiceListRunsHandler.ServeHTTP(w, r) case CIServiceListWorkflowsProcedure: @@ -479,6 +504,10 @@ func (UnimplementedCIServiceHandler) GetJobAttemptLogs(context.Context, *connect return nil, connect.NewError(connect.CodeUnimplemented, errors.New("depot.ci.v1.CIService.GetJobAttemptLogs is not implemented")) } +func (UnimplementedCIServiceHandler) StreamJobAttemptLogs(context.Context, *connect.Request[v1.StreamJobAttemptLogsRequest], *connect.ServerStream[v1.StreamJobAttemptLogsResponse]) error { + return connect.NewError(connect.CodeUnimplemented, errors.New("depot.ci.v1.CIService.StreamJobAttemptLogs is not implemented")) +} + func (UnimplementedCIServiceHandler) ListRuns(context.Context, *connect.Request[v1.ListRunsRequest]) (*connect.Response[v1.ListRunsResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, errors.New("depot.ci.v1.CIService.ListRuns is not implemented")) } diff --git a/proto/depot/ci/v1/ci.proto b/proto/depot/ci/v1/ci.proto index 26bebebb..7f07f36e 100644 --- a/proto/depot/ci/v1/ci.proto +++ b/proto/depot/ci/v1/ci.proto @@ -41,6 +41,9 @@ service CIService { // GetJobAttemptLogs returns log lines for a job attempt rpc GetJobAttemptLogs(GetJobAttemptLogsRequest) returns (GetJobAttemptLogsResponse) {} + // StreamJobAttemptLogs streams persisted log lines for a job attempt + rpc StreamJobAttemptLogs(StreamJobAttemptLogsRequest) returns (stream StreamJobAttemptLogsResponse) {} + // ListRuns returns recent CI runs for the authenticated organization. // // This is a recent run discovery API, not a historical search API. Results are always @@ -396,6 +399,26 @@ message GetJobAttemptLogsResponse { string next_page_token = 2; } +message StreamJobAttemptLogsRequest { + // attempt_id identifies the concrete job attempt whose logs to stream. + string attempt_id = 1; + // Opaque cursor returned by a previous stream response. + string cursor = 2; + // job_id identifies a CI job. The stream resolves it to that job's latest attempt. + string job_id = 3; +} + +message StreamJobAttemptLogsResponse { + // line is set when this response carries a persisted log line. + // Status-only responses can omit it while the stream is waiting for rows. + LogLine line = 1; + // Opaque cursor to resume after the emitted line. + string next_cursor = 2; + // attempt_status is the current lowercase attempt state, e.g. "queued", + // "running", "finished", "failed", or "cancelled". + string attempt_status = 3; +} + message LogLine { string step_id = 1; int64 timestamp_ms = 2;