Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions pkg/api/ci.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package api

import (
"container/list"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"time"

"connectrpc.com/connect"
Expand All @@ -13,6 +19,14 @@ import (

var baseURLFunc = getBaseURL

const ciStreamLogDedupeSize = 4096

var (
ciStreamInitialBackoff = 250 * time.Millisecond
ciStreamMaxBackoff = 30 * time.Second
ciStreamSleep = sleepWithContext
)

func newCIServiceClient() civ1connect.CIServiceClient {
baseURL := baseURLFunc()
return civ1connect.NewCIServiceClient(getHTTPClient(baseURL), baseURL, WithUserAgent())
Expand Down Expand Up @@ -70,6 +84,169 @@ func CIGetJobAttemptLogs(ctx context.Context, token, orgID, attemptID string) ([
return allLines, nil
}

type CILogStreamTarget struct {
AttemptID string
JobID string
}

// CIStreamJobAttemptLogs streams log lines for a job attempt or the latest
// attempt of a job, resuming from the last cursor after transient stream errors.
// If onStatus is non-nil, it receives attempt status updates from the stream.
func CIStreamJobAttemptLogs(ctx context.Context, token, orgID string, target CILogStreamTarget, w io.Writer, onStatus func(string)) error {
if target.AttemptID == "" && target.JobID == "" {
return fmt.Errorf("exactly one of attempt ID or job ID is required")
}
if target.AttemptID != "" && target.JobID != "" {
return fmt.Errorf("exactly one of attempt ID or job ID is required")
}

client := newCIServiceClient()
cursor := ""
backoff := ciStreamInitialBackoff
seen := newLogLineDedupe(ciStreamLogDedupeSize)

for {
req := &civ1.StreamJobAttemptLogsRequest{AttemptId: target.AttemptID, JobId: target.JobID, Cursor: cursor}
stream, err := client.StreamJobAttemptLogs(ctx, WithAuthenticationAndOrg(connect.NewRequest(req), token, orgID))
if err != nil {
if !isTransientConnectError(err) {
return err
}
if err := ciStreamSleep(ctx, backoff); err != nil {
return err
}
backoff = nextCIStreamBackoff(backoff)
continue
}

for stream.Receive() {
msg := stream.Msg()
backoff = ciStreamInitialBackoff
if status := msg.GetAttemptStatus(); status != "" && onStatus != nil {
onStatus(status)
}

line := msg.GetLine()
if line == nil {
continue
}

identity := logLineIdentity(line)
if !seen.Contains(identity) {
if err := writeLogLine(w, line); err != nil {
stream.Close()
return err
}
seen.Add(identity)
if msg.GetNextCursor() != "" {
cursor = msg.GetNextCursor()
}
}
}

err = stream.Err()
stream.Close()
if err == nil {
return nil
}
if !isTransientConnectError(err) {
return err
}
if err := ciStreamSleep(ctx, backoff); err != nil {
return err
}
backoff = nextCIStreamBackoff(backoff)
}
}

func writeLogLine(w io.Writer, line *civ1.LogLine) error {
text := line.GetBody() + "\n"
n, err := io.WriteString(w, text)
if err != nil {
return err
}
if n != len(text) {
return io.ErrShortWrite
}
return nil
}

func isTransientConnectError(err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
switch connect.CodeOf(err) {
case connect.CodeUnavailable, connect.CodeDeadlineExceeded, connect.CodeAborted:
return true
default:
return false
}
}
Comment thread
cursor[bot] marked this conversation as resolved.

func sleepWithContext(ctx context.Context, d time.Duration) error {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return nil
}
}

func nextCIStreamBackoff(current time.Duration) time.Duration {
next := current * 2
if next > ciStreamMaxBackoff {
return ciStreamMaxBackoff
}
return next
}

func logLineIdentity(line *civ1.LogLine) string {
sum := sha256.Sum256([]byte(line.GetBody()))
return fmt.Sprintf("%s:%d:%d:%d:%s", line.GetStepId(), line.GetTimestampMs(), line.GetLineNumber(), line.GetStream(), hex.EncodeToString(sum[:]))
}

type logLineDedupe struct {
capacity int
entries map[string]*list.Element
order *list.List
}

func newLogLineDedupe(capacity int) *logLineDedupe {
return &logLineDedupe{
capacity: capacity,
entries: make(map[string]*list.Element, capacity),
order: list.New(),
}
}

func (d *logLineDedupe) Contains(key string) bool {
if elem, ok := d.entries[key]; ok {
d.order.MoveToFront(elem)
return true
}
return false
}

func (d *logLineDedupe) Add(key string) {
if elem, ok := d.entries[key]; ok {
d.order.MoveToFront(elem)
return
}
elem := d.order.PushFront(key)
d.entries[key] = elem
if d.order.Len() <= d.capacity {
return
}
oldest := d.order.Back()
if oldest == nil {
return
}
d.order.Remove(oldest)
delete(d.entries, oldest.Value.(string))
}

// CIRun triggers a CI run.
func CIRun(ctx context.Context, token, orgID string, req *civ1.RunRequest) (*civ1.RunResponse, error) {
client := newCIServiceClient()
Expand Down
Loading
Loading