From d90bec4f492a6a4e94148a349cad1ad37e00b27b Mon Sep 17 00:00:00 2001 From: David Sauer Date: Mon, 24 May 2021 19:05:28 +0200 Subject: [PATCH] upload in context in chunks / deduplicate build context chunks to reduce upload volume --- cmd/d8s/main.go | 184 +++++++++++++++++++++++++++++++++++++++++++++--- go.mod | 1 + go.sum | 2 + 3 files changed, 179 insertions(+), 8 deletions(-) diff --git a/cmd/d8s/main.go b/cmd/d8s/main.go index 9dadc5dc..eb46bec2 100644 --- a/cmd/d8s/main.go +++ b/cmd/d8s/main.go @@ -1,14 +1,11 @@ package main -/* - https://restic.readthedocs.io/en/stable/040_backup.html?highlight=password#environment-variables - https://restic.readthedocs.io/en/stable/030_preparing_a_new_repo.html#minio-server - https://restic.readthedocs.io/en/stable/100_references.html?highlight=deduplication#backups-and-deduplication -*/ - import ( "bufio" + "bytes" "context" + "crypto/sha256" + "encoding/hex" "fmt" "io" "log" @@ -22,6 +19,7 @@ import ( "strings" "time" + "github.com/restic/chunker" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -35,7 +33,10 @@ import ( "github.com/urfave/cli/v2" ) -const weddingPort = 2376 +const ( + weddingPort = 2376 + testPol = chunker.Pol(0x3DA3358B4DC173) +) var ( gitHash string @@ -318,7 +319,7 @@ func localServer(localAddr string) (int, error) { proxy := httputil.NewSingleHostReverseProxy(targetURL) mux := http.NewServeMux() - mux.HandleFunc("/", proxy.ServeHTTP) + mux.HandleFunc("/", uploadContextHandlerFunc(proxy, localAddr)) listener, err := net.Listen("tcp", ":0") if err != nil { @@ -329,3 +330,170 @@ func localServer(localAddr string) (int, error) { return listener.Addr().(*net.TCPAddr).Port, nil } + +func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) http.HandlerFunc { + re := regexp.MustCompile(`^/[^/]+/build$`) + + return func(w http.ResponseWriter, r *http.Request) { + if !re.MatchString(r.URL.Path) { + proxy.ServeHTTP(w, r) + } + + tempFile, err := os.CreateTemp("", "d8s-build-context") + if err != nil { + log.Printf("create temp file to store context: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + defer os.Remove(tempFile.Name()) + + _, err = io.Copy(tempFile, r.Body) + if err != nil { + log.Printf("copy build context to temp file: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + _, err = tempFile.Seek(0, io.SeekStart) + if err != nil { + log.Printf("seek temp context to beginning: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + chnker := chunker.New(tempFile, testPol) + + for { + c, err := chnker.Next(nil) + + if err == io.EOF { + break + } + + if err != nil { + log.Printf("searching next chunk: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + log.Printf("chunk: Length: %d", c.Length) + + found, err := chunkExists(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data)) + if err != nil { + log.Printf("chunk #%d deduplication: %v", c.Cut, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + if found { + log.Printf("SKIP uploading chunk #%d: %v", c.Cut, err) + continue + } + + log.Printf("upload chunk #%d: %v", c.Cut, err) + + err = uploadChunk(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data)) + if err != nil { + log.Printf("uploading chunk #%d: %v", c.Cut, err) + w.WriteHeader(http.StatusInternalServerError) + return + } + } + + _, err = tempFile.Seek(0, io.SeekStart) + if err != nil { + log.Printf("seek temp context to beginning: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + r.Body = tempFile + + proxy.ServeHTTP(w, r) + } +} + +func chunkExists(ctx context.Context, target string, data io.Reader) (bool, error) { + client := &http.Client{ + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) + if err != nil { + return false, fmt.Errorf("create request for chunk deduplication: %v", err) + } + + hash, err := hashData(data) + + hashHex := make([]byte, hex.EncodedLen(len(hash))) + hex.Encode(hashHex, hash) + + q := req.URL.Query() + q.Add("hash", string(hashHex)) + req.URL.RawQuery = q.Encode() + + resp, err := client.Do(req) + if err != nil { + return false, fmt.Errorf("chunk deduplication request: %v", err) + } + + if resp.StatusCode == http.StatusOK { + return true, nil + } + + if resp.StatusCode == http.StatusNotFound { + return false, nil + } + + return false, fmt.Errorf("chunk deduplication request returned status code %d", resp.StatusCode) + +} + +func hashData(r io.Reader) ([]byte, error) { + h := sha256.New() + + _, err := io.Copy(h, r) + if err != nil { + return []byte{}, err + } + + return h.Sum(nil), nil +} + +func uploadChunk(ctx context.Context, target string, data io.Reader) error { + client := &http.Client{ + Transport: &http.Transport{ + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + }, + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, data) + if err != nil { + return fmt.Errorf("create request for chunk upload: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("chunk upload: %v", err) + } + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("chunk upload returned status code %d", resp.StatusCode) + } + + return nil +} diff --git a/go.mod b/go.mod index ed18dbdc..755f3035 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.15 require ( github.com/aws/aws-sdk-go v1.38.45 github.com/gorilla/mux v1.8.0 + github.com/restic/chunker v0.4.0 github.com/urfave/cli/v2 v2.3.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c k8s.io/api v0.21.1 diff --git a/go.sum b/go.sum index baac8bf6..a46efe78 100644 --- a/go.sum +++ b/go.sum @@ -171,6 +171,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/restic/chunker v0.4.0 h1:YUPYCUn70MYP7VO4yllypp2SjmsRhRJaad3xKu1QFRw= +github.com/restic/chunker v0.4.0/go.mod h1:z0cH2BejpW636LXw0R/BGyv+Ey8+m9QGiOanDHItzyw= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=