Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions api/src/routes/tests/personaController/mergePersona-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ describe('PersonaController.mergePersona', () => {
expectedCode
}) => new Promise((resolve, reject) => {
apiApp
.post(routes.MERGE_PERSONA)
.set('Authorization', `Bearer ${token}`)
.query({ mergePersonaFromId, mergePersonaToId })
.expect(expectedCode)
.end((err, res) => {
if (err) return reject(err);
resolve(res);
});
.post(routes.MERGE_PERSONA)
.set('Authorization', `Bearer ${token}`)
.query({ mergePersonaFromId, mergePersonaToId })
.expect(expectedCode)
.end((err, res) => {
if (err) return reject(err);
resolve(res);
});
});

const assertFailedMerge = async ({
Expand All @@ -70,7 +70,7 @@ describe('PersonaController.mergePersona', () => {
};

it('should merge two people', async () => {
const ifi1 = { key: 'mbox', value: 'mailto:A@test.com' };
const ifi1 = { key: 'mbox', value: 'mailto:A1@test.com' };
const ifi2 = { key: 'mbox', value: 'mailto:A2@test.com' };
const organisation = testId;
const { personaId: personaAId } = await personaService.createUpdateIdentifierPersona({
Expand Down
44 changes: 26 additions & 18 deletions lib/connections/redis.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,39 @@
import redis from 'redis';
import Redis from 'ioredis';
import logger from 'lib/logger';
import Promise from 'bluebird';
import defaultTo from 'lodash/defaultTo';

Promise.promisifyAll(redis.RedisClient.prototype);
Promise.promisifyAll(redis.Multi.prototype);
const DEFAULT_REDIS_PORT = 6379;

export const getOptions = () => {
const options = {};

// these options should only be set in the config if present (even defaults shouldnt be provided)
if (process.env.REDIS_PASSWORD) options.auth_pass = process.env.REDIS_PASSWORD;
if (process.env.REDIS_DB) options.db = process.env.REDIS_DB;

return options;
const eventsRepo = defaultTo(process.env.EVENTS_REPO, 'redis');
switch (eventsRepo) {
case 'sentinel': {
const db = defaultTo(Number(process.env.SENTINEL_DB), 0);
const password = process.env.SENTINEL_PASSWORD;
const name = defaultTo(process.env.SENTINEL_NAME, 'mymaster');
const connections = defaultTo(process.env.SENTINEL_CONNECTIONS, '127.0.0.1:6379');
const sentinels = connections.split(' ').map((conn) => {
const [host, port] = conn.split(':');
return { host, port: defaultTo(Number(port), DEFAULT_REDIS_PORT) };
});
return { db, password, name, sentinels };
}
default: case 'redis': {
const db = defaultTo(Number(process.env.REDIS_DB), 0);
const password = process.env.REDIS_PASSWORD;
const host = process.env.REDIS_HOST;
const port = defaultTo(Number(process.env.REDIS_PORT), DEFAULT_REDIS_PORT);
return { db, password, host, port };
}
}
};

export const createClient = () => {
let client;
try {
const host = process.env.REDIS_HOST;
const port = process.env.REDIS_PORT || 6379;
const url = `redis://${host}:${port}`;
const options = getOptions();
client = redis.createClient(url, options);
logger.info('Creating Redis client');
return new Redis(options);
} catch (e) {
logger.error("Couldn't connect to redis", e);
}

return client;
};
2 changes: 1 addition & 1 deletion lib/services/importPersonas/importPersonas-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ describe('import personas service', () => {
data =>
data
).collect()
.toPromise(Promise);
.toPromise(Promise);

await downloadToStream(
csvHandle
Expand Down
34 changes: 15 additions & 19 deletions lib/services/queue/bull/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import { isString, map, forEach } from 'lodash';
import * as redis from 'lib/connections/redis';
import { Promise } from 'bluebird';

const redisOpts = redis.getOptions();

const Bull = {
queues: {}
};
Expand All @@ -15,29 +13,27 @@ const logError = queueName => (error) => {
};

const removeJob = (job) => {
logger.debug(`REMOVING JOB ${job.jobId}`);
logger.debug(`REMOVING JOB ${job.id}`);
job.remove();
};

export const getQueue = (queueName, done) => {
if (!Bull.queues[queueName]) {
Bull.queues[queueName] = new Queue(
queueName,
process.env.REDIS_PORT,
process.env.REDIS_HOST,
redisOpts
{ createClient: redis.createClient }
)
.on('error', logError(queueName))
.on('completed', (job) => {
logger.debug(`COMPLETED JOB ${job.jobId}`);
removeJob(job);
})
.on('failed', (job, err) => {
const queue = err.queue || {};
const failedQueueName = queue.name || 'No queue';
logger.debug(`JOB ${job.jobId} FAILED`, failedQueueName);
removeJob(job);
});
.on('error', logError(queueName))
.on('completed', (job) => {
logger.debug(`COMPLETED JOB ${job.id}`);
removeJob(job);
})
.on('failed', (job, err) => {
const queue = err.queue || {};
const failedQueueName = queue.name || 'No queue';
logger.debug(`JOB ${job.id} FAILED`, failedQueueName);
removeJob(job);
});
}

return done(null, Bull.queues[queueName]);
Expand Down Expand Up @@ -81,9 +77,9 @@ const sendDeadLetter = ({ queueName, deadLetter }) => async (data) => {
export const subscribe = ({
queueName,
handler,
onProccessed = () => {},
onProccessed = () => { },
deadLetter
}, done) => {
}, done) => {
getQueue(queueName, (err, queue) => {
if (err) return done(err);

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
"body-parser": "^1.14.1",
"boolean": "^0.1.0",
"btoa": "^1.1.2",
"bull": "2.*",
"bull": "^3.3.10",
"chai-fs": "1.0.*",
"chai-immutable": "^1.6.0",
"clamscan": "^0.8.4",
Expand Down Expand Up @@ -110,6 +110,7 @@
"immutability-helper": "^2.0.0",
"immutable": "^3.8.1",
"invariant": "^2.2.0",
"ioredis": "^3.2.2",
"isomorphic-style-loader": "^2.0.0",
"js-cookie": "^2.1.3",
"jsonwebtoken": "^7.3.0",
Expand Down
Loading