Skip to content

Commit

Permalink
Merge pull request #41 from googlemaps/timeouts
Browse files Browse the repository at this point in the history
Properly cancel timers
  • Loading branch information
stephenmcd committed Sep 22, 2016
2 parents ef6dcc7 + 5224e78 commit 59e1fca
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 133 deletions.
15 changes: 2 additions & 13 deletions lib/internal/attempt.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,8 @@
* limitations under the License.
*/

var Task = require('./task');

exports.inject = function(setTimeout) {
/**
* Returns a task that waits for the given delay.
* @param {number} delayMs
* @return {Task<undefined>}
*/
function wait(delayMs) {
return Task.start(function(resolve) {
setTimeout(resolve, delayMs);
});
}
exports.inject = function(wait) {
var Task = require('./task');

return {
/**
Expand Down
53 changes: 26 additions & 27 deletions lib/internal/make-api-call.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ exports.inject = function(options) {

var makeUrlRequest = options.makeUrlRequest || require('./make-url-request');
var mySetTimeout = options.setTimeout || setTimeout;
var myClearTimeout = options.clearTimeout || clearTimeout;
var getTime = options.getTime || function() {return new Date().getTime();};
var attempt = require('./attempt').inject(mySetTimeout).attempt;
var ThrottledQueue = require('./throttled-queue').inject(mySetTimeout, getTime);
var wait = require('./wait').inject(mySetTimeout, myClearTimeout);
var attempt = require('./attempt').inject(wait).attempt;
var ThrottledQueue = require('./throttled-queue').inject(wait, getTime);
var requestQueue = ThrottledQueue.create(rateLimit, ratePeriod);

/**
Expand Down Expand Up @@ -84,32 +86,29 @@ exports.inject = function(options) {
});
}

var task = Task.start(function(resolve, reject) {
var requestTask = attempt({
'do': rateLimitedGet,
until: function(response) {
return !(
response == null
|| response.status === 500
|| response.status === 503
|| response.status === 504
|| response.json && response.json.status === 'OVER_QUERY_LIMIT');
},
interval: retryOptions.interval,
increment: retryOptions.increment,
jitter: retryOptions.jitter
});
var timeoutTask = wait(timeout).thenDo(function() {
throw 'timeout';
});
var requestTask = attempt({
'do': rateLimitedGet,
until: function(response) {
return !(
response == null
|| response.status === 500
|| response.status === 503
|| response.status === 504
|| response.json && response.json.status === 'OVER_QUERY_LIMIT');
},
interval: retryOptions.interval,
increment: retryOptions.increment,
jitter: retryOptions.jitter
});

// Race the request and the timeout.
requestTask.thenDo(resolve, reject);
mySetTimeout(function() {
reject('timeout');
requestTask.cancel();
}, timeout);
})
.thenDo(
function(response) { callback(null, response); },
function(err) { callback(err); });
var task =
Task.race([timeoutTask, requestTask])
.thenDo(
function(response) { callback(null, response); },
function(err) { callback(err); });

if (options.Promise) {
var originalCallback = callback;
Expand Down
48 changes: 39 additions & 9 deletions lib/internal/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
// (b) Promises aren't cancellable (yet?), and I want cancellability.
//
// This is very stripped down, compared to Promises.
// (a) You can only call .thenDo() once.
// (b) Tasks always complete with a pair (err, result).
// (c) Regardless of errors or cancellation, the argument of .thenDo() is
// *always* executed, and asynchronously.
// (d) The argument to .thenDo() must return either undefined or a Task. I don't
// (a) You can only call .thenDo() once. Because there's only one party waiting
// on the result of a task, cancelling always propagates backwards.
// (b) The argument to .thenDo() must return either undefined or a Task. I don't
// promote values to Tasks, like what happens with Promises.

var Task = exports;
Expand Down Expand Up @@ -55,11 +53,16 @@ Task.start = function(doSomething) {
// finished or onFinish.
var finished;
var onFinish;
var cleaners = [];

function finish(err, result) {
if (!finished) {
finished = {err: err, result: result};
if (onFinish) onFinish();
var cleanup;
while (cleanup = cleaners.pop()) {
cleanup();
}
}
}

Expand All @@ -76,8 +79,8 @@ Task.start = function(doSomething) {
* Cancels the task (unless the task has already finished, in which case
* this call is ignored).
*
* If there is a subsequent task scheduled (using #thenDo) it will be called
* with the pair ('cancelled', null).
* Subsequent tasks created with #thenDo will not be started. However, clean-
* up code added with #finished will run.
*/
me.cancel = function() {
if (!finished) {
Expand Down Expand Up @@ -131,9 +134,13 @@ Task.start = function(doSomething) {
* @return {THIS}
*/
me.finally = function(cleanup) {
setListener(function() {
if (!finished) {
cleaners.push(function() {
process.nextTick(cleanup);
});
} else {
process.nextTick(cleanup);
});
}
return me;
};

Expand All @@ -149,6 +156,29 @@ Task.withValue = function(result) {
});
};

/**
* Returns a new task that races the given tasks. Eventually finishes with the
* result or error of whichever task finishes first. If any task is cancelled,
* all of the tasks are cancelled.
*
* @param {Array<Task<T>>} tasks
* @return {Task<T>}
* @template T
*/
Task.race = function(tasks) {
return Task.start(function(resolve, reject) {
function cancelAll() {
tasks.forEach(function(task) {
task.cancel();
});
}
tasks.forEach(function(task) {
task.finally(cancelAll).thenDo(resolve, reject);
});
return cancelAll;
});
};

/**
* Creates a composite task, which uses the output of the first task to create
* a subsequent task, and represents the two tasks together.
Expand Down
61 changes: 22 additions & 39 deletions lib/internal/throttled-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
var CircularBuffer = require('./circular-buffer');
var Task = require('./task');

exports.inject = function(setTimeout, getTime) {
exports.inject = function(wait, getTime) {
return {
/**
* Creates a ThrottledQueue. The queue stores tasks, which will be executed
Expand All @@ -32,29 +32,8 @@ exports.inject = function(setTimeout, getTime) {
*/
create: function(limit, period) {
var me = {};
var queue = [];
var queue = Task.withValue();
var recentTimes = CircularBuffer.create(limit);
var scheduled = false;

function schedule() {
if (scheduled) return;

var lastTime = recentTimes.item(limit - 1);
var delay = lastTime + period - getTime();
delay = (lastTime != undefined && delay > 0) ? delay : 0;

scheduled = true;
setTimeout(function() {
scheduled = false;

var action = queue.shift();
if (action) action();

if (queue.length) {
schedule();
}
}, delay);
}

/**
* Adds a task to the work queue.
Expand All @@ -65,24 +44,28 @@ exports.inject = function(setTimeout, getTime) {
* @template T
*/
me.add = function(doSomething) {
schedule();

return Task.start(function(resolve) {
var cancelled;

// Call the callback when the action is popped off the queue
// (unless we were cancelled in the meantime).
queue.push(function() {
if (!cancelled) {
// Return a separate task from the queue, so that cancelling a task
// doesn't propagate back and cancel the whole queue.
var waitForMyTurn = Task
.start(function(resolve) {
queue.finally(resolve);
})
.thenDo(function() {
var lastTime = recentTimes.item(limit - 1);
if (lastTime == undefined) return;
return wait(Math.max(lastTime + period - getTime(), 0));
})
.thenDo(function() {
recentTimes.insert(getTime());
resolve();
}
});

queue = queue.thenDo(function() {
return Task.start(function(resolve) {
waitForMyTurn.finally(resolve);
});
return function onCancel() {
cancelled = true;
};
})
.thenDo(doSomething);
});

return waitForMyTurn.thenDo(doSomething);
};

return me;
Expand Down
34 changes: 34 additions & 0 deletions lib/internal/wait.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* @license
* Copyright 2016 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

var Task = require('./task');

exports.inject = function(setTimeout, clearTimeout) {
/**
* Returns a task that waits for the given delay.
* @param {number} delayMs
* @return {Task<undefined>}
*/
return function wait(delayMs) {
return Task.start(function(resolve) {
var id = setTimeout(resolve, delayMs);
return function cancel() {
clearTimeout(id);
};
});
}
};
9 changes: 8 additions & 1 deletion spec/mock-clock.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ MockClock.create = function(opt_startTime) {
return b.time - a.time;
});
var nextId = 100;
var cancelledCallbacks = {};

var me = {};

Expand All @@ -43,6 +44,10 @@ MockClock.create = function(opt_startTime) {
return id;
};

me.clearTimeout = function(id) {
cancelledCallbacks[id] = true;
};

me.run = function(opt_duration) {
var endTime = opt_duration + theTime;

Expand All @@ -58,7 +63,9 @@ MockClock.create = function(opt_startTime) {
var item = queue.deq();

theTime = item.time;
item.callback();
if (!cancelledCallbacks[item.id]) {
item.callback();
}

// Wait a bit to allow process.nextTick() and setImmediate to happen.
return wait1ms().thenDo(loop);
Expand Down
6 changes: 3 additions & 3 deletions spec/unit/attempt-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ describe('attempt', function() {
var attempt;
beforeEach(function() {
clock = MockClock.create();
attempt = require('../../lib/internal/attempt')
.inject(clock.setTimeout)
.attempt;
var wait = require('../../lib/internal/wait')
.inject(clock.setTimeout, clock.clearTimeout);
attempt = require('../../lib/internal/attempt').inject(wait).attempt;
});

var equalTo200;
Expand Down
6 changes: 5 additions & 1 deletion spec/unit/index-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ describe('index.js:', function() {
createClient({
makeUrlRequest: requestAndFail,
getTime: clock.getTime,
setTimeout: clock.setTimeout
setTimeout: clock.setTimeout,
clearTimeout: clock.clearTimeout
})
.geocode({
address: 'Sydney Opera House',
Expand All @@ -116,6 +117,7 @@ describe('index.js:', function() {
makeUrlRequest: requestAndFail,
getTime: clock.getTime,
setTimeout: clock.setTimeout,
clearTimeout: clock.clearTimeout,
timeout: 100,
retryOptions: {
interval: 30,
Expand All @@ -139,6 +141,7 @@ describe('index.js:', function() {
makeUrlRequest: requestAndSucceed,
getTime: clock.getTime,
setTimeout: clock.setTimeout,
clearTimeout: clock.clearTimeout,
rate: {limit: 3, period: 30}
});

Expand All @@ -158,6 +161,7 @@ describe('index.js:', function() {
makeUrlRequest: requestAndSucceed,
getTime: clock.getTime,
setTimeout: clock.setTimeout,
clearTimeout: clock.clearTimeout,
rate: {period: 20}
});

Expand Down

0 comments on commit 59e1fca

Please sign in to comment.