Skip to content

Commit

Permalink
extract image digest from buildkit logs
Browse files Browse the repository at this point in the history
  • Loading branch information
damoon committed Nov 14, 2020
1 parent b0d8b2a commit eb1d469
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 101 deletions.
8 changes: 5 additions & 3 deletions hack/sniffer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
func main() {
http.HandleFunc("/", sniffer)

log.Println("proxy http traffic from :23765 to http://127.0.0.1:2376")

if err := http.ListenAndServe(":23765", nil); err != nil {
log.Fatalf("listen and serve: %v", err)
}
Expand Down Expand Up @@ -53,7 +55,7 @@ func serveReverseProxy(target string, w http.ResponseWriter, r *http.Request) er
return printRespose(os.Stdout, w)
}

proxy.ServeHTTP(r, w)
proxy.ServeHTTP(w, r)

return nil
}
Expand All @@ -66,7 +68,7 @@ func printRequest(w io.Writer, r *http.Request) (*bytes.Reader, error) {
return nil, fmt.Errorf("reading body: %v", err)
}

w.Write([]byte(fmt.Sprintf("req: %v\n\n", req)))
w.Write([]byte(fmt.Sprintf("req: %v\n\n", r)))

r.Body = ioutil.NopCloser(bytes.NewBuffer(body))
reader := bytes.NewReader(body)
Expand All @@ -75,7 +77,7 @@ func printRequest(w io.Writer, r *http.Request) (*bytes.Reader, error) {
}

func printRespose(w io.Writer, resp *http.Response) error {
b, err := ioutil.ReadAll(wp.Body)
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
Expand Down
57 changes: 34 additions & 23 deletions pkg/build.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package wedding

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -9,6 +10,7 @@ import (
"log"
"net/http"
"os"
"regexp"
"strconv"
"time"

Expand Down Expand Up @@ -67,13 +69,9 @@ func (s Service) build(w http.ResponseWriter, r *http.Request) {

err = s.executeBuild(ctx, cfg, w)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("execute build: %v", err)))
log.Printf("execute build: %v", err)
return
}

w.Write([]byte(`{"aux":{"ID":"sha256:42341736246f8e99122d49e4c0e414f0a3e5f69a024e72a2ac1a39a2093d483f"}}`))
}

func buildParameters(r *http.Request) (*buildConfig, error) {
Expand Down Expand Up @@ -316,9 +314,9 @@ func (s Service) executeBuild(ctx context.Context, cfg *buildConfig, w http.Resp
imageNames += fmt.Sprintf("wedding-registry:5000/images/%s", tag)
}

output := "--output type=image,push=true,name=wedding-registry:5000/digests"
destination := "--output type=image,push=true,name=wedding-registry:5000/digests"
if imageNames != "" {
output = fmt.Sprintf("--output type=image,push=true,\"name=%s\"", imageNames)
destination = fmt.Sprintf("--output type=image,push=true,\"name=%s\"", imageNames)
}

// TODO add timeout for script
Expand All @@ -340,7 +338,7 @@ buildctl-daemonless.sh \
%s \
--export-cache=type=registry,ref=wedding-registry:5000/cache-repo,mode=max \
--import-cache=type=registry,ref=wedding-registry:5000/cache-repo
`, presignedContextURL, output)
`, presignedContextURL, destination)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -392,38 +390,51 @@ buildctl-daemonless.sh \
},
}

streamer := streamer{w: w}
o := &output{w: w}
d := &digestParser{w: o}
err = s.executePod(ctx, pod, d)
if err != nil {
o.Errorf("execute build: %v", err)
return err
}

err = s.executePod(ctx, pod, streamer)
err = d.publish(w)
if err != nil {
return err
}

return nil
}

