Skip to content

Commit

Permalink
added docker hub mirror from google / fixed exit codes for pull, tag,…
Browse files Browse the repository at this point in the history
… and push
  • Loading branch information
damoon committed Nov 14, 2020
1 parent a7b50b0 commit b0d8b2a
Show file tree
Hide file tree
Showing 16 changed files with 198 additions and 94 deletions.
3 changes: 2 additions & 1 deletion Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ min_tilt_version('0.15.0') # includes fix for auto_init+False with tilt ci
include('./services/Tiltfile')
include('./tests/Tiltfile')

k8s_yaml('deployment/config.yaml')
k8s_yaml('deployment/kubernetes.yaml')

if os.environ.get('PROD', '') == '':
Expand Down Expand Up @@ -57,5 +58,5 @@ else:
k8s_resource(
'wedding',
port_forwards=['12376:2376'],
resource_deps=['setup-s3-bucket', 'wedding-registry'],
resource_deps=['setup-s3-bucket', 'wedding-registry', 'docker-io-mirror'],
)
22 changes: 22 additions & 0 deletions deployment/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: buildkitd-config
data:
buildkitd.toml: |
debug = true
[worker.oci]
rootless = true
noProcessSandbox = true
[registry."docker.io"]
mirrors = ["docker-io-mirror:5000"]
[registry."docker-io-mirror:5000"]
http = true
insecure = true
[registry."wedding-registry:5000"]
http = true
insecure = true
62 changes: 46 additions & 16 deletions pkg/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
Expand Down Expand Up @@ -275,9 +276,6 @@ func (o ObjectStore) deleteContext(ctx context.Context, cfg *buildConfig) error
}

func (s Service) executeBuild(ctx context.Context, cfg *buildConfig, w http.ResponseWriter) error {

stream(w, fmt.Sprintf("%v", cfg))

presignedContextURL, err := s.objectStore.presignContext(cfg)
if err != nil {
return err
Expand Down Expand Up @@ -325,19 +323,11 @@ func (s Service) executeBuild(ctx context.Context, cfg *buildConfig, w http.Resp

// TODO add timeout for script
buildScript := fmt.Sprintf(`
set -euo pipefail
set -euxo pipefail
mkdir ~/context && cd ~/context
mkdir -p ~/.config/buildkit/
echo "
[registry.\"wedding-registry:5000\"]
http = true
insecure = true
" > ~/.config/buildkit/buildkitd.toml
echo Downloading context
wget -O - "%s" | tar -xf - # --quiet
wget -O - "%s" | tar -xf -
export BUILDKITD_FLAGS="--oci-worker-no-process-sandbox"
export BUILDCTL_CONNECT_RETRIES_MAX=100
Expand All @@ -352,8 +342,6 @@ buildctl-daemonless.sh \
--import-cache=type=registry,ref=wedding-registry:5000/cache-repo
`, presignedContextURL, output)

stream(w, buildScript)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "wedding-build-",
Expand All @@ -373,6 +361,10 @@ buildctl-daemonless.sh \
MountPath: "/home/user/.docker",
Name: "docker-config",
},
{
MountPath: "/home/user/.config/buildkit",
Name: "buildkitd-config",
},
},
},
},
Expand All @@ -385,15 +377,53 @@ buildctl-daemonless.sh \
},
},
},
{
Name: "buildkitd-config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: "buildkitd-config",
},
},
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
}

err = s.executePod(ctx, pod, w)
streamer := streamer{w: w}

err = s.executePod(ctx, pod, streamer)
if err != nil {
return err
}

return nil
}

type streamer struct {
w io.Writer
}

