Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xml/s3 bucket transfer support #96

Merged
merged 4 commits into from
Jan 24, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Fortio can be an http or grpc load generator, gathering statistics using the `lo

```
$ fortio
Φορτίο 0.6.4 usage:
Φορτίο 0.6.5 usage:
fortio command [flags] target
where command is one of: load (load testing), server (starts grpc ping and
http echo/ui/redirect servers), grpcping (grpc client), report (report only UI
Expand Down Expand Up @@ -124,6 +124,8 @@ tests) or host:port (grpc health test) and flags are:
Absolute path to the dir containing the static files dir
-stdclient
Use the slower net/http standard client (works for TLS)
-sync string
index.tsv or s3/gcs bucket xml URL to fetch at startup for server modes.
-t duration
How long to run the test or 0 to run until ^C (default 5s)
-ui-path string
Expand Down Expand Up @@ -236,6 +238,7 @@ For instance `curl -d abcdef http://localhost:8080/` returns `abcdef` back. It s
- Proxy/fetch other URLs
- `/fortio/data/index.tsv` an tab separated value file conforming to Google cloud storage [URL list data transfer format](https://cloud.google.com/storage/transfer/create-url-list) so you can export/backup local results to the cloud.
- Download/sync peer to peer JSON results files from other Fortio servers (using their `index.tsv` URLs)
- Download/sync from an Amazon S3 or Google Cloud compatible bucket listings [XML URLs](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html)

The `report` mode is a readonly subset of the above directly on `/`.

Expand Down
8 changes: 8 additions & 0 deletions fortio_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ var (
"Run for exactly this number of calls instead of duration. Default (0) is to use duration (-t). "+
"Default is 1 when used as grpc ping count.")
quietFlag = flag.Bool("quiet", false, "Quiet mode: sets the loglevel to Error and reduces the output.")
syncFlag = flag.String("sync", "", "index.tsv or s3/gcs bucket xml URL to fetch at startup for server modes.")
)

