Permalink
Browse files

Added throttling of Overpass API requests (partially implements #39)

  • Loading branch information...
1 parent 6238767 commit 309a931307d8863bd111b7a30ae013fe4da3ea14 @robhawkes robhawkes committed Feb 25, 2014
Showing with 331 additions and 4 deletions.
  1. +1 −0 Gruntfile.js
  2. +1 −0 README.md
  3. +1 −1 examples/index.html
  4. +10 −3 src/client/data/DataOverpass.js
  5. +318 −0 src/shared/vendor/throat.js
View
1 Gruntfile.js
@@ -31,6 +31,7 @@ module.exports = function(grunt) {
'src/shared/vendor/fpsmeter.js',
'src/shared/vendor/moment.js',
'src/shared/vendor/simplify.js',
+ 'src/shared/vendor/throat.js',
'src/client/Vizi.js',
'src/client/Log.js',
'src/client/Mediator.js',
View
1 README.md
@@ -155,6 +155,7 @@ Communicate with the ViziCities team via email ([hello@vizicities.com](mailto:he
* [D3.js](http://d3js.org) – Geographic coordinate conversion
* [Underscore.js](http://underscorejs.org) – General helpers
* [Q](https://github.com/kriskowal/q) – Promises
+* [Throat](https://github.com/ForbesLindesay/throat) - Limiting concurrency
* [Catiline](http://catilinejs.com) – Web Workers
* [Dat.gui](https://code.google.com/p/dat-gui) – Debug control panel
* [FPSMeter](http://darsa.in/fpsmeter) – FPS meter
View
2 examples/index.html
@@ -26,7 +26,7 @@
<script>
// Debug output
- VIZI.DEBUG = false;
+ VIZI.DEBUG = true;
var city = new VIZI.City();
city.init({
View
13 src/client/data/DataOverpass.js
@@ -1,4 +1,4 @@
-/* globals window, _, VIZI, Q, d3, simplify */
+/* globals window, _, VIZI, Q, d3, simplify, throat */
(function() {
"use strict";
@@ -98,11 +98,18 @@
var tileBoundsLonLat = self.grid.getBoundsLonLat(tileBounds);
var cacheKey = tileCoords[0] + ":" + tileCoords[1];
- promiseQueue.push(self.load(url, tileBoundsLonLat, cacheKey));
+
+ // TODO: Handle load promise without actually running the function
+ // - At the moment, the load function is run in at this point
+ promiseQueue.push([self.load, [url, tileBoundsLonLat, cacheKey]]);
}
}
- Q.all(promiseQueue).done(function() {
+ // Use throat to limit simultaneous Overpass requests
+ // Without limitation the Overpass API will rate-limit
+ Q.all(promiseQueue.map(throat(2, function(promiseFunc) {
+ return promiseFunc[0].apply(self, promiseFunc[1]);
+ }))).done(function() {
deferred.resolve();
}, function(error) {
deferred.reject(error);
View
318 src/shared/vendor/throat.js
@@ -0,0 +1,318 @@
+!function(e){if("object"==typeof exports)module.exports=e();else if("function"==typeof define&&define.amd)define(e);else{var f;"undefined"!=typeof window?f=window:"undefined"!=typeof global?f=global:"undefined"!=typeof self&&(f=self),f.throat=e()}}(function(){var define,module,exports;return (function e(t,n,r){function s(o,u){if(!n[o]){if(!t[o]){var a=typeof require=="function"&&require;if(!u&&a)return a(o,!0);if(i)return i(o,!0);throw new Error("Cannot find module '"+o+"'")}var f=n[o]={exports:{}};t[o][0].call(f.exports,function(e){var n=t[o][1][e];return s(n?n:e)},f,f.exports,e,t,n,r)}return n[o].exports}var i=typeof require=="function"&&require;for(var o=0;o<r.length;o++)s(r[o]);return s})({1:[function(_dereq_,module,exports){
+'use strict'
+
+var Promise = _dereq_('promise')
+
+module.exports = throat
+function throat(size, fn) {
+ var queue = []
+ function run(fn, self, args) {
+ if (size) {
+ size--
+ var result = new Promise(function (resolve) {
+ resolve(fn.apply(self, args))
+ })
+ result.done(release, release)
+ return result
+ } else {
+ return new Promise(function (resolve) {
+ queue.push(new Delayed(resolve, fn, self, args))
+ })
+ }
+ }
+ function release() {
+ size++
+ if (queue.length) {
+ var next = queue.shift()
+ next.resolve(run(next.fn, next.self, next.args))
+ }
+ }
+ if (typeof fn === 'function') {
+ return function () {
+ var args = arguments
+ return run(fn, this, arguments)
+ }
+ } else {
+ return function (fn) {
+ return run(fn, this, Array.prototype.slice.call(arguments, 1))
+ }
+ }
+}
+
+function Delayed(resolve, fn, self, args) {
+ this.resolve = resolve
+ this.fn = fn
+ this.self = self || null
+ this.args = args || null
+}
+},{"promise":3}],2:[function(_dereq_,module,exports){
+'use strict'
+
+var nextTick = _dereq_('./lib/next-tick')
+
+module.exports = Promise
+function Promise(fn) {
+ if (!(this instanceof Promise)) return new Promise(fn)
+ if (typeof fn !== 'function') throw new TypeError('not a function')
+ var state = null
+ var delegating = false
+ var value = null
+ var deferreds = []
+ var self = this
+
+ this.then = function(onFulfilled, onRejected) {
+ return new Promise(function(resolve, reject) {
+ handle(new Handler(onFulfilled, onRejected, resolve, reject))
+ })
+ }
+
+ function handle(deferred) {
+ if (state === null) {
+ deferreds.push(deferred)
+ return
+ }
+ nextTick(function() {
+ var cb = state ? deferred.onFulfilled : deferred.onRejected
+ if (cb === null) {
+ (state ? deferred.resolve : deferred.reject)(value)
+ return
+ }
+ var ret
+ try {
+ ret = cb(value)
+ }
+ catch (e) {
+ deferred.reject(e)
+ return
+ }
+ deferred.resolve(ret)
+ })
+ }
+
+ function resolve(newValue) {
+ if (delegating)
+ return
+ resolve_(newValue)
+ }
+
+ function resolve_(newValue) {
+ if (state !== null)
+ return
+ try { //Promise Resolution Procedure: https://github.com/promises-aplus/promises-spec#the-promise-resolution-procedure
+ if (newValue === self) throw new TypeError('A promise cannot be resolved with itself.')
+ if (newValue && (typeof newValue === 'object' || typeof newValue === 'function')) {
+ var then = newValue.then
+ if (typeof then === 'function') {
+ delegating = true
+ then.call(newValue, resolve_, reject_)
+ return
+ }
+ }
+ state = true
+ value = newValue
+ finale()
+ } catch (e) { reject_(e) }
+ }
+
+ function reject(newValue) {
+ if (delegating)
+ return
+ reject_(newValue)
+ }
+
+ function reject_(newValue) {
+ if (state !== null)
+ return
+ state = false
+ value = newValue
+ finale()
+ }
+
+ function finale() {
+ for (var i = 0, len = deferreds.length; i < len; i++)
+ handle(deferreds[i])
+ deferreds = null
+ }
+
+ try { fn(resolve, reject) }
+ catch(e) { reject(e) }
+}
+
+
+function Handler(onFulfilled, onRejected, resolve, reject){
+ this.onFulfilled = typeof onFulfilled === 'function' ? onFulfilled : null
+ this.onRejected = typeof onRejected === 'function' ? onRejected : null
+ this.resolve = resolve
+ this.reject = reject
+}
+
+},{"./lib/next-tick":4}],3:[function(_dereq_,module,exports){
+'use strict'
+
+//This file contains then/promise specific extensions to the core promise API
+
+var Promise = _dereq_('./core.js')
+var nextTick = _dereq_('./lib/next-tick')
+
+module.exports = Promise
+
+/* Static Functions */
+
+Promise.from = function (value) {
+ if (value instanceof Promise) return value
+ return new Promise(function (resolve) { resolve(value) })
+}
+Promise.denodeify = function (fn) {
+ return function () {
+ var self = this
+ var args = Array.prototype.slice.call(arguments)
+ return new Promise(function (resolve, reject) {
+ args.push(function (err, res) {
+ if (err) reject(err)
+ else resolve(res)
+ })
+ fn.apply(self, args)
+ })
+ }
+}
+Promise.nodeify = function (fn) {
+ return function () {
+ var args = Array.prototype.slice.call(arguments)
+ var callback = typeof args[args.length - 1] === 'function' ? args.pop() : null
+ try {
+ return fn.apply(this, arguments).nodeify(callback)
+ } catch (ex) {
+ if (callback == null) {
+ return new Promise(function (resolve, reject) { reject(ex) })
+ } else {
+ nextTick(function () {
+ callback(ex)
+ })
+ }
+ }
+ }
+}
+
+Promise.all = function () {
+ var args = Array.prototype.slice.call(arguments.length === 1 && Array.isArray(arguments[0]) ? arguments[0] : arguments)
+
+ return new Promise(function (resolve, reject) {
+ if (args.length === 0) return resolve([])
+ var remaining = args.length
+ function res(i, val) {
+ try {
+ if (val && (typeof val === 'object' || typeof val === 'function')) {
+ var then = val.then
+ if (typeof then === 'function') {
+ then.call(val, function (val) { res(i, val) }, reject)
+ return
+ }
+ }
+ args[i] = val
+ if (--remaining === 0) {
+ resolve(args);
+ }
+ } catch (ex) {
+ reject(ex)
+ }
+ }
+ for (var i = 0; i < args.length; i++) {
+ res(i, args[i])
+ }
+ })
+}
+
+/* Prototype Methods */
+
+Promise.prototype.done = function (onFulfilled, onRejected) {
+ var self = arguments.length ? this.then.apply(this, arguments) : this
+ self.then(null, function (err) {
+ nextTick(function () {
+ throw err
+ })
+ })
+}
+Promise.prototype.nodeify = function (callback) {
+ if (callback == null) return this
+
+ this.then(function (value) {
+ nextTick(function () {
+ callback(null, value)
+ })
+ }, function (err) {
+ nextTick(function () {
+ callback(err)
+ })
+ })
+}
+},{"./core.js":2,"./lib/next-tick":4}],4:[function(_dereq_,module,exports){
+(function (process){
+'use strict'
+
+if (typeof setImmediate === 'function') { // IE >= 10 & node.js >= 0.10
+ module.exports = function(fn){ setImmediate(fn) }
+} else if (typeof process !== 'undefined' && process && typeof process.nextTick === 'function') { // node.js before 0.10
+ module.exports = function(fn){ process.nextTick(fn) }
+} else {
+ module.exports = function(fn){ setTimeout(fn, 0) }
+}
+
+}).call(this,_dereq_("/usr/local/lib/node_modules/browserify/node_modules/insert-module-globals/node_modules/process/browser.js"))
+},{"/usr/local/lib/node_modules/browserify/node_modules/insert-module-globals/node_modules/process/browser.js":5}],5:[function(_dereq_,module,exports){
+// shim for using process in browser
+
+var process = module.exports = {};
+
+process.nextTick = (function () {
+ var canSetImmediate = typeof window !== 'undefined'
+ && window.setImmediate;
+ var canPost = typeof window !== 'undefined'
+ && window.postMessage && window.addEventListener
+ ;
+
+ if (canSetImmediate) {
+ return function (f) { return window.setImmediate(f) };
+ }
+
+ if (canPost) {
+ var queue = [];
+ window.addEventListener('message', function (ev) {
+ var source = ev.source;
+ if ((source === window || source === null) && ev.data === 'process-tick') {
+ ev.stopPropagation();
+ if (queue.length > 0) {
+ var fn = queue.shift();
+ fn();
+ }
+ }
+ }, true);
+
+ return function nextTick(fn) {
+ queue.push(fn);
+ window.postMessage('process-tick', '*');
+ };
+ }
+
+ return function nextTick(fn) {
+ setTimeout(fn, 0);
+ };
+})();
+
+process.title = 'browser';
+process.browser = true;
+process.env = {};
+process.argv = [];
+
+process.binding = function (name) {
+ throw new Error('process.binding is not supported');
+}
+
+// TODO(shtylman)
+process.cwd = function () { return '/' };
+process.chdir = function (dir) {
+ throw new Error('process.chdir is not supported');
+};
+
+},{}]},{},[1])
+(1)
+});

0 comments on commit 309a931

Please sign in to comment.