Skip to content

Commit

Permalink
upload in context in chunks / deduplicate build context chunks to red…
Browse files Browse the repository at this point in the history
…uce upload volume
  • Loading branch information
damoon committed May 24, 2021
1 parent 1a22a82 commit d90bec4
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 8 deletions.
184 changes: 176 additions & 8 deletions cmd/d8s/main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package main

/*
https://restic.readthedocs.io/en/stable/040_backup.html?highlight=password#environment-variables
https://restic.readthedocs.io/en/stable/030_preparing_a_new_repo.html#minio-server
https://restic.readthedocs.io/en/stable/100_references.html?highlight=deduplication#backups-and-deduplication
*/

import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log"
Expand All @@ -22,6 +19,7 @@ import (
"strings"
"time"

"github.com/restic/chunker"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -35,7 +33,10 @@ import (
"github.com/urfave/cli/v2"
)

const weddingPort = 2376
const (
weddingPort = 2376
testPol = chunker.Pol(0x3DA3358B4DC173)
)

var (
gitHash string
Expand Down Expand Up @@ -318,7 +319,7 @@ func localServer(localAddr string) (int, error) {
proxy := httputil.NewSingleHostReverseProxy(targetURL)

mux := http.NewServeMux()
mux.HandleFunc("/", proxy.ServeHTTP)
mux.HandleFunc("/", uploadContextHandlerFunc(proxy, localAddr))

listener, err := net.Listen("tcp", ":0")
if err != nil {
Expand All @@ -329,3 +330,170 @@ func localServer(localAddr string) (int, error) {

return listener.Addr().(*net.TCPAddr).Port, nil
}

func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) http.HandlerFunc {
re := regexp.MustCompile(`^/[^/]+/build$`)

return func(w http.ResponseWriter, r *http.Request) {
if !re.MatchString(r.URL.Path) {
proxy.ServeHTTP(w, r)
}

tempFile, err := os.CreateTemp("", "d8s-build-context")
if err != nil {
log.Printf("create temp file to store context: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer os.Remove(tempFile.Name())

_, err = io.Copy(tempFile, r.Body)
if err != nil {
log.Printf("copy build context to temp file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

_, err = tempFile.Seek(0, io.SeekStart)
if err != nil {
log.Printf("seek temp context to beginning: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

chnker := chunker.New(tempFile, testPol)

for {
c, err := chnker.Next(nil)

if err == io.EOF {
break
}

if err != nil {
log.Printf("searching next chunk: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

log.Printf("chunk: Length: %d", c.Length)

found, err := chunkExists(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data))
if err != nil {
log.Printf("chunk #%d deduplication: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if found {
log.Printf("SKIP uploading chunk #%d: %v", c.Cut, err)
continue
}

log.Printf("upload chunk #%d: %v", c.Cut, err)

err = uploadChunk(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data))
if err != nil {
log.Printf("uploading chunk #%d: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

_, err = tempFile.Seek(0, io.SeekStart)
if err != nil {
log.Printf("seek temp context to beginning: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

r.Body = tempFile

proxy.ServeHTTP(w, r)
}
}

func chunkExists(ctx context.Context, target string, data io.Reader) (bool, error) {
client := &http.Client{
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
if err != nil {
return false, fmt.Errorf("create request for chunk deduplication: %v", err)
}

hash, err := hashData(data)

hashHex := make([]byte, hex.EncodedLen(len(hash)))
hex.Encode(hashHex, hash)

q := req.URL.Query()
q.Add("hash", string(hashHex))
req.URL.RawQuery = q.Encode()

resp, err := client.Do(req)
if err != nil {
return false, fmt.Errorf("chunk deduplication request: %v", err)
}

if resp.StatusCode == http.StatusOK {
return true, nil
}

if resp.StatusCode == http.StatusNotFound {
return false, nil
}

return false, fmt.Errorf("chunk deduplication request returned status code %d", resp.StatusCode)

}

func hashData(r io.Reader) ([]byte, error) {
h := sha256.New()

_, err := io.Copy(h, r)
if err != nil {
return []byte{}, err
}

return h.Sum(nil), nil
}

func uploadChunk(ctx context.Context, target string, data io.Reader) error {
client := &http.Client{
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, data)
if err != nil {
return fmt.Errorf("create request for chunk upload: %v", err)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("chunk upload: %v", err)
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("chunk upload returned status code %d", resp.StatusCode)
}

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.15
require (
github.com/aws/aws-sdk-go v1.38.45
github.com/gorilla/mux v1.8.0
github.com/restic/chunker v0.4.0
github.com/urfave/cli/v2 v2.3.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
k8s.io/api v0.21.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/restic/chunker v0.4.0 h1:YUPYCUn70MYP7VO4yllypp2SjmsRhRJaad3xKu1QFRw=
github.com/restic/chunker v0.4.0/go.mod h1:z0cH2BejpW636LXw0R/BGyv+Ey8+m9QGiOanDHItzyw=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down

0 comments on commit d90bec4

Please sign in to comment.