type streamer struct {
w io.Writer
type digestParser struct {
buf bytes.Buffer
w io.Writer
}

func (s streamer) Write(b []byte) (int, error) {
i := len(b)
func (d *digestParser) publish(w io.Writer) error {
patterns := regexp.
MustCompile(`exporting manifest (sha256:[0-9a-f]+)`).
FindStringSubmatch(d.buf.String())

b, err := json.Marshal(string(b))
if err != nil {
panic(err) // encode a string to json should not fail
if len(patterns) != 2 || patterns[1] == "" {
return fmt.Errorf("digest not found")
}

_, err = s.w.Write([]byte(fmt.Sprintf(`{"stream": %s}`, b)))
log.Printf("found digest: %s", patterns[1])

_, err := w.Write([]byte(fmt.Sprintf(`{"aux":{"ID":"%s"}}`, patterns[1])))
if err != nil {
return 0, err
return err
}

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
} else {
return 0, fmt.Errorf("stream can not be flushed")
return nil
}

func (d *digestParser) Write(bb []byte) (int, error) {
_, err := d.buf.Write(bb)
if err != nil {
return 0, err
}

return i, nil
return d.w.Write(bb)
}
78 changes: 78 additions & 0 deletions pkg/build_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package wedding

import (
"bytes"
"io"
"io/ioutil"
"testing"
)

func Test_digestParser_publish(t *testing.T) {
type fields struct {
buf bytes.Buffer
w io.Writer
}
tests := []struct {
name string
fields fields
input string
wantW string
wantErr bool
}{
{
name: "found",
fields: fields{
buf: bytes.Buffer{},
w: ioutil.Discard,
},
input: `#5 [2/2] RUN sleep 1
#5 DONE 1.2s
#7 exporting to image
#7 exporting layers
#7 exporting layers 0.4s done
#7 exporting manifest sha256:d8438874a02b14e2ad7be50f7505ec3d9fe645964e6987101179ef42f8bed5b6 0.0s done
#7 exporting config sha256:0d3cc5d5b92a708fbabb79b63b59839ca87012742a1b5e741cf51fd1ad14b804 0.0s done
#7 pushing layers
#7 pushing layers 0.2s done
#7 pushing manifest for wedding-registry:5000/digests:latest
#7 pushing manifest for wedding-registry:5000/digests:latest 0.1s done
#7 DONE 0.7s
#8 exporting cache
#8 preparing build cache for export 0.1s done
#8 writing layer sha256:166a2418f7e86fa48d87bf6807b4e5b35f078acb2ad1cbf10444a7025913c24f
#8 writing layer sha256:166a2418f7e86fa48d87bf6807b4e5b35f078acb2ad1cbf10444a7025913c24f done
#8 writing layer sha256:1966ea362d2394e7c5c508ebf3695f039dd3825bd1e7a07449ae530aea3c4cd1 done
#8 writing layer sha256:5a9f1c0027a73bc0e66a469f90e47a59e23ab3472126ed28e6a4e7b1a98d1eb5 done
#8 writing layer sha256:b5c20b2b484f5ca9bc9d98dc79f8f1381ee0c063111ea0ddf42d1ae5ea942d50 done
#8 writing layer sha256:bb79b6b2107fea8e8a47133a660b78e3a546998fcf0427be39ac9a0af4a97e90 done
#8 writing layer sha256:e2eabaeb95d9574853154f705dc7ebce6184a95cd153e3ff87108e200267aa0a 0.1s done
#8 writing config sha256:d7e07a2c74d972a2a645ea9ba4d4970a71b2795611236ff61810cb30b60d7725 0.1s done
#8 writing manifest sha256:3acb5d16e32a8cf7094e195a5d24ca15d4fbe8a433a8bd5cc2365040739eb2dc`,
wantW: `{"aux":{"ID":"sha256:d8438874a02b14e2ad7be50f7505ec3d9fe645964e6987101179ef42f8bed5b6"}}`,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := digestParser{
buf: tt.fields.buf,
w: tt.fields.w,
}

if _, err := d.Write([]byte(tt.input)); err != nil {
t.Errorf("digestParser.Write() error = %v", err)
return
}

w := &bytes.Buffer{}
if err := d.publish(w); (err != nil) != tt.wantErr {
t.Errorf("digestParser.publish() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotW := w.String(); gotW != tt.wantW {
t.Errorf("digestParser.publish() = %v, want %v", gotW, tt.wantW)
}
})
}
}
2 changes: 0 additions & 2 deletions pkg/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ func (s Service) executePod(ctx context.Context, pod *corev1.Pod, w io.Writer) e
return
}

return

w.Write([]byte("Deleting pod.\n"))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down
52 changes: 52 additions & 0 deletions pkg/mock.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,62 @@
package wedding

import (
"encoding/json"
"fmt"
"io"
"net/http"
)

type output struct {
w io.Writer
}

func (o output) Write(b []byte) (int, error) {
i := len(b)

b, err := json.Marshal(string(b))
if err != nil {
return 0, err
}

_, err = o.w.Write([]byte(fmt.Sprintf(`{"stream": %s}`, b)))
if err != nil {
return 0, err
}

if f, ok := o.w.(http.Flusher); ok {
f.Flush()
} else {
return 0, fmt.Errorf("stream can not be flushed")
}

return i, nil
}

func (o output) Errorf(e string, args ...interface{}) error {
return o.Error(fmt.Sprintf(e, args...))
}

func (o output) Error(e string) error {
b, err := json.Marshal(string(e))
if err != nil {
return err
}

_, err = o.w.Write([]byte(fmt.Sprintf(`{"errorDetail": {"code": %d, "message": %s}, "error": %s}`, 1, b, b)))
if err != nil {
return err
}

if f, ok := o.w.(http.Flusher); ok {
f.Flush()
} else {
return fmt.Errorf("stream can not be flushed")
}

return nil
}

func ping(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Api-Version", apiVersion)
w.Header().Set("Docker-Experimental", "false")
Expand Down
44 changes: 4 additions & 40 deletions pkg/pull.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package wedding

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
Expand Down Expand Up @@ -139,43 +136,10 @@ skopeo copy --dest-tls-verify=false docker://%s docker://%s
},
}

b := &bytes.Buffer{}
messanger := streamer{w: w}
err = s.executePod(r.Context(), pod, b)
o := &output{w: w}
err = s.executePod(r.Context(), pod, o)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.Copy(w, b)
w.Write([]byte(fmt.Sprintf("execute push: %v", err)))
log.Printf("execute push: %v", err)
return
}