func main() {
Expand Down Expand Up @@ -139,6 +140,13 @@ func main() {
usage("Unable to extract percentiles from -p: ", err)
}

sync := strings.TrimSpace(*syncFlag)
if sync != "" {
if !ui.Sync(os.Stdout, sync) {
os.Exit(1)
}
}

switch command {
case "load":
fortioLoad()
Expand Down
2 changes: 1 addition & 1 deletion periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

const (
// Version is the overall package version (used to version json output too).
Version = "0.6.4"
Version = "0.6.5"
)

// DefaultRunnerOptions are the default values for options (do not mutate!).
Expand Down
3 changes: 2 additions & 1 deletion ui/templates/main.html
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ <h1>Φορτίο (fortio) v{{.Version}}{{if not .DoLoad}} control UI{{end}}</h1>
<p><i>Or</i></p>
<form action="sync">
<div>
Sync/download tsv from: <input type="text" name="url" value="" size=50/>
Sync/download tsv or xml s3 bucket data from:<br />
<input type="text" name="url" value="" size=50/>
</div>
</form>
<p><i>Or</i></p>
Expand Down
216 changes: 179 additions & 37 deletions ui/uihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/md5" // nolint: gas
"encoding/base64"
"encoding/json"
"encoding/xml"
"fmt"
"html"
"html/template"
Expand Down Expand Up @@ -564,6 +565,43 @@ func onBehalfOf(o *fhttp.HTTPOptions, r *http.Request) {
_ = o.AddAndValidateExtraHeader("X-On-Behalf-Of: " + r.RemoteAddr) // nolint: gas
}

// TODO: move tsv/xml sync handling to their own file (and possibly package)

// http.ResponseWriter + Flusher emulator
type outHTTPWriter struct {
CodePtr *int // Needed because that interface is somehow pass by value
Out io.Writer
header http.Header
}

func (o outHTTPWriter) Header() http.Header {
return o.header
}

func (o outHTTPWriter) Write(b []byte) (int, error) {
return o.Out.Write(b)
}

func (o outHTTPWriter) WriteHeader(code int) {
*o.CodePtr = code
o.Out.Write([]byte(fmt.Sprintf("\n*** result code: %d\n", code)))
}

func (o outHTTPWriter) Flush() {
// nothing
}

// Sync is the non http equivalent of fortio/sync?url=u.
func Sync(out io.Writer, u string) bool {
v := url.Values{}
v.Set("url", u)
req, _ := http.NewRequest("GET", "/sync-function?"+v.Encode(), nil)
code := http.StatusOK // default
w := outHTTPWriter{Out: out, CodePtr: &code}
SyncHandler(w, req)
return (code == http.StatusOK)
}

// SyncHandler handles syncing/downloading from tsv url.
func SyncHandler(w http.ResponseWriter, r *http.Request) {
LogRequest(r, "Sync")
Expand All @@ -572,15 +610,17 @@ func SyncHandler(w http.ResponseWriter, r *http.Request) {
log.Fatalf("expected http.ResponseWriter to be an http.Flusher")
}
uStr := strings.TrimSpace(r.FormValue("url"))
err := syncTemplate.Execute(w, &struct {
Version string
LogoPath string
URL string
}{periodic.Version, logoPath, uStr})
if err != nil {
log.Critf("Sync template execution failed: %v", err)
if syncTemplate != nil {
err := syncTemplate.Execute(w, &struct {
Version string
LogoPath string
URL string
}{periodic.Version, logoPath, uStr})
if err != nil {
log.Critf("Sync template execution failed: %v", err)
}
}
w.Write([]byte("Fetch of index url ... ")) // nolint: gas, errcheck
w.Write([]byte("Fetch of index/bucket url ... ")) // nolint: gas, errcheck
flusher.Flush()
o := fhttp.NewHTTPOptions(uStr)
onBehalfOf(o, r)
Expand All @@ -590,62 +630,164 @@ func SyncHandler(w http.ResponseWriter, r *http.Request) {
client := fhttp.NewStdClient(o)
if client == nil {
w.Write([]byte("invalid url!<script>setPB(1,1)</script></body></html>\n")) // nolint: gas, errcheck
w.WriteHeader(422 /*Unprocessable Entity*/)
return
}
code, data, _ := client.Fetch()
if code != http.StatusOK {
w.Write([]byte(fmt.Sprintf("http error, code %d<script>setPB(1,1)</script>", code))) // nolint: gas, errcheck
w.Write([]byte(fmt.Sprintf("http error, code %d<script>setPB(1,1)</script></body></html>\n", code))) // nolint: gas, errcheck
w.WriteHeader(424 /*Failed Dependency*/)
return
}
sdata := strings.TrimSpace(string(data))
if strings.HasPrefix(sdata, "TsvHttpData-1.0") {
processTSV(w, client, sdata)
} else {
sdata := strings.TrimSpace(string(data))
lines := strings.Split(sdata, "\n")
n := len(lines)
w.Write([]byte(fmt.Sprintf("success! fetching %d referenced URLs:<script>setPB(1,%d)</script>\n", n, n))) // nolint: gas, errcheck
w.Write([]byte("<table>")) // nolint: gas, errcheck
flusher.Flush()
for i, l := range lines[1:] {
parts := strings.Split(l, "\t")
u := parts[0]
w.Write([]byte("<tr><td>")) // nolint: gas, errcheck
w.Write([]byte(template.HTMLEscapeString(u))) // nolint: gas, errcheck
ur, err := url.Parse(u)
if err != nil {
w.Write([]byte("<td>skipped (not a valid url)")) // nolint: gas, errcheck
} else {
uPath := ur.Path
pathParts := strings.Split(uPath, "/")
name := pathParts[len(pathParts)-1]
downloadOne(w, client, name, u)
}
w.Write([]byte(fmt.Sprintf("</tr><script>setPB(%d)</script>\n", i+2))) // nolint: gas, errcheck
flusher.Flush()
if !processXML(w, client, data, uStr, 0) {
return
}
w.Write([]byte("</table>")) // nolint: gas, errcheck
}
w.Write([]byte("</table>")) // nolint: gas, errcheck
w.Write([]byte("\n</body></html>\n")) // nolint: gas, errcheck
}

func downloadOne(w io.Writer, client *fhttp.Client, name string, u string) {
func processTSV(w http.ResponseWriter, client *fhttp.Client, sdata string) {
flusher := w.(http.Flusher)
lines := strings.Split(sdata, "\n")
n := len(lines)
// nolint: gas, errcheck
w.Write([]byte(fmt.Sprintf("success tsv fetch! Now fetching %d referenced URLs:<script>setPB(1,%d)</script>\n",
n-1, n)))
w.Write([]byte("<table>")) // nolint: gas, errcheck
flusher.Flush()
for i, l := range lines[1:] {
parts := strings.Split(l, "\t")
u := parts[0]
w.Write([]byte("<tr><td>")) // nolint: gas, errcheck
w.Write([]byte(template.HTMLEscapeString(u))) // nolint: gas, errcheck
ur, err := url.Parse(u)
if err != nil {
w.Write([]byte("<td>skipped (not a valid url)")) // nolint: gas, errcheck
} else {
uPath := ur.Path
pathParts := strings.Split(uPath, "/")
name := pathParts[len(pathParts)-1]
downloadOne(w, client, name, u)
}
w.Write([]byte(fmt.Sprintf("</tr><script>setPB(%d)</script>\n", i+2))) // nolint: gas, errcheck
flusher.Flush()
}
}

// ListBucketResult is the minimum we need out of s3 xml results.
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
// e.g. https://storage.googleapis.com/fortio-data?max-keys=2&prefix=fortio.istio.io/
type ListBucketResult struct {
NextMarker string `xml:"NextMarker"`
Names []string `xml:"Contents>Key"`
}

// @returns true if started a table successfully - false is error
func processXML(w http.ResponseWriter, client *fhttp.Client, data []byte, baseURL string, level int) bool {
// We already know this parses as we just fetched it:
bu, _ := url.Parse(baseURL) // nolint: gas, errcheck
flusher := w.(http.Flusher)
l := ListBucketResult{}
err := xml.Unmarshal(data, &l)
if err != nil {
log.Errf("xml unmarshal error %v", err)
// don't show the error / would need html escape to avoid CSS attacks
w.Write([]byte("xml parsing error, check logs<script>setPB(1,1)</script></body></html>\n")) // nolint: gas, errcheck
w.WriteHeader(http.StatusInternalServerError)
return false
}
n := len(l.Names)
log.Infof("Parsed %+v", l)
// nolint: gas, errcheck
w.Write([]byte(fmt.Sprintf("success xml fetch #%d! Now fetching %d referenced objects:<script>setPB(1,%d)</script>\n",
level+1, n, n+1)))
if level == 0 {
w.Write([]byte("<table>")) // nolint: gas, errcheck
}
for i, el := range l.Names {
w.Write([]byte("<tr><td>")) // nolint: gas, errcheck
w.Write([]byte(template.HTMLEscapeString(el))) // nolint: gas, errcheck
pathParts := strings.Split(el, "/")
name := pathParts[len(pathParts)-1]
newURL := *bu // copy
newURL.Path = newURL.Path + "/" + el
fullURL := newURL.String()
downloadOne(w, client, name, fullURL)
w.Write([]byte(fmt.Sprintf("</tr><script>setPB(%d)</script>\n", i+2))) // nolint: gas, errcheck
flusher.Flush()
}
flusher.Flush()
// Is there more data ? (NextMarker present)
if len(l.NextMarker) == 0 {
return true
}
if level > 100 {
log.Errf("Too many chunks, stopping after 100")
w.WriteHeader(509 /* Bandwidth Limit Exceeded */)
return true
}
q := bu.Query()
if q.Get("marker") == l.NextMarker {
log.Errf("Loop with same marker %+v", bu)
w.WriteHeader(508 /* Loop Detected */)
return true
}
q.Set("marker", l.NextMarker)
bu.RawQuery = q.Encode()
newBaseURL := bu.String()
// url already validated
w.Write([]byte("<tr><td>")) // nolint: gas, errcheck
w.Write([]byte(template.HTMLEscapeString(newBaseURL))) // nolint: gas, errcheck
w.Write([]byte("<td>")) // nolint: gas, errcheck
_ = client.ChangeURL(newBaseURL) // nolint: gas
ncode, ndata, _ := client.Fetch()
if ncode != http.StatusOK {
log.Errf("Can't fetch continuation with marker %+v", bu)
// nolint: gas, errcheck
w.Write([]byte(fmt.Sprintf("http error, code %d<script>setPB(1,1)</script></table></body></html>\n", ncode)))
w.WriteHeader(424 /*Failed Dependency*/)
return false
}
return processXML(w, client, ndata, newBaseURL, level+1) // recurse
}

func downloadOne(w http.ResponseWriter, client *fhttp.Client, name string, u string) {
log.Infof("downloadOne(%s,%s)", name, u)
if !strings.HasSuffix(name, ".json") {
w.Write([]byte("<td>skipped (not json)")) // nolint: gas, errcheck
return
}
localPath := path.Join(dataDir, name)
if _, err := os.Stat(localPath); err == nil || !os.IsNotExist(err) {
log.Infof("check %s : %v", localPath, err)
w.Write([]byte("<td>skipped (already exist or other error)")) // nolint: gas, errcheck
_, err := os.Stat(localPath)
if err == nil {
w.Write([]byte("<td>skipped (already exists)")) // nolint: gas, errcheck
return
}
// note that if data dir doesn't exist this will trigger too - TODO: check datadir earlier
if !os.IsNotExist(err) {
log.Warnf("check %s : %v", localPath, err)
// don't return the details of the error to not leak local data dir etc
w.Write([]byte("<td>skipped (access error)")) // nolint: gas, errcheck
return
}
// url already validated
_ = client.ChangeURL(u) // nolint: gas
code1, data1, _ := client.Fetch()
if code1 != http.StatusOK {
w.Write([]byte(fmt.Sprintf("<td>Http error, code %d", code1))) // nolint: gas, errcheck
w.WriteHeader(424 /*Failed Dependency*/)
return
}
err := ioutil.WriteFile(localPath, data1, 0644)
err = ioutil.WriteFile(localPath, data1, 0644)
if err != nil {
log.Errf("Unable to save %s: %v", localPath, err)
w.Write([]byte("<td>skipped (write error)")) // nolint: gas, errcheck
w.WriteHeader(http.StatusInternalServerError)
return
}
// finally ! success !
Expand Down