Skip to content

Commit

Permalink
Fixed bug in httpworker, added gcpworker
Browse files Browse the repository at this point in the history
  • Loading branch information
dave committed Nov 1, 2017
1 parent 70fdcb5 commit 30d7925
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 16 deletions.
9 changes: 5 additions & 4 deletions dummyworker/dummyworker.go
Expand Up @@ -9,9 +9,10 @@ import (

"fmt"

"errors"

"github.com/dave/blast/blaster"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

func New() blaster.Worker {
Expand All @@ -28,7 +29,7 @@ func (w *Worker) Start(ctx context.Context, raw map[string]interface{}) error {

var config workerConfig
if err := mapstructure.Decode(raw, &config); err != nil {
return errors.WithStack(err)
return err
}

w.base = config.Base
Expand All @@ -46,7 +47,7 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[stri

var payload payloadConfig
if err := mapstructure.Decode(raw, &payload); err != nil {
return nil, errors.WithStack(err)
return map[string]interface{}{"status": "Error decoding payload"}, err
}

if w.print {
Expand Down Expand Up @@ -74,7 +75,7 @@ func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[stri
case context.Canceled:
status = "Cancelled"
default:
status = fmt.Sprintf("(%s)", err.Error())
status = err.Error()
}
return map[string]interface{}{"status": status}, err
}
Expand Down
84 changes: 84 additions & 0 deletions gcsworker/gcsworker.go
@@ -0,0 +1,84 @@
package gcsworker

import (
"context"

"net/http"

"bytes"

"errors"

"net/url"

"github.com/dave/blast/blaster"
"github.com/mitchellh/mapstructure"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
)

func New() blaster.Worker {
return &Worker{}
}

type Worker struct {
client *http.Client
}

func (w *Worker) Start(ctx context.Context, payload map[string]interface{}) error {
src, err := google.DefaultTokenSource(ctx)
if err != nil {
return err
}
w.client = oauth2.NewClient(ctx, src)
return nil
}

func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[string]interface{}, error) {

var payload def
if err := mapstructure.Decode(raw, &payload); err != nil {
return map[string]interface{}{"status": "Error decoding payload"}, err
}

request, err := http.NewRequest(payload.Method, payload.Url, bytes.NewBufferString(payload.Body))
if err != nil {
return map[string]interface{}{"status": "Error creating request"}, err
}

request = request.WithContext(ctx)

for k, v := range payload.Headers {
request.Header.Add(k, v)
}

response, err := w.client.Do(request)
if err != nil {
var status interface{}
ue, ok := err.(*url.Error)
switch {
case response != nil:
status = response.StatusCode
case ok && ue.Err == context.DeadlineExceeded:
status = "Timeout"
case ok && ue.Err == context.Canceled:
status = "Cancelled"
case ok:
status = ue.Err.Error()
default:
status = err.Error()
}
return map[string]interface{}{"status": status}, err
}
if response.StatusCode != 200 {
return map[string]interface{}{"status": response.StatusCode}, errors.New("non 200 status")
}
return map[string]interface{}{"status": 200}, nil
}

type def struct {
Method string `mapstructure:"method"`
Url string `mapstructure:"url"`
Body string `mapstructure:"body"`
Headers map[string]string `mapstructure:"headers"`
}
50 changes: 38 additions & 12 deletions httpworker/httpworker.go
Expand Up @@ -7,9 +7,12 @@ import (

"bytes"

"net/url"

"errors"

"github.com/dave/blast/blaster"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

func New() blaster.Worker {
Expand All @@ -18,28 +21,51 @@ func New() blaster.Worker {

type Worker struct{}

func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (response map[string]interface{}, err error) {
func (w *Worker) Send(ctx context.Context, raw map[string]interface{}) (map[string]interface{}, error) {

var payload def
if err := mapstructure.Decode(&payload, raw); err != nil {
return nil, errors.WithStack(err)
if err := mapstructure.Decode(raw, &payload); err != nil {
return map[string]interface{}{"status": "Error decoding payload"}, err
}

request, err := http.NewRequest(payload.Method, payload.Url, bytes.NewBufferString(payload.Body))
if err != nil {
return nil, errors.WithStack(err)
return map[string]interface{}{"status": "Error creating request"}, err
}

request = request.WithContext(ctx)
r, err := http.DefaultClient.Do(request)

for k, v := range payload.Headers {
request.Header.Add(k, v)
}

response, err := http.DefaultClient.Do(request)
if err != nil {
return map[string]interface{}{"status": r.StatusCode}, errors.WithStack(err)
var status interface{}
ue, ok := err.(*url.Error)
switch {
case response != nil:
status = response.StatusCode
case ok && ue.Err == context.DeadlineExceeded:
status = "Timeout"
case ok && ue.Err == context.Canceled:
status = "Cancelled"
case ok:
status = ue.Err.Error()
default:
status = err.Error()
}
return map[string]interface{}{"status": status}, err
}
if r.StatusCode != 200 {
return map[string]interface{}{"status": r.StatusCode}, errors.New("Non 200 status")
if response.StatusCode != 200 {
return map[string]interface{}{"status": response.StatusCode}, errors.New("non 200 status")
}
return map[string]interface{}{"status": 200}, nil
}

type def struct {
Method string `mapstructure:"method"`
Url string `mapstructure:"url"`
Body string `mapstructure:"body"`
Method string `mapstructure:"method"`
Url string `mapstructure:"url"`
Body string `mapstructure:"body"`
Headers map[string]string `mapstructure:"headers"`
}
2 changes: 2 additions & 0 deletions main.go
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/dave/blast/blaster"
"github.com/dave/blast/dummyworker"
"github.com/dave/blast/gcsworker"
"github.com/dave/blast/httpworker"
)

Expand All @@ -24,6 +25,7 @@ func main() {

b.RegisterWorkerType("dummy", dummyworker.New)
b.RegisterWorkerType("http", httpworker.New)
b.RegisterWorkerType("gcs", gcsworker.New)

if err := b.Start(ctx); err != nil {
if DEBUG {
Expand Down

0 comments on commit 30d7925

Please sign in to comment.