-
Notifications
You must be signed in to change notification settings - Fork 3
/
handler.js
217 lines (179 loc) · 5.48 KB
/
handler.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
//
// # RequestHandler
//
//
// ## Error Handling
//
// Since none of the RequestHandler's methods take callback functions, all error handling (like request handling) is
// facilitated by an 'error' event being emitted.
//
var EventEmitter = require('events').EventEmitter
, url = require('url')
, debug = require('debug')('shuttle:RequestHandler')
, mi = require('mi')
, zmqstream = require('zmq-stream')
//
// ## RequestHandler `RequestHandler(options)`
//
// Creates a new RequestHandler with the specified options:
//
// * `linger`: A duration, in ms, that the RequestHandler will wait for outgoing messages to be sent before releasing
// its resources after `close` is called. Outgoing messages take a non-zero time to be completely sent, and can be
// dropped by a subsequent call to `close`. A value of -1 indicates an infinite delay. Defaults to -1.
//
function RequestHandler(obj) {
if (!(this instanceof RequestHandler)) {
return new RequestHandler(obj)
}
EventEmitter.call(this)
obj = obj || {}
this.linger = (typeof obj.linger === 'number') ? obj.linger : -1
this._zrouter = null
}
RequestHandler.createHandler = RequestHandler
//
// ## EventEmitter API (`on`, `once`, `removeAllListeners`, etc.)
//
// RequestHandler inherits from EventEmitter to facilitate local subscriptions to remote events. See the Node.js
// Documentation's [Events API](http://nodejs.org/api/events.html) for more information.
//
// NOTE: Do not call `emit`. While it should work as expected, it can cause leaks in this abstraction.
//
mi.extend(RequestHandler, EventEmitter)
//
// ## listen `listen(options)`
//
// ### Also `listenForRequests`
//
// Synchronously listens for RequestEmitter connections. If **options.url** is provided, that URL will be used.
// Otherwise, **options** will be formatted as a URL as defined by the core `url` module.
//
RequestHandler.prototype.listenForRequests = listen
RequestHandler.prototype.listen = listen
function listen(options) {
var self = this
, opts = options || {}
, iface = options.url
if (!iface) {
iface = url.format(opts)
}
debug('Listening to %s.', iface)
self._initSocket()
self._zrouter.bind(iface)
}
//
// ## connect `connect(options)`
//
// ### Also `connectForRequests`
//
// Synchronously connects to a listening RequestEmitter. If **options.url** is provided, that URL will be used.
// Otherwise, **options** will be formatted as a URL as defined by the core `url` module.
//
RequestHandler.prototype.connectForRequests = connect
RequestHandler.prototype.connect = connect
function connect(options) {
var self = this
, opts = options || {}
, iface = options.url
if (!iface) {
iface = url.format(opts)
}
debug('Connecting to %s.', iface)
self._initSocket()
self._zrouter.connect(iface)
}
//
// ## close `close()`
//
// Synchonously releases the underlying resources, allowing the RequestHandler to be `connect`ed or `listen`ed again
// freely.
//
// Unless the RequestHandler was configured with a `linger` period, all pending outgoing messages will be dropped.
//
RequestHandler.prototype.close = close
function close() {
var self = this
if (!self._zrouter) {
return
}
debug('Closing.')
self._zrouter.close()
self._zrouter = null
}
//
// ## _initSocket `_initSocket()`
//
// Internal use only.
//
// Creates the underlying networking resources.
//
RequestHandler.prototype._initSocket = _initSocket
function _initSocket() {
var self = this
if (self._zrouter) {
return
}
self._zrouter = new zmqstream.Socket({
type: zmqstream.Type.ROUTER
})
self._zrouter.set(zmqstream.Option.LINGER, self.linger)
self._zrouter.on('readable', function () {
self._handle()
})
self._handle()
}
//
// ## _handle `_handle()`
//
// Internal use only.
//
// Polls the network for requests, re-emitting them locally for handling.
//
RequestHandler.prototype._handle = _handle
function _handle() {
var self = this
, messages = null
// 1. If the router socket is currently closed, it cannot be read from. Otherwise, read from it.
if (!self._zrouter) {
return
}
messages = self._zrouter.read(100)
// 1. If there are no messages, we'll come back to this later. For now, just leave.
if (!messages) {
return
}
// 1. For each message, we want to emit a local event to be handled.
messages.forEach(function (envelope) {
var payload
// 1. If we don't have four message frames or the body isn't valid JSON, this didn't come from a
// RequestEmitter. It's safe to ignore.
if (envelope.length !== 4) {
return
}
try {
payload = JSON.parse(envelope.pop().toString('utf8'))
} catch (e) {
return
}
debug('Emitting %s with %s.', payload.name, JSON.stringify(payload.data))
self.emit(payload.name, payload.data, function (err, data) {
// 1. Once we receive a response, write back to the router this response.
var response = JSON.stringify({
err: err,
data: data
})
envelope.push(new Buffer(response))
debug('Handling response to %s with %s.', payload.name, response)
// 1. If `write` returns false, we're out of resources to send more messages, and need to throw an error.
if (!self._zrouter.write(envelope)) {
// TODO: Put some place safe.
self.emit('error', new Error('Too many responses'))
}
})
})
// 1. We may have more messages to receive, so try reading again soon.
process.nextTick(function () {
self._handle()
})
}
module.exports = RequestHandler