diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..cb6bc20c --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +tilt_modules +vendor diff --git a/cmd/d8s/main.go b/cmd/d8s/main.go index eb46bec2..67368698 100644 --- a/cmd/d8s/main.go +++ b/cmd/d8s/main.go @@ -35,7 +35,7 @@ import ( const ( weddingPort = 2376 - testPol = chunker.Pol(0x3DA3358B4DC173) + staticPol = chunker.Pol(0x3DA3358B4DC173) ) var ( @@ -333,35 +333,14 @@ func localServer(localAddr string) (int, error) { func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) http.HandlerFunc { re := regexp.MustCompile(`^/[^/]+/build$`) + chunksList := bytes.Buffer{} return func(w http.ResponseWriter, r *http.Request) { if !re.MatchString(r.URL.Path) { proxy.ServeHTTP(w, r) } - tempFile, err := os.CreateTemp("", "d8s-build-context") - if err != nil { - log.Printf("create temp file to store context: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - defer os.Remove(tempFile.Name()) - - _, err = io.Copy(tempFile, r.Body) - if err != nil { - log.Printf("copy build context to temp file: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - _, err = tempFile.Seek(0, io.SeekStart) - if err != nil { - log.Printf("seek temp context to beginning: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - chnker := chunker.New(tempFile, testPol) + chnker := chunker.New(r.Body, staticPol) for { c, err := chnker.Next(nil) @@ -376,9 +355,11 @@ func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) ht return } - log.Printf("chunk: Length: %d", c.Length) + hash, err := hashData(bytes.NewBuffer(c.Data)) - found, err := chunkExists(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data)) + chunksList.Write(hash) + + found, err := chunkExists(r.Context(), localAddr+"/_chunks", hash) if err != nil { log.Printf("chunk #%d deduplication: %v", c.Cut, err) w.WriteHeader(http.StatusInternalServerError) @@ -386,11 +367,11 @@ func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) ht } if found { - log.Printf("SKIP uploading chunk #%d: %v", c.Cut, err) + log.Printf("SKIP uploading %d bytes", c.Length) continue } - log.Printf("upload chunk #%d: %v", c.Cut, err) + log.Printf("uploading %d bytes", c.Length) err = uploadChunk(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data)) if err != nil { @@ -400,20 +381,14 @@ func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) ht } } - _, err = tempFile.Seek(0, io.SeekStart) - if err != nil { - log.Printf("seek temp context to beginning: %v", err) - w.WriteHeader(http.StatusInternalServerError) - return - } - - r.Body = tempFile + r.Body = io.NopCloser(bytes.NewReader(chunksList.Bytes())) + r.Header.Add("d8s-chunked", "true") proxy.ServeHTTP(w, r) } } -func chunkExists(ctx context.Context, target string, data io.Reader) (bool, error) { +func chunkExists(ctx context.Context, target string, hash []byte) (bool, error) { client := &http.Client{ Transport: &http.Transport{ Dial: (&net.Dialer{ @@ -431,8 +406,6 @@ func chunkExists(ctx context.Context, target string, data io.Reader) (bool, erro return false, fmt.Errorf("create request for chunk deduplication: %v", err) } - hash, err := hashData(data) - hashHex := make([]byte, hex.EncodedLen(len(hash))) hex.Encode(hashHex, hash) @@ -444,6 +417,7 @@ func chunkExists(ctx context.Context, target string, data io.Reader) (bool, erro if err != nil { return false, fmt.Errorf("chunk deduplication request: %v", err) } + defer resp.Body.Close() if resp.StatusCode == http.StatusOK { return true, nil @@ -490,6 +464,7 @@ func uploadChunk(ctx context.Context, target string, data io.Reader) error { if err != nil { return fmt.Errorf("chunk upload: %v", err) } + defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("chunk upload returned status code %d", resp.StatusCode) diff --git a/go.mod b/go.mod index 755f3035..b2296758 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/damoon/wedding -go 1.15 +go 1.16 require ( github.com/aws/aws-sdk-go v1.38.45 diff --git a/pkg/build.go b/pkg/build.go index c4acc047..5c3296ff 100644 --- a/pkg/build.go +++ b/pkg/build.go @@ -6,8 +6,10 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "log" "net/http" + "os" "path/filepath" "regexp" "strconv" @@ -55,8 +57,39 @@ func (s Service) build(w http.ResponseWriter, r *http.Request) { } ctx := r.Context() + chunked := r.Header.Get("d8s-chunked") != "" - err = s.objectStore.storeContext(ctx, r.Body, cfg) + buildContext := r.Body + if chunked { + tempfile, err := ioutil.TempFile("", "build-context-restore") + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(fmt.Sprintf("restore context: %v", err))) + log.Printf("restore context: %v", err) + return + } + defer os.Remove(tempfile.Name()) + + err = s.restoreContext(ctx, r.Body, tempfile) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(fmt.Sprintf("restore context: %v", err))) + log.Printf("restore context: %v", err) + return + } + + _, err = tempfile.Seek(0, io.SeekStart) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(fmt.Sprintf("seek restored context file: %v", err))) + log.Printf("seek restored context file: %v", err) + return + } + + buildContext = tempfile + } + + err = s.objectStore.storeContext(ctx, buildContext, cfg) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(fmt.Sprintf("store context: %v", err))) @@ -74,6 +107,27 @@ func (s Service) build(w http.ResponseWriter, r *http.Request) { } } +func (s Service) restoreContext(ctx context.Context, chunkList io.Reader, w io.Writer) error { + hash := make([]byte, 32) + + for { + _, err := chunkList.Read(hash) + if err == io.EOF { + return nil + } + + chunk, err := s.restoreChunk(ctx, hash) + if err != nil { + return err + } + + _, err = io.Copy(w, chunk) + if err != nil { + return err + } + } +} + func buildParameters(r *http.Request) (*buildConfig, error) { cfg := &buildConfig{} diff --git a/pkg/chunks.go b/pkg/chunks.go index c07c881f..8aa3dc21 100644 --- a/pkg/chunks.go +++ b/pkg/chunks.go @@ -106,3 +106,21 @@ func hashData(r io.Reader) ([]byte, error) { return h.Sum(nil), nil } + +// RestoreChunk stores a chunk for later reuse. +func (s Service) restoreChunk(ctx context.Context, hash []byte) (io.Reader, error) { + hashHex := make([]byte, hex.EncodedLen(len(hash))) + hex.Encode(hashHex, hash) + + path := filepath.Join("chunks", string(hashHex)) + + object, err := s.objectStore.Client.GetObjectWithContext(ctx, &s3.GetObjectInput{ + Bucket: aws.String(s.objectStore.Bucket), + Key: aws.String(path), + }) + if err != nil { + return &bytes.Buffer{}, err + } + + return object.Body, nil +}