Skip to content

Commit

Permalink
removed code for context dedup / dinner deployment
Browse files Browse the repository at this point in the history
  • Loading branch information
damoon committed Jan 10, 2022
1 parent 4d0ae8d commit 37815f7
Show file tree
Hide file tree
Showing 28 changed files with 61 additions and 1,453 deletions.
41 changes: 0 additions & 41 deletions Tiltfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,4 @@ disable_snapshots()
analytics_settings(enable=False)
allow_k8s_contexts(os.getenv("TILT_ALLOW_CONTEXT"))

include('./service-dependencies/Tiltfile')
include('./tests/Tiltfile')

k8s_yaml('deployment/kubernetes.yaml')

target='prod'
live_update=[]
if os.environ.get('PROD', '') == '':
target='build-env'
live_update=[
sync('pkg', '/app/pkg'),
sync('cmd', '/app/cmd'),
sync('go.mod', '/app/go.mod'),
sync('go.sum', '/app/go.sum'),
run('go install -v ./cmd/dinner'),
]

docker_build(
'davedamoon/dinner:latest',
'.',
dockerfile='deployment/Dockerfile',
target=target,
build_args={"SOURCE_BRANCH":"development", "SOURCE_COMMIT":"development"},
only=[ 'go.mod'
, 'go.sum'
, 'pkg'
, 'cmd'
, 'deployment'
],
ignore=[ '.git'
, '*/*_test.go'
, 'deployment/kubernetes.yaml'
],
live_update=live_update,
)

k8s_resource(
'dinner',
port_forwards=['12375:2375'],
resource_deps=['minio-buckets', 'dind'],
labels=["application"],
)
213 changes: 11 additions & 202 deletions cmd/d8s/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,11 @@ package main

import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/exec"
Expand Down Expand Up @@ -41,9 +36,8 @@ const (
)

var (
gitHash string
gitRef = "latest"
uploadBottlenecks = MutexMap{}
gitHash string
gitRef = "latest"
)

