Skip to content
This repository has been archived by the owner on Jan 7, 2018. It is now read-only.

Commit

Permalink
Resolve mongodb connection drop issues (I think).
Browse files Browse the repository at this point in the history
By switching the mongo connections from the raw Mongo adapter to
Mongoose, this seems to fix NREL/api-umbrella#17.

Mongoose uses the same underlying mongo driver, so why this fixes it is
a bit confusing. My only theories are:

- It's related to how Mongoose connects and the keepalive setting.
  Mongoose doesn't use the newer MongoClient.connect approach, and on a
  quick glance, there are maybe some differences in how keepalive
  settings get passed with the new MongoClient.connect).
- Or it's Mongoose's query buffering that helps resolve this issue.
  While we don't need most of Mongoose's extra features, its ability to
  buffer queries when in a disconnected state might be what fixes this.

Assuming this ends up working, the retry logic in api_key_validator.js
should be revisited.
  • Loading branch information
GUI committed Dec 8, 2013
1 parent cd00ae6 commit 5c5c12b
Show file tree
Hide file tree
Showing 17 changed files with 394 additions and 117 deletions.
7 changes: 7 additions & 0 deletions config/default.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
apiUmbrella:
mongodb: "mongodb://127.0.0.1:27017/api_umbrella_development"
mongodb_options:
server:
socketOptions:
keepAlive: 500
replset:
socketOptions:
keepAlive: 500
redis:
host: 127.0.0.1
port: 6379
Expand Down
20 changes: 11 additions & 9 deletions lib/config_reloader.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ var _ = require('lodash'),
async = require('async'),
cloneDeep = require('clone'),
config = require('./config'),
ConfigVersion = require('./models/config_version'),
DnsResolver = require('./config_reloader/dns_resolver').DnsResolver,
events = require('events'),
exec = require('child_process').exec,
fs = require('fs'),
handlebars = require('handlebars'),
logger = require('./logger'),
MongoClient = require('mongodb').MongoClient,
mongoConnect = require('./mongo_connect'),
path = require('path'),
traverse = require('traverse'),
util = require('util');
Expand All @@ -37,12 +38,11 @@ _.extend(ConfigReloader.prototype, {
},

connectMongo: function(asyncReadyCallback) {
MongoClient.connect(config.get('mongodb'), this.handleConnectMongo.bind(this, asyncReadyCallback));
mongoConnect(this.handleConnectMongo.bind(this, asyncReadyCallback));
},

handleConnectMongo: function(asyncReadyCallback, error, db) {
handleConnectMongo: function(asyncReadyCallback, error) {
if(!error) {
this.mongo = db;
asyncReadyCallback(null);
} else {
asyncReadyCallback(error);
Expand All @@ -67,9 +67,11 @@ _.extend(ConfigReloader.prototype, {
},

reload: function(options) {
var collection = this.mongo.collection('config_versions');
var cursor = collection.find().sort({ version: -1 }).limit(1);
cursor.toArray(this.handleFetchConfigVersion.bind(this, options));
ConfigVersion
.find()
.sort({ version: -1 })
.limit(1)
.exec(this.handleFetchConfigVersion.bind(this, options));
},

handleFetchConfigVersion: function(options, error, configVersions) {
Expand Down Expand Up @@ -270,8 +272,8 @@ _.extend(ConfigReloader.prototype, {
clearTimeout(this.pollTimeout);
}

if(this.mongo) {
this.mongo.close();
if(mongoose.connection) {
mongoose.connection.close();
}

if(this.resolver) {
Expand Down
87 changes: 54 additions & 33 deletions lib/distributed_rate_limits_sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ var _ = require('lodash'),
config = require('./config'),
events = require('events'),
logger = require('./logger'),
MongoClient = require('mongodb').MongoClient,
mongoConnect = require('./mongo_connect'),
rateLimitModel = require('./models/rate_limit_model'),
redis = require('redis'),
util = require('util');

Expand Down Expand Up @@ -43,20 +44,23 @@ _.extend(DistributedRateLimitsSync.prototype, {
syncEvery: 500,
syncBuffer: 2000,

initialize: function() {
initialize: function(readyCallback) {
if(readyCallback) {
this.once('synced', readyCallback);
}

async.parallel([
this.connectMongo.bind(this),
this.connectRedis.bind(this),
], this.handleConnections.bind(this));
},

connectMongo: function(asyncReadyCallback) {
MongoClient.connect(config.get('mongodb'), this.handleConnectMongo.bind(this, asyncReadyCallback));
mongoConnect(this.handleConnectMongo.bind(this, asyncReadyCallback));
},

handleConnectMongo: function(asyncReadyCallback, error, db) {
handleConnectMongo: function(asyncReadyCallback, error) {
if(!error) {
this.mongo = db;
asyncReadyCallback(null);
} else {
asyncReadyCallback(error);
Expand Down Expand Up @@ -126,11 +130,10 @@ _.extend(DistributedRateLimitsSync.prototype, {
});

this.rateLimits = _.map(rateLimits, function(options) {
var prefix = options.limit_by + ':' + options.duration;
return {
expireAfter: options.duration + options.accuracy + 1000,
redisPrefix: prefix,
mongoCollection: this.mongo.collection('rate_limits_' + prefix.replace(/:/, '_')),
redisPrefix: options.limit_by + ':' + options.duration,
mongoModel: rateLimitModel(options),
};
}.bind(this));
},
Expand All @@ -149,47 +152,61 @@ _.extend(DistributedRateLimitsSync.prototype, {
since = new Date() - 60 * 60 * 1000;
}

rateLimit.mongoCollection.find({
var stream = rateLimit.mongoModel.find({
updated_at: { '$gte': new Date(since) },
}).each(function(error, mongoResult) {
}).stream();

var queue = async.queue(this.processSyncQueue.bind(this, rateLimit), 10);

stream.on('data', function(mongoResult) {
queue.push(mongoResult);
});

stream.on('error', function(error) {
logger.error('Distributed rate limits sync MongoDB result error: ', error);
asyncCallback(error);
});

stream.on('close', function() {
queue.drain = function() {
asyncCallback(null);
};
});
},

processSyncQueue: function(rateLimit, mongoResult, callback) {
this.redis.get(mongoResult._id, function(error, redisCount) {
if(error) {
logger.error('Distributed rate limits sync MongoDB result error: ', error);
asyncCallback(error);
logger.error('Distributed rate limits sync Redis result error: ', error);
callback(error);
return false;
}

if(mongoResult) {
this.redis.get(mongoResult._id, function(error, redisCount) {
if(error) {
logger.error('Distributed rate limits sync Redis result error: ', error);
return false;
}

redisCount = parseInt(redisCount, 10);
redisCount = parseInt(redisCount, 10);

if(!redisCount) {
this.redis.multi()
.set(mongoResult._id, mongoResult.count)
.pexpire(mongoResult._id, rateLimit.expireAfter)
.exec();
if(!redisCount) {
this.redis.multi()
.set(mongoResult._id, mongoResult.count)
.pexpire(mongoResult._id, rateLimit.expireAfter)
.exec(callback);

logger.info('Syncing distributed rate limit: ' + mongoResult._id + ' = ' + mongoResult.count);
} else if(mongoResult.count > redisCount) {
var difference = mongoResult.count - redisCount;
this.redis.incrby(mongoResult._id, difference);
logger.info('Syncing distributed rate limit: ' + mongoResult._id + ' = ' + mongoResult.count);
} else if(mongoResult.count > redisCount) {
var difference = mongoResult.count - redisCount;
this.redis.incrby(mongoResult._id, difference, callback);

logger.info('Syncing distributed rate limit: ' + mongoResult._id + ' += ' + difference);
}
}.bind(this));
logger.info('Syncing distributed rate limit: ' + mongoResult._id + ' += ' + difference);
} else {
asyncCallback(null);
callback(null);
}
}.bind(this));
},

finishedSyncRateLimits: function() {
this.lastSyncTime = new Date();

this.emit('synced');

var syncAgainIn = this.syncEvery;

// If the sync took longer than the syncEvery to complete, go ahead and
Expand All @@ -207,6 +224,10 @@ _.extend(DistributedRateLimitsSync.prototype, {
clearTimeout(this.syncRateLimitsTimeout);
}

if(mongoose.connection) {
mongoose.connection.close();
}

if(callback) {
callback(null);
}
Expand Down
10 changes: 5 additions & 5 deletions lib/gatekeeper/middleware/api_key_validator.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var _ = require('lodash'),
ApiUser = require('../../models/api_user'),
async = require('async'),
config = require('../../config'),
logger = require('../../logger'),
Expand Down Expand Up @@ -34,7 +35,7 @@ _.extend(ApiKeyValidatorRequest.prototype, {
var retriesUser;
async.doWhilst(
function(callback) {
this.validator.users.findOne({ api_key: request.apiUmbrellaGatekeeper.apiKey }, function(error, user) {
ApiUser.findOne({ api_key: request.apiUmbrellaGatekeeper.apiKey }, function(error, user) {
retriesError = error;
if(error) {
retriesCount++;
Expand Down Expand Up @@ -124,8 +125,7 @@ var ApiKeyValidator = function() {
};

_.extend(ApiKeyValidator.prototype, {
initialize: function(proxy) {
this.users = proxy.mongo.collection('api_users');
initialize: function() {
this.apiKeyMethods = config.get('proxy.apiKeyMethods');
},

Expand All @@ -134,8 +134,8 @@ _.extend(ApiKeyValidator.prototype, {
},
});

module.exports = function apiKeyValidator(proxy) {
var middleware = new ApiKeyValidator(proxy);
module.exports = function apiKeyValidator() {
var middleware = new ApiKeyValidator();

return function(request, response, next) {
middleware.handleRequest(request, response, next);
Expand Down
48 changes: 14 additions & 34 deletions lib/gatekeeper/middleware/rate_limit.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
var _ = require('lodash'),
async = require('async'),
logger = require('../../logger'),
rateLimitModel = require('../../models/rate_limit_model'),
utils = require('../utils');

var TimeWindow = function() {
Expand All @@ -18,30 +19,7 @@ _.extend(TimeWindow.prototype, {
this.prefix = options.limit_by + ':' + options.duration;

if(options.distributed) {
this.mongoCollection = proxy.mongo.collection('rate_limits_' + this.prefix.replace(/:/, '_'));

this.mongoCollection.ensureIndex({ updated_at: 1 },
{
background: true,
},
function(error) {
if(error) {
logger.error('MongoDB ensureIndex error: ', error);
return false;
}
});

this.mongoCollection.ensureIndex({ time: 1 },
{
background: true,
expireAfterSeconds: this.expireAfter / 1000
},
function(error) {
if(error) {
logger.error('MongoDB ensureIndex error: ', error);
return false;
}
});
this.mongoModel = rateLimitModel(options);
}
},
});
Expand Down Expand Up @@ -124,16 +102,18 @@ _.extend(RateLimitRequestTimeWindow.prototype, {
},

incrementMongo: function() {
this.timeWindow.mongoCollection.update({
_id: this.getCurrentBucketKey(),
time: this.getCurrentBucketDate()
},
{
'$inc': { count: 1 },
'$set': { updated_at: new Date() },
},
{ upsert: true },
this.handleIncrementMongo);
var conditions = {
_id: this.getCurrentBucketKey(),
time: this.getCurrentBucketDate()
};

var update = {
'$inc': { count: 1 },
'$set': { updated_at: new Date() },
};

var options = { upsert: true };
this.timeWindow.mongoModel.update(conditions, update, options, this.handleIncrementMongo);
},

handleIncrementMongo: function(error) {
Expand Down
11 changes: 5 additions & 6 deletions lib/gatekeeper/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ var _ = require('lodash'),
httpProxy = require('http-proxy'),
logger = require('../logger'),
middleware = require('./middleware'),
MongoClient = require('mongodb').MongoClient,
mongoConnect = require('../mongo_connect'),
ProxyLogger = require('./logger').Logger,
redis = require('redis'),
util = require('util');
Expand All @@ -32,12 +32,11 @@ _.extend(Worker.prototype, {
},

connectMongo: function(asyncReadyCallback) {
MongoClient.connect(config.get('mongodb'), this.handleConnectMongo.bind(this, asyncReadyCallback));
mongoConnect(this.handleConnectMongo.bind(this, asyncReadyCallback));
},

handleConnectMongo: function(asyncReadyCallback, error, db) {
handleConnectMongo: function(asyncReadyCallback, error) {
if(!error) {
this.mongo = db;
asyncReadyCallback(null);
} else {
asyncReadyCallback(error);
Expand Down Expand Up @@ -180,8 +179,8 @@ _.extend(Worker.prototype, {
this.redis.quit();
}

if(this.mongo) {
this.mongo.close();
if(mongoose.connection) {
mongoose.connection.close();
}

if(this.server) {
Expand Down
19 changes: 19 additions & 0 deletions lib/models/api_user.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict';

var mongoose = require('mongoose');

module.exports = mongoose.model('ApiUser', new mongoose.Schema({
_id: mongoose.Schema.Types.Mixed,
api_key: {
type: String,
index: { unique: true },
},
first_name: String,
last_name: String,
email: String,
website: String,
throttle_by_ip: Boolean,
disabled_at: Date,
roles: [String],
settings: mongoose.Schema.Types.Mixed,
}, { collection: 'api_users' }));
11 changes: 11 additions & 0 deletions lib/models/config_version.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
'use strict';

var mongoose = require('mongoose');

module.exports = mongoose.model('ConfigVersion', new mongoose.Schema({
version: {
type: Date,
unique: true,
},
config: mongoose.Schema.Types.Mixed,
}, { collection: 'config_versions' }));
Loading

0 comments on commit 5c5c12b

Please sign in to comment.