From 94875d2e0c92f2136285793507039e53077f5c5f Mon Sep 17 00:00:00 2001 From: David Sauer Date: Sun, 22 Nov 2020 22:46:22 +0100 Subject: [PATCH] run one buildkit build locally and up to 5 skopeo transfers before scheduling to kubernetes --- pkg/build.go | 30 ++++++++++++++++++------------ pkg/inspect.go | 18 +++++++++++++++--- pkg/mock.go | 4 ---- pkg/pull.go | 17 +++++++++++++++-- pkg/push.go | 17 +++++++++++++++-- pkg/service.go | 13 ++++++++++++- pkg/tag.go | 18 +++++++++++++++--- 7 files changed, 90 insertions(+), 27 deletions(-) diff --git a/pkg/build.go b/pkg/build.go index a24e86f9..02b036e3 100644 --- a/pkg/build.go +++ b/pkg/build.go @@ -19,7 +19,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3manager" - "golang.org/x/sync/semaphore" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -58,8 +57,20 @@ func (s Service) build(w http.ResponseWriter, r *http.Request) { return } - // s.buildInKubernetes(w, r, cfg) - buildLocally(w, r, cfg) + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + scheduler := s.buildInKubernetes + err = semBuild.Acquire(ctx, 1) + if err == nil { + log.Printf("build locally %v", cfg.tags) + defer semBuild.Release(1) + scheduler = buildLocally + } else { + log.Printf("build scheduled %v", cfg.tags) + } + + scheduler(w, r, cfg) } func buildParameters(r *http.Request) (*buildConfig, error) { @@ -410,6 +421,7 @@ buildctl-daemonless.sh \ d := &digestParser{w: o} err = s.executePod(ctx, pod, d) if err != nil { + log.Printf("execute build: %v", err) o.Errorf("execute build: %v", err) return err } @@ -428,26 +440,20 @@ func buildLocally(w http.ResponseWriter, r *http.Request, cfg *buildConfig) { err := buildLocallyError(r.Context(), d, r.Body, cfg) if err != nil { + log.Printf("execute build: %v", err) o.Errorf("execute build: %v", err) return } err = d.publish(w) if err != nil { + log.Printf("publish ID: %v", err) o.Errorf("publish ID: %v", err) return } } -var sem = semaphore.NewWeighted(1) - func buildLocallyError(ctx context.Context, w io.Writer, r io.Reader, cfg *buildConfig) error { - err := sem.Acquire(ctx, 1) - if err != nil { - return fmt.Errorf("acquire build semaphore: %v", err) - } - defer sem.Release(1) - defer os.RemoveAll("/root/context") script := ` @@ -468,7 +474,7 @@ tar -xf - cmd.Stderr = w cmd.Stdin = r - err = cmd.Run() + err := cmd.Run() if err != nil { return fmt.Errorf("extract context: %v", err) } diff --git a/pkg/inspect.go b/pkg/inspect.go index 2d5a0464..9014dbc0 100644 --- a/pkg/inspect.go +++ b/pkg/inspect.go @@ -2,11 +2,13 @@ package wedding import ( "bytes" + "context" "fmt" "io" "log" "math/rand" "net/http" + "time" "github.com/gorilla/mux" ) @@ -23,11 +25,21 @@ skopeo inspect dir://%s rm -r %s `, randomID, image, randomID, randomID, randomID) - scheduler := scheduleLocal - // scheduler = s.scheduleInKubernetes + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + scheduler := s.scheduleInKubernetes + err := semSkopeo.Acquire(ctx, 1) + if err == nil { + log.Printf("inspect locally %s", vars["name"]) + defer semSkopeo.Release(1) + scheduler = scheduleLocal + } else { + log.Printf("inspect scheduled %s", vars["name"]) + } o := &bytes.Buffer{} - err := scheduler(r.Context(), o, "inspect", script, "") + err = scheduler(r.Context(), o, "inspect", script, "") if err != nil { log.Printf("execute inspect: %v", err) w.WriteHeader(http.StatusNotFound) diff --git a/pkg/mock.go b/pkg/mock.go index a6b8bc9b..38d96ef1 100644 --- a/pkg/mock.go +++ b/pkg/mock.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" ) @@ -21,7 +20,6 @@ func (o output) Write(b []byte) (int, error) { } msg := fmt.Sprintf(`{"stream": %s}`, b) - log.Printf("msg: %v", msg) _, err = o.w.Write([]byte(msg)) if err != nil { @@ -47,8 +45,6 @@ func (o output) Error(e string) error { return err } - log.Printf("error: %s", b) - msg := fmt.Sprintf(`{"error": %s, "errorDetail": {"code": %d, "message": %s}}`, b, 1, b) _, err = o.w.Write([]byte(msg)) diff --git a/pkg/pull.go b/pkg/pull.go index d9460a6f..9af5d7f9 100644 --- a/pkg/pull.go +++ b/pkg/pull.go @@ -1,9 +1,11 @@ package wedding import ( + "context" "fmt" "log" "net/http" + "time" ) func (s Service) pullImage(w http.ResponseWriter, r *http.Request) { @@ -60,12 +62,23 @@ func (s Service) pullImage(w http.ResponseWriter, r *http.Request) { script := fmt.Sprintf(`skopeo copy --retry-times 3 --dest-tls-verify=false docker://%s docker://%s`, from, to) - scheduler := scheduleLocal - // scheduler = s.scheduleInKubernetes + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + scheduler := s.scheduleInKubernetes + err = semSkopeo.Acquire(ctx, 1) + if err == nil { + log.Printf("pull locally %s", from) + defer semSkopeo.Release(1) + scheduler = scheduleLocal + } else { + log.Printf("pull scheduled %s", from) + } o := &output{w: w} err = scheduler(r.Context(), o, "pull", script, dockerCfg.mustToJSON()) if err != nil { + log.Printf("execute pull: %v", err) o.Errorf("execute pull: %v", err) } } diff --git a/pkg/push.go b/pkg/push.go index 3c8e4cb2..4143ae47 100644 --- a/pkg/push.go +++ b/pkg/push.go @@ -1,9 +1,11 @@ package wedding import ( + "context" "fmt" "log" "net/http" + "time" "github.com/gorilla/mux" ) @@ -26,12 +28,23 @@ func (s Service) pushImage(w http.ResponseWriter, r *http.Request) { // TODO only use --dest-tls-verify=false for local registry script := fmt.Sprintf(`skopeo copy --retry-times 3 --src-tls-verify=false --dest-tls-verify=false docker://%s docker://%s`, from, to) - scheduler := scheduleLocal - // scheduler = s.scheduleInKubernetes + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + scheduler := s.scheduleInKubernetes + err = semSkopeo.Acquire(ctx, 1) + if err == nil { + log.Printf("push locally %s", to) + defer semSkopeo.Release(1) + scheduler = scheduleLocal + } else { + log.Printf("push scheduled %s", to) + } o := &output{w: w} err = scheduler(r.Context(), o, "push", script, dockerCfg.mustToJSON()) if err != nil { + log.Printf("execute push: %v", err) o.Errorf("execute push: %v", err) } } diff --git a/pkg/service.go b/pkg/service.go index 0dbf7e0d..949b923d 100644 --- a/pkg/service.go +++ b/pkg/service.go @@ -7,6 +7,7 @@ import ( "time" "github.com/gorilla/mux" + "golang.org/x/sync/semaphore" "k8s.io/client-go/kubernetes" ) @@ -24,6 +25,11 @@ const ( skopeoCPU = "200m" ) +var ( + semBuild = semaphore.NewWeighted(1) + semSkopeo = semaphore.NewWeighted(5) +) + // Service runs the wedding server. type Service struct { router http.Handler @@ -61,6 +67,7 @@ func (s *Service) routes(gitHash, gitRef string) { router.HandleFunc("/{apiVersion}/images/{name:.+}/push", s.pushImage).Methods(http.MethodPost) router.HandleFunc("/{apiVersion}/images/{name:.+}/json", s.inspect).Methods(http.MethodGet) router.HandleFunc("/{apiVersion}/images/create", s.pullImage).Methods(http.MethodPost) + router.HandleFunc("/{apiVersion}/containers/{name:.+}/json", missing).Methods(http.MethodGet) router.HandleFunc("/{apiVersion}/containers/prune", containersPrune).Methods(http.MethodPost) router.HandleFunc("/{apiVersion}/images/json", imagesJSON).Methods(http.MethodGet) @@ -78,7 +85,7 @@ func (s *Service) routes(gitHash, gitRef string) { func loggingMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.Header.Get("User-Agent"), "kube-probe/") { - log.Printf("HTTP Request %s %s\n", r.Method, r.URL) + // log.Printf("HTTP Request %s %s\n", r.Method, r.URL) } next.ServeHTTP(w, r) @@ -88,3 +95,7 @@ func loggingMiddleware(next http.Handler) http.Handler { func ignored(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } + +func missing(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) +} diff --git a/pkg/tag.go b/pkg/tag.go index 8a9a8636..14c1e6ac 100644 --- a/pkg/tag.go +++ b/pkg/tag.go @@ -2,12 +2,14 @@ package wedding import ( "bytes" + "context" "fmt" "io" "log" "net/http" "regexp" "strings" + "time" "github.com/gorilla/mux" ) @@ -34,11 +36,21 @@ func (s Service) tagImage(w http.ResponseWriter, r *http.Request) { script := fmt.Sprintf(`skopeo copy --retry-times 3 --src-tls-verify=false --dest-tls-verify=false docker://%s docker://%s`, from, to) - scheduler := scheduleLocal - // scheduler = s.scheduleInKubernetes + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + scheduler := s.scheduleInKubernetes + err := semSkopeo.Acquire(ctx, 1) + if err == nil { + log.Printf("tag locally %s to %s", tag, to) + defer semSkopeo.Release(1) + scheduler = scheduleLocal + } else { + log.Printf("tag scheduled %s to %s", tag, to) + } o := &bytes.Buffer{} - err := scheduler(r.Context(), o, "tag", script, "") + err = scheduler(r.Context(), o, "tag", script, "") if err != nil { log.Printf("execute tag: %v", err) w.WriteHeader(http.StatusInternalServerError)