From 30d7925ea18b8773b5c6c851fe54455020a05288 Mon Sep 17 00:00:00 2001 From: Dave Brophy Date: Tue, 31 Oct 2017 21:03:05 -0700 Subject: [PATCH] Fixed bug in httpworker, added gcpworker --- dummyworker/dummyworker.go | 9 ++-- gcsworker/gcsworker.go | 84 ++++++++++++++++++++++++++++++++++++++ httpworker/httpworker.go | 50 +++++++++++++++++------ main.go | 2 + 4 files changed, 129 insertions(+), 16 deletions(-) create mode 100644 gcsworker/gcsworker.go diff --git a/dummyworker/dummyworker.go b/dummyworker/dummyworker.go index dae5d22..ea1b246 100644 --- a/dummyworker/dummyworker.go +++ b/dummyworker/dummyworker.go @@ -9,9 +9,10 @@ import ( "fmt" + "errors" + "github.com/dave/blast/blaster" "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" ) func New() blaster.Worker { @@ -28,7 +29,7 @@ func (w *Worker) Start(ctx context.Context, raw map[string]interface{}) error { var config workerConfig if err := mapstructure.Decode(raw, &config); err != nil { - return errors.WithStack(err) + return err } w.base = config.Base @@ -46,7 +47,7 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[stri var payload payloadConfig if err := mapstructure.Decode(raw, &payload); err != nil { - return nil, errors.WithStack(err) + return map[string]interface{}{"status": "Error decoding payload"}, err } if w.print { @@ -74,7 +75,7 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[stri case context.Canceled: status = "Cancelled" default: - status = fmt.Sprintf("(%s)", err.Error()) + status = err.Error() } return map[string]interface{}{"status": status}, err } diff --git a/gcsworker/gcsworker.go b/gcsworker/gcsworker.go new file mode 100644 index 0000000..dc22929 --- /dev/null +++ b/gcsworker/gcsworker.go @@ -0,0 +1,84 @@ +package gcsworker + +import ( + "context" + + "net/http" + + "bytes" + + "errors" + + "net/url" + + "github.com/dave/blast/blaster" + "github.com/mitchellh/mapstructure" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" +) + +func New() blaster.Worker { + return &Worker{} +} + +type Worker struct { + client *http.Client +} + +func (w *Worker) Start(ctx context.Context, payload map[string]interface{}) error { + src, err := google.DefaultTokenSource(ctx) + if err != nil { + return err + } + w.client = oauth2.NewClient(ctx, src) + return nil +} + +func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[string]interface{}, error) { + + var payload def + if err := mapstructure.Decode(raw, &payload); err != nil { + return map[string]interface{}{"status": "Error decoding payload"}, err + } + + request, err := http.NewRequest(payload.Method, payload.Url, bytes.NewBufferString(payload.Body)) + if err != nil { + return map[string]interface{}{"status": "Error creating request"}, err + } + + request = request.WithContext(ctx) + + for k, v := range payload.Headers { + request.Header.Add(k, v) + } + + response, err := w.client.Do(request) + if err != nil { + var status interface{} + ue, ok := err.(*url.Error) + switch { + case response != nil: + status = response.StatusCode + case ok && ue.Err == context.DeadlineExceeded: + status = "Timeout" + case ok && ue.Err == context.Canceled: + status = "Cancelled" + case ok: + status = ue.Err.Error() + default: + status = err.Error() + } + return map[string]interface{}{"status": status}, err + } + if response.StatusCode != 200 { + return map[string]interface{}{"status": response.StatusCode}, errors.New("non 200 status") + } + return map[string]interface{}{"status": 200}, nil +} + +type def struct { + Method string `mapstructure:"method"` + Url string `mapstructure:"url"` + Body string `mapstructure:"body"` + Headers map[string]string `mapstructure:"headers"` +} diff --git a/httpworker/httpworker.go b/httpworker/httpworker.go index d4d5e82..c94085b 100644 --- a/httpworker/httpworker.go +++ b/httpworker/httpworker.go @@ -7,9 +7,12 @@ import ( "bytes" + "net/url" + + "errors" + "github.com/dave/blast/blaster" "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" ) func New() blaster.Worker { @@ -18,28 +21,51 @@ func New() blaster.Worker { type Worker struct{} -func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (response map[string]interface{}, err error) { +func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[string]interface{}, error) { + var payload def - if err := mapstructure.Decode(&payload, raw); err != nil { - return nil, errors.WithStack(err) + if err := mapstructure.Decode(raw, &payload); err != nil { + return map[string]interface{}{"status": "Error decoding payload"}, err } + request, err := http.NewRequest(payload.Method, payload.Url, bytes.NewBufferString(payload.Body)) if err != nil { - return nil, errors.WithStack(err) + return map[string]interface{}{"status": "Error creating request"}, err } + request = request.WithContext(ctx) - r, err := http.DefaultClient.Do(request) + + for k, v := range payload.Headers { + request.Header.Add(k, v) + } + + response, err := http.DefaultClient.Do(request) if err != nil { - return map[string]interface{}{"status": r.StatusCode}, errors.WithStack(err) + var status interface{} + ue, ok := err.(*url.Error) + switch { + case response != nil: + status = response.StatusCode + case ok && ue.Err == context.DeadlineExceeded: + status = "Timeout" + case ok && ue.Err == context.Canceled: + status = "Cancelled" + case ok: + status = ue.Err.Error() + default: + status = err.Error() + } + return map[string]interface{}{"status": status}, err } - if r.StatusCode != 200 { - return map[string]interface{}{"status": r.StatusCode}, errors.New("Non 200 status") + if response.StatusCode != 200 { + return map[string]interface{}{"status": response.StatusCode}, errors.New("non 200 status") } return map[string]interface{}{"status": 200}, nil } type def struct { - Method string `mapstructure:"method"` - Url string `mapstructure:"url"` - Body string `mapstructure:"body"` + Method string `mapstructure:"method"` + Url string `mapstructure:"url"` + Body string `mapstructure:"body"` + Headers map[string]string `mapstructure:"headers"` } diff --git a/main.go b/main.go index 6723f69..6022bb3 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "github.com/dave/blast/blaster" "github.com/dave/blast/dummyworker" + "github.com/dave/blast/gcsworker" "github.com/dave/blast/httpworker" ) @@ -24,6 +25,7 @@ func main() { b.RegisterWorkerType("dummy", dummyworker.New) b.RegisterWorkerType("http", httpworker.New) + b.RegisterWorkerType("gcs", gcsworker.New) if err := b.Start(ctx); err != nil { if DEBUG {