Skip to content

Commit

Permalink
added mutex in a per chunk base to avoid double upload
Browse files Browse the repository at this point in the history
  • Loading branch information
damoon committed Dec 9, 2021
1 parent 9be9fff commit 3deb221
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 35 deletions.
110 changes: 76 additions & 34 deletions cmd/d8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/restic/chunker"
Expand All @@ -40,10 +41,25 @@ const (
)

var (
gitHash string
gitRef = "latest"
gitHash string
gitRef = "latest"
uploadBottlenecks = MutexMap{}
)

type MutexMap struct {
mutexes sync.Map
}

func (mm *MutexMap) Lock(key []byte) func() {
value, _ := mm.mutexes.LoadOrStore(string(key), &sync.Mutex{})
mu := value.(*sync.Mutex)
mu.Lock()
unlock := func() {
mu.Unlock()
}
return unlock
}

func main() {
homeDir, err := os.UserHomeDir()
if err != nil {
Expand All @@ -58,6 +74,12 @@ func main() {
Name: "run",
Usage: "Connect to wedding server and set DOCKER_HOST for started process.",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "verbose",
Aliases: []string{"v"},
Usage: "Print verbose logs.",
EnvVars: []string{"WEDDING_VERBOSE"},
},
&cli.StringFlag{
Name: "kubeconfig",
Usage: "Kubeconfig file to use.",
Expand Down Expand Up @@ -113,6 +135,7 @@ func run(c *cli.Context) error {
return fmt.Errorf("command missing")
}

verbose := c.Bool("verbose")
kubeconfig := c.String("kubeconfig")
context := c.String("context")
namespace := c.String("namespace")
Expand All @@ -127,10 +150,10 @@ func run(c *cli.Context) error {
return fmt.Errorf("find wedding pod: %v", err)
}

localAddr, stopCh := portForward(pod, config)
localAddr, stopCh := portForward(pod, config, verbose)
defer close(stopCh)

localPort, err := localServer("http://" + localAddr)
localPort, err := localServer("http://"+localAddr, verbose)
if err != nil {
return fmt.Errorf("parse local address: %v", err)
}
Expand Down Expand Up @@ -232,7 +255,7 @@ PODS:
return nil
}

func portForward(pod *v1.Pod, cfg *rest.Config) (string, chan struct{}) {
func portForward(pod *v1.Pod, cfg *rest.Config, verbose bool) (string, chan struct{}) {
stopCh := make(chan struct{}, 1)

readyCh := make(chan struct{})
Expand All @@ -251,7 +274,9 @@ func portForward(pod *v1.Pod, cfg *rest.Config) (string, chan struct{}) {
addrCh <- addr
}
}
fmt.Println(ln)
if verbose {
fmt.Println(ln)
}
}
if err := scanner.Err(); err != nil {
log.Printf("reading from port forward logs: %v", err)
Expand Down Expand Up @@ -354,7 +379,7 @@ func executeCommand(args cli.Args, localAddr string) error {
return nil
}

func localServer(localAddr string) (int, error) {
func localServer(localAddr string, verbose bool) (int, error) {
targetURL, err := url.Parse(localAddr)
if err != nil {
return 0, err
Expand All @@ -363,7 +388,7 @@ func localServer(localAddr string) (int, error) {
proxy := httputil.NewSingleHostReverseProxy(targetURL)

mux := http.NewServeMux()
mux.HandleFunc("/", uploadContextHandlerFunc(proxy, localAddr))
mux.HandleFunc("/", uploadContextHandlerFunc(proxy, localAddr, verbose))

listener, err := net.Listen("tcp", ":0")
if err != nil {
Expand All @@ -375,7 +400,7 @@ func localServer(localAddr string) (int, error) {
return listener.Addr().(*net.TCPAddr).Port, nil
}

func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) http.HandlerFunc {
func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string, verbose bool) http.HandlerFunc {
re := regexp.MustCompile(`^/[^/]+/build$`)

return func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -384,11 +409,11 @@ func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) ht
return
}

chnker := chunker.New(r.Body, staticPol)
chunksList := bytes.Buffer{}
chunker := chunker.New(r.Body, staticPol)
chunksList := &bytes.Buffer{}

for {
c, err := chnker.Next(nil)
c, err := chunker.Next(nil)

if err == io.EOF {
break
Expand All @@ -400,28 +425,8 @@ func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) ht
return
}

hash, err := hashData(bytes.NewBuffer(c.Data))

chunksList.Write(hash)

found, err := chunkExists(r.Context(), localAddr+"/_chunks", hash)
if err != nil {
log.Printf("chunk #%d deduplication: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if found {
log.Printf("SKIP uploading %d bytes", c.Length)
continue
}

log.Printf("uploading %d bytes", c.Length)

err = uploadChunk(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data))
if err != nil {
log.Printf("uploading chunk #%d: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
ok := verifyChunk(w, r, chunksList, c, localAddr, verbose)
if !ok {
return
}
}
Expand All @@ -433,6 +438,43 @@ func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string) ht
}
}

func verifyChunk(w http.ResponseWriter, r *http.Request, chunksList *bytes.Buffer, c chunker.Chunk, localAddr string, verbose bool) bool {
hash, err := hashData(bytes.NewBuffer(c.Data))

// lock on hash to avoid race condition and double uploads
unlock := uploadBottlenecks.Lock(hash)
defer unlock()

chunksList.Write(hash)

found, err := chunkExists(r.Context(), localAddr+"/_chunks", hash)
if err != nil {
log.Printf("chunk #%d deduplication: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
return false
}

if found {
if verbose {
log.Printf("SKIP uploading %d bytes", c.Length)
}
return true
}

if verbose {
log.Printf("uploading %d bytes", c.Length)
}

err = uploadChunk(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data))
if err != nil {
log.Printf("uploading chunk #%d: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
return false
}

return true
}

func chunkExists(ctx context.Context, target string, hash []byte) (bool, error) {
client := &http.Client{
Transport: &http.Transport{
Expand Down
3 changes: 2 additions & 1 deletion tests/d8s-build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!bash
set -uexo pipefail

go run ../cmd/d8s run docker build -t wedding-d8s-build-test ./d8s-build -f ./d8s-build/Dockerfile
go run ../cmd/d8s run \
docker build -t wedding-d8s-build-test ./d8s-build -f ./d8s-build/Dockerfile

0 comments on commit 3deb221

Please sign in to comment.