Skip to content

Commit

Permalink
refactored http package for more reliability
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Nov 17, 2017
1 parent ba5d20e commit 3a35013
Show file tree
Hide file tree
Showing 78 changed files with 22,089 additions and 75 deletions.
14 changes: 14 additions & 0 deletions broker/status.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/**********************************************************************************
* Copyright (c) 2009-2017 Misakai Ltd.
* This program is free software: you can redistribute it and/or modify it under the
* terms of the GNU Affero General Public License as published by the Free Software
* Foundation, either version 3 of the License, or(at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program. If not, see<http://www.gnu.org/licenses/>.
************************************************************************************/

package broker

import (
Expand Down
164 changes: 123 additions & 41 deletions network/http/http.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
/**********************************************************************************
* Copyright (c) 2009-2017 Misakai Ltd.
* This program is free software: you can redistribute it and/or modify it under the
* terms of the GNU Affero General Public License as published by the Free Software
* Foundation, either version 3 of the License, or(at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program. If not, see<http://www.gnu.org/licenses/>.
************************************************************************************/

package http

import (
"bytes"
"encoding/json"
"errors"
"io"
"net/http"
"net"
"net/url"
"strings"
"time"
)

// DefaultClient used for http with a shorter timeout.
var defaultClient = &http.Client{
Timeout: 5 * time.Second,
}
"github.com/emitter-io/emitter/utils"
"github.com/valyala/fasthttp"
)

