-
Notifications
You must be signed in to change notification settings - Fork 5
/
index.js
357 lines (292 loc) · 10.5 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
'use strict';
// core modules
const events = require('events');
const util = require('util');
// external modules
const assert = require('assert-plus');
// internal files
const bind = require('./bind');
//------------------------------------------------------------------------------
// class constructor
//------------------------------------------------------------------------------
/**
* Reissue object. Wraps context for setTimeout and behavior
* @class
* @constructor
* @param {Object} opts an options object
* @param {Object} opts.func an the function to execute
* @param {Object | Function} opts.interval the interval to execute at, or a
* function that returns an interval. function allows for dynamic intervals.
* @param {Number} [opts.timeout] an optional timeout value that causes the
* `timeout` event to be fired when a given invocation exceeds this value.
* function that returns an interval. function allows for dynamic intervals.
* @param {Object} [opts.context] the context to bind the function to
* @param {Object} [opts.args] any arguments to pass to the function
*/
function Reissue(opts) {
// assert options
assert.object(opts, 'opts');
assert.func(opts.func, 'func');
assert.optionalObject(opts.context, 'context');
assert.optionalArray(opts.args, 'args');
assert.optionalNumber(opts.timeout, 'timeout');
// assert options of different types
const typeofInterval = typeof opts.interval;
assert.equal(typeofInterval === 'function' || typeofInterval === 'number',
true);
const self = this;
//--------------------------------------------------------------------------
// user supplied properties
//--------------------------------------------------------------------------
/**
* the function to execute
* @type {Function}
*/
self._func = opts.func;
/**
* the interval to execute at. if user passed in static value, wrap it in
* a function to normalize the dynamic interval scenario.
* @type {Number | Function}
* @return {Number}
*/
self._interval = (typeofInterval === 'number') ?
function _returnInterval() {
return opts.interval;
} : opts.interval;
/**
* `this` context for the function
* @type {Object}
*/
self._funcContext = opts.context || null;
/**
* arguments to pass to the function. append the internal done callback.
* @type {Array}
*/
const boundDone = bind(self._done, self);
self._funcArgs = (opts.args) ?
opts.args.concat(boundDone) : [boundDone];
/**
* schedule an optional timeout which will trigger timeout event if a
* given invocation exceeds this number.
* @type {Number}
*/
self._timeoutMs = opts.timeout || null;
//--------------------------------------------------------------------------
// internal properties
//--------------------------------------------------------------------------
/**
* internal flag used to determine if the process is active.
* @type {Boolean}
*/
self._active = false;
/**
* keeps track of time elapsed since last invocation.
* @type {Number}
*/
self._startTime = 0;
/**
* boolean flag set when we are waiting for user supplied function to
* complete. technically we should know this if self._nextHandlerId had
* a flag to show whether or not it had already been executed, but this is
* a safer way to do it.
* @type {Boolean}
*/
self._inUserFunc = false;
/**
* setTimeout handler of next invocation
* @type {Function}
*/
self._nextHandlerId = null;
/*
* setTimeout handler of an internal timeout implementation. used to fire
* the timeout event if a user supplied function takes too long.
* @type {Function}
*/
self._timeoutHandlerId = null;
}
util.inherits(Reissue, events.EventEmitter);
//------------------------------------------------------------------------------
// private methods
//------------------------------------------------------------------------------
/**
* run the function.
* @private
* @method _execute
* @return {undefined}
*/
Reissue.prototype._execute = function _execute() {
const self = this;
// start invocation timer
self._startTime = Date.now();
// set flag so we know we're currently in user supplied func
self._inUserFunc = true;
// execute their func on a setImmediate, such that we can schedule the
// timeout first. to be clear though, user func could be sync and our
// timeout may never fire.
setImmediate(function _executeImmediately() {
self._func.apply(self._funcContext, self._funcArgs);
});
// if timeout option is specified, schedule one here. basically, we
// execute the next invocation immediately above, then schedule a
// timeout handler, so we basically have two set timeout functions
// being scheduled.
if (self._timeoutMs !== null) {
// assign timeout to self so that we can cancel it if we complete
// on time.
self._timeoutHandlerId = setTimeout(
bind(self._onTimeout, self),
self._timeoutMs
);
}
};
/**
* callback on completion of user supplied function. this is where we determine
* the timeout of the next invocation based on how long it took.
* @private
* @method _done
* @param {Object} err an error returned by user function
* @return {undefined}
*/
Reissue.prototype._done = function _done(err) {
// calculate delta interval
const self = this;
const interval = self._interval();
const elapsedTime = Date.now() - self._startTime;
// we're out of user supplied func now
self._inUserFunc = false;
// clear out the handler id
self._nextHandlerId = null;
// re-emit error
if (err) {
self.emit('error', err);
}
// in every other case, we're fine, since we've finished before the
// timeout event has occurred. call _internalDone where we will clear
// the timeout event.
return _internalDone();
// common completion function called by forked code above
function _internalDone() {
// clear any timeout handlers
if (self._timeoutHandlerId) {
clearTimeout(self._timeoutHandlerId);
self._timeoutHandlerId = null;
}
// if user called stop() sometime during last invocation, we're done!
// don't queue up another invocation.
if (self._active === false) {
self._stop();
} else {
// start invocation immediately if: the elapsedTime is greater than
// the interval, which means the last execution took longer than
// the interval itself. otherwise, subtract the time the previous
// invocation took.
const timeToInvocation = (elapsedTime >= interval) ?
0 : (interval - elapsedTime);
self._nextHandlerId = setTimeout(function _nextInvocation() {
self._execute();
}, timeToInvocation);
}
}
};
/**
* internal implementation of stop. clears all timeout handlers and emits the
* stop event.
* @private
* @method _stop
* @returns {undefined}
*/
Reissue.prototype._stop = function _stop() {
const self = this;
// clear the next invocation if one exists
if (self._nextHandlerId) {
clearTimeout(self._nextHandlerId);
self._nextHandlerId = null;
}
// no need to clear timeout handlers, as they're already cleared
// in _done before we get here.
// emit stop, and we're done!
self.emit('stop');
};
/**
* called when the interval function "times out", or in other words takes
* longer than then specified timeout interval. this blocks the next invocation
* of the interval function until user calls the callback on the timeout
* event.
* @private
* @method _onTimeout
* @return {undefined}
*/
Reissue.prototype._onTimeout = function _onTimeout() {
const self = this;
// we might have called stop during current invocation. emit timeout event
// only if we're still active.
if (self._active === true) {
self.emit('timeout');
}
};
//------------------------------------------------------------------------------
// public methods
//------------------------------------------------------------------------------
/**
* start the interval execution.
* @public
* @method start
* @param {Number} [delay] optional delay in ms before starting interval
* @return {undefined}
*/
Reissue.prototype.start = function start(delay) {
assert.optionalNumber(delay);
const self = this;
const realDelay = (typeof delay === 'number' && delay >= 0) ? delay : 0;
// before starting, see if reissue is already active. if so, throw an
// error.
if (self._active === true) {
throw new Error('cannot reissue, function already active!');
}
// set the flag and off we go!
self._active = true;
// can't to truthy check since 0 is falsy. if a delay is passed in, then
// schedule it. otherwise, it's synchronous and you can't stop it.
if (typeof delay === 'number') {
self._nextHandlerId = setTimeout(function _nextInvocation() {
self._execute();
}, realDelay);
} else {
self._execute();
}
};
/**
* manually stop the exeuction queue.
* @public
* @method stop
* @return {undefined}
*/
Reissue.prototype.stop = function stop() {
const self = this;
// NOTE: while the below if statements could be collapsed to be more more
// terse, this logic is easier to read in terms of maintainability.
// check if we are currently active. if not, we can stop now.
if (self._active === false) {
self._stop();
} else {
// in the else case, we are still active. there are two possibilities
// here, we are either:
// 1) queued up waiting for the next invocation
// 2) waiting for user supplied function to complete
if (self._inUserFunc === false) {
// case #1
// if we're just waiting for the next invocation, call stop now
// which will clear out the next invocation.
self._stop();
} else {
// case #2
// set active flag to false, when we come back from user function
// we will check this flag and call internal _stop()
self._active = false;
}
}
};
// export a factory function
module.exports.create = function create(opts) {
return new Reissue(opts);
};