w.WriteHeader(http.StatusOK)
io.Copy(messanger, b)
}

type messanger struct {
w io.Writer
}

func (m messanger) Write(b []byte) (int, error) {
i := len(b)

b, err := json.Marshal(string(b))
if err != nil {
panic(err) // encode a string to json should not fail
}

_, err = m.w.Write([]byte(fmt.Sprintf(`{"message": %s}`, b)))
if err != nil {
return 0, err
}

if f, ok := m.w.(http.Flusher); ok {
f.Flush()
} else {
return 0, fmt.Errorf("stream can not be flushed")
log.Printf("execute pull: %v", err)
o.Errorf("execute pull: %v", err)
}

return i, nil
}
15 changes: 3 additions & 12 deletions pkg/push.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package wedding

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"time"
Expand Down Expand Up @@ -101,17 +99,10 @@ skopeo copy --src-tls-verify=false --dest-tls-verify=false docker://%s docker://
},
}

b := &bytes.Buffer{}
messanger := streamer{w: w}
err = s.executePod(r.Context(), pod, b)
o := &output{w: w}
err = s.executePod(r.Context(), pod, o)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
io.Copy(w, b)
w.Write([]byte(fmt.Sprintf("execute push: %v", err)))
log.Printf("execute push: %v", err)
return
o.Errorf("execute push: %v", err)
}

w.WriteHeader(http.StatusOK)
io.Copy(messanger, b)
}
Loading

0 comments on commit eb1d469

Please sign in to comment.