-
-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
http.go
244 lines (218 loc) · 8.75 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package addtransport
import (
"bytes"
"context"
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/url"
"strings"
"time"
"golang.org/x/time/rate"
stdopentracing "github.com/opentracing/opentracing-go"
stdzipkin "github.com/openzipkin/zipkin-go"
"github.com/sony/gobreaker"
"github.com/go-kit/kit/circuitbreaker"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/tracing/zipkin"
httptransport "github.com/go-kit/kit/transport/http"
"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
)
// NewHTTPHandler returns an HTTP handler that makes a set of endpoints
// available on predefined paths.
func NewHTTPHandler(endpoints addendpoint.Set, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) http.Handler {
// Zipkin HTTP Server Trace can either be instantiated per endpoint with a
// provided operation name or a global tracing service can be instantiated
// without an operation name and fed to each Go kit endpoint as ServerOption.
// In the latter case, the operation name will be the endpoint's http method.
// We demonstrate a global tracing service here.
zipkinServer := zipkin.HTTPServerTrace(zipkinTracer)
options := []httptransport.ServerOption{
httptransport.ServerErrorEncoder(errorEncoder),
httptransport.ServerErrorLogger(logger),
zipkinServer,
}
m := http.NewServeMux()
m.Handle("/sum", httptransport.NewServer(
endpoints.SumEndpoint,
decodeHTTPSumRequest,
encodeHTTPGenericResponse,
append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Sum", logger)))...,
))
m.Handle("/concat", httptransport.NewServer(
endpoints.ConcatEndpoint,
decodeHTTPConcatRequest,
encodeHTTPGenericResponse,
append(options, httptransport.ServerBefore(opentracing.HTTPToContext(otTracer, "Concat", logger)))...,
))
return m
}
// NewHTTPClient returns an AddService backed by an HTTP server living at the
// remote instance. We expect instance to come from a service discovery system,
// so likely of the form "host:port". We bake-in certain middlewares,
// implementing the client library pattern.
func NewHTTPClient(instance string, otTracer stdopentracing.Tracer, zipkinTracer *stdzipkin.Tracer, logger log.Logger) (addservice.Service, error) {
// Quickly sanitize the instance string.
if !strings.HasPrefix(instance, "http") {
instance = "http://" + instance
}
u, err := url.Parse(instance)
if err != nil {
return nil, err
}
// We construct a single ratelimiter middleware, to limit the total outgoing
// QPS from this client to all methods on the remote instance. We also
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))
// Zipkin HTTP Client Trace can either be instantiated per endpoint with a
// provided operation name or a global tracing client can be instantiated
// without an operation name and fed to each Go kit endpoint as ClientOption.
// In the latter case, the operation name will be the endpoint's http method.
zipkinClient := zipkin.HTTPClientTrace(zipkinTracer)
// global client middlewares
options := []httptransport.ClientOption{
zipkinClient,
}
// Each individual endpoint is an http/transport.Client (which implements
// endpoint.Endpoint) that gets wrapped with various middlewares. If you
// made your own client library, you'd do this work there, so your server
// could rely on a consistent set of client behavior.
var sumEndpoint endpoint.Endpoint
{
sumEndpoint = httptransport.NewClient(
"POST",
copyURL(u, "/sum"),
encodeHTTPGenericRequest,
decodeHTTPSumResponse,
append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))...,
).Endpoint()
sumEndpoint = opentracing.TraceClient(otTracer, "Sum")(sumEndpoint)
sumEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Sum")(sumEndpoint)
sumEndpoint = limiter(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Sum",
Timeout: 30 * time.Second,
}))(sumEndpoint)
}
// The Concat endpoint is the same thing, with slightly different
// middlewares to demonstrate how to specialize per-endpoint.
var concatEndpoint endpoint.Endpoint
{
concatEndpoint = httptransport.NewClient(
"POST",
copyURL(u, "/concat"),
encodeHTTPGenericRequest,
decodeHTTPConcatResponse,
append(options, httptransport.ClientBefore(opentracing.ContextToHTTP(otTracer, logger)))...,
).Endpoint()
concatEndpoint = opentracing.TraceClient(otTracer, "Concat")(concatEndpoint)
concatEndpoint = zipkin.TraceEndpoint(zipkinTracer, "Concat")(concatEndpoint)
concatEndpoint = limiter(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Concat",
Timeout: 10 * time.Second,
}))(concatEndpoint)
}
// Returning the endpoint.Set as a service.Service relies on the
// endpoint.Set implementing the Service methods. That's just a simple bit
// of glue code.
return addendpoint.Set{
SumEndpoint: sumEndpoint,
ConcatEndpoint: concatEndpoint,
}, nil
}
func copyURL(base *url.URL, path string) *url.URL {
next := *base
next.Path = path
return &next
}
func errorEncoder(_ context.Context, err error, w http.ResponseWriter) {
w.WriteHeader(err2code(err))
json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()})
}
func err2code(err error) int {
switch err {
case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow:
return http.StatusBadRequest
}
return http.StatusInternalServerError
}
func errorDecoder(r *http.Response) error {
var w errorWrapper
if err := json.NewDecoder(r.Body).Decode(&w); err != nil {
return err
}
return errors.New(w.Error)
}
type errorWrapper struct {
Error string `json:"error"`
}
// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a
// JSON-encoded sum request from the HTTP request body. Primarily useful in a
// server.
func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) {
var req addendpoint.SumRequest
err := json.NewDecoder(r.Body).Decode(&req)
return req, err
}
// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a
// JSON-encoded concat request from the HTTP request body. Primarily useful in a
// server.
func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) {
var req addendpoint.ConcatRequest
err := json.NewDecoder(r.Body).Decode(&req)
return req, err
}
// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a
// JSON-encoded sum response from the HTTP response body. If the response has a
// non-200 status code, we will interpret that as an error and attempt to decode
// the specific error message from the response body. Primarily useful in a
// client.
func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) {
if r.StatusCode != http.StatusOK {
return nil, errors.New(r.Status)
}
var resp addendpoint.SumResponse
err := json.NewDecoder(r.Body).Decode(&resp)
return resp, err
}
// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes
// a JSON-encoded concat response from the HTTP response body. If the response
// has a non-200 status code, we will interpret that as an error and attempt to
// decode the specific error message from the response body. Primarily useful in
// a client.
func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) {
if r.StatusCode != http.StatusOK {
return nil, errors.New(r.Status)
}
var resp addendpoint.ConcatResponse
err := json.NewDecoder(r.Body).Decode(&resp)
return resp, err
}
// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that
// JSON-encodes any request to the request body. Primarily useful in a client.
func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(request); err != nil {
return err
}
r.Body = ioutil.NopCloser(&buf)
return nil
}
// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes
// the response as JSON to the response writer. Primarily useful in a server.
func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil {
errorEncoder(ctx, f.Failed(), w)
return nil
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
return json.NewEncoder(w).Encode(response)
}