forked from st3v/go-plugins
/
http.go
267 lines (233 loc) · 5.95 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// Package http provides a micro to http proxy
package http
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
"net/url"
"path/filepath"
"strings"
"github.com/micro/go-micro"
"github.com/micro/go-micro/errors"
"github.com/micro/go-micro/server"
)
// Router will proxy rpc requests as http POST requests. It is a server.Router
type Router struct {
// Converts RPC Foo.Bar to /foo/bar
Resolver *Resolver
// The http backend to call
Backend string
// first request
first bool
// rpc ep / http ep mapping
eps map[string]string
}
// Resolver resolves rpc to http. It explicity maps Foo.Bar to /foo/bar
type Resolver struct{}
var (
// The default backend
DefaultBackend = "http://localhost:9090"
// The default router
DefaultRouter = &Router{}
)
// Foo.Bar becomes /foo/bar
func (r *Resolver) Resolve(ep string) string {
// replace . with /
ep = strings.Replace(ep, ".", "/", -1)
// lowercase the whole thing
ep = strings.ToLower(ep)
// prefix with "/"
return filepath.Join("/", ep)
}
// set the nil things
func (p *Router) setup() {
if p.Resolver == nil {
p.Resolver = new(Resolver)
}
if p.Backend == "" {
p.Backend = DefaultBackend
}
if p.eps == nil {
p.eps = map[string]string{}
}
}
// Endpoint returns the http endpoint for an rpc endpoint.
// Endpoint("Foo.Bar") returns http://localhost:9090/foo/bar
func (p *Router) Endpoint(rpcEp string) (string, error) {
p.setup()
// get http endpoint
ep, ok := p.eps[rpcEp]
if !ok {
// get default
ep = p.Resolver.Resolve(rpcEp)
}
// already full qualified URL
if strings.HasPrefix(ep, "http://") || strings.HasPrefix(ep, "https://") {
return ep, nil
}
// parse into url
// full path to call
u, err := url.Parse(p.Backend)
if err != nil {
return "", err
}
// set path
u.Path = filepath.Join(u.Path, ep)
// set scheme
if len(u.Scheme) == 0 {
u.Scheme = "http"
}
// set host
if len(u.Host) == 0 {
u.Host = "localhost"
}
// create ep
return u.String(), nil
}
// RegisterEndpoint registers a http endpoint against an RPC endpoint.
// It converts relative paths into backend:endpoint. Anything prefixed
// with http:// or https:// will be left as is.
// RegisterEndpoint("Foo.Bar", "/foo/bar")
// RegisterEndpoint("Greeter.Hello", "/helloworld")
// RegisterEndpoint("Greeter.Hello", "http://localhost:8080/")
func (p *Router) RegisterEndpoint(rpcEp, httpEp string) error {
p.setup()
// create ep
p.eps[rpcEp] = httpEp
return nil
}
// ServeRequest honours the server.Router interface
func (p *Router) ServeRequest(ctx context.Context, req server.Request, rsp server.Response) error {
// rudimentary post based streaming
for {
// get data
body, err := req.Read()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
var rpcEp string
// get rpc endpoint
if p.first {
p.first = false
rpcEp = req.Endpoint()
} else {
hdr := req.Header()
rpcEp = hdr["X-Micro-Endpoint"]
}
// get http endpoint
ep, err := p.Endpoint(rpcEp)
if err != nil {
return errors.NotFound(req.Service(), err.Error())
}
// no stream support currently
// TODO: lookup host
hreq, err := http.NewRequest("POST", ep, bytes.NewReader(body))
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
// get the header
hdr := req.Header()
// set the headers
for k, v := range hdr {
hreq.Header.Set(k, v)
}
// make the call
hrsp, err := http.DefaultClient.Do(hreq)
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
// read body
b, err := ioutil.ReadAll(hrsp.Body)
hrsp.Body.Close()
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
// set response headers
hdr = map[string]string{}
for k, _ := range hrsp.Header {
hdr[k] = hrsp.Header.Get(k)
}
// write the header
rsp.WriteHeader(hdr)
// write the body
err = rsp.Write(b)
if err == io.EOF {
return nil
}
if err != nil {
return errors.InternalServerError(req.Service(), err.Error())
}
}
return nil
}
// NewSingleHostRouter returns a router which sends requests a single http backend
//
// It is used by setting it in a new micro service to act as a proxy for a http backend.
//
// Usage:
//
// Create a new router to the http backend
//
// r := NewSingleHostRouter("http://localhost:10001")
//
// // Add additional routes
// r.RegisterEndpoint("Hello.World", "/helloworld")
//
// // Create your new service
// service := micro.NewService(
// micro.Name("greeter"),
// // Set the router
// http.WithRouter(r),
// )
//
// // Run the service
// service.Run()
func NewSingleHostRouter(url string) *Router {
return &Router{
Resolver: new(Resolver),
Backend: url,
eps: map[string]string{},
}
}
// NewService returns a new http proxy. It acts as a micro service and proxies to a http backend.
// Routes are dynamically set e.g Foo.Bar routes to /foo/bar. The default backend is http://localhost:9090.
// Optionally specify the backend endpoint url or the router. Also choose to register specific endpoints.
//
// Usage:
//
// service := NewService(
// micro.Name("greeter"),
// // Sets the default http endpoint
// http.WithBackend("http://localhost:10001"),
// )
//
// Set fixed backend endpoints
//
// // register an endpoint
// http.RegisterEndpoint("Hello.World", "/helloworld")
//
// service := NewService(
// micro.Name("greeter"),
// // Set the http endpoint
// http.WithBackend("http://localhost:10001"),
// )
func NewService(opts ...micro.Option) micro.Service {
// prepend router to opts
opts = append([]micro.Option{
WithRouter(DefaultRouter),
}, opts...)
// create the new service
return micro.NewService(opts...)
}
// RegisterEndpoint registers a http endpoint against an RPC endpoint
// RegisterEndpoint("Foo.Bar", "/foo/bar")
// RegisterEndpoint("Greeter.Hello", "/helloworld")
// RegisterEndpoint("Greeter.Hello", "http://localhost:8080/")
func RegisterEndpoint(rpcEp string, httpEp string) error {
return DefaultRouter.RegisterEndpoint(rpcEp, httpEp)
}