diff --git a/go.mod b/go.mod index 0ba548a..b50593e 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/moby/moby/api v1.54.2 github.com/moby/moby/client v0.4.1 github.com/tidwall/jsonc v0.3.3 + google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 2973c52..5665a66 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,8 @@ golang.org/x/sys v0.43.0 h1:Rlag2XtaFTxp19wS8MXlJwTvoh8ArU6ezoyFsMyCTNI= golang.org/x/sys v0.43.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/runtime/docker/build.go b/runtime/docker/build.go index 0b56473..22d7cff 100644 --- a/runtime/docker/build.go +++ b/runtime/docker/build.go @@ -107,29 +107,29 @@ func (r *Runtime) BuildImage(ctx context.Context, spec runtime.BuildSpec, events // // - Classic-builder-style records with `stream` (log lines) and // `status` (layer/pull progress) fields. Pre-BuildKit format. +// Kept as a fallback — modern dockerd never emits these under +// BuildKit, but harmless if a future daemon falls back. // // - BuildKit records of the form `{"id":"moby.buildkit.trace", -// "aux":""}` for per-step progress and -// `{"id":"moby.image.id","aux":{"ID":"sha256:..."}}` for the -// final image. The aux protobuf is buildkit's `SolveStatus` — -// decoding requires the buildkit module. We intentionally don't -// pull that dep in: per-step progress events are silently dropped -// under BuildKit; BuildStart / BuildCompleted (emitted by the -// caller and at the end of BuildImage) still fire correctly, and -// errors still propagate via `errorDetail` / `error` fields. -// A future PR can revisit if vertex-level progress is needed. +// "aux":""}` for per-step progress. The aux +// payload is decoded by buildkitTraceDecoder via protowire — no +// dependency on github.com/moby/buildkit. See buildkit_trace.go +// for the subset of the StatusResponse schema we parse. func streamBuildOutput(ctx context.Context, body io.ReadCloser, events chan<- runtime.BuildEvent) error { defer body.Close() type buildMsg struct { - Stream string `json:"stream,omitempty"` - Status string `json:"status,omitempty"` + Stream string `json:"stream,omitempty"` + Status string `json:"status,omitempty"` + ID string `json:"id,omitempty"` + Aux json.RawMessage `json:"aux,omitempty"` ErrorDetail *struct { Message string `json:"message"` } `json:"errorDetail,omitempty"` Error string `json:"error,omitempty"` } + trace := newBuildkitTraceDecoder() dec := json.NewDecoder(body) for { select { @@ -162,6 +162,9 @@ func streamBuildOutput(ctx context.Context, body io.ReadCloser, events chan<- ru Message: msg.Status, }) } + if msg.ID == "moby.buildkit.trace" && len(msg.Aux) > 0 { + trace.handleAux(msg.Aux, events) + } } } diff --git a/runtime/docker/buildkit_trace.go b/runtime/docker/buildkit_trace.go new file mode 100644 index 0000000..b6d7798 --- /dev/null +++ b/runtime/docker/buildkit_trace.go @@ -0,0 +1,230 @@ +package docker + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "strings" + + "google.golang.org/protobuf/encoding/protowire" + + "github.com/crunchloop/devcontainer/runtime" +) + +// buildkitTraceDecoder decodes the `moby.buildkit.trace` aux records +// dockerd emits when building under BuildKit. The aux payload is a +// base64-encoded `controlapi.StatusResponse` protobuf: +// +// message StatusResponse { +// repeated Vertex vertexes = 1; +// repeated VertexStatus statuses = 2; +// repeated VertexLog logs = 3; +// } +// message Vertex { +// string digest = 1; repeated string inputs = 2; string name = 3; +// bool cached = 4; Timestamp started = 5; Timestamp completed = 6; +// string error = 7; ... +// } +// message VertexLog { +// string vertex = 1; Timestamp timestamp = 2; int64 stream = 3; +// bytes msg = 4; +// } +// +// We parse the wire format directly via protowire — buildkit's own Go +// types live in github.com/moby/buildkit, which would pull ~250 +// transitive modules (containerd, k8s, sigstore, AWS/Azure SDKs). The +// fields we care about (name, cached, started, completed, error, log +// msg) are stable and a hand-roll is ~150 LOC. +// +// State is tracked across StatusResponse updates: BuildKit re-sends the +// same vertex digest with incremental field updates. We dedupe by +// emitting BuildLayerEvent only on start- and complete-transitions per +// digest; VertexLog records always emit BuildLogEvent. +type buildkitTraceDecoder struct { + seenStart map[string]bool + seenComplete map[string]bool +} + +func newBuildkitTraceDecoder() *buildkitTraceDecoder { + return &buildkitTraceDecoder{ + seenStart: map[string]bool{}, + seenComplete: map[string]bool{}, + } +} + +// handleAux base64-decodes and parses a moby.buildkit.trace aux +// payload. Best-effort: malformed records are silently ignored — the +// build's authoritative success/failure is reported via the outer +// JSON-line stream's `error`/`errorDetail` fields, not via aux. +func (d *buildkitTraceDecoder) handleAux(aux json.RawMessage, events chan<- runtime.BuildEvent) { + var b64 string + if err := json.Unmarshal(aux, &b64); err != nil { + return + } + raw, err := base64.StdEncoding.DecodeString(b64) + if err != nil { + return + } + d.decodeStatus(raw, events) +} + +func (d *buildkitTraceDecoder) decodeStatus(buf []byte, events chan<- runtime.BuildEvent) { + for len(buf) > 0 { + num, typ, n := protowire.ConsumeTag(buf) + if n < 0 { + return + } + buf = buf[n:] + switch { + case num == 1 && typ == protowire.BytesType: // Vertex + v, m := protowire.ConsumeBytes(buf) + if m < 0 { + return + } + d.decodeVertex(v, events) + buf = buf[m:] + case num == 3 && typ == protowire.BytesType: // VertexLog + v, m := protowire.ConsumeBytes(buf) + if m < 0 { + return + } + d.decodeLog(v, events) + buf = buf[m:] + default: + m := protowire.ConsumeFieldValue(num, typ, buf) + if m < 0 { + return + } + buf = buf[m:] + } + } +} + +func (d *buildkitTraceDecoder) decodeVertex(buf []byte, events chan<- runtime.BuildEvent) { + var ( + digest, name, vErr string + cached bool + hasStarted, hasCompleted bool + ) + for len(buf) > 0 { + num, typ, n := protowire.ConsumeTag(buf) + if n < 0 { + return + } + buf = buf[n:] + switch { + case num == 1 && typ == protowire.BytesType: + s, m := protowire.ConsumeBytes(buf) + if m < 0 { + return + } + digest = string(s) + buf = buf[m:] + case num == 3 && typ == protowire.BytesType: + s, m := protowire.ConsumeBytes(buf) + if m < 0 { + return + } + name = string(s) + buf = buf[m:] + case num == 4 && typ == protowire.VarintType: + v, m := protowire.ConsumeVarint(buf) + if m < 0 { + return + } + cached = v != 0 + buf = buf[m:] + case num == 5 && typ == protowire.BytesType: + _, m := protowire.ConsumeBytes(buf) + if m < 0 { + return + } + hasStarted = true + buf = buf[m:] + case num == 6 && typ == protowire.BytesType: + _, m := protowire.ConsumeBytes(buf) + if m < 0 { + return + } + hasCompleted = true + buf = buf[m:] + case num == 7 && typ == protowire.BytesType: + s, m := protowire.ConsumeBytes(buf) + if m < 0 { + return + } + vErr = string(s) + buf = buf[m:] + default: + m := protowire.ConsumeFieldValue(num, typ, buf) + if m < 0 { + return + } + buf = buf[m:] + } + } + if digest == "" || name == "" { + return + } + if hasStarted && !d.seenStart[digest] { + d.seenStart[digest] = true + emitBuildEvent(events, runtime.BuildEvent{ + Kind: runtime.BuildEventLayer, + Message: fmt.Sprintf("START %s", name), + LayerID: digest, + }) + } + if hasCompleted && !d.seenComplete[digest] { + d.seenComplete[digest] = true + status := "DONE" + switch { + case vErr != "": + status = "ERROR: " + vErr + case cached: + status = "CACHED" + } + emitBuildEvent(events, runtime.BuildEvent{ + Kind: runtime.BuildEventLayer, + Message: fmt.Sprintf("%s %s", status, name), + LayerID: digest, + }) + } +} + +func (d *buildkitTraceDecoder) decodeLog(buf []byte, events chan<- runtime.BuildEvent) { + var msg []byte + for len(buf) > 0 { + num, typ, n := protowire.ConsumeTag(buf) + if n < 0 { + return + } + buf = buf[n:] + switch { + case num == 4 && typ == protowire.BytesType: + s, m := protowire.ConsumeBytes(buf) + if m < 0 { + return + } + msg = s + buf = buf[m:] + default: + m := protowire.ConsumeFieldValue(num, typ, buf) + if m < 0 { + return + } + buf = buf[m:] + } + } + if len(msg) == 0 { + return + } + for _, line := range strings.Split(strings.TrimRight(string(msg), "\n"), "\n") { + if line == "" { + continue + } + emitBuildEvent(events, runtime.BuildEvent{ + Kind: runtime.BuildEventLog, + Message: line, + }) + } +} diff --git a/runtime/docker/buildkit_trace_test.go b/runtime/docker/buildkit_trace_test.go new file mode 100644 index 0000000..61da001 --- /dev/null +++ b/runtime/docker/buildkit_trace_test.go @@ -0,0 +1,231 @@ +package docker + +import ( + "encoding/base64" + "encoding/json" + "testing" + + "google.golang.org/protobuf/encoding/protowire" + + "github.com/crunchloop/devcontainer/runtime" +) + +// Wire-format helpers — build the minimal subset of StatusResponse we +// need to exercise the decoder, without depending on buildkit's +// generated Go types. + +func appendString(buf []byte, fieldNum int, s string) []byte { + buf = protowire.AppendTag(buf, protowire.Number(fieldNum), protowire.BytesType) + return protowire.AppendString(buf, s) +} + +func appendBool(buf []byte, fieldNum int, b bool) []byte { + buf = protowire.AppendTag(buf, protowire.Number(fieldNum), protowire.VarintType) + v := uint64(0) + if b { + v = 1 + } + return protowire.AppendVarint(buf, v) +} + +func appendSubmessage(buf []byte, fieldNum int, sub []byte) []byte { + buf = protowire.AppendTag(buf, protowire.Number(fieldNum), protowire.BytesType) + return protowire.AppendBytes(buf, sub) +} + +// vertex builds a Vertex submessage payload. +func vertex(digest, name string, cached, started, completed bool, vErr string) []byte { + var b []byte + if digest != "" { + b = appendString(b, 1, digest) + } + if name != "" { + b = appendString(b, 3, name) + } + if cached { + b = appendBool(b, 4, true) + } + if started { + // Empty Timestamp submessage — presence is what we test for. + b = appendSubmessage(b, 5, nil) + } + if completed { + b = appendSubmessage(b, 6, nil) + } + if vErr != "" { + b = appendString(b, 7, vErr) + } + return b +} + +func vertexLog(vertex string, msg []byte) []byte { + var b []byte + if vertex != "" { + b = appendString(b, 1, vertex) + } + if len(msg) > 0 { + b = protowire.AppendTag(b, 4, protowire.BytesType) + b = protowire.AppendBytes(b, msg) + } + return b +} + +func statusResponse(vertexes [][]byte, logs [][]byte) []byte { + var b []byte + for _, v := range vertexes { + b = appendSubmessage(b, 1, v) + } + for _, l := range logs { + b = appendSubmessage(b, 3, l) + } + return b +} + +// drain collects events from a channel until it's empty (non-blocking). +func drain(ch chan runtime.BuildEvent) []runtime.BuildEvent { + var out []runtime.BuildEvent + for { + select { + case e := <-ch: + out = append(out, e) + default: + return out + } + } +} + +func auxJSON(t *testing.T, payload []byte) json.RawMessage { + t.Helper() + b64 := base64.StdEncoding.EncodeToString(payload) + raw, err := json.Marshal(b64) + if err != nil { + t.Fatal(err) + } + return raw +} + +func TestBuildkitTrace_VertexStartCompleteOnce(t *testing.T) { + d := newBuildkitTraceDecoder() + ch := make(chan runtime.BuildEvent, 16) + + // First update: vertex appears in "started" state. + d.handleAux(auxJSON(t, statusResponse( + [][]byte{vertex("sha256:aaa", "[1/2] RUN echo hi", false, true, false, "")}, + nil, + )), ch) + + // Second update: same vertex, now also completed. + d.handleAux(auxJSON(t, statusResponse( + [][]byte{vertex("sha256:aaa", "[1/2] RUN echo hi", false, true, true, "")}, + nil, + )), ch) + + // Third update: same vertex again — should emit nothing. + d.handleAux(auxJSON(t, statusResponse( + [][]byte{vertex("sha256:aaa", "[1/2] RUN echo hi", false, true, true, "")}, + nil, + )), ch) + + got := drain(ch) + if len(got) != 2 { + t.Fatalf("event count = %d, want 2: %+v", len(got), got) + } + if got[0].Kind != runtime.BuildEventLayer || got[0].Message != "START [1/2] RUN echo hi" || got[0].LayerID != "sha256:aaa" { + t.Errorf("start event = %+v", got[0]) + } + if got[1].Kind != runtime.BuildEventLayer || got[1].Message != "DONE [1/2] RUN echo hi" { + t.Errorf("done event = %+v", got[1]) + } +} + +func TestBuildkitTrace_CachedAndError(t *testing.T) { + d := newBuildkitTraceDecoder() + ch := make(chan runtime.BuildEvent, 16) + + d.handleAux(auxJSON(t, statusResponse( + [][]byte{ + vertex("sha256:cached", "[cached] FROM alpine", true, true, true, ""), + vertex("sha256:err", "[2/2] RUN false", false, true, true, "exit code 1"), + }, + nil, + )), ch) + + got := drain(ch) + if len(got) != 4 { + t.Fatalf("event count = %d, want 4: %+v", len(got), got) + } + // Order within a single StatusResponse: start then complete per vertex, + // vertexes in field-order. We assert by digest+kind, not strict order. + byMsg := map[string]runtime.BuildEvent{} + for _, e := range got { + byMsg[e.Message] = e + } + if e, ok := byMsg["CACHED [cached] FROM alpine"]; !ok || e.LayerID != "sha256:cached" { + t.Errorf("missing CACHED event: %+v", byMsg) + } + if e, ok := byMsg["ERROR: exit code 1 [2/2] RUN false"]; !ok || e.LayerID != "sha256:err" { + t.Errorf("missing ERROR event: %+v", byMsg) + } +} + +func TestBuildkitTrace_LogSplitsLines(t *testing.T) { + d := newBuildkitTraceDecoder() + ch := make(chan runtime.BuildEvent, 16) + + d.handleAux(auxJSON(t, statusResponse( + nil, + [][]byte{vertexLog("sha256:x", []byte("line one\nline two\n\nline three\n"))}, + )), ch) + + got := drain(ch) + if len(got) != 3 { + t.Fatalf("event count = %d, want 3: %+v", len(got), got) + } + for i, want := range []string{"line one", "line two", "line three"} { + if got[i].Kind != runtime.BuildEventLog || got[i].Message != want { + t.Errorf("event[%d] = %+v, want log %q", i, got[i], want) + } + } +} + +func TestBuildkitTrace_MalformedSilentlyIgnored(t *testing.T) { + d := newBuildkitTraceDecoder() + ch := make(chan runtime.BuildEvent, 4) + + // Not valid base64. + d.handleAux(json.RawMessage(`"!!!not-base64!!!"`), ch) + // Not a JSON string. + d.handleAux(json.RawMessage(`{"oops":true}`), ch) + // Valid base64 but garbage protobuf — protowire's Consume returns + // negative lengths and the decoder bails. No panic, no events. + d.handleAux(auxJSON(t, []byte{0xff, 0xff, 0xff, 0xff}), ch) + + if got := drain(ch); len(got) != 0 { + t.Errorf("expected no events on malformed input, got %+v", got) + } +} + +func TestBuildkitTrace_UnknownFieldsSkipped(t *testing.T) { + // Forward-compat: a StatusResponse with an unknown high-numbered + // field type should be skipped without breaking known field decoding. + d := newBuildkitTraceDecoder() + ch := make(chan runtime.BuildEvent, 4) + + payload := statusResponse( + [][]byte{vertex("sha256:fwd", "[fwd] step", false, true, true, "")}, + nil, + ) + // Append an unknown varint field (field number 99). + payload = protowire.AppendTag(payload, 99, protowire.VarintType) + payload = protowire.AppendVarint(payload, 42) + // And an unknown length-delimited field (100). + payload = protowire.AppendTag(payload, 100, protowire.BytesType) + payload = protowire.AppendString(payload, "future-field-value") + + d.handleAux(auxJSON(t, payload), ch) + + got := drain(ch) + if len(got) != 2 { + t.Errorf("expected 2 events despite unknown fields, got %d: %+v", len(got), got) + } +} diff --git a/test/integration/buildkit_trace_events_test.go b/test/integration/buildkit_trace_events_test.go new file mode 100644 index 0000000..ff24c8a --- /dev/null +++ b/test/integration/buildkit_trace_events_test.go @@ -0,0 +1,117 @@ +//go:build integration + +package integration + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + devcontainer "github.com/crunchloop/devcontainer" + "github.com/crunchloop/devcontainer/events" +) + +// TestBuildKit_PerVertexEventsEmitted asserts that BuildKit's +// per-vertex progress (moby.buildkit.trace aux records) is decoded and +// surfaced as BuildLayerEvent + BuildLogEvent for the dockerfile build. +// +// Regression guard for runtime/docker/buildkit_trace.go: if the aux +// decoder is removed or streamBuildOutput stops routing aux records to +// it, this test goes silent — zero dockerfile layer events and the RUN +// marker never appears in the log stream. +// +// The Dockerfile embeds a unique per-run marker in the RUN step so the +// build can't satisfy that vertex from cache; otherwise the test would +// still pass via "CACHED" layer events but the log assertion would +// flake based on prior runs. +func TestBuildKit_PerVertexEventsEmitted(t *testing.T) { + if testing.Short() { + t.Skip() + } + + eng, rt := newEngine(t) + defer rt.Close() + + dir := t.TempDir() + marker := fmt.Sprintf("BUILDKIT_TRACE_PROBE_%d", time.Now().UnixNano()) + mustWrite(t, filepath.Join(dir, "Dockerfile"), fmt.Sprintf(` +FROM alpine:3.20 +RUN echo %s +`, marker)) + mustWrite(t, filepath.Join(dir, ".devcontainer", "devcontainer.json"), `{ + "build": { "dockerfile": "Dockerfile", "context": ".." } + }`) + + evCh := make(chan events.Event, 1024) + var ( + mu sync.Mutex + dockerfileLayer int + dockerfileLogs []string + ) + consumerDone := make(chan struct{}) + go func() { + defer close(consumerDone) + for ev := range evCh { + switch e := ev.(type) { + case events.BuildLayerEvent: + if e.Source == events.BuildSourceDockerfile { + mu.Lock() + dockerfileLayer++ + mu.Unlock() + } + case events.BuildLogEvent: + if e.Source == events.BuildSourceDockerfile { + mu.Lock() + dockerfileLogs = append(dockerfileLogs, e.Line) + mu.Unlock() + } + } + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + wsObj, err := eng.Up(ctx, devcontainer.UpOptions{ + LocalWorkspaceFolder: dir, + Recreate: true, + SkipLifecycle: true, + Events: evCh, + }) + // Per UpOptions.Events doc: close only after Up returns. + close(evCh) + <-consumerDone + if err != nil { + t.Fatalf("Up: %v", err) + } + defer func() { _ = eng.Down(context.Background(), wsObj, devcontainer.DownOptions{Remove: true}) }() + + mu.Lock() + gotLayer := dockerfileLayer + gotLogs := append([]string(nil), dockerfileLogs...) + mu.Unlock() + + // A 2-step BuildKit graph emits multiple vertex transitions (resolve + // image config, FROM, internal load-build-context, RUN — start + + // done per vertex). 2 is a conservative floor that proves the + // decoder is firing at all. + if gotLayer < 2 { + t.Errorf("expected >=2 BuildLayerEvent from dockerfile build, got %d (aux decoder dead?)", gotLayer) + } + + var sawMarker bool + for _, line := range gotLogs { + if strings.Contains(line, marker) { + sawMarker = true + break + } + } + if !sawMarker { + t.Errorf("RUN marker %q missing from %d log events (VertexLog decoder dead?): %v", + marker, len(gotLogs), gotLogs) + } +}