Skip to content

Commit

Permalink
adds internal cached fetch (download) a file from remote url
Browse files Browse the repository at this point in the history
  • Loading branch information
danmux committed May 9, 2018
1 parent 64556fb commit 1274146
Show file tree
Hide file tree
Showing 23 changed files with 1,574 additions and 22 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/cavaliercoder/grab"
version = "2.0.0"
2 changes: 1 addition & 1 deletion cmd/floe/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func start(sc srvConf, conf []byte, addr chan string) error {
// TODO - implement other stores e.g. s3

q := &event.Queue{}
hub := hub.New(sc.HostName, sc.Tags, filepath.Join(sc.Root, "spaces"), sc.AdminToken, c, s, q)
hub := hub.New(sc.HostName, sc.Tags, sc.Root, sc.AdminToken, c, s, q)
server.AdminToken = sc.AdminToken

server.LaunchWeb(sc.Conf, c.Common.BaseURL, hub, q, addr, sc.WebDev)
Expand Down
41 changes: 29 additions & 12 deletions config/nodetype/exec_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nodetype

import (
"io/ioutil"
"strings"
"testing"
)
Expand Down Expand Up @@ -50,6 +51,14 @@ func TestCmdAndArgs(t *testing.T) {
}

func TestEnvVars(t *testing.T) {
opts := Opts{
"shell": "export",
"env": []string{"DAN=fart"},
}
testNode(t, "exe env vars", exec{}, opts, []string{`DAN="fart"`, `FLOEWS="."`})
}

func testNode(t *testing.T, msg string, nt NodeType, opts Opts, expected []string) bool {
op := make(chan string)
var out []string
captured := make(chan bool)
Expand All @@ -60,19 +69,21 @@ func TestEnvVars(t *testing.T) {
captured <- true
}()

e := exec{}
e.Execute(&Workspace{
BasePath: ".",
}, Opts{
"shell": "export",
"env": []string{"DAN=fart"},
}, op)
tmp, err := ioutil.TempDir("", "floe-test")
if err != nil {
t.Fatal("can't create tmp dir")
}

nt.Execute(&Workspace{
BasePath: ".",
FetchCache: tmp,
}, opts, op)

close(op)

<-captured

expected := []string{`DAN="fart"`, `FLOEWS="."`}
prob := false
for _, x := range expected {
found := false
for _, l := range out {
Expand All @@ -82,12 +93,18 @@ func TestEnvVars(t *testing.T) {
}
}
if !found {
t.Error("did not find env var:", x)
for _, o := range out {
println(o)
}
prob = true
t.Error(msg, "did not find expected:", x)
}
}
// output the output if there was a problem
if prob {
t.Log("cache is at:", tmp)
for _, o := range out {
t.Log(o)
}
}
return prob
}

func TestExpandEnvOpts(t *testing.T) {
Expand Down
121 changes: 121 additions & 0 deletions config/nodetype/fetch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package nodetype

import (
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"encoding/hex"
"fmt"
"hash"
"strings"
"time"

"github.com/cavaliercoder/grab"
)

type fetchOpts struct {
URL string `json:"url"` // url of the file to download
Checksum string `json:"checksum"` // the checksum (and typically filename)
ChecksumAlgo string `json:"checksum-algo"` // the checksum algorithm
Location string `json:"location"` // where to download
}

// fetch downloads stuff if it is not in the cache
type fetch struct{}

func (g fetch) Match(ol, or Opts) bool {
return true
}

func (g fetch) Execute(ws *Workspace, in Opts, output chan string) (int, Opts, error) {

fop := fetchOpts{}
err := decode(in, &fop)
if err != nil {
return 255, nil, err
}

if fop.URL == "" {
return 255, nil, fmt.Errorf("problem getting fetch url option")
}
if fop.Checksum == "" {
output <- "(N.B. fetch without a checksum can not be trusted)"
}
if fop.Location == "" {
fop.Location = "${ws}"
}
fop.Location = strings.Replace(fop.Location, "${ws}", ws.BasePath, -1)

client := grab.NewClient()
req, err := grab.NewRequest(ws.FetchCache, fop.URL)
if err != nil {
output <- fmt.Sprintf("Error setting up the download %v", err)
return 255, nil, err
}

// set up any checksum
if len(fop.Checksum) > 0 {
// is it in the sum filename format ...
// ba411cafee2f0f702572369da0b765e2 bodhi-4.1.0-64.iso
parts := strings.Split(fop.Checksum, " ")
if len(parts) > 1 {
fop.Checksum = parts[0]
}
checksum, err := hex.DecodeString(fop.Checksum)
if err != nil {
output <- fmt.Sprintf("Error decoding hex checksum: %s", fop.Checksum)
return 255, nil, err
}

var h hash.Hash
switch fop.ChecksumAlgo {
case "sha256":
h = sha256.New()
case "sha1":
h = sha1.New()
case "md5":
h = md5.New()
}
req.SetChecksum(h, checksum, true)
}

started := time.Now()
// start download
output <- fmt.Sprintf("Downloading %v...", req.URL())
resp := client.Do(req)
output <- fmt.Sprintf(" %v", resp.HTTPResponse.Status)

// start UI loop
t := time.NewTicker(300 * time.Millisecond)
defer t.Stop()

Loop:
for {
select {
case <-t.C:
output <- fmt.Sprintf(" %v / %v bytes (%.2f%%)", resp.BytesComplete(), resp.Size, 100*resp.Progress())
case <-resp.Done:
break Loop
}
}

// check for errors
if err := resp.Err(); err != nil {
output <- fmt.Sprintf("Download failed: %v", err)
} else {
output <- fmt.Sprintf(" %v / %v bytes (%.2f%%) in %v", resp.BytesComplete(), resp.Size, 100*resp.Progress(), time.Since(started))
output <- fmt.Sprintf("Download saved to %v", resp.Filename)
}

// TODO - copy to Location

// Output:
// Downloading http://www.golang-book.com/public/pdf/gobook.pdf...
// 200 OK
// transferred 42970 / 2893557 bytes (1.49%)
// transferred 1207474 / 2893557 bytes (41.73%)
// transferred 2758210 / 2893557 bytes (95.32%)
// Download saved to ./gobook.pdf

return 0, nil, nil
}
93 changes: 93 additions & 0 deletions config/nodetype/fetch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package nodetype

import (
"fmt"
"net"
"net/http"
"testing"
)

func TestFetch(t *testing.T) {
portCh := make(chan int)

go serveFiles(portCh)

port := <-portCh

success := []string{`(100.00%)`, `Downloading`, "200 OK"}
fail := []string{"404", "Not Found"}
fixtures := []struct {
url string
algo string
checksum string
expected []string
}{
{ // simple dl
url: fmt.Sprintf("http://127.0.0.1:%d/get-file.txt", port),
expected: success,
},
{ // dl with sha256 check
url: fmt.Sprintf("http://127.0.0.1:%d/get-file.txt", port),
algo: "sha256",
checksum: "864d6473d56d235de9ffb9d404e76f23e4d596ce77eae5b7ce5106f454fa7ee4 get-file.txt",
expected: success,
},
{ // dl with sha1 check
url: fmt.Sprintf("http://127.0.0.1:%d/get-file.txt", port),
algo: "sha1",
checksum: "bb3357153aa8e2c0b22fef75a7f21969abb7c2b4",
expected: success,
},
{ // dl with sha256 check
url: fmt.Sprintf("http://127.0.0.1:%d/get-file.txt", port),
algo: "md5",
checksum: "f35ff35df6efc82e474e97eaf10e7ff6",
expected: success,
},
{ // good dl bad checksum
url: fmt.Sprintf("http://127.0.0.1:%d/get-file.txt", port),
algo: "sha256",
checksum: "load-of bollox",
expected: []string{"Error", "hex", "checksum"},
},
{ // bad dl
url: fmt.Sprintf("http://127.0.0.1:%d/wont_be_found", port),
expected: fail,
},
{ // good external check
url: "https://dl.google.com/go/go1.10.2.src.tar.gz",
algo: "sha256",
checksum: "6264609c6b9cd8ed8e02ca84605d727ce1898d74efa79841660b2e3e985a98bd go1.10.2.src.tar.gz",
expected: success,
},
}

for i, fx := range fixtures {
opts := Opts{
"url": fx.url,
"checksum": fx.checksum,
"checksum-algo": fx.algo,
}
testNode(t, fmt.Sprintf("fetch test: %d", i), fetch{}, opts, fx.expected)
}
}

func sayHello(w http.ResponseWriter, r *http.Request) {
message := "this is a file with known hashes"
w.Write([]byte(message))
}

func serveFiles(portChan chan int) {

listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}

mux := http.NewServeMux()
mux.HandleFunc("/get-file.txt", sayHello)

portChan <- listener.Addr().(*net.TCPAddr).Port

http.Serve(listener, mux)
}
2 changes: 2 additions & 0 deletions config/nodetype/node_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
NtData NType = "data"
NtTimer NType = "timer"
NtExec NType = "exec"
NtFetch NType = "fetch"
NtGitMerge NType = "git-merge"
NtGitCheckout NType = "git-checkout"
)
Expand All @@ -26,6 +27,7 @@ var nts = map[NType]NodeType{
NtData: data{},
NtTimer: timer{},
NtExec: exec{},
NtFetch: fetch{},
NtGitMerge: gitMerge{},
NtGitCheckout: gitCheckout{},
}
Expand Down
5 changes: 3 additions & 2 deletions config/nodetype/opts.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package nodetype

// Workspace is anything specific to a workspace for a single run
// Workspace is anything specific to a workspace for a single run or any locations common between runs
type Workspace struct {
BasePath string
BasePath string //
FetchCache string
}

// Opts are the options on the node type that will be compared to those on the event
Expand Down
2 changes: 1 addition & 1 deletion hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestExecuteNode(t *testing.T) {
didExec := false
exec := func(ws *nt.Workspace, updates chan string) {
didExec = true
if ws.BasePath != "/foo/bar/testflow/ws/h1-5" {
if ws.BasePath != "/foo/bar/spaces/testflow/ws/h1-5" {
t.Errorf("base path is wrong <%s>", ws.BasePath)
}
}
Expand Down
Loading

0 comments on commit 1274146

Please sign in to comment.