-
Notifications
You must be signed in to change notification settings - Fork 75
/
apiaxle-proxy.coffee
executable file
·577 lines (440 loc) · 16.8 KB
/
apiaxle-proxy.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
#!/usr/bin/env coffee
# This code is covered by the GPL version 3.
# Copyright 2011-2013 Philip Jackson.
_ = require "lodash"
qs = require "querystring"
urllib = require "url"
async = require "async"
crypto = require "crypto"
http = require "http"
https = require "https"
cluster = require "cluster"
cpus = require("os").cpus()
{ tconst } = require "apiaxle-base"
{ AxleApp } = require "apiaxle-base"
{ ApiUnknown,
KeyError,
ApiDisabled,
KeyDisabled,
EndpointMissingError,
EndpointTimeoutError,
ConnectionError,
DNSError } = require "./lib/error"
{ PathGlobs } = require "./lib/path_globs"
{ ApiaxleQueueProcessor } = require "./apiaxle-proxy-event-subscriber"
class exports.ApiaxleProxy extends AxleApp
@plugins = {}
@ENDPOINT_ERROR_MAP =
ETIMEDOUT: ( ) -> new EndpointTimeoutError "API endpoint timed out."
ENOTFOUND: ( ) -> new EndpointMissingError "API endpoint could not be found."
EADDRINFO: ( ) -> new DNSError "API endpoint could not be resolved."
ECONNREFUSED: ( ) -> new ConnectionError "API endpoint could not be reached."
# we don't use the constructor in scarf because we don't want to use
# express in this instance.
constructor: ( options ) ->
@setOptions options
@pathname_caches = {}
@hostname_caches = {}
@endpoint_caches = {}
# setup the queue processor if we need it
if @options.processQueue
@queue_proc = new ApiaxleQueueProcessor
name: options.name
disableTimings: options.disableTimings
@path_globs = new PathGlobs()
getApiName: ( req, res, next ) =>
{ host } = req.headers
# we have cache hit
if req.api_name = @hostname_caches[host]
return next()
if parts = /^(.+?)\.api\./.exec host
@hostname_caches[host] = req.api_name = parts[1]
return next()
if api = @pathname_caches[req.parsed_url.path]
req.api_name = api
return next()
# we've not got the API via the host
routes = @config.routing?.path_to_api
for path, api of routes
re = new RegExp path
if re.exec req.parsed_url.path
req.api_name = api
@pathname_caches[req.parsed_url.path] = api
return next()
return next new ApiUnknown "No api specified (via subdomain)"
getKeyringNames: ( req, res, next ) ->
req.key.supportedKeyrings ( err, names ) ->
return next err if err
req.keyring_names = names
return next()
getApi: ( req, res, next ) =>
@model( "apifactory" ).find [ req.api_name ], ( err, results ) =>
return next err if err
api = results[req.api_name]
if not api?
# no api found
return next new ApiUnknown "'#{ req.api_name }' is not known to us."
if api.isDisabled()
return next new ApiDisabled "This API has been disabled."
req.api = api
return next()
# create a key (which expires) from the IP address and use
# that. Assumes req.api.data.extractKeyRegex has already been
# checked.
createKeyBasedOnIp: ( req, cb ) ->
# try to get the ip address
# TODO: I think x-forwarded-for can have many, comma seperated
# addresses.
ip = req.headers["x-forwarded-for"] or
req.connection.remoteAddress or
req.socket.remoteAddress or
req.connection.socket.remoteAddress
key_name = "ip-#{ req.api_name }-#{ ip }"
model = @model "keyfactory"
model.find [ key_name ], ( err, results ) =>
return cb err if err
# we've a hit, return the key
return cb null, results[key_name] if results[key_name]
create_link = [
# create the key
( cb ) ->
{ keylessQps, keylessQpd } = req.api.data
model.create key_name, { qps: keylessQps, qpd: keylessQpd }, cb
# now link the key
( cb ) -> req.api.linkKey key_name, cb
]
# return the new key
async.series create_link, ( err, [ new_key ] ) -> cb err, new_key
getKeyName: ( req, res, next ) =>
{ apiaxle_key, api_key, key } = req.parsed_url.query
if req.key_name = ( apiaxle_key or api_key or key )
if typeof req.key_name isnt "string"
return next new Error "Api key is malformed."
return next()
# if the key isn't a query param, check a regexp on the url
if req.key_name = @getRegexKey req.url, req.api.data.extractKeyRegex
return next()
# base the keys on ip addresses
if req.api.data.allowKeylessUse
# notify the stats system that this is a keyless request
req.is_keyless = true
return @createKeyBasedOnIp req, ( err, key ) ->
# setting req.key here means we won't try to fetch it from
# redis again later
req.key = key
req.key_name = key.id
return next()
return next new KeyError "No api_key specified."
getKey: ( req, res, next ) =>
# it's possible we've already got this thanks to the keyless
# stuff
return next() if req.key
@model( "keyfactory" ).find [ req.key_name ], ( err, results ) =>
return next err if err
if not results[req.key_name]
return next new KeyError "'#{ req.key_name }' is not a valid key."
req.key = results[req.key_name]
return next()
validateToken: ( skewCount, providedToken, key_name, sharedSecret, cb ) ->
now = Date.now() / 1000
# add more potential calls.
potentials = [ now ]
potentials.push( now - c, now + c ) for c in [1..skewCount ]
for potential in potentials
date = Math.floor( potential ).toString()
hmac = crypto.createHmac "sha1", sharedSecret
hmac.update date
hmac.update key_name
processed = hmac.digest "hex"
if processed is providedToken
return cb null, processed
# none of the potential matches matched
return cb new KeyError "Invalid signature (got #{providedToken})."
# Attempts to parse regex from url
getRegexKey: ( url, regex ) ->
matches = url.match new RegExp( regex )
if matches and matches.length > 1
return matches[1]
return null
authenticateWithKey: ( req, res, next ) =>
all = []
# outright disabled
if req.key.isDisabled()
return next new KeyDisabled "This API key has been disabled."
# there's a shared secret, do the token thing
if req.key.data.sharedSecret
{ apiaxle_sig, api_sig } = req.parsed_url.query
if not providedToken = ( apiaxle_sig or api_sig )
return next new KeyError "A signature is required for this API."
all.push ( cb ) =>
@validateToken req.api.data.tokenSkewProtectionCount, providedToken, req.key_name, req.key.data.sharedSecret, cb
# check the req.key is for this req.api
all.push ( cb ) ->
req.api.supportsKey req.key.id, ( err, supported ) =>
return cb err if err
# this API doesn't know about the key
if not supported
return cb new KeyError "'#{ req.key.id }' is not a valid key for '#{ req.api.id }'"
return cb()
return async.series all, ( err ) ->
return next err if err
return next()
getHttpProxyOptions: ( req ) ->
key = "#{ req.api.data.protocol }#{ req.api.data.endPoint }"
if not @endpoint_caches[key]
[ host, port ] = req.api.data.endPoint.split ":"
@endpoint_caches[key] =
host: host
@endpoint_caches[key].port = port if port
@endpoint_caches[key].timeout = ( req.api.data.endPointTimeout * 1000 )
options = @endpoint_caches[key]
options.path = @buildPath req
delete req.headers.host
options.headers = req.headers
options.method = req.method
return options
buildPath: ( req ) =>
endpointUrl = ""
# here we support a default path for the request. This makes
# sense with people like the BBC who have many APIs all sitting
# on the one domain.
if ( defaultPath = req.api.data.defaultPath )
endpointUrl += defaultPath
endpointUrl += req.parsed_url.pathname
query = req.parsed_url.query
if not req.api.data.sendThroughApiSig
delete query.apiaxle_sig
delete query.api_sig
if not req.api.data.sendThroughApiKey
delete query.apiaxle_key
delete query.api_key
delete query.key
if not _.isEmpty query
endpointUrl += "?#{ qs.stringify query }"
# here's the actual setting
return endpointUrl
applyLimits: ( req, res, next ) =>
args = [
req.key.id
req.key.data.qps
req.key.data.qpd
]
@model( "apilimits" ).apiHit args..., ( err, [ newQpd, newQps ] ) ->
return next err if err
# let the user know what they have left
res.setHeader "X-ApiaxleProxy-Qps-Left", newQps
res.setHeader "X-ApiaxleProxy-Qpd-Left", newQpd
return next()
close: ( cb ) -> @server.close()
parseUrl: ( req, res, next ) =>
req.parsed_url = urllib.parse req.url, true
next();
setTiming: ( name ) ->
return ( req, res, next ) ->
now = Date.now()
req.timing ||= { first: now }
req.timing[name] = now - req.timing.first
next()
logRequest: ( req, res, next ) =>
@logger.debug "%s %s", req.method, req.url
next()
middleware: ->
return [
@logRequest,
# puts the query params on req
@setTiming( "start-url-parsed" ),
@parseUrl,
@setTiming( "end-url-parsed" ),
# handle getting the API. If the api is invalid an error will be
# thrown.
@setTiming( "start-api-fetched" ),
@getApiName,
@getApi,
@setTiming( "end-api-fetched" ),
# check if CORS are enabled on API and set response headers as needed
@setTiming( "start-applying-cors" ),
@applyCors,
@setTiming( "end-applying-cors" ),
# get the valid key and keyrings. If the key is invalid an error
# will be thrown.
@setTiming( "start-key-fetched" ),
@getKeyName,
@getKey,
@setTiming( "end-key-fetched" ),
@setTiming( "start-key-authenticated" ),
@authenticateWithKey,
@setTiming( "end-key-authenticated" ),
@setTiming( "start-keyrings-fetched" ),
@getKeyringNames,
@setTiming( "end-keyrings-fetched" ),
# make sure the key still has the right to use the api (that
# limits/quotas haven't been met yet)
@setTiming( "start-limits-applied" ),
@applyLimits,
@setTiming( "end-limits-applied" ),
@setTiming( "start-request" )
]
applyCors: ( req, res, next ) =>
# If CORS is not enabled, proceed
if req.api.data.corsEnabled
res.setHeader "Access-Control-Allow-Origin", "*"
res.setHeader "Access-Control-Allow-Credentials", "true"
res.setHeader "Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS, PATCH, HEAD"
res.setHeader "Access-Control-Allow-Headers", "Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token"
res.setHeader "Access-Control-Expose-Headers", "content-type, content-length, X-ApiaxleProxy-Qps-Left, X-ApiaxleProxy-Qpd-Left"
return next()
run: ( cb ) ->
# takes the request, a name and a cb. Used to make a suitable
# callback rkeylacement that can assign to req[thing_name]
assReq = ( req, res, name, cb ) =>
return ( err, result ) =>
return cb @error( err, req, res ) if err
return cb null, ( req[name] = result )
# share the plugins with the queue proc
if @queue_proc?
@queue_proc.plugins = {}
@queue_proc.plugins.models = @plugins.models
@server = http.createServer ( req, res ) =>
# run the middleware, this will populate req.api etc
ittr = ( f, cb ) -> f( req, res, cb )
# run the middleware above
async.eachSeries @middleware(), ittr, ( err ) =>
return @error err, req, res if err
# use the correct module and create the correct agent (http vs
# https)
mod = if req.api.data.protocol is "https" then https else http
req_options = @getHttpProxyOptions req
req_options.agent ||= new mod.Agent
maxSockets: 100
rejectUnauthorized: req.api.data.strictSSL
@logger.debug "Backend: #{ req_options.method } to " +
"'#{ req.api.data.protocol }://" +
"#{ req_options.host}:#{ req_options.port }#{ req_options.path }"
proxyReq = mod.request req_options
# make sure we timeout if asked to
proxyReq.setTimeout ( req.api.data.endPointTimeout * 1000 ), ->
e = new Error "ETIMEDOUT"
e.code = "ETIMEDOUT"
proxyReq.emit "error", e
proxyReq.abort()
ended = no
proxyReq.on "response", ( proxyRes ) =>
proxyRes.on "close", ->
proxyRes.emit "end" if not ended
proxyRes.on "end", =>
ended = yes
@setTiming( "end-request" ) req, res, =>
hit_options =
api_name: req.api_name
key_name: req.key_name
is_keyless: ( not not req.is_keyless )
keyring_names: req.keyring_names
timing: req.timing
parsed_url: req.parsed_url
status_code: proxyRes.statusCode
# we leave the processing to the queue handler. Just
# fire the message and forget.
if not @options.processQueue
return @model( "queue" ).rpush "queue", JSON.stringify( hit_options )
# we need to handle the queue ourselves
@queue_proc.processHit hit_options
# copy the response headers
res.writeHead proxyRes.statusCode, proxyRes.headers
# pipe the actual request
proxyRes.pipe res
proxyReq.on "error", ( err ) =>
@logger.debug "Proxy error: #{ err.message }"
@handleProxyError err, req, res
return req.pipe proxyReq
@server.listen @options.port, @options.host, cb
error: ( err, req, res ) =>
@setTiming( "end-request" )( req, res, -> )
req.error = err
details =
api_name: req.api_name
key_name: req.key_name
is_keyless: ( not not req.is_keyless )
keyring_names: req.keyring_names
timing: req.timing
parsed_url: req.parsed_url
error: req.error
run = []
if not @options.processQueue
run.push ( cb ) =>
@model( "queue" ).rpush "queue", JSON.stringify( details ), cb
else
run.push ( cb ) =>
@queue_proc.processHit details, cb
# now for the actual response
async.series run, ( subErr, results ) =>
spr = exports.ApiaxleProxy.__super__.error
if subErr
return spr.apply this, [ subErr, req, res ]
return spr.apply this, [ err, req, res ]
handleProxyError: ( err, req, res ) =>
# I'm not sure what the right thing to do here is. There could be
# floods of these from dodgy clients. Perhaps a counter in the
# future?
if err.code is "ECONNRESET"
@logger.debug "Received an #{ err.code }."
return
# if we know how to handle an error then we also log it
error = if err_func = @constructor.ENDPOINT_ERROR_MAP[ err.code ]
err_func()
else
# if we're here its a new kind of error, don't want to call
# statsModel.hit without knowing what it is for now
@logger.warn "Error won't be statistically logged: '#{ err.code }, #{ err.message }'"
new Error "Unrecognised error: '#{ err.message }'."
return @error error, req, res
if not module.parent
optimism = require( "optimist" ).options
p:
alias: "port"
default: 4000
describe: "Port to bind the proxy to."
h:
alias: "host"
default: "127.0.0.1"
describe: "Host to bind the proxy to."
f:
alias: "fork-count"
default: cpus.length
describe: "How many internal processes to fork"
q:
alias: "process-queue"
default: false
describe: "If your willing to take the performance penalty, process " +
"each request from this system, rather than using " +
"apiaxle-proxy-event-subscriber."
t:
alias: "disable-timings"
default: false
describe: "Disable timing processing only makes sense with -q."
optimism.boolean "help"
optimism.describe "help", "Show this help screen"
if optimism.argv.help or optimism.argv._.length > 0
optimism.showHelp()
process.exit 0
# taking a port from the commandline makes it much easier to cluster
# the app
{ port, host } = optimism.argv
if cluster.isMaster
# fork for each CPU or the specified amount
cluster.fork() for i in [ 1..optimism.argv["fork-count"] ]
cluster.on "exit", ( worker, code, signal ) ->
console.log( "Worker #{ worker.process.pid } died." )
else
api = new exports.ApiaxleProxy
name: "apiaxle"
port: port
host: host
processQueue: optimism.argv["process-queue"]
disableTimings: optimism.argv["disable-timings"]
all = []
all.push ( cb ) -> api.configure cb
all.push ( cb ) -> api.redisConnect "redisClient", cb
all.push ( cb ) -> api.loadAndInstansiatePlugins cb
all.push ( cb ) -> api.run cb
async.series all, ( err ) ->
throw err if err