Skip to content

Commit

Permalink
run one buildkit build locally and up to 5 skopeo transfers before sc…
Browse files Browse the repository at this point in the history
…heduling to kubernetes
  • Loading branch information
damoon committed Nov 22, 2020
1 parent 40d3ec0 commit 94875d2
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 27 deletions.
30 changes: 18 additions & 12 deletions pkg/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"golang.org/x/sync/semaphore"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -58,8 +57,20 @@ func (s Service) build(w http.ResponseWriter, r *http.Request) {
return
}

// s.buildInKubernetes(w, r, cfg)
buildLocally(w, r, cfg)
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

scheduler := s.buildInKubernetes
err = semBuild.Acquire(ctx, 1)
if err == nil {
log.Printf("build locally %v", cfg.tags)
defer semBuild.Release(1)
scheduler = buildLocally
} else {
log.Printf("build scheduled %v", cfg.tags)
}

scheduler(w, r, cfg)
}

func buildParameters(r *http.Request) (*buildConfig, error) {
Expand Down Expand Up @@ -410,6 +421,7 @@ buildctl-daemonless.sh \
d := &digestParser{w: o}
err = s.executePod(ctx, pod, d)
if err != nil {
log.Printf("execute build: %v", err)
o.Errorf("execute build: %v", err)
return err
}
Expand All @@ -428,26 +440,20 @@ func buildLocally(w http.ResponseWriter, r *http.Request, cfg *buildConfig) {

err := buildLocallyError(r.Context(), d, r.Body, cfg)
if err != nil {
log.Printf("execute build: %v", err)
o.Errorf("execute build: %v", err)
return
}

err = d.publish(w)
if err != nil {
log.Printf("publish ID: %v", err)
o.Errorf("publish ID: %v", err)
return
}
}

var sem = semaphore.NewWeighted(1)

func buildLocallyError(ctx context.Context, w io.Writer, r io.Reader, cfg *buildConfig) error {
err := sem.Acquire(ctx, 1)
if err != nil {
return fmt.Errorf("acquire build semaphore: %v", err)
}
defer sem.Release(1)

defer os.RemoveAll("/root/context")

script := `
Expand All @@ -468,7 +474,7 @@ tar -xf -
cmd.Stderr = w
cmd.Stdin = r

err = cmd.Run()
err := cmd.Run()
if err != nil {
return fmt.Errorf("extract context: %v", err)
}
Expand Down
18 changes: 15 additions & 3 deletions pkg/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package wedding

import (
"bytes"
"context"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"time"

"github.com/gorilla/mux"
)
Expand All @@ -23,11 +25,21 @@ skopeo inspect dir://%s
rm -r %s
`, randomID, image, randomID, randomID, randomID)

scheduler := scheduleLocal
// scheduler = s.scheduleInKubernetes
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

scheduler := s.scheduleInKubernetes
err := semSkopeo.Acquire(ctx, 1)
if err == nil {
log.Printf("inspect locally %s", vars["name"])
defer semSkopeo.Release(1)
scheduler = scheduleLocal
} else {
log.Printf("inspect scheduled %s", vars["name"])
}

o := &bytes.Buffer{}
err := scheduler(r.Context(), o, "inspect", script, "")
err = scheduler(r.Context(), o, "inspect", script, "")
if err != nil {
log.Printf("execute inspect: %v", err)
w.WriteHeader(http.StatusNotFound)
Expand Down
4 changes: 0 additions & 4 deletions pkg/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
)

Expand All @@ -21,7 +20,6 @@ func (o output) Write(b []byte) (int, error) {
}

msg := fmt.Sprintf(`{"stream": %s}`, b)
log.Printf("msg: %v", msg)

_, err = o.w.Write([]byte(msg))
if err != nil {
Expand All @@ -47,8 +45,6 @@ func (o output) Error(e string) error {
return err
}

log.Printf("error: %s", b)

msg := fmt.Sprintf(`{"error": %s, "errorDetail": {"code": %d, "message": %s}}`, b, 1, b)

_, err = o.w.Write([]byte(msg))
Expand Down
17 changes: 15 additions & 2 deletions pkg/pull.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package wedding

import (
"context"
"fmt"
"log"
"net/http"
"time"
)

func (s Service) pullImage(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -60,12 +62,23 @@ func (s Service) pullImage(w http.ResponseWriter, r *http.Request) {

script := fmt.Sprintf(`skopeo copy --retry-times 3 --dest-tls-verify=false docker://%s docker://%s`, from, to)

scheduler := scheduleLocal
// scheduler = s.scheduleInKubernetes
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

scheduler := s.scheduleInKubernetes
err = semSkopeo.Acquire(ctx, 1)
if err == nil {
log.Printf("pull locally %s", from)
defer semSkopeo.Release(1)
scheduler = scheduleLocal
} else {
log.Printf("pull scheduled %s", from)
}

o := &output{w: w}
err = scheduler(r.Context(), o, "pull", script, dockerCfg.mustToJSON())
if err != nil {
log.Printf("execute pull: %v", err)
o.Errorf("execute pull: %v", err)
}
}
17 changes: 15 additions & 2 deletions pkg/push.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package wedding

import (
"context"
"fmt"
"log"
"net/http"
"time"

"github.com/gorilla/mux"
)
Expand All @@ -26,12 +28,23 @@ func (s Service) pushImage(w http.ResponseWriter, r *http.Request) {
// TODO only use --dest-tls-verify=false for local registry
script := fmt.Sprintf(`skopeo copy --retry-times 3 --src-tls-verify=false --dest-tls-verify=false docker://%s docker://%s`, from, to)

scheduler := scheduleLocal
// scheduler = s.scheduleInKubernetes
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

scheduler := s.scheduleInKubernetes
err = semSkopeo.Acquire(ctx, 1)
if err == nil {
log.Printf("push locally %s", to)
defer semSkopeo.Release(1)
scheduler = scheduleLocal
} else {
log.Printf("push scheduled %s", to)
}

o := &output{w: w}
err = scheduler(r.Context(), o, "push", script, dockerCfg.mustToJSON())
if err != nil {
log.Printf("execute push: %v", err)
o.Errorf("execute push: %v", err)
}
}
13 changes: 12 additions & 1 deletion pkg/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/gorilla/mux"
"golang.org/x/sync/semaphore"
"k8s.io/client-go/kubernetes"
)

Expand All @@ -24,6 +25,11 @@ const (
skopeoCPU = "200m"
)

var (
semBuild = semaphore.NewWeighted(1)
semSkopeo = semaphore.NewWeighted(5)
)

// Service runs the wedding server.
type Service struct {
router http.Handler
Expand Down Expand Up @@ -61,6 +67,7 @@ func (s *Service) routes(gitHash, gitRef string) {
router.HandleFunc("/{apiVersion}/images/{name:.+}/push", s.pushImage).Methods(http.MethodPost)
router.HandleFunc("/{apiVersion}/images/{name:.+}/json", s.inspect).Methods(http.MethodGet)
router.HandleFunc("/{apiVersion}/images/create", s.pullImage).Methods(http.MethodPost)
router.HandleFunc("/{apiVersion}/containers/{name:.+}/json", missing).Methods(http.MethodGet)

router.HandleFunc("/{apiVersion}/containers/prune", containersPrune).Methods(http.MethodPost)
router.HandleFunc("/{apiVersion}/images/json", imagesJSON).Methods(http.MethodGet)
Expand All @@ -78,7 +85,7 @@ func (s *Service) routes(gitHash, gitRef string) {
func loggingMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.Header.Get("User-Agent"), "kube-probe/") {
log.Printf("HTTP Request %s %s\n", r.Method, r.URL)
// log.Printf("HTTP Request %s %s\n", r.Method, r.URL)
}

next.ServeHTTP(w, r)
Expand All @@ -88,3 +95,7 @@ func loggingMiddleware(next http.Handler) http.Handler {
func ignored(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func missing(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}
18 changes: 15 additions & 3 deletions pkg/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package wedding

import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"regexp"
"strings"
"time"

"github.com/gorilla/mux"
)
Expand All @@ -34,11 +36,21 @@ func (s Service) tagImage(w http.ResponseWriter, r *http.Request) {

script := fmt.Sprintf(`skopeo copy --retry-times 3 --src-tls-verify=false --dest-tls-verify=false docker://%s docker://%s`, from, to)

scheduler := scheduleLocal
// scheduler = s.scheduleInKubernetes
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()

scheduler := s.scheduleInKubernetes
err := semSkopeo.Acquire(ctx, 1)
if err == nil {
log.Printf("tag locally %s to %s", tag, to)
defer semSkopeo.Release(1)
scheduler = scheduleLocal
} else {
log.Printf("tag scheduled %s to %s", tag, to)
}

o := &bytes.Buffer{}
err := scheduler(r.Context(), o, "tag", script, "")
err = scheduler(r.Context(), o, "tag", script, "")
if err != nil {
log.Printf("execute tag: %v", err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down

0 comments on commit 94875d2

Please sign in to comment.