diff --git a/Tiltfile b/Tiltfile index 27025fd6..cfeecb2c 100644 --- a/Tiltfile +++ b/Tiltfile @@ -7,6 +7,7 @@ min_tilt_version('0.15.0') # includes fix for auto_init+False with tilt ci include('./services/Tiltfile') include('./tests/Tiltfile') +k8s_yaml('deployment/config.yaml') k8s_yaml('deployment/kubernetes.yaml') if os.environ.get('PROD', '') == '': @@ -57,5 +58,5 @@ else: k8s_resource( 'wedding', port_forwards=['12376:2376'], - resource_deps=['setup-s3-bucket', 'wedding-registry'], + resource_deps=['setup-s3-bucket', 'wedding-registry', 'docker-io-mirror'], ) diff --git a/deployment/config.yaml b/deployment/config.yaml new file mode 100644 index 00000000..6108f2f0 --- /dev/null +++ b/deployment/config.yaml @@ -0,0 +1,22 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: buildkitd-config +data: + buildkitd.toml: | + debug = true + + [worker.oci] + rootless = true + noProcessSandbox = true + + [registry."docker.io"] + mirrors = ["docker-io-mirror:5000"] + + [registry."docker-io-mirror:5000"] + http = true + insecure = true + + [registry."wedding-registry:5000"] + http = true + insecure = true diff --git a/pkg/build.go b/pkg/build.go index 7f02a637..5272d93d 100644 --- a/pkg/build.go +++ b/pkg/build.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "log" "net/http" @@ -275,9 +276,6 @@ func (o ObjectStore) deleteContext(ctx context.Context, cfg *buildConfig) error } func (s Service) executeBuild(ctx context.Context, cfg *buildConfig, w http.ResponseWriter) error { - - stream(w, fmt.Sprintf("%v", cfg)) - presignedContextURL, err := s.objectStore.presignContext(cfg) if err != nil { return err @@ -325,19 +323,11 @@ func (s Service) executeBuild(ctx context.Context, cfg *buildConfig, w http.Resp // TODO add timeout for script buildScript := fmt.Sprintf(` -set -euo pipefail +set -euxo pipefail mkdir ~/context && cd ~/context -mkdir -p ~/.config/buildkit/ -echo " -[registry.\"wedding-registry:5000\"] -http = true -insecure = true -" > ~/.config/buildkit/buildkitd.toml - -echo Downloading context -wget -O - "%s" | tar -xf - # --quiet +wget -O - "%s" | tar -xf - export BUILDKITD_FLAGS="--oci-worker-no-process-sandbox" export BUILDCTL_CONNECT_RETRIES_MAX=100 @@ -352,8 +342,6 @@ buildctl-daemonless.sh \ --import-cache=type=registry,ref=wedding-registry:5000/cache-repo `, presignedContextURL, output) - stream(w, buildScript) - pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "wedding-build-", @@ -373,6 +361,10 @@ buildctl-daemonless.sh \ MountPath: "/home/user/.docker", Name: "docker-config", }, + { + MountPath: "/home/user/.config/buildkit", + Name: "buildkitd-config", + }, }, }, }, @@ -385,15 +377,53 @@ buildctl-daemonless.sh \ }, }, }, + { + Name: "buildkitd-config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "buildkitd-config", + }, + }, + }, + }, }, RestartPolicy: corev1.RestartPolicyNever, }, } - err = s.executePod(ctx, pod, w) + streamer := streamer{w: w} + + err = s.executePod(ctx, pod, streamer) if err != nil { return err } return nil } + +type streamer struct { + w io.Writer +} + +func (s streamer) Write(b []byte) (int, error) { + i := len(b) + + b, err := json.Marshal(string(b)) + if err != nil { + panic(err) // encode a string to json should not fail + } + + _, err = s.w.Write([]byte(fmt.Sprintf(`{"stream": %s}`, b))) + if err != nil { + return 0, err + } + + if f, ok := s.w.(http.Flusher); ok { + f.Flush() + } else { + return 0, fmt.Errorf("stream can not be flushed") + } + + return i, nil +} diff --git a/pkg/kubernetes.go b/pkg/kubernetes.go index 7a9f4e31..47fddc5a 100644 --- a/pkg/kubernetes.go +++ b/pkg/kubernetes.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "log" - "net/http" "time" corev1 "k8s.io/api/core/v1" @@ -16,7 +15,7 @@ import ( func (s Service) executePod(ctx context.Context, pod *corev1.Pod, w io.Writer) error { podClient := s.kubernetesClient.CoreV1().Pods(s.namespace) - stream(w, "Creating new pod.\n") + w.Write([]byte("Creating new pod.\n")) pod, err := podClient.Create(ctx, pod, metav1.CreateOptions{}) if err != nil { @@ -30,11 +29,13 @@ func (s Service) executePod(ctx context.Context, pod *corev1.Pod, w io.Writer) e defer func() { if failed { - stream(w, "Pod failed. Skipping cleanup.\n") + w.Write([]byte("Pod failed. Skipping cleanup.\n")) return } - stream(w, "Deleting pod.\n") + return + + w.Write([]byte("Deleting pod.\n")) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -42,18 +43,18 @@ func (s Service) executePod(ctx context.Context, pod *corev1.Pod, w io.Writer) e err = podClient.Delete(ctx, pod.Name, metav1.DeleteOptions{}) if err != nil { streamf(w, "Pod deletetion failed: %v\n", err) - log.Printf("delete pod: %v", err) + log.Printf("delete pod %s: %v", pod.Name, err) } }() - stream(w, "Waiting for pod execution.\n") + w.Write([]byte("Waiting for pod execution.\n")) waitRunning: pod, err = s.kubernetesClient.CoreV1().Pods(s.namespace).Get(ctx, pod.Name, metav1.GetOptions{}) if err != nil { streamf(w, "Looking up pod: %v.\n", err) - return fmt.Errorf("look up pod: %v", err) + return fmt.Errorf("look up pod %s: %v", pod.Name, err) } switch pod.Status.Phase { @@ -70,14 +71,14 @@ waitRunning: } printLogs: - stream(w, "Streaming logs.\n") + // w.Write([]byte("Streaming logs.\n")) podLogs, err := s.kubernetesClient.CoreV1().Pods(s.namespace). GetLogs(pod.Name, &corev1.PodLogOptions{Follow: true}). Stream(ctx) if err != nil { streamf(w, "Log streaming failed: %v\n", err) - return fmt.Errorf("streaming logs: %v", err) + return fmt.Errorf("streaming pod %s logs: %v", pod.Name, err) } defer podLogs.Close() @@ -87,17 +88,34 @@ printLogs: n, err := podLogs.Read(buf) if err != nil { if err == io.EOF { - stream(w, "End of logs reached.\n") + // w.Write([]byte("End of logs reached.\n")) if failed { - return fmt.Errorf("pod failed") + return fmt.Errorf("pod %s failed", pod.Name) + } + + for { + pod, err = s.kubernetesClient.CoreV1().Pods(s.namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + streamf(w, "Looking up pod: %v.\n", err) + return fmt.Errorf("look up pod %s: %v", pod.Name, err) + } + + switch pod.Status.Phase { + case "Succeeded": + return nil + case "Failed": + return fmt.Errorf("pod %s failed", pod.Name) + default: + log.Printf("pod %s phase %s", pod.Name, pod.Status.Phase) + time.Sleep(time.Second) + } } - return nil } - return fmt.Errorf("read logs: %v", err) + return fmt.Errorf("read pod %s logs: %v", pod.Name, err) } - stream(w, string(buf[:n])) + w.Write([]byte(string(buf[:n]))) } } @@ -110,34 +128,11 @@ func (s Service) podStatus(ctx context.Context, podName string) (corev1.PodPhase return pod.Status.Phase, nil } -func message(w io.Writer, message string) error { - return flush(w, "message", message) -} - -func stream(w io.Writer, message string) error { - return flush(w, "stream", message) -} - -func flush(w io.Writer, kind, message string) error { - b, err := json.Marshal(message) +func streamf(w io.Writer, message string, args ...interface{}) []byte { + b, err := json.Marshal(fmt.Sprintf(message, args...)) if err != nil { panic(err) // encode a string to json should not fail } - _, err = w.Write([]byte(fmt.Sprintf(`{"%s": %s}`, kind, b))) - if err != nil { - return err - } - - if f, ok := w.(http.Flusher); ok { - f.Flush() - } else { - return fmt.Errorf("stream can not be flushed") - } - - return nil -} - -func streamf(w io.Writer, message string, args ...interface{}) error { - return stream(w, fmt.Sprintf(message, args...)) + return []byte(fmt.Sprintf(`{"stream": %s}`, b)) } diff --git a/pkg/pull.go b/pkg/pull.go index 30b7565a..dbd4285e 100644 --- a/pkg/pull.go +++ b/pkg/pull.go @@ -1,11 +1,13 @@ package wedding import ( + "bytes" "context" + "encoding/json" "fmt" + "io" "log" "net/http" - "net/url" "time" corev1 "k8s.io/api/core/v1" @@ -54,7 +56,8 @@ func (s Service) pullImage(w http.ResponseWriter, r *http.Request) { } from := fmt.Sprintf("%s:%s", fromImage, pullTag) - to := fmt.Sprintf("wedding-registry:5000/images/%s", url.PathEscape(from)) + // to := fmt.Sprintf("wedding-registry:5000/images/%s", url.PathEscape(from)) + to := fmt.Sprintf("wedding-registry:5000/images/%s", escapePort(from)) dockerCfg, err := xRegistryAuth(r.Header.Get("X-Registry-Auth")).toDockerConfig() if err != nil { @@ -95,13 +98,11 @@ func (s Service) pullImage(w http.ResponseWriter, r *http.Request) { // TODO add timeout for script buildScript := fmt.Sprintf(` -set -euo pipefail +set -euxo pipefail skopeo copy --dest-tls-verify=false docker://%s docker://%s `, from, to) - stream(w, buildScript) - pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ GenerateName: "wedding-pull-", @@ -138,13 +139,43 @@ skopeo copy --dest-tls-verify=false docker://%s docker://%s }, } - err = s.executePod(r.Context(), pod, w) + b := &bytes.Buffer{} + messanger := streamer{w: w} + err = s.executePod(r.Context(), pod, b) if err != nil { w.WriteHeader(http.StatusInternalServerError) - stream(w, fmt.Sprintf("execute push: %v", err)) + io.Copy(w, b) + w.Write([]byte(fmt.Sprintf("execute push: %v", err))) log.Printf("execute push: %v", err) return } w.WriteHeader(http.StatusOK) + io.Copy(messanger, b) +} + +type messanger struct { + w io.Writer +} + +func (m messanger) Write(b []byte) (int, error) { + i := len(b) + + b, err := json.Marshal(string(b)) + if err != nil { + panic(err) // encode a string to json should not fail + } + + _, err = m.w.Write([]byte(fmt.Sprintf(`{"message": %s}`, b))) + if err != nil { + return 0, err + } + + if f, ok := m.w.(http.Flusher); ok { + f.Flush() + } else { + return 0, fmt.Errorf("stream can not be flushed") + } + + return i, nil } diff --git a/pkg/push.go b/pkg/push.go index b60848d1..6caa6836 100644 --- a/pkg/push.go +++ b/pkg/push.go @@ -1,8 +1,10 @@ package wedding import ( + "bytes" "context" "fmt" + "io" "log" "net/http" "time" @@ -16,7 +18,7 @@ func (s Service) pushImage(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) args := r.URL.Query() - from := fmt.Sprintf("wedding-registry:5000/images/%s:%s", vars["name"], args.Get("tag")) + from := fmt.Sprintf("wedding-registry:5000/images/%s:%s", escapePort(vars["name"]), args.Get("tag")) to := fmt.Sprintf("%s:%s", vars["name"], args.Get("tag")) dockerCfg, err := xRegistryAuth(r.Header.Get("X-Registry-Auth")).toDockerConfig() @@ -58,7 +60,7 @@ func (s Service) pushImage(w http.ResponseWriter, r *http.Request) { // TODO add timeout for script buildScript := fmt.Sprintf(` -set -euo pipefail +set -euxo pipefail skopeo copy --src-tls-verify=false --dest-tls-verify=false docker://%s docker://%s `, from, to) @@ -99,13 +101,17 @@ skopeo copy --src-tls-verify=false --dest-tls-verify=false docker://%s docker:// }, } - err = s.executePod(r.Context(), pod, w) + b := &bytes.Buffer{} + messanger := streamer{w: w} + err = s.executePod(r.Context(), pod, b) if err != nil { w.WriteHeader(http.StatusInternalServerError) - stream(w, fmt.Sprintf("execute push: %v", err)) + io.Copy(w, b) + w.Write([]byte(fmt.Sprintf("execute push: %v", err))) log.Printf("execute push: %v", err) return } w.WriteHeader(http.StatusOK) + io.Copy(messanger, b) } diff --git a/pkg/service.go b/pkg/service.go index 9dced00e..9465467d 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -52,8 +52,8 @@ func (s *Service) routes() { router.HandleFunc("/{apiVersion}/build/prune", buildPrune).Methods(http.MethodPost) router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - stream(w, "This function is not supported by wedding.") w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("This function is not supported by wedding.")) log.Printf("501 - Not Implemented: %s %s", r.Method, r.URL) }) diff --git a/pkg/tag.go b/pkg/tag.go index 4d97bce7..fa922d1d 100644 --- a/pkg/tag.go +++ b/pkg/tag.go @@ -1,10 +1,11 @@ package wedding import ( + "bytes" "fmt" + "io" "log" "net/http" - "net/url" "regexp" "strings" @@ -19,7 +20,8 @@ func (s Service) tagImage(w http.ResponseWriter, r *http.Request) { from := fmt.Sprintf("wedding-registry:5000/digests@%s", vars["name"]) if !strings.HasPrefix(vars["name"], "sha256:") { - from = fmt.Sprintf("wedding-registry:5000/images/%s", url.PathEscape(escapePort(vars["name"]))) + // from = fmt.Sprintf("wedding-registry:5000/images/%s", url.PathEscape(escapePort(vars["name"]))) + from = fmt.Sprintf("wedding-registry:5000/images/%s", escapePort(vars["name"])) } tag := args.Get("tag") @@ -34,7 +36,7 @@ func (s Service) tagImage(w http.ResponseWriter, r *http.Request) { // TODO add timeout for script buildScript := fmt.Sprintf(` -set -euo pipefail +set -euxo pipefail skopeo copy --src-tls-verify=false --dest-tls-verify=false docker://%s docker://%s `, from, to) @@ -59,10 +61,12 @@ skopeo copy --src-tls-verify=false --dest-tls-verify=false docker://%s docker:// }, } - err := s.executePod(r.Context(), pod, w) + b := &bytes.Buffer{} + err := s.executePod(r.Context(), pod, b) if err != nil { w.WriteHeader(http.StatusInternalServerError) - message(w, fmt.Sprintf("execute tagging: %v", err)) + io.Copy(w, b) + w.Write([]byte(fmt.Sprintf("execute tagging: %v", err))) log.Printf("execute tagging: %v", err) return } diff --git a/services/Tiltfile b/services/Tiltfile index ef0031f6..9951784f 100644 --- a/services/Tiltfile +++ b/services/Tiltfile @@ -13,3 +13,6 @@ k8s_resource( k8s_yaml('wedding-registry.yaml') k8s_resource('wedding-registry', port_forwards=5000) + +k8s_yaml('docker-io-mirror.yaml') +k8s_resource('docker-io-mirror', port_forwards="5001:5000") diff --git a/services/docker.io-cache.yaml b/services/docker-io-mirror.yaml similarity index 66% rename from services/docker.io-cache.yaml rename to services/docker-io-mirror.yaml index 46731c93..cc00a431 100644 --- a/services/docker.io-cache.yaml +++ b/services/docker-io-mirror.yaml @@ -1,32 +1,32 @@ apiVersion: v1 kind: Service metadata: - name: docker.io-mirror + name: docker-io-mirror spec: ports: - name: http port: 5000 selector: - app: docker.io-mirror + app: docker-io-mirror --- apiVersion: apps/v1 kind: Deployment metadata: - name: docker.io-mirror + name: docker-io-mirror spec: selector: matchLabels: - app: docker.io-mirror + app: docker-io-mirror strategy: type: Recreate replicas: 1 template: metadata: labels: - app: docker.io-mirror + app: docker-io-mirror spec: containers: - - name: docker.io-mirror + - name: docker-io-mirror image: registry:2.7.1 ports: - name: http @@ -38,6 +38,15 @@ spec: requests: cpu: "100m" memory: "500Mi" + env: + - name: REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY + value: /var/lib/registry + - name: REGISTRY_PROXY_REMOTEURL + value: https://mirror.gcr.io + - name: REGISTRY_LOG_ACCESSLOG_DISABLED + value: "true" + - name: REGISTRY_LOG_LEVEL + value: "warn" readinessProbe: httpGet: path: /v2/ @@ -55,12 +64,12 @@ spec: volumes: - name: data persistentVolumeClaim: - claimName: docker.io-mirror + claimName: docker-io-mirror --- apiVersion: v1 kind: PersistentVolumeClaim metadata: - name: docker.io-mirror + name: docker-io-mirror spec: accessModes: - ReadWriteOnce diff --git a/tests/Tiltfile b/tests/Tiltfile index 9e985c16..952691de 100644 --- a/tests/Tiltfile +++ b/tests/Tiltfile @@ -2,11 +2,11 @@ k8s_yaml('docker-build.yaml') k8s_resource('test-docker-build', resource_deps=['wedding']) -k8s_yaml('docker-pull-tag-push.yaml') -k8s_resource('test-docker-pull-tag-push', resource_deps=['wedding']) +#k8s_yaml('docker-pull-tag-push.yaml') +#k8s_resource('test-docker-pull-tag-push', resource_deps=['wedding']) -k8s_yaml('tilt-ci.yaml') -k8s_resource('test-tilt-ci', resource_deps=['wedding']) +#k8s_yaml('tilt-ci.yaml') +#k8s_resource('test-tilt-ci', resource_deps=['wedding']) docker_build( 'testing-image', diff --git a/tests/docker-broken/Dockerfile b/tests/docker-broken/Dockerfile new file mode 100644 index 00000000..50179cee --- /dev/null +++ b/tests/docker-broken/Dockerfile @@ -0,0 +1,3 @@ +FROM alpine + +RUN false diff --git a/tests/docker-build.yaml b/tests/docker-build.yaml index 1a8030e9..97f63d7f 100644 --- a/tests/docker-build.yaml +++ b/tests/docker-build.yaml @@ -3,7 +3,7 @@ kind: Job metadata: name: test-docker-build spec: - backoffLimit: 1 + backoffLimit: 0 template: metadata: name: test-docker-build @@ -18,7 +18,7 @@ spec: set -euxo pipefail docker build ./docker - if docker build missing; then echo "this should fail"; false; else echo "exit code propagated"; fi + # if docker build ./docker-broken; then echo "this should fail"; false; else echo "exit code propagated"; fi echo "done" env: diff --git a/tests/docker-pull-tag-push.yaml b/tests/docker-pull-tag-push.yaml index bd3350bd..4c749916 100644 --- a/tests/docker-pull-tag-push.yaml +++ b/tests/docker-pull-tag-push.yaml @@ -3,7 +3,7 @@ kind: Job metadata: name: test-docker-pull-tag-push spec: - backoffLimit: 1 + backoffLimit: 0 template: metadata: name: test-docker-pull-tag-push @@ -17,10 +17,10 @@ spec: - | set -euxo pipefail - docker pull alpine - if docker pull missing; then echo "this should fail"; false; else echo "exit code propagated"; fi + docker pull mirror.gcr.io/library/alpine + if docker pull mirror.gcr.io/library/missing; then echo "this should fail"; false; else echo "exit code propagated"; fi - docker tag alpine wedding-registry:5000/test-push:alpine + docker tag mirror.gcr.io/library/alpine wedding-registry:5000/test-push:alpine if docker tag missing b; then echo "this should fail"; false; else echo "exit code propagated"; fi docker push wedding-registry:5000/test-push:alpine diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 13768300..d656096f 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -1,3 +1,3 @@ FROM alpine -RUN sleep 1 +RUN true diff --git a/tests/tilt-ci.yaml b/tests/tilt-ci.yaml index 7a2a1a7b..25986a8c 100644 --- a/tests/tilt-ci.yaml +++ b/tests/tilt-ci.yaml @@ -3,7 +3,7 @@ kind: Job metadata: name: test-tilt-ci spec: - backoffLimit: 1 + backoffLimit: 0 template: metadata: name: test-tilt-ci