func (s streamer) Write(b []byte) (int, error) {
i := len(b)

b, err := json.Marshal(string(b))
if err != nil {
panic(err) // encode a string to json should not fail
}

_, err = s.w.Write([]byte(fmt.Sprintf(`{"stream": %s}`, b)))
if err != nil {
return 0, err
}

if f, ok := s.w.(http.Flusher); ok {
f.Flush()
} else {
return 0, fmt.Errorf("stream can not be flushed")
}

return i, nil
}
75 changes: 35 additions & 40 deletions pkg/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"log"
"net/http"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -16,7 +15,7 @@ import (
func (s Service) executePod(ctx context.Context, pod *corev1.Pod, w io.Writer) error {
podClient := s.kubernetesClient.CoreV1().Pods(s.namespace)

stream(w, "Creating new pod.\n")
w.Write([]byte("Creating new pod.\n"))

pod, err := podClient.Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
Expand All @@ -30,30 +29,32 @@ func (s Service) executePod(ctx context.Context, pod *corev1.Pod, w io.Writer) e

defer func() {
if failed {
stream(w, "Pod failed. Skipping cleanup.\n")
w.Write([]byte("Pod failed. Skipping cleanup.\n"))
return
}

stream(w, "Deleting pod.\n")
return

w.Write([]byte("Deleting pod.\n"))

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

err = podClient.Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
streamf(w, "Pod deletetion failed: %v\n", err)
log.Printf("delete pod: %v", err)
log.Printf("delete pod %s: %v", pod.Name, err)
}
}()

stream(w, "Waiting for pod execution.\n")
w.Write([]byte("Waiting for pod execution.\n"))

waitRunning:
pod, err = s.kubernetesClient.CoreV1().Pods(s.namespace).Get(ctx, pod.Name, metav1.GetOptions{})

if err != nil {
streamf(w, "Looking up pod: %v.\n", err)
return fmt.Errorf("look up pod: %v", err)
return fmt.Errorf("look up pod %s: %v", pod.Name, err)
}

switch pod.Status.Phase {
Expand All @@ -70,14 +71,14 @@ waitRunning:
}

printLogs:
stream(w, "Streaming logs.\n")
// w.Write([]byte("Streaming logs.\n"))

podLogs, err := s.kubernetesClient.CoreV1().Pods(s.namespace).
GetLogs(pod.Name, &corev1.PodLogOptions{Follow: true}).
Stream(ctx)
if err != nil {
streamf(w, "Log streaming failed: %v\n", err)
return fmt.Errorf("streaming logs: %v", err)
return fmt.Errorf("streaming pod %s logs: %v", pod.Name, err)
}
defer podLogs.Close()

Expand All @@ -87,17 +88,34 @@ printLogs:
n, err := podLogs.Read(buf)
if err != nil {
if err == io.EOF {
stream(w, "End of logs reached.\n")
// w.Write([]byte("End of logs reached.\n"))
if failed {
return fmt.Errorf("pod failed")
return fmt.Errorf("pod %s failed", pod.Name)
}

for {
pod, err = s.kubernetesClient.CoreV1().Pods(s.namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
streamf(w, "Looking up pod: %v.\n", err)
return fmt.Errorf("look up pod %s: %v", pod.Name, err)
}

switch pod.Status.Phase {
case "Succeeded":
return nil
case "Failed":
return fmt.Errorf("pod %s failed", pod.Name)
default:
log.Printf("pod %s phase %s", pod.Name, pod.Status.Phase)
time.Sleep(time.Second)
}
}
return nil
}

return fmt.Errorf("read logs: %v", err)
return fmt.Errorf("read pod %s logs: %v", pod.Name, err)
}

stream(w, string(buf[:n]))
w.Write([]byte(string(buf[:n])))
}
}

Expand All @@ -110,34 +128,11 @@ func (s Service) podStatus(ctx context.Context, podName string) (corev1.PodPhase
return pod.Status.Phase, nil
}

func message(w io.Writer, message string) error {
return flush(w, "message", message)
}

func stream(w io.Writer, message string) error {
return flush(w, "stream", message)
}

func flush(w io.Writer, kind, message string) error {
b, err := json.Marshal(message)
func streamf(w io.Writer, message string, args ...interface{}) []byte {
b, err := json.Marshal(fmt.Sprintf(message, args...))
if err != nil {
panic(err) // encode a string to json should not fail
}

_, err = w.Write([]byte(fmt.Sprintf(`{"%s": %s}`, kind, b)))
if err != nil {
return err
}

if f, ok := w.(http.Flusher); ok {
f.Flush()
} else {
return fmt.Errorf("stream can not be flushed")
}

return nil
}

func streamf(w io.Writer, message string, args ...interface{}) error {
return stream(w, fmt.Sprintf(message, args...))
return []byte(fmt.Sprintf(`{"stream": %s}`, b))
}
45 changes: 38 additions & 7 deletions pkg/pull.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package wedding

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -54,7 +56,8 @@ func (s Service) pullImage(w http.ResponseWriter, r *http.Request) {
}

from := fmt.Sprintf("%s:%s", fromImage, pullTag)
to := fmt.Sprintf("wedding-registry:5000/images/%s", url.PathEscape(from))
// to := fmt.Sprintf("wedding-registry:5000/images/%s", url.PathEscape(from))
to := fmt.Sprintf("wedding-registry:5000/images/%s", escapePort(from))

dockerCfg, err := xRegistryAuth(r.Header.Get("X-Registry-Auth")).toDockerConfig()
if err != nil {
Expand Down Expand Up @@ -95,13 +98,11 @@ func (s Service) pullImage(w http.ResponseWriter, r *http.Request) {

// TODO add timeout for script
buildScript := fmt.Sprintf(`
set -euo pipefail
set -euxo pipefail
skopeo copy --dest-tls-verify=false docker://%s docker://%s
`, from, to)

stream(w, buildScript)

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "wedding-pull-",
Expand Down Expand Up @@ -138,13 +139,43 @@ skopeo copy --dest-tls-verify=false docker://%s docker://%s
},
}

err = s.executePod(r.Context(), pod, w)
b := &bytes.Buffer{}
messanger := streamer{w: w}
err = s.executePod(r.Context(), pod, b)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
stream(w, fmt.Sprintf("execute push: %v", err))
io.Copy(w, b)
w.Write([]byte(fmt.Sprintf("execute push: %v", err)))
log.Printf("execute push: %v", err)
return
}

w.WriteHeader(http.StatusOK)
io.Copy(messanger, b)
}

type messanger struct {
w io.Writer
}

func (m messanger) Write(b []byte) (int, error) {
i := len(b)

b, err := json.Marshal(string(b))
if err != nil {
panic(err) // encode a string to json should not fail
}

_, err = m.w.Write([]byte(fmt.Sprintf(`{"message": %s}`, b)))
if err != nil {
return 0, err
}

if f, ok := m.w.(http.Flusher); ok {
f.Flush()
} else {
return 0, fmt.Errorf("stream can not be flushed")
}

return i, nil
}
Loading

0 comments on commit b0d8b2a

Please sign in to comment.