Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions pkg/api/ci.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,22 @@ type CILogStreamTarget struct {
// attempt of a job, resuming from the last cursor after transient stream errors.
// If onStatus is non-nil, it receives attempt status updates from the stream.
func CIStreamJobAttemptLogs(ctx context.Context, token, orgID string, target CILogStreamTarget, w io.Writer, onStatus func(string)) error {
var onStatusWithError func(string) error
if onStatus != nil {
onStatusWithError = func(status string) error {
onStatus(status)
return nil
}
}
return CIStreamJobAttemptLogLines(ctx, token, orgID, target, func(line *civ1.LogLine) error {
return writeLogLine(w, line)
}, onStatusWithError)
}

// CIStreamJobAttemptLogLines streams log line metadata for a job attempt or the
// latest attempt of a job, resuming from the last cursor after transient stream
// errors. Duplicate replayed lines are suppressed before onLine is called.
func CIStreamJobAttemptLogLines(ctx context.Context, token, orgID string, target CILogStreamTarget, onLine func(*civ1.LogLine) error, onStatus func(string) error) error {
if target.AttemptID == "" && target.JobID == "" {
return fmt.Errorf("exactly one of attempt ID or job ID is required")
}
Expand Down Expand Up @@ -123,7 +139,10 @@ func CIStreamJobAttemptLogs(ctx context.Context, token, orgID string, target CIL
msg := stream.Msg()
backoff = ciStreamInitialBackoff
if status := msg.GetAttemptStatus(); status != "" && onStatus != nil {
onStatus(status)
if err := onStatus(status); err != nil {
stream.Close()
return err
}
}

line := msg.GetLine()
Expand All @@ -133,9 +152,11 @@ func CIStreamJobAttemptLogs(ctx context.Context, token, orgID string, target CIL

identity := logLineIdentity(line)
if !seen.Contains(identity) {
if err := writeLogLine(w, line); err != nil {
stream.Close()
return err
if onLine != nil {
if err := onLine(line); err != nil {
stream.Close()
return err
}
}
seen.Add(identity)
if msg.GetNextCursor() != "" {
Expand Down
68 changes: 68 additions & 0 deletions pkg/api/ci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,74 @@ func TestCIStreamJobAttemptLogsReconnectsFromLastWrittenCursorAndSuppressesDupli
}
}

func TestCIStreamJobAttemptLogLinesReturnsMetadataAndSuppressesDuplicates(t *testing.T) {
recorder := &streamLogsRecorder{}
_, handler := civ1connect.NewCIServiceHandler(recorder)
server := httptest.NewServer(h2c.NewHandler(handler, &http2.Server{}))
t.Cleanup(server.Close)

originalBaseURLFunc := baseURLFunc
baseURLFunc = func() string { return server.URL }
t.Cleanup(func() { baseURLFunc = originalBaseURLFunc })

originalInitialBackoff := ciStreamInitialBackoff
ciStreamInitialBackoff = 0
t.Cleanup(func() { ciStreamInitialBackoff = originalInitialBackoff })

var lines []*civ1.LogLine
var statuses []string
if err := CIStreamJobAttemptLogLines(context.Background(), "token-123", "org-123", CILogStreamTarget{AttemptID: "attempt-123"}, func(line *civ1.LogLine) error {
lines = append(lines, proto.Clone(line).(*civ1.LogLine))
return nil
}, func(status string) error {
statuses = append(statuses, status)
return nil
}); err != nil {
t.Fatal(err)
}

if got, want := len(lines), 3; got != want {
t.Fatalf("lines = %d, want %d", got, want)
}
for i, wantBody := range []string{"first", "second", "third"} {
if got := lines[i].GetBody(); got != wantBody {
t.Fatalf("line %d body = %q, want %q", i, got, wantBody)
}
if got, want := lines[i].GetLineNumber(), uint32(i+1); got != want {
t.Fatalf("line %d line number = %d, want %d", i, got, want)
}
if got, want := lines[i].GetTimestampMs(), int64(i+1); got != want {
t.Fatalf("line %d timestamp_ms = %d, want %d", i, got, want)
}
}
if got, want := statuses, []string{"running", "running", "running", "running", "finished"}; !slices.Equal(got, want) {
t.Fatalf("statuses = %v, want %v", got, want)
}
}

func TestCIStreamJobAttemptLogLinesPropagatesLineCallbackError(t *testing.T) {
recorder := &streamLogsRecorder{}
_, handler := civ1connect.NewCIServiceHandler(recorder)
server := httptest.NewServer(h2c.NewHandler(handler, &http2.Server{}))
t.Cleanup(server.Close)

originalBaseURLFunc := baseURLFunc
baseURLFunc = func() string { return server.URL }
t.Cleanup(func() { baseURLFunc = originalBaseURLFunc })

originalInitialBackoff := ciStreamInitialBackoff
ciStreamInitialBackoff = 0
t.Cleanup(func() { ciStreamInitialBackoff = originalInitialBackoff })

callbackErr := errors.New("callback failed")
err := CIStreamJobAttemptLogLines(context.Background(), "token-123", "org-123", CILogStreamTarget{AttemptID: "attempt-123"}, func(*civ1.LogLine) error {
return callbackErr
}, nil)
if !errors.Is(err, callbackErr) {
t.Fatalf("expected callback error, got %v", err)
}
}

type statusOnlyStreamRecorder struct {
civ1connect.UnimplementedCIServiceHandler
requests []*civ1.StreamJobAttemptLogsRequest
Expand Down
Loading
Loading