// HeaderValue represents a header with a value attached.
type HeaderValue struct {
Expand All @@ -25,66 +36,137 @@ func NewHeader(header, value string) HeaderValue {
return HeaderValue{Header: header, Value: value}
}

// Get is a utility function which issues an HTTP Get on a specified URL. The encoding is JSON.
var Get = func(url string, output interface{}, headers ...HeaderValue) error {
req, err := http.NewRequest("GET", url, nil)
// Client represents an HTTP client which can be used for issuing requests concurrently.
type Client interface {
Get(url string, output interface{}, headers ...HeaderValue) error
Post(url string, body []byte, output interface{}, headers ...HeaderValue) error
PostJSON(url string, body interface{}, output interface{}) (err error)
PostBinary(url string, body interface{}, output interface{}) (err error)
}

// Client implementation.
type client struct {
host string // The host name of the client.
http *fasthttp.HostClient // The underlying client.
}

// NewClient creates a new HTTP Client for the provided host. This will use round-robin
// to load-balance the requests to the addresses resolved by the host.
func NewClient(host string, timeout time.Duration) (Client, error) {

u, err := url.Parse(host)
if err != nil {
return err
return nil, err
}

// Get the addresses by performing a DNS lookup, this should not fail
addr, err := net.LookupHost(u.Hostname())
if err != nil {
return nil, err
}

// Add port to each address
for i, a := range addr {
addr[i] = a + ":" + u.Port()
}

// Construct a new client
c := new(client)
c.host = host
c.http = &fasthttp.HostClient{
Addr: strings.Join(addr, ","),
ReadTimeout: timeout,
WriteTimeout: timeout,
}
return c, nil
}

// Get issues an HTTP Get on a specified URL and decodes the payload as JSON.
func (c *client) Get(url string, output interface{}, headers ...HeaderValue) error {

// Prepare the request
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
req.SetRequestURI(url)

// Set the headers provided
// Set the headers
req.Header.Set("Accept", "application/json, application/binary")
for _, h := range headers {
req.Header.Set(h.Header, h.Value)
}

// Acquire a response
res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(res)

// Issue the request
resp, err := defaultClient.Do(req)
err := c.http.Do(req, res)
if err != nil {
return err
}

return UnmarshalJSON(resp.Body, output)
// Get the content type
mime := string(res.Header.ContentType())
switch mime {
case "application/binary":
return utils.Decode(res.Body(), output)
}

// Always default to JSON here
return json.Unmarshal(res.Body(), output)
}

// Post is a utility function which marshals and issues an HTTP post on a specified URL. The
// encoding is JSON.
var Post = func(url string, body interface{}, output interface{}, headers ...HeaderValue) error {
b, err := json.Marshal(body)
if err != nil {
return err
}
// Post is a utility function which marshals and issues an HTTP post on a specified URL.
func (c *client) Post(url string, body []byte, output interface{}, headers ...HeaderValue) error {

// Build a new request
req, err := http.NewRequest("POST", url, bytes.NewReader(b))
if err != nil {
return err
}
// Prepare the request
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
req.SetRequestURI(url)
req.SetBody(body)

// Set the headers provided
// Set the headers
req.Header.SetMethod("POST")
req.Header.Set("Accept", "application/json, application/binary")
for _, h := range headers {
req.Header.Set(h.Header, h.Value)
}

// Acquire a response
res := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(res)

// Issue the request
resp, err := defaultClient.Do(req)
err := c.http.Do(req, res)
if err != nil {
return err
}

return UnmarshalJSON(resp.Body, output)
// Get the content type
mime := string(res.Header.ContentType())
switch mime {
case "application/binary":
return utils.Decode(res.Body(), output)
}

// Always default to JSON here
return json.Unmarshal(res.Body(), output)
}

// UnmarshalJSON unmarshals the given io.Reader pointing to a JSON, into a desired object
var UnmarshalJSON = func(r io.Reader, out interface{}) error {
if r == nil {
return errors.New("'io.Reader' being decoded is nil")
// PostJSON is a helper function which posts a JSON body with an appropriate content type.
func (c *client) PostJSON(url string, body interface{}, output interface{}) (err error) {
var buffer []byte
if buffer, err = json.Marshal(body); err == nil {
err = c.Post(url, buffer, output, NewHeader("Content-Type", "application/json"))
}
return
}

if out == nil {
return errors.New("output parameter 'out' is nil")
// PostBinary is a helper function which posts a binary body with an appropriate content type.
func (c *client) PostBinary(url string, body interface{}, output interface{}) (err error) {
var buffer []byte
if buffer, err = utils.Encode(body); err == nil {
err = c.Post(url, buffer, output, NewHeader("Content-Type", "application/binary"))
}

// Decode the json
dec := json.NewDecoder(r)
return dec.Decode(out)
return
}
55 changes: 40 additions & 15 deletions network/http/http_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package http

import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"

"encoding/json"
"github.com/stretchr/testify/assert"
Expand All @@ -17,26 +16,32 @@ type testObject struct {
Field string `json:"field"`
}

func TestUnmarshalJSON(t *testing.T) {
input := `{"test":"data","validation":"process"}`
expected := map[string]interface{}{
"test": "data",
"validation": "process",
func TestNewClient(t *testing.T) {
tests := []struct {
url string
ok bool
}{
{url: "http://google.com/123", ok: true},
{url: "google.com/123", ok: false},
{url: "235235", ok: false},
}

var actual map[string]interface{}
err := UnmarshalJSON(bytes.NewReader([]byte(input)), &actual)
if err != nil {
fmt.Printf("decoding err: %v\n", err)
for _, tc := range tests {
c, err := NewClient(tc.url, time.Second)
if tc.ok {
assert.NotNil(t, c)
assert.NoError(t, err)
} else {
assert.Nil(t, c)
}
}

assert.EqualValues(t, expected, actual)
}

func (h *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
b, _ := json.Marshal(&testObject{
Field: "response",
})
w.Header().Set("Content-Type", "application/json")
w.Write(b)
}

Expand All @@ -46,16 +51,36 @@ func TestPostGet(t *testing.T) {
body := testObject{Field: "hello"}
expect := &testObject{Field: "response"}

jsonBody, _ := json.Marshal(body)

// Reuse the client
c, err := NewClient(s.URL, time.Second)
assert.NoError(t, err)

{
output := new(testObject)
err := c.Get(s.URL, output)
assert.NoError(t, err)
assert.EqualValues(t, expect, output)
}

{
output := new(testObject)
err := c.Post(s.URL, jsonBody, output)
assert.NoError(t, err)
assert.EqualValues(t, expect, output)
}

{
output := new(testObject)
err := Post(s.URL, body, output)
err := c.PostJSON(s.URL, body, output)
assert.NoError(t, err)
assert.EqualValues(t, expect, output)
}

{
output := new(testObject)
err := Get(s.URL, output)
err := c.PostBinary(s.URL, body, output)
assert.NoError(t, err)
assert.EqualValues(t, expect, output)
}
Expand Down
55 changes: 55 additions & 0 deletions network/http/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**********************************************************************************
* Copyright (c) 2009-2017 Misakai Ltd.
* This program is free software: you can redistribute it and/or modify it under the
* terms of the GNU Affero General Public License as published by the Free Software
* Foundation, either version 3 of the License, or(at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License along
* with this program. If not, see<http://www.gnu.org/licenses/>.
************************************************************************************/

package http

import (
"github.com/stretchr/testify/mock"
)

// MockClient is a mock implementation of Client
type MockClient struct {
mock.Mock
}

// NewMockClient returns a mock implementation of Client
func NewMockClient() *MockClient {
return &MockClient{
Mock: mock.Mock{},
}
}

// Get issues an HTTP Get on a specified URL and decodes the payload as JSON.
func (mock *MockClient) Get(url string, output interface{}, headers ...HeaderValue) error {
mockArgs := mock.Called(url, output, headers)
return mockArgs.Error(0)
}

// Post is a utility function which marshals and issues an HTTP post on a specified URL.
func (mock *MockClient) Post(url string, body []byte, output interface{}, headers ...HeaderValue) error {
mockArgs := mock.Called(url, body, output, headers)
return mockArgs.Error(0)
}

// PostJSON is a helper function which posts a JSON body with an appropriate content type.
func (mock *MockClient) PostJSON(url string, body interface{}, output interface{}) (err error) {
mockArgs := mock.Called(url, body, output)
return mockArgs.Error(0)
}

// PostBinary is a helper function which posts a binary body with an appropriate content type.
func (mock *MockClient) PostBinary(url string, body interface{}, output interface{}) (err error) {
mockArgs := mock.Called(url, body, output)
return mockArgs.Error(0)
}

0 comments on commit 3a35013

Please sign in to comment.