type MutexMap struct {
Expand Down Expand Up @@ -71,8 +65,8 @@ func main() {
Usage: "The client for dinner.",
Commands: []*cli.Command{
{
Name: "run",
Usage: "Connect to dinner server and set DOCKER_HOST for started process.",
Name: "up",
Usage: "Connect to docker in docker and set DOCKER_HOST for started process.",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "verbose",
Expand All @@ -97,7 +91,7 @@ func main() {
EnvVars: []string{"D8S_NAMESPACE"},
},
},
Action: run,
Action: up,
},
{
Name: "version",
Expand Down Expand Up @@ -129,7 +123,7 @@ func version(c *cli.Context) error {
return nil
}

func run(c *cli.Context) error {
func up(c *cli.Context) error {
args := c.Args()
if args.First() == "" {
return fmt.Errorf("command missing")
Expand All @@ -153,12 +147,7 @@ func run(c *cli.Context) error {
localAddr, stopCh := portForward(pod, config, verbose)
defer close(stopCh)

localPort, err := localServer("http://"+localAddr, verbose)
if err != nil {
return fmt.Errorf("parse local address: %v", err)
}

err = executeCommand(c.Args(), fmt.Sprintf("127.0.0.1:%d", localPort))
err = executeCommand(c.Args(), localAddr)
if err != nil {
return fmt.Errorf("command failed with %s", err)
}
Expand All @@ -168,6 +157,10 @@ func run(c *cli.Context) error {

// https://stackoverflow.com/questions/50435564/use-kubectl-context-in-kubernetes-client-go
func setupKubernetesClient(kubeconfig, context, namespace string) (*kubernetes.Clientset, *rest.Config, string, string, error) {
// TODO: verify kubernetes context is ok
// https://github.com/tilt-dev/tilt/blob/fe386b5cc967383972bf73f8cbe6514c604100f8/internal/k8s/env.go#L38
// https://github.com/turbine-kreuzberg/dind-nurse/blob/main/Tiltfile#L3

configLoader := clientcmd.NewDefaultClientConfigLoadingRules()

configLoader.ExplicitPath = kubeconfig
Expand Down Expand Up @@ -378,187 +371,3 @@ func executeCommand(args cli.Args, localAddr string) error {

return nil
}

func localServer(localAddr string, verbose bool) (int, error) {
targetURL, err := url.Parse(localAddr)
if err != nil {
return 0, err
}

proxy := httputil.NewSingleHostReverseProxy(targetURL)

mux := http.NewServeMux()
mux.HandleFunc("/", uploadContextHandlerFunc(proxy, localAddr, verbose))

listener, err := net.Listen("tcp", ":0")
if err != nil {
return 0, err
}

go http.Serve(listener, mux)

return listener.Addr().(*net.TCPAddr).Port, nil
}

func uploadContextHandlerFunc(proxy *httputil.ReverseProxy, localAddr string, verbose bool) http.HandlerFunc {
re := regexp.MustCompile(`^/[^/]+/build$`)

return func(w http.ResponseWriter, r *http.Request) {
if !re.MatchString(r.URL.Path) {
proxy.ServeHTTP(w, r)
return
}

proxy.ServeHTTP(w, r)
return

chunker := chunker.New(r.Body, staticPol)
chunksList := &bytes.Buffer{}

for {
c, err := chunker.Next(nil)

if err == io.EOF {
break
}

if err != nil {
log.Printf("searching next chunk: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

ok := verifyChunk(w, r, chunksList, c, localAddr, verbose)
if !ok {
return
}
}

r.Body = io.NopCloser(bytes.NewReader(chunksList.Bytes()))
r.Header.Add("d8s-chunked", "true")

proxy.ServeHTTP(w, r)
}
}

func verifyChunk(w http.ResponseWriter, r *http.Request, chunksList *bytes.Buffer, c chunker.Chunk, localAddr string, verbose bool) bool {
hash, err := hashData(bytes.NewBuffer(c.Data))

// lock on hash to avoid race condition and double uploads
unlock := uploadBottlenecks.Lock(hash)
defer unlock()

chunksList.Write(hash)

found, err := chunkExists(r.Context(), localAddr+"/_chunks", hash)
if err != nil {
log.Printf("chunk #%d deduplication: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
return false
}

if found {
if verbose {
log.Printf("SKIP uploading %d bytes", c.Length)
}
return true
}

if verbose {
log.Printf("uploading %d bytes", c.Length)
}

err = uploadChunk(r.Context(), localAddr+"/_chunks", bytes.NewBuffer(c.Data))
if err != nil {
log.Printf("uploading chunk #%d: %v", c.Cut, err)
w.WriteHeader(http.StatusInternalServerError)
return false
}

return true
}

func chunkExists(ctx context.Context, target string, hash []byte) (bool, error) {
client := &http.Client{
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
if err != nil {
return false, fmt.Errorf("create request for chunk deduplication: %v", err)
}

hashHex := make([]byte, hex.EncodedLen(len(hash)))
hex.Encode(hashHex, hash)

q := req.URL.Query()
q.Add("hash", string(hashHex))
req.URL.RawQuery = q.Encode()

resp, err := client.Do(req)
if err != nil {
return false, fmt.Errorf("chunk deduplication request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusOK {
return true, nil
}

if resp.StatusCode == http.StatusNotFound {
return false, nil
}

return false, fmt.Errorf("chunk deduplication request returned status code %d", resp.StatusCode)

}

func hashData(r io.Reader) ([]byte, error) {
h := sha256.New()

_, err := io.Copy(h, r)
if err != nil {
return []byte{}, err
}

return h.Sum(nil), nil
}

func uploadChunk(ctx context.Context, target string, data io.Reader) error {
client := &http.Client{
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
},
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, data)
if err != nil {
return fmt.Errorf("create request for chunk upload: %v", err)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("chunk upload: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("chunk upload returned status code %d", resp.StatusCode)
}

return nil
}
Loading

0 comments on commit 37815f7

Please sign in to comment.