Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1. Change response processing to copy out of a channel to array #30

Merged
merged 7 commits into from
Sep 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The [Service Registry] is a utility that will listen on the network for services
## RackHD Proxy
The [RackHD Proxy] is a utility that acts as a proxy to the RackHD API.



[Service Registry]: https://github.com/RackHD/neighborhood-manager/tree/master/registry
[RackHD Proxy]: https://github.com/RackHD/neighborhood-manager/tree/master/rackhd

Expand Down
46 changes: 38 additions & 8 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package: github.com/RackHD/neighborhood-manager
import:
- package: github.com/hashicorp/consul
version: ~0.7.0
subpackages:
- api
- package: github.com/hashicorp/go-cleanhttp
- package: github.com/king-jam/gossdp
- package: github.com/spf13/viper
testImport:
- package: github.com/onsi/ginkgo
version: ~1.2.0
- package: github.com/onsi/gomega
version: ~1.0.0
138 changes: 105 additions & 33 deletions rackhd/proxy/controller.go
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
package proxy

import (
"encoding/json"
"fmt"
"io"
"log"
"net"
"net/http"
"sync"
"time"

regStore "github.com/RackHD/neighborhood-manager/libreg/registry"
"github.com/RackHD/neighborhood-manager/libreg/registry/consul"
"github.com/RackHD/neighborhood-manager/rackhd/watcher"
"github.com/hashicorp/go-cleanhttp"
)

// Responses is an array of Response structs
type Responses []Response

// Err creates an error message to print
type Err struct {
Msg string `json:"msg"`
}

// Server is the proxy server struct
type Server struct {
Address string
Expand Down Expand Up @@ -52,65 +63,97 @@ func (p *Server) HandleTest(w http.ResponseWriter, r *http.Request) {

// HandleNodes sends, recieves, and processes all the data
func (p *Server) HandleNodes(w http.ResponseWriter, r *http.Request) {
start := time.Now()
w.Header().Set("Content-Type", "application/json; charset=utf-8")
addrMap := p.GetAddresses(w, r)
cr, _ := p.GetResp(r, addrMap)
p.RespCheck(w, cr)
addrMap, err := p.GetAddresses(w, r)
if len(addrMap) == 0 {
w.WriteHeader(200)
w.Write([]byte("[]"))
return
}
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("%s", err)))
return
}
if (r.Method != "GET") && (len(addrMap) > 1) {
w.WriteHeader(400)
msg := Err{Msg: "Unsupported api call to multiple hosts. Use query string method."}
json.NewEncoder(w).Encode(msg)
return
}
ar := p.GetResp(r, addrMap)
p.RespCheck(r, w, ar)
elapsed := time.Since(start)
fmt.Printf("Total Request Time => %v\n", elapsed)
}

