Skip to content

Commit

Permalink
cache via registry / run pods instead of jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
damoon committed Nov 10, 2020
1 parent 72b6ac2 commit 3c94ffd
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 120 deletions.
187 changes: 85 additions & 102 deletions pkg/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"

batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

const (
Expand Down Expand Up @@ -299,156 +297,139 @@ func (s Service) executeBuild(ctx context.Context, cfg *buildConfig, res http.Re
return err
}

// TODO add timeout for script
buildScript := fmt.Sprintf(`
cd
pwd
wget -O - "%s" | tar -xf -
ls -la
set -euo pipefail
mkdir ~/context && cd ~/context
mkdir -p ~/.config/buildkit/
echo "
[registry.\"cache-registry:5000\"]
http = true
insecure = true
" > ~/.config/buildkit/buildkitd.toml
echo Downloading context
wget -O - "%s" | tar -xf - # --quiet
export BUILDKITD_FLAGS="--oci-worker-no-process-sandbox"
export BUILDCTL_CONNECT_RETRIES_MAX=100
buildctl-daemonless.sh \
build \
--frontend dockerfile.v0 \
--local context=. \
--local dockerfile=. \
--opt filename=Dockerfile
`, presignedContextURL)

job := &batchv1.Job{
build \
--frontend dockerfile.v0 \
--local context=. \
--local dockerfile=. \
--opt filename=Dockerfile \
--output type=image,push=true,name=cache-registry:5000/cache-repo:latest \
--export-cache=type=registry,ref=cache-registry:5000/cache-repo,mode=max \
--import-cache=type=registry,ref=cache-registry:5000/cache-repo
`, presignedContextURL)

pod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "wedding-build-",
},
Spec: batchv1.JobSpec{
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
Name: "buildkit",
Image: "moby/buildkit:v0.7.2-rootless",
Command: []string{
"sh",
"-c",
buildScript,
// "date; sleep 1; date; sleep 1; date; sleep 1; date;",
},
},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
Name: "buildkit",
Image: "moby/buildkit:master-rootless",
ImagePullPolicy: apiv1.PullAlways,
// Image: "moby/buildkit:v0.8-beta",
// Image: "moby/buildkit:v0.7.2-rootless",
Command: []string{
"sh",
"-c",
buildScript,
// "date; sleep 1; date; sleep 1; date; sleep 1; date;",
},
RestartPolicy: apiv1.RestartPolicyOnFailure,
},
},
RestartPolicy: apiv1.RestartPolicyNever,
},
}

err = s.executeJob(ctx, job, res)
err = s.executePod(ctx, pod, res)
if err != nil {
return err
}

return nil
}

func (s Service) executeJob(ctx context.Context, job *batchv1.Job, res http.ResponseWriter) error {
jobsClient := s.kubernetesClient.BatchV1().Jobs(s.namespace)
func (s Service) executePod(ctx context.Context, pod *apiv1.Pod, res http.ResponseWriter) error {
podClient := s.kubernetesClient.CoreV1().Pods(s.namespace)

stream(res, "Creating new job.\n")
stream(res, "Creating new pod.\n")

newJob, err := jobsClient.Create(ctx, job, v1.CreateOptions{})
pod, err := podClient.Create(ctx, pod, v1.CreateOptions{})
if err != nil {
streamf(res, "Job creation failed: %v\n", err)
return fmt.Errorf("create job: %v", err)
streamf(res, "Pod creation failed: %v\n", err)
return fmt.Errorf("create pod: %v", err)
}

streamf(res, "Created job %v.\n", newJob.GetName())
streamf(res, "Created pod %v.\n", pod.Name)

failed := false

defer func() {
streamf(res, "Deleting job %v.\n", newJob.GetName())
if failed {
stream(res, "Pod failed. Skipping cleanup.\n")
return
}

stream(res, "Deleting pod.\n")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

propagationPolicy := v1.DeletePropagationBackground
err = jobsClient.Delete(ctx, newJob.GetName(), v1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
err = podClient.Delete(ctx, pod.Name, v1.DeleteOptions{})
if err != nil {
streamf(res, "Job deletetion failed: %v\n", err)
log.Printf("delete job: %v", err)
streamf(res, "Pod deletetion failed: %v\n", err)
log.Printf("delete pod: %v", err)
}
}()

watchJob:
labelSelector := metav1.LabelSelector{MatchLabels: map[string]string{"job-name": newJob.GetName()}}
watchTimeout := int64(120)
jobWatch, err := s.kubernetesClient.BatchV1().Jobs(s.namespace).
Watch(ctx, v1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
TimeoutSeconds: &watchTimeout,
})
if err != nil {
return err
}
defer jobWatch.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("context timed out")

case e := <-jobWatch.ResultChan():
watchedJob, ok := e.Object.(*batchv1.Job)
if !ok {
stream(res, "Unexpected error.\n")
log.Panicf("unexpected type %v", e.Object)
}

if watchedJob.Status.Succeeded == 1 {
stream(res, "Job finished.\n")
return nil
}

if watchedJob.Status.Active == 1 {
stream(res, "Job started.\n")
goto showLogs
}
}
}
stream(res, "Waiting for pod execution.\n")

showLogs:
podList, err := s.kubernetesClient.CoreV1().Pods(s.namespace).List(ctx, metav1.ListOptions{
LabelSelector: labels.Set(labelSelector.MatchLabels).String(),
})
waitRunning:
pod, err = s.kubernetesClient.CoreV1().Pods(s.namespace).Get(ctx, pod.Name, metav1.GetOptions{})

if len(podList.Items) != 1 {
stream(res, "Pod not found.\n")
goto watchJob
}

pod := podList.Items[0]

if pod.Status.Phase != "Running" {
if err != nil {
streamf(res, "Looking up pod: %v.\n", err)
return fmt.Errorf("look up pod: %v", err)
}

switch pod.Status.Phase {
case "Failed":
failed = true
fallthrough
case "Succeeded":
fallthrough
case "Running":
goto printLogs
default:
time.Sleep(time.Second)
goto showLogs
goto waitRunning
}

stream(res, "Pod started.\n")
printLogs:
stream(res, "Streaming logs.\n")

req := s.kubernetesClient.CoreV1().Pods(s.namespace).GetLogs(pod.Name, &apiv1.PodLogOptions{Follow: true})
podLogs, err := req.Stream(ctx)
podLogs, err := s.kubernetesClient.CoreV1().Pods(s.namespace).
GetLogs(pod.Name, &apiv1.PodLogOptions{Follow: true}).
Stream(ctx)
if err != nil {
streamf(res, "Log streaming failed: %v\n", err)
return fmt.Errorf("streaming logs: %v", err)
}
defer podLogs.Close()

streamf(res, "Streaming logs from pod %s.\n", pod.Name)

buf := make([]byte, 1024)

for {
n, err := podLogs.Read(buf)
if n != 0 {
stream(res, string(buf[:n]))
}
if err != nil {
if err == io.EOF {
stream(res, "End of logs reached.\n")
Expand All @@ -457,6 +438,8 @@ showLogs:

return fmt.Errorf("read logs: %v", err)
}

stream(res, string(buf[:n]))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/wedding.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (s *Service) routes() {
s.router.HandleFunc("/v"+apiVersion+"/containers/prune", containersPrune)
s.router.HandleFunc("/v"+apiVersion+"/build/prune", buildPrune)
s.router.HandleFunc("/v"+apiVersion+"/version", version)
s.router.HandleFunc("/v"+apiVersion+"/build", logReqResp(s.buildHandler()))
s.router.HandleFunc("/v"+apiVersion+"/build", s.buildHandler())
s.router.HandleFunc("/_ping", ping)
s.router.HandleFunc("/", logReqResp(unsupported))
}
Expand Down
3 changes: 3 additions & 0 deletions services/Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ k8s_resource(
'delete-old-files',
resource_deps=['setup-s3-bucket'],
)

k8s_yaml('cache-registry.yaml')
k8s_resource('cache-registry', port_forwards=5000)
69 changes: 69 additions & 0 deletions services/cache-registry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
apiVersion: v1
kind: Service
metadata:
name: cache-registry
spec:
ports:
- name: http
port: 5000
selector:
app: cache-registry
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: cache-registry
spec:
selector:
matchLabels:
app: cache-registry
strategy:
type: Recreate
replicas: 1
template:
metadata:
labels:
app: cache-registry
spec:
containers:
- name: cache-registry
image: registry:2.7.1
ports:
- name: http
containerPort: 5000
resources:
limits:
cpu: "100m"
memory: "500Mi"
requests:
cpu: "100m"
memory: "500Mi"
readinessProbe:
httpGet:
path: /v2/
port: 5000
lifecycle:
preStop:
exec:
command:
- sh
- -c
- "sleep 10"
volumeMounts:
- name: data
mountPath: /var/lib/registry
volumes:
- name: data
persistentVolumeClaim:
claimName: cache-registry
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: cache-registry
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
18 changes: 1 addition & 17 deletions services/minio.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,11 @@ spec:
- "sleep 10"
volumeMounts:
- name: data
mountPath: /home/shared
- name: queue
mountPath: /queue
mountPath: /home/shared
volumes:
- name: data
persistentVolumeClaim:
claimName: minio
- name: queue
persistentVolumeClaim:
claimName: minio-notification-queue
---
apiVersion: v1
kind: Secret
Expand All @@ -87,14 +82,3 @@ spec:
resources:
requests:
storage: 3Gi
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: minio-notification-queue
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi

0 comments on commit 3c94ffd

Please sign in to comment.