Skip to content

Commit

Permalink
feat: async/await redis connection
Browse files Browse the repository at this point in the history
  • Loading branch information
barisusakli committed Jan 23, 2021
1 parent 33bf1b0 commit fdfbc90
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 137 deletions.
138 changes: 51 additions & 87 deletions src/database/redis.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
'use strict';

const async = require('async');
const winston = require('winston');
const nconf = require('nconf');
const semver = require('semver');
const util = require('util');
const session = require('express-session');

const connection = require('./redis/connection');
Expand Down Expand Up @@ -36,112 +35,77 @@ redisModule.questions = [
];


redisModule.init = function (callback) {
callback = callback || function () { };
redisModule.client = connection.connect(nconf.get('redis'), function (err) {
if (err) {
winston.error('NodeBB could not connect to your Redis database. Redis returned the following error\n' + err.stack);
return callback(err);
}
require('./redis/promisify')(redisModule.client);

callback();
});
redisModule.init = async function () {
redisModule.client = await connection.connect(nconf.get('redis'));
require('./redis/promisify')(redisModule.client);
};

redisModule.createSessionStore = function (options, callback) {
redisModule.createSessionStore = async function (options) {
const meta = require('../meta');
const sessionStore = require('connect-redis')(session);
const client = connection.connect(options);
const client = await connection.connect(options);
const store = new sessionStore({
client: client,
ttl: meta.getSessionTTLSeconds(),
});

if (typeof callback === 'function') {
callback(null, store);
}
};

redisModule.createIndices = function (callback) {
setImmediate(callback);
return store;
};

redisModule.checkCompatibility = function (callback) {
async.waterfall([
function (next) {
redisModule.info(redisModule.client, next);
},
function (info, next) {
redisModule.checkCompatibilityVersion(info.redis_version, next);
},
], callback);
redisModule.checkCompatibility = async function () {
const info = await redisModule.info(redisModule.client);
redisModule.checkCompatibilityVersion(info.redis_version);
};

redisModule.checkCompatibilityVersion = function (version, callback) {
redisModule.checkCompatibilityVersion = function (version) {
if (semver.lt(version, '2.8.9')) {
return callback(new Error('Your Redis version is not new enough to support NodeBB, please upgrade Redis to v2.8.9 or higher.'));
throw new Error('Your Redis version is not new enough to support NodeBB, please upgrade Redis to v2.8.9 or higher.');
}
callback();
};

redisModule.close = function (callback) {
callback = callback || function () {};
redisModule.client.quit(function (err) {
callback(err);
});
redisModule.close = async function () {
await redisModule.client.async.quit();
};

redisModule.info = function (cxn, callback) {
async.waterfall([
function (next) {
if (cxn) {
return setImmediate(next, null, cxn);
}
connection.connect(nconf.get('redis'), next);
},
function (cxn, next) {
redisModule.client = redisModule.client || cxn;

cxn.info(next);
},
function (data, next) {
var lines = data.toString().split('\r\n').sort();
var redisData = {};
lines.forEach(function (line) {
var parts = line.split(':');
if (parts[1]) {
redisData[parts[0]] = parts[1];
}
});

const keyInfo = redisData['db' + nconf.get('redis:database')];
if (keyInfo) {
const split = keyInfo.split(',');
redisData.keys = (split[0] || '').replace('keys=', '');
redisData.expires = (split[1] || '').replace('expires=', '');
redisData.avg_ttl = (split[2] || '').replace('avg_ttl=', '');
}

redisData.instantaneous_input = (redisData.instantaneous_input_kbps / 1024).toFixed(3);
redisData.instantaneous_output = (redisData.instantaneous_output_kbps / 1024).toFixed(3);

redisData.total_net_input = (redisData.total_net_input_bytes / (1024 * 1024 * 1024)).toFixed(3);
redisData.total_net_output = (redisData.total_net_output_bytes / (1024 * 1024 * 1024)).toFixed(3);

redisData.used_memory_human = (redisData.used_memory / (1024 * 1024 * 1024)).toFixed(3);
redisData.raw = JSON.stringify(redisData, null, 4);
redisData.redis = true;

next(null, redisData);
},
], callback);
redisModule.info = async function (cxn) {
if (!cxn) {
cxn = await connection.connect(nconf.get('redis'));
}
redisModule.client = redisModule.client || cxn;
const infoAsync = util.promisify(cb => cxn.info(cb));
const data = await infoAsync();
const lines = data.toString().split('\r\n').sort();
const redisData = {};
lines.forEach(function (line) {
const parts = line.split(':');
if (parts[1]) {
redisData[parts[0]] = parts[1];
}
});

const keyInfo = redisData['db' + nconf.get('redis:database')];
if (keyInfo) {
const split = keyInfo.split(',');
redisData.keys = (split[0] || '').replace('keys=', '');
redisData.expires = (split[1] || '').replace('expires=', '');
redisData.avg_ttl = (split[2] || '').replace('avg_ttl=', '');
}

redisData.instantaneous_input = (redisData.instantaneous_input_kbps / 1024).toFixed(3);
redisData.instantaneous_output = (redisData.instantaneous_output_kbps / 1024).toFixed(3);

redisData.total_net_input = (redisData.total_net_input_bytes / (1024 * 1024 * 1024)).toFixed(3);
redisData.total_net_output = (redisData.total_net_output_bytes / (1024 * 1024 * 1024)).toFixed(3);

redisData.used_memory_human = (redisData.used_memory / (1024 * 1024 * 1024)).toFixed(3);
redisData.raw = JSON.stringify(redisData, null, 4);
redisData.redis = true;
return redisData;
};

redisModule.socketAdapter = function () {
var redisAdapter = require('socket.io-redis');
var pub = connection.connect(nconf.get('redis'));
var sub = connection.connect(nconf.get('redis'));
const redisAdapter = require('socket.io-redis');
const pub = connection.connect(nconf.get('redis'));
const sub = connection.connect(nconf.get('redis'));
return redisAdapter({
key: 'db:' + nconf.get('redis:database') + ':adapter_key',
pubClient: pub,
Expand Down
81 changes: 33 additions & 48 deletions src/database/redis/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,48 @@ const connection = module.exports;

connection.getConnectionOptions = function (redis) {
redis = redis || nconf.get('redis');
let connOptions = {};
const connOptions = {};
if (redis.password) {
connOptions.auth_pass = redis.password;
}

connOptions = _.merge(connOptions, redis.options || {});
return connOptions;
};

connection.connect = function (options, callback) {
callback = callback || function () {};
options = options || nconf.get('redis');
var redis_socket_or_host = options.host;
var cxn;
var callbackCalled = false;

const connOptions = connection.getConnectionOptions(options);

if (redis_socket_or_host && redis_socket_or_host.indexOf('/') >= 0) {
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
cxn = redis.createClient(options.host, connOptions);
} else {
/* Else, connect over tcp/ip */
cxn = redis.createClient(options.port, options.host, connOptions);
if (redis.hasOwnProperty('database')) {
connOptions.db = redis.database;
}
return _.merge(connOptions, redis.options || {});
};

cxn.on('error', function (err) {
winston.error(err.stack);
if (!callbackCalled) {
callbackCalled = true;
callback(err);
connection.connect = async function (options) {
return new Promise(function (resolve, reject) {
options = options || nconf.get('redis');
const redis_socket_or_host = options.host;
const connOptions = connection.getConnectionOptions(options);

let cxn;
if (redis_socket_or_host && String(redis_socket_or_host).indexOf('/') >= 0) {
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
cxn = redis.createClient(options.host, connOptions);
} else {
/* Else, connect over tcp/ip */
cxn = redis.createClient(options.port, options.host, connOptions);
}
});

cxn.on('ready', function () {
if (!callbackCalled) {
callbackCalled = true;
callback(null, cxn);
const dbIdx = parseInt(options.database, 10);
if (!(dbIdx >= 0)) {
throw new Error('[[error:no-database-selected]]');
}
});

if (options.password) {
cxn.auth(options.password);
}

var dbIdx = parseInt(options.database, 10);
if (dbIdx >= 0) {
cxn.select(dbIdx, function (err) {
if (err) {
winston.error('NodeBB could not select Redis database. Redis returned the following error\n' + err.stack);
throw err;
}
cxn.on('error', function (err) {
winston.error(err.stack);
reject(err);
});
cxn.on('ready', function () {
resolve(cxn);
});
} else {
callbackCalled = true;
return callback(new Error('[[error:no-database-selected]]'));
}

return cxn;
if (options.password) {
cxn.auth(options.password);
}
});
};

require('../../promisify')(connection);
1 change: 1 addition & 0 deletions src/database/redis/promisify.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const util = require('util');

module.exports = function (redisClient) {
redisClient.async = {
quit: util.promisify(redisClient.quit).bind(redisClient),
send_command: util.promisify(redisClient.send_command).bind(redisClient),

exists: util.promisify(redisClient.exists).bind(redisClient),
Expand Down
4 changes: 3 additions & 1 deletion src/install.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ async function completeConfigSetup(config) {
nconf.overrides(config);
const db = require('./database');
await db.init();
await db.createIndices();
if (db.hasOwnProperty('createIndices')) {
await db.createIndices();
}

// Sanity-check/fix url/port
if (!/^http(?:s)?:\/\//.test(config.url)) {
Expand Down
4 changes: 3 additions & 1 deletion test/mocks/databasemock.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ before(async function () {


await db.init();
await db.createIndices();
if (db.hasOwnProperty('createIndices')) {
await db.createIndices();
}
await setupMockDefaults();
await db.initSessionStore();

Expand Down

0 comments on commit fdfbc90

Please sign in to comment.