-
Notifications
You must be signed in to change notification settings - Fork 0
/
proxy.go
360 lines (313 loc) · 12 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
package main
import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/pnet"
"github.com/multiformats/go-multiaddr"
"github.com/PhysarumSM/common/p2pnode"
"github.com/PhysarumSM/common/p2putil"
"github.com/PhysarumSM/common/util"
"github.com/PhysarumSM/service-registry/registry"
"github.com/PhysarumSM/service-manager/conf"
"github.com/PhysarumSM/service-manager/lca"
"github.com/PhysarumSM/service-manager/pcache"
"github.com/PhysarumSM/service-manager/rcache"
)
const defaultKeyFile = "~/.privKeyProxy"
func init() {
log.SetFlags(log.Ldate | log.Lmicroseconds | log.Lshortfile)
}
// Global LCA Manager instance to handle peer search and allocation
var manager lca.LCAManager
// Global Peer Cache instance to cache connected peers
var peerCache *pcache.PeerCache
// Global Registry Cache instance to cache service registry info
var registryCache *rcache.RegistryCache
func runRequest(servName string, servInfo registry.ServiceInfo, req *http.Request) (*http.Response, error) {
var err error
var id peer.ID
var perf p2putil.PerfInd
serviceHash := servInfo.ContentHash
dockerHash := servInfo.DockerHash
// 2. Search for cached instances, allocate new instance if none found
id, err = peerCache.GetPeer(serviceHash)
if err == nil {
log.Printf("Found cached peer with ID %s for service %s\n", id, servName)
} else {
// Search for an instance in the network, allocating a new one if need be.
// Maximum of 3 allocation attempts.
// TODO: It's totally possible for an allocation attempt to succeed,
// but the service takes a long time to come up, leading to subsequent
// allocation attempts. This is a future problem to solve.
startTime := time.Now()
for attempts := 0; attempts < 3 && id == peer.ID(""); attempts++ {
if attempts > 0 {
log.Printf("Unable to successfully find or allocate, retrying...")
}
// TODO: Always pass perf req into AllocService()
// Need to combine AllocService and AllocBetterService
// Need to obtain perf req from registry-service
log.Println("Finding best existing service instance")
id, perf, err = manager.FindService(serviceHash)
if err != nil {
log.Println("Could not find, creating new service instance")
_, _, err = manager.AllocService(dockerHash)
if err != nil {
log.Println("Service allocation failed\n", err)
continue // Or return error right away?
}
// Re-do FindService() to ensure the new instance is connected
// to the network. Sleep 200ms or so to allow the service to
// come up. If not found, perform exponential backoff and attempt
// to re-find it (max 3 times). If it's still found, there may be
// something wrong with it (or it's taking too long to boot).
wait := 200 * time.Millisecond
time.Sleep(wait)
backoff, err := util.NewExpoBackoffAttempts(wait, time.Second, 5)
if err != nil {
log.Printf("ERROR: Unable to create ExpoBackoffAttempts\n")
}
for backoff.Attempt() {
id, perf, err = manager.FindService(serviceHash)
if err == nil {
break
}
}
} else if servInfo.NetworkSoftReq.LessThan(perf) {
log.Printf("Found service's performance (%s) does not meet requirements (%s)\n", perf, servInfo.NetworkSoftReq)
_, _, err = manager.AllocBetterService(dockerHash, perf)
if err != nil {
log.Println("No services able to be created, using previously found peer")
}
}
if err == nil && id != peer.ID("") {
// Cache peer information
peerCache.AddPeer(p2putil.PeerInfo{
ID: id,
ServName: servName,
ServHash: serviceHash,
})
}
}
elapsedTime := time.Now().Sub(startTime)
log.Println("Find/alloc service took:", elapsedTime)
}
if id == peer.ID("") {
return nil, errors.New("Not found")
}
log.Printf("Running request to peer ID %s\n", id)
resp, err := manager.Request(id, req)
if err != nil {
log.Printf("ERROR: HTTP request over P2P failed\n%v\n", err)
go peerCache.RemovePeer(id)
}
return resp, err
}
// Handles the "proxying" part of proxy
// TODO: Refactor this function
func httpRequestHandler(w http.ResponseWriter, r *http.Request) {
log.Println("Got request:", r.URL.RequestURI()[1:])
// URL.RequestURI() includes path?query (URL.Path only has the path)
tokens := strings.SplitN(r.URL.RequestURI(), "/", 3)
log.Println(tokens)
// tokens[0] should be an empty string from parsing the initial "/"
serviceName := tokens[1]
info, err := registryCache.GetOrRequestService(serviceName)
if err != nil {
http.Error(w, "404 Not Found", http.StatusNotFound)
fmt.Fprintf(w, "%s\n", err)
log.Printf("ERROR: Registry lookup failed\n%s\n", err)
return
}
// Run request
resp, err := runRequest(serviceName, info, r)
if resp != nil {
defer resp.Body.Close()
}
if err != nil {
log.Println("Request to service returned an error:\n", err)
// Propagate error code and message to client
if resp != nil {
http.Error(w, "Service error: " + resp.Status, resp.StatusCode)
} else {
http.Error(w, "Service error", http.StatusBadGateway)
}
return
}
// Return result
// This returns errors as well
// Ideally this would find another instance if there is an error
// but just keep this behaviour for now
log.Println("Sending response back to requester")
// Copy any headers
for k, v := range resp.Header {
for _, s := range v {
w.Header().Add(k, s)
}
}
w.WriteHeader(resp.StatusCode)
// Copy body
io.Copy(w, resp.Body)
return
}
// Custom usage func to support positional arguments (not trivial in golang)
func customUsage() {
fmt.Fprintf(flag.CommandLine.Output(), "Usage of %s:\n", os.Args[0])
fmt.Fprintf(flag.CommandLine.Output(),
"$ %s [OPTIONS ...] PORT [SERVICE] [ADDRESS]\n", os.Args[0])
fmt.Fprintf(flag.CommandLine.Output(), "\nOPTIONS:\n")
flag.PrintDefaults()
fmt.Fprintf(flag.CommandLine.Output(), "\nPOSITIONAL PARAMETERS:\n")
posArgs := map[string]string {
"PORT": "Local port for proxy to listen on",
"SERVICE": "Human-readable name of the service for proxy to represent",
"ADDRESS": "A string in \"IP:SERVICE_PORT\" format of the address for the proxy to advertise",
}
for name, usage := range posArgs {
// Altered from golang's flag.go's PrintDefaults() implementation
s := " " + name
if len(s) <= 4 {
s += "\t"
} else {
s += "\n \t"
}
s += strings.ReplaceAll(usage, "\n", "\n \t")
fmt.Fprint(flag.CommandLine.Output(), s + "\n")
}
s := "NOTE: PORT, SERVICE, and ADDRESS *must* come after any OPTIONS flag arguments."
fmt.Fprint(flag.CommandLine.Output(), "\n" + s + "\n")
s = "If service and address are omitted, proxy launches in anonymous mode.\n" +
"Using \"anonymous mode\" allows a client to access the network without\n" +
"having to register and advertise itself in the network."
fmt.Fprint(flag.CommandLine.Output(), "\n" + s + "\n")
}
func main() {
var err error
// Argument options
var configPath string
flag.StringVar(&configPath, "configfile", "../conf/conf.json", "Path to configuration file to use")
var rcacheTTL int
flag.IntVar(&rcacheTTL, "rcache-ttl", 3600, "Time-to-live in seconds for registry cache entries")
var keyFlags util.KeyFlags
var bootstraps *[]multiaddr.Multiaddr
var psk *pnet.PSK
if keyFlags, err = util.AddKeyFlags(defaultKeyFile); err != nil {
log.Fatalln(err)
}
if bootstraps, err = util.AddBootstrapFlags(); err != nil {
log.Fatalln(err)
}
if psk, err = util.AddPSKFlag(); err != nil {
log.Fatalln(err)
}
flag.Usage = customUsage // Do this only afer adding all flags
flag.Parse() // Parse flag arguments
// Parse positional arguments
var mode string
var port string
var service string
var address string
switch flag.NArg() {
case 1:
mode = "anonymous"
port = flag.Arg(0)
case 3:
mode = "service"
port = flag.Arg(0)
service = flag.Arg(1)
address = flag.Arg(2)
default:
flag.Usage()
os.Exit(1)
}
priv, err := util.CreateOrLoadKey(keyFlags)
if err != nil {
log.Fatalln(err)
}
// Read in config file
config := conf.Config{}
configFile, err := os.Open(configPath)
if err != nil {
log.Fatalf("ERROR: Unable to open configuration file\n%s\n", err)
}
configByte, err := ioutil.ReadAll(configFile)
if err != nil {
configFile.Close()
log.Fatalf("ERROR: Unable to read configuration file\n%s\n", err)
}
err = json.Unmarshal(configByte, &config)
if err != nil {
configFile.Close()
log.Fatalf("ERROR: Unable to parse configuration file\n%s\n", err)
}
configFile.Close()
if len(*bootstraps) == 0 {
if len(config.Bootstraps) == 0 {
envBootstraps, err := util.GetEnvBootstraps()
if err != nil {
log.Fatalln(err)
}
if len(envBootstraps) == 0 {
log.Fatalf("ERROR: Must specify at least one bootstrap node " +
"through a command line flag, the configuration file, or " +
"setting the %s environment variable.", util.ENV_KEY_BOOTSTRAPS)
}
*bootstraps = envBootstraps
} else {
*bootstraps, err = util.StringsToMultiaddrs(config.Bootstraps)
if err != nil {
log.Fatalln(err)
}
}
}
// If CLI didn't specify a PSK, check the environment variables
if *psk == nil {
envPsk, err := util.GetEnvPSK()
if err != nil {
log.Fatalln(err)
}
*psk = envPsk
}
// Set node configuration
nodeConfig := p2pnode.NewConfig()
nodeConfig.PrivKey = priv
nodeConfig.BootstrapPeers = *bootstraps
nodeConfig.PSK = *psk
// Setup LCA Manager
ctx := context.Background()
if mode == "anonymous" {
log.Println("Starting LCA Manager in anonymous mode")
manager, err = lca.NewLCAManager(ctx, nodeConfig, "", "")
} else {
log.Println("Starting LCA Manager in service mode with arguments",
service, address)
manager, err = lca.NewLCAManager(ctx, nodeConfig, service, address)
}
if err != nil {
log.Fatalf("ERROR: Unable to create LCA Manager\n%s", err)
}
// Setup registry cache
registryCache = rcache.NewRegistryCache(manager.Host.Ctx, manager.Host.Host,
manager.Host.RoutingDiscovery, rcacheTTL)
// Create peer cache instance and start cache update loop
log.Println("Launching proxy PeerCache instance")
peerCache = pcache.NewPeerCache(&manager.Host, registryCache)
go peerCache.UpdateCache()
// Setup HTTP proxy service
// This port number must be fixed in order for the proxy to be portable
// Docker must route this port to an available one externally
log.Println("Starting HTTP Proxy on 127.0.0.1:" + port)
http.HandleFunc("/", httpRequestHandler)
log.Fatal(http.ListenAndServe("127.0.0.1:" + port, nil))
}