Skip to content

Commit

Permalink
Xml/s3 bucket transfer support (#96)
Browse files Browse the repository at this point in the history
* Xml/s3 bucket transfer support

example/test:
https://storage.googleapis.com/fortio-data?max-keys=2&prefix=istio.fortio.io/

(This feature is pretty big and would be 0.7.0 but… trying to not get
too much ahead of the monthly istio cycle)

* Adding -sync command line flag to the fortio server command

Fixes #92
As we can now add -sync to yaml to load data on startup
  • Loading branch information
ldemailly committed Jan 24, 2018
1 parent 785fb3b commit 44d1e6e
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 40 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Fortio can be an http or grpc load generator, gathering statistics using the `lo

```
$ fortio
Φορτίο 0.6.4 usage:
Φορτίο 0.6.5 usage:
fortio command [flags] target
where command is one of: load (load testing), server (starts grpc ping and
http echo/ui/redirect servers), grpcping (grpc client), report (report only UI
Expand Down Expand Up @@ -124,6 +124,8 @@ tests) or host:port (grpc health test) and flags are:
Absolute path to the dir containing the static files dir
-stdclient
Use the slower net/http standard client (works for TLS)
-sync string
index.tsv or s3/gcs bucket xml URL to fetch at startup for server modes.
-t duration
How long to run the test or 0 to run until ^C (default 5s)
-ui-path string
Expand Down Expand Up @@ -236,6 +238,7 @@ For instance `curl -d abcdef http://localhost:8080/` returns `abcdef` back. It s
- Proxy/fetch other URLs
- `/fortio/data/index.tsv` an tab separated value file conforming to Google cloud storage [URL list data transfer format](https://cloud.google.com/storage/transfer/create-url-list) so you can export/backup local results to the cloud.
- Download/sync peer to peer JSON results files from other Fortio servers (using their `index.tsv` URLs)
- Download/sync from an Amazon S3 or Google Cloud compatible bucket listings [XML URLs](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html)

The `report` mode is a readonly subset of the above directly on `/`.

Expand Down
8 changes: 8 additions & 0 deletions fortio_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ var (
"Run for exactly this number of calls instead of duration. Default (0) is to use duration (-t). "+
"Default is 1 when used as grpc ping count.")
quietFlag = flag.Bool("quiet", false, "Quiet mode: sets the loglevel to Error and reduces the output.")
syncFlag = flag.String("sync", "", "index.tsv or s3/gcs bucket xml URL to fetch at startup for server modes.")
)

func main() {
Expand Down Expand Up @@ -139,6 +140,13 @@ func main() {
usage("Unable to extract percentiles from -p: ", err)
}

sync := strings.TrimSpace(*syncFlag)
if sync != "" {
if !ui.Sync(os.Stdout, sync) {
os.Exit(1)
}
}

switch command {
case "load":
fortioLoad()
Expand Down
2 changes: 1 addition & 1 deletion periodic/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

const (
// Version is the overall package version (used to version json output too).
Version = "0.6.4"
Version = "0.6.5"
)

// DefaultRunnerOptions are the default values for options (do not mutate!).
Expand Down
3 changes: 2 additions & 1 deletion ui/templates/main.html
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ <h1>Φορτίο (fortio) v{{.Version}}{{if not .DoLoad}} control UI{{end}}</h1>
<p><i>Or</i></p>
<form action="sync">
<div>
Sync/download tsv from: <input type="text" name="url" value="" size=50/>
Sync/download tsv or xml s3 bucket data from:<br />
<input type="text" name="url" value="" size=50/>
</div>
</form>
<p><i>Or</i></p>
Expand Down
217 changes: 180 additions & 37 deletions ui/uihandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/md5" // nolint: gas
"encoding/base64"
"encoding/json"
"encoding/xml"
"fmt"
"html"
"html/template"
Expand Down Expand Up @@ -564,6 +565,44 @@ func onBehalfOf(o *fhttp.HTTPOptions, r *http.Request) {
_ = o.AddAndValidateExtraHeader("X-On-Behalf-Of: " + r.RemoteAddr) // nolint: gas
}

// TODO: move tsv/xml sync handling to their own file (and possibly package)

// http.ResponseWriter + Flusher emulator - if we refactor the code this should
// not be needed. on the other hand it's useful and could be reused.
type outHTTPWriter struct {
CodePtr *int // Needed because that interface is somehow pass by value
Out io.Writer
header http.Header
}

func (o outHTTPWriter) Header() http.Header {
return o.header
}

func (o outHTTPWriter) Write(b []byte) (int, error) {
return o.Out.Write(b)
}

func (o outHTTPWriter) WriteHeader(code int) {
*o.CodePtr = code
o.Out.Write([]byte(fmt.Sprintf("\n*** result code: %d\n", code))) // nolint: gas, errcheck
}

func (o outHTTPWriter) Flush() {
// nothing
}

// Sync is the non http equivalent of fortio/sync?url=u.
func Sync(out io.Writer, u string) bool {
v := url.Values{}
v.Set("url", u)
req, _ := http.NewRequest("GET", "/sync-function?"+v.Encode(), nil) // nolint: gas
code := http.StatusOK // default
w := outHTTPWriter{Out: out, CodePtr: &code}
SyncHandler(w, req)
return (code == http.StatusOK)
}

// SyncHandler handles syncing/downloading from tsv url.
func SyncHandler(w http.ResponseWriter, r *http.Request) {
LogRequest(r, "Sync")
Expand All @@ -572,15 +611,17 @@ func SyncHandler(w http.ResponseWriter, r *http.Request) {
log.Fatalf("expected http.ResponseWriter to be an http.Flusher")
}
uStr := strings.TrimSpace(r.FormValue("url"))
err := syncTemplate.Execute(w, &struct {
Version string
LogoPath string
URL string
}{periodic.Version, logoPath, uStr})
if err != nil {
log.Critf("Sync template execution failed: %v", err)
if syncTemplate != nil {
err := syncTemplate.Execute(w, &struct {
Version string
LogoPath string
URL string
}{periodic.Version, logoPath, uStr})
if err != nil {
log.Critf("Sync template execution failed: %v", err)
}
}
w.Write([]byte("Fetch of index url ... ")) // nolint: gas, errcheck
w.Write([]byte("Fetch of index/bucket url ... ")) // nolint: gas, errcheck
flusher.Flush()
o := fhttp.NewHTTPOptions(uStr)
onBehalfOf(o, r)
Expand All @@ -590,62 +631,164 @@ func SyncHandler(w http.ResponseWriter, r *http.Request) {
client := fhttp.NewStdClient(o)
if client == nil {
w.Write([]byte("invalid url!<script>setPB(1,1)</script></body></html>\n")) // nolint: gas, errcheck
w.WriteHeader(422 /*Unprocessable Entity*/)
return
}
code, data, _ := client.Fetch()
if code != http.StatusOK {
w.Write([]byte(fmt.Sprintf("http error, code %d<script>setPB(1,1)</script>", code))) // nolint: gas, errcheck
w.Write([]byte(fmt.Sprintf("http error, code %d<script>setPB(1,1)</script></body></html>\n", code))) // nolint: gas, errcheck
w.WriteHeader(424 /*Failed Dependency*/)
return
}
sdata := strings.TrimSpace(string(data))
if strings.HasPrefix(sdata, "TsvHttpData-1.0") {
processTSV(w, client, sdata)
} else {
sdata := strings.TrimSpace(string(data))
lines := strings.Split(sdata, "\n")
n := len(lines)
w.Write([]byte(fmt.Sprintf("success! fetching %d referenced URLs:<script>setPB(1,%d)</script>\n", n, n))) // nolint: gas, errcheck
w.Write([]byte("<table>")) // nolint: gas, errcheck
flusher.Flush()
for i, l := range lines[1:] {
parts := strings.Split(l, "\t")
u := parts[0]
w.Write([]byte("<tr><td>")) // nolint: gas, errcheck
w.Write([]byte(template.HTMLEscapeString(u))) // nolint: gas, errcheck
ur, err := url.Parse(u)
if err != nil {
w.Write([]byte("<td>skipped (not a valid url)")) // nolint: gas, errcheck
} else {
uPath := ur.Path
pathParts := strings.Split(uPath, "/")
name := pathParts[len(pathParts)-1]
downloadOne(w, client, name, u)
}
w.Write([]byte(fmt.Sprintf("</tr><script>setPB(%d)</script>\n", i+2))) // nolint: gas, errcheck
flusher.Flush()
if !processXML(w, client, data, uStr, 0) {
return
}
w.Write([]byte("</table>")) // nolint: gas, errcheck
}
w.Write([]byte("</table>")) // nolint: gas, errcheck
w.Write([]byte("\n</body></html>\n")) // nolint: gas, errcheck
}

func downloadOne(w io.Writer, client *fhttp.Client, name string, u string) {
func processTSV(w http.ResponseWriter, client *fhttp.Client, sdata string) {
flusher := w.(http.Flusher)
lines := strings.Split(sdata, "\n")
n := len(lines)
// nolint: gas, errcheck
w.Write([]byte(fmt.Sprintf("success tsv fetch! Now fetching %d referenced URLs:<script>setPB(1,%d)</script>\n",
n-1, n)))
w.Write([]byte("<table>")) // nolint: gas, errcheck
flusher.Flush()
for i, l := range lines[1:] {
parts := strings.Split(l, "\t")
u := parts[0]
w.Write([]byte("<tr><td>")) // nolint: gas, errcheck
w.Write([]byte(template.HTMLEscapeString(u))) // nolint: gas, errcheck
ur, err := url.Parse(u)
if err != nil {
w.Write([]byte("<td>skipped (not a valid url)")) // nolint: gas, errcheck
} else {
uPath := ur.Path
pathParts := strings.Split(uPath, "/")
name := pathParts[len(pathParts)-1]
downloadOne(w, client, name, u)
}
w.Write([]byte(fmt.Sprintf("</tr><script>setPB(%d)</script>\n", i+2))) // nolint: gas, errcheck
flusher.Flush()
}
}

// ListBucketResult is the minimum we need out of s3 xml results.
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
// e.g. https://storage.googleapis.com/fortio-data?max-keys=2&prefix=fortio.istio.io/
type ListBucketResult struct {
NextMarker string `xml:"NextMarker"`
Names []string `xml:"Contents>Key"`
}

// @returns true if started a table successfully - false is error
func processXML(w http.ResponseWriter, client *fhttp.Client, data []byte, baseURL string, level int) bool {
// We already know this parses as we just fetched it:
bu, _ := url.Parse(baseURL) // nolint: gas, errcheck
flusher := w.(http.Flusher)
l := ListBucketResult{}
err := xml.Unmarshal(data, &l)
if err != nil {
log.Errf("xml unmarshal error %v", err)
// don't show the error / would need html escape to avoid CSS attacks
w.Write([]byte("xml parsing error, check logs<script>setPB(1,1)</script></body></html>\n")) // nolint: gas, errcheck
w.WriteHeader(http.StatusInternalServerError)
return false
}
n := len(l.Names)
log.Infof("Parsed %+v", l)
// nolint: gas, errcheck
w.Write([]byte(fmt.Sprintf("success xml fetch #%d! Now fetching %d referenced objects:<script>setPB(1,%d)</script>\n",
level+1, n, n+1)))
if level == 0 {
w.Write([]byte("<table>")) // nolint: gas, errcheck
}
for i, el := range l.Names {
w.Write([]byte("<tr><td>")) // nolint: gas, errcheck
w.Write([]byte(template.HTMLEscapeString(el))) // nolint: gas, errcheck
pathParts := strings.Split(el, "/")
name := pathParts[len(pathParts)-1]
newURL := *bu // copy
newURL.Path = newURL.Path + "/" + el
fullURL := newURL.String()
downloadOne(w, client, name, fullURL)
w.Write([]byte(fmt.Sprintf("</tr><script>setPB(%d)</script>\n", i+2))) // nolint: gas, errcheck
flusher.Flush()
}
flusher.Flush()
// Is there more data ? (NextMarker present)
if len(l.NextMarker) == 0 {
return true
}
if level > 100 {
log.Errf("Too many chunks, stopping after 100")
w.WriteHeader(509 /* Bandwidth Limit Exceeded */)
return true
}
q := bu.Query()
if q.Get("marker") == l.NextMarker {
log.Errf("Loop with same marker %+v", bu)
w.WriteHeader(508 /* Loop Detected */)
return true
}
q.Set("marker", l.NextMarker)
bu.RawQuery = q.Encode()
newBaseURL := bu.String()
// url already validated
w.Write([]byte("<tr><td>")) // nolint: gas, errcheck
w.Write([]byte(template.HTMLEscapeString(newBaseURL))) // nolint: gas, errcheck
w.Write([]byte("<td>")) // nolint: gas, errcheck
_ = client.ChangeURL(newBaseURL) // nolint: gas
ncode, ndata, _ := client.Fetch()
if ncode != http.StatusOK {
log.Errf("Can't fetch continuation with marker %+v", bu)
// nolint: gas, errcheck
w.Write([]byte(fmt.Sprintf("http error, code %d<script>setPB(1,1)</script></table></body></html>\n", ncode)))
w.WriteHeader(424 /*Failed Dependency*/)
return false
}
return processXML(w, client, ndata, newBaseURL, level+1) // recurse
}

func downloadOne(w http.ResponseWriter, client *fhttp.Client, name string, u string) {
log.Infof("downloadOne(%s,%s)", name, u)
if !strings.HasSuffix(name, ".json") {
w.Write([]byte("<td>skipped (not json)")) // nolint: gas, errcheck
return
}
localPath := path.Join(dataDir, name)
if _, err := os.Stat(localPath); err == nil || !os.IsNotExist(err) {
log.Infof("check %s : %v", localPath, err)
w.Write([]byte("<td>skipped (already exist or other error)")) // nolint: gas, errcheck
_, err := os.Stat(localPath)
if err == nil {
w.Write([]byte("<td>skipped (already exists)")) // nolint: gas, errcheck
return
}
// note that if data dir doesn't exist this will trigger too - TODO: check datadir earlier
if !os.IsNotExist(err) {
log.Warnf("check %s : %v", localPath, err)
// don't return the details of the error to not leak local data dir etc
w.Write([]byte("<td>skipped (access error)")) // nolint: gas, errcheck
return
}
// url already validated
_ = client.ChangeURL(u) // nolint: gas
code1, data1, _ := client.Fetch()
if code1 != http.StatusOK {
w.Write([]byte(fmt.Sprintf("<td>Http error, code %d", code1))) // nolint: gas, errcheck
w.WriteHeader(424 /*Failed Dependency*/)
return
}
err := ioutil.WriteFile(localPath, data1, 0644)
err = ioutil.WriteFile(localPath, data1, 0644)
if err != nil {
log.Errf("Unable to save %s: %v", localPath, err)
w.Write([]byte("<td>skipped (write error)")) // nolint: gas, errcheck
w.WriteHeader(http.StatusInternalServerError)
return
}
// finally ! success !
Expand Down

0 comments on commit 44d1e6e

Please sign in to comment.