Skip to content

Commit

Permalink
only upload new chunks / restore context server side
Browse files Browse the repository at this point in the history
  • Loading branch information
damoon committed May 24, 2021
1 parent d90bec4 commit 48de666
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 41 deletions.
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
tilt_modules
vendor
53 changes: 14 additions & 39 deletions cmd/d8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

const (
weddingPort = 2376
testPol = chunker.Pol(0x3DA3358B4DC173)
staticPol = chunker.Pol(0x3DA3358B4DC173)
)

var (
Expand Down Expand Up @@ -333,35 +333,14 @@ func localServer(localAddr string) (int, error) {

func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) http.HandlerFunc {
re := regexp.MustCompile(`^/[^/]+/build$`)
chunksList := bytes.Buffer{}

return func(w http.ResponseWriter, r *http.Request) {
if !re.MatchString(r.URL.Path) {
proxy.ServeHTTP(w, r)
}

tempFile, err := os.CreateTemp("", "d8s-build-context")
if err != nil {
log.Printf("create temp file to store context: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer os.Remove(tempFile.Name())

_, err = io.Copy(tempFile, r.Body)
if err != nil {
log.Printf("copy build context to temp file: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

_, err = tempFile.Seek(0, io.SeekStart)
if err != nil {
log.Printf("seek temp context to beginning: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

chnker := chunker.New(tempFile, testPol)
chnker := chunker.New(r.Body, staticPol)

for {
c, err := chnker.Next(nil)
Expand All @@ -376,21 +355,23 @@ func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) ht
return
}

log.Printf("chunk: Length: %d", c.Length)
hash, err := hashData(bytes.NewBuffer(c.Data))

found, err := chunkExists(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data))
chunksList.Write(hash)

found, err := chunkExists(r.Context(), localAddr+"/_chunks", hash)
if err != nil {
log.Printf("chunk #%d deduplication: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if found {
log.Printf("SKIP uploading chunk #%d: %v", c.Cut, err)
log.Printf("SKIP uploading %d bytes", c.Length)
continue
}

log.Printf("upload chunk #%d: %v", c.Cut, err)
log.Printf("uploading %d bytes", c.Length)

err = uploadChunk(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data))
if err != nil {
Expand All @@ -400,20 +381,14 @@ func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) ht
}
}

_, err = tempFile.Seek(0, io.SeekStart)
if err != nil {
log.Printf("seek temp context to beginning: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

r.Body = tempFile
r.Body = io.NopCloser(bytes.NewReader(chunksList.Bytes()))
r.Header.Add("d8s-chunked", "true")

proxy.ServeHTTP(w, r)
}
}

func chunkExists(ctx context.Context, target string, data io.Reader) (bool, error) {
func chunkExists(ctx context.Context, target string, hash []byte) (bool, error) {
client := &http.Client{
Transport: &http.Transport{
Dial: (&net.Dialer{
Expand All @@ -431,8 +406,6 @@ func chunkExists(ctx context.Context, target string, data io.Reader) (bool, erro
return false, fmt.Errorf("create request for chunk deduplication: %v", err)
}

hash, err := hashData(data)

hashHex := make([]byte, hex.EncodedLen(len(hash)))
hex.Encode(hashHex, hash)

Expand All @@ -444,6 +417,7 @@ func chunkExists(ctx context.Context, target string, data io.Reader) (bool, erro
if err != nil {
return false, fmt.Errorf("chunk deduplication request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
return true, nil
Expand Down Expand Up @@ -490,6 +464,7 @@ func uploadChunk(ctx context.Context, target string, data io.Reader) error {
if err != nil {
return fmt.Errorf("chunk upload: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("chunk upload returned status code %d", resp.StatusCode)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/damoon/wedding

go 1.15
go 1.16

require (
github.com/aws/aws-sdk-go v1.38.45
Expand Down
56 changes: 55 additions & 1 deletion pkg/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
Expand Down Expand Up @@ -55,8 +57,39 @@ func (s Service) build(w http.ResponseWriter, r *http.Request) {
}

ctx := r.Context()
chunked := r.Header.Get("d8s-chunked") != ""

err = s.objectStore.storeContext(ctx, r.Body, cfg)
buildContext := r.Body
if chunked {
tempfile, err := ioutil.TempFile("", "build-context-restore")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("restore context: %v", err)))
log.Printf("restore context: %v", err)
return
}
defer os.Remove(tempfile.Name())

err = s.restoreContext(ctx, r.Body, tempfile)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("restore context: %v", err)))
log.Printf("restore context: %v", err)
return
}

_, err = tempfile.Seek(0, io.SeekStart)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("seek restored context file: %v", err)))
log.Printf("seek restored context file: %v", err)
return
}

buildContext = tempfile
}

err = s.objectStore.storeContext(ctx, buildContext, cfg)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(fmt.Sprintf("store context: %v", err)))
Expand All @@ -74,6 +107,27 @@ func (s Service) build(w http.ResponseWriter, r *http.Request) {
}
}

func (s Service) restoreContext(ctx context.Context, chunkList io.Reader, w io.Writer) error {
hash := make([]byte, 32)

for {
_, err := chunkList.Read(hash)
if err == io.EOF {
return nil
}

chunk, err := s.restoreChunk(ctx, hash)
if err != nil {
return err
}

_, err = io.Copy(w, chunk)
if err != nil {
return err
}
}
}

func buildParameters(r *http.Request) (*buildConfig, error) {
cfg := &buildConfig{}

Expand Down
18 changes: 18 additions & 0 deletions pkg/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,21 @@ func hashData(r io.Reader) ([]byte, error) {

return h.Sum(nil), nil
}

// RestoreChunk stores a chunk for later reuse.
func (s Service) restoreChunk(ctx context.Context, hash []byte) (io.Reader, error) {
hashHex := make([]byte, hex.EncodedLen(len(hash)))
hex.Encode(hashHex, hash)

path := filepath.Join("chunks", string(hashHex))

object, err := s.objectStore.Client.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.objectStore.Bucket),
Key: aws.String(path),
})
if err != nil {
return &bytes.Buffer{}, err
}

return object.Body, nil
}

0 comments on commit 48de666

Please sign in to comment.