// GetResp makes channels for the response and errors from http.Get.
// A go func is spun up for each http.Get and the responses are fed
// into their respective channels.
func (p *Server) GetResp(r *http.Request, addrs map[string]struct{}) (chan *Response, chan error) {
func (p *Server) GetResp(r *http.Request, addrs map[string]struct{}) Responses {
cr := make(chan *Response, len(addrs))
errs := make(chan error, len(addrs))
defer close(cr)
defer close(errs)
for entry := range addrs {
p.wg.Add(1)
go func(entry string) {
go func(entry string, r *http.Request) {
defer p.wg.Done()
r.URL.Host = entry
r.URL.Scheme = "http"
fmt.Printf("url string %s\n", r.URL.String())
respGet, err := http.Get(r.URL.String())
req, err := NewRequest(r, entry)
if err != nil {
cr <- NewResponseFromError(err)
return
}
client := cleanhttp.DefaultClient()
start := time.Now()
respGet, err := client.Do(req)
elapsed := time.Since(start)
fmt.Printf("Response time: %v => %s\n", elapsed, entry)
if err != nil {
errs <- fmt.Errorf("Could not send any HTTP Get requests: %s\n", err)
cr <- NewResponseFromError(err)
return
}
defer respGet.Body.Close()
responseCopy, err := NewResponse(respGet)
if err != nil {
log.Printf("Error copying response => %s\n", err)
cr <- NewResponseFromError(err)
return
}
cr <- responseCopy
respGet.Body.Close()
}(entry)
}(entry, r)
}
p.wg.Wait()
return cr, errs
close(cr)
var ar Responses
for entry := range cr {
ar = append(ar, *entry)
}
return ar
}

// GetAddresses decides from where to retrieve the addresses
func (p *Server) GetAddresses(w http.ResponseWriter, r *http.Request) map[string]struct{} {
func (p *Server) GetAddresses(w http.ResponseWriter, r *http.Request) (map[string]struct{}, error) {
if err := r.ParseForm(); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return nil, err
}
querySlice := r.URL.Query()
if len(querySlice["ip"]) > 0 {
addrMap := p.GetQueryAddresses(querySlice["ip"])
return addrMap
return addrMap, nil
}
addrMap := p.GetStoredAddresses()
return addrMap
addrMap, err := p.GetStoredAddresses()
if err != nil {
return nil, err
}
return addrMap, nil
}

// GetStoredAddresses calls GetAddresses and returns a map of addresses
func (p *Server) GetStoredAddresses() map[string]struct{} {
func (p *Server) GetStoredAddresses() (map[string]struct{}, error) {
addresses, err := p.Store.GetAddresses()
if err != nil {
log.Printf("Did not get IP List ==> %s\n", err)
}
return addresses
return addresses, err
}

// GetQueryAddresses retrives a url flag and returns a map of address(es)
Expand All @@ -130,18 +173,47 @@ func (p *Server) GetQueryAddresses(querySlice []string) map[string]struct{} {
return queryMap
}

// RespHeaderWriter writes the StatusCode and Headers
func (p *Server) RespHeaderWriter(r *http.Request, w http.ResponseWriter, ar Responses) {
var status int
status = 500
if len(ar) <= 1 {
for k, v := range ar[0].Header {
for _, value := range v {
w.Header().Set(k, value)
}
}
}
for _, r := range ar {
if r.StatusCode < status {
status = r.StatusCode
}
}
w.WriteHeader(status)
}

// RespCheck identifies the type of initialResp.Body and calls the appropriate
// helper function to write to the ResponseWriter.
func (p *Server) RespCheck(w http.ResponseWriter, cr chan *Response) {
initialResp := <-cr
if initialResp.Body[0] != '[' {
w.Write(initialResp.Body)
return
}
w.Write(initialResp.Body[0 : len(initialResp.Body)-2])
for r := range cr {
w.Write([]byte(","))
w.Write(r.Body[1 : len(r.Body)-2])
func (p *Server) RespCheck(r *http.Request, w http.ResponseWriter, ar Responses) {
var cutSize int

p.RespHeaderWriter(r, w, ar)
w.Write([]byte("["))
for i, r := range ar {
if r.Body == nil {
continue
}
if r.Body[0] == '[' {
cutSize = 1
} else if r.Body[0] == '{' {
cutSize = 0
} else {
continue
}
w.Write(r.Body[cutSize : len(r.Body)-cutSize])
if i != len(ar)-1 {
w.Write([]byte(","))
}
}
w.Write([]byte("]"))
}
56 changes: 56 additions & 0 deletions rackhd/proxy/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package proxy

import (
"io/ioutil"
"log"
"net/http"
)

// Response is the internal proxy response object
type Response struct {
Header http.Header
StatusCode int
Body []byte
RequestURL string
Error error
}

// NewResponse copies a http.Response into a proxy Response
func NewResponse(resp *http.Response) (*Response, error) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Printf("Error reading Response.Body %s\n", err)
return nil, err
}
proxyResponse := &Response{
Header: resp.Header,
StatusCode: resp.StatusCode,
Body: body,
RequestURL: resp.Request.URL.String(),
Error: nil,
}
return proxyResponse, err
}

// NewResponseFromError sets errors
func NewResponseFromError(err error) *Response {
proxyRespnse := &Response{
StatusCode: 500,
Error: err,
}
return proxyRespnse
}

// NewRequest copies a http.Request & Header and sets the new host
func NewRequest(r *http.Request, host string) (*http.Request, error) {
req, err := http.NewRequest(r.Method, "http://"+host+r.URL.Path, r.Body)
if err != nil {
return nil, err
}
for k, v := range r.Header {
for _, value := range v {
req.Header.Set(k, value)
}
}
return req, nil
}
Loading