Skip to content

Commit

Permalink
migrate redis client to utils
Browse files Browse the repository at this point in the history
  • Loading branch information
rjawesome committed Feb 6, 2024
1 parent 4f69547 commit 061360f
Show file tree
Hide file tree
Showing 4 changed files with 370 additions and 1 deletion.
57 changes: 57 additions & 0 deletions __test__/unittest/redisClient.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { redisClient as redisClientType } from '../../src/redis-client';
import RedisMock from 'ioredis-mock';

describe('Test redis client', () => {
const OLD_ENV = process.env;

beforeEach(() => {
jest.resetModules(); // Most important - it clears the cache
jest.clearAllMocks();
jest.mock('ioredis', () => RedisMock);
process.env = { ...OLD_ENV }; // Make a copy
});

afterAll(() => {
process.env = OLD_ENV; // Restore old environment
});
test('Client reports not enabled when REDIS_HOST is not set', () => {
const { redisClient } = require('../../src/redis-client');
expect(redisClient.clientEnabled).toEqual(false);
});

test('will receive process.env variables', () => {
// Set the variables
process.env.REDIS_HOST = 'localhost';
process.env.REDIS_PORT = '3367';
const { redisClient } = require('../../src/redis-client');
expect(redisClient).not.toEqual({});
});

test('Test if record is correctly stored', async () => {
process.env.REDIS_HOST = 'localhost';
process.env.REDIS_PORT = '3367';
const { redisClient } = require('../../src/redis-client');
await redisClient.client.setTimeout('record1', 'hello');
const res = await redisClient.client.getTimeout('record1');
expect(res).toEqual('hello');
});

test('Test if record is correctly stored', async () => {
process.env.REDIS_HOST = 'localhost';
process.env.REDIS_PORT = '3367';
const { redisClient } = require('../../src/redis-client');
await redisClient.client.setTimeout('record1', 'hello');
const res = await redisClient.client.getTimeout('record1');
expect(res).toEqual('hello');
});

test('Test key should be removed after ttl', async () => {
process.env.REDIS_HOST = 'localhost';
process.env.REDIS_PORT = '3367';
const { redisClient } = require('../../src/redis-client');
await redisClient.client.setTimeout('record1', 'hello', 'EX', 2);
await new Promise((r) => setTimeout(r, 3000));
const res = await redisClient.client.getTimeout('record1');
expect(res).not.toBeNull;
});
});
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
"@opentelemetry/api": "^1.7.0",
"@sentry/node": "^7.74.1",
"debug": "^4.3.4",
"lodash": "^4.17.21"
"lodash": "^4.17.21",
"ioredis": "^5.3.2",
"redlock": "5.0.0-beta.2"
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from "./types";
export * from "./log_entry";
export * from "./telemetry";
export * from "./misc";
export * from './redis-client';
309 changes: 309 additions & 0 deletions src/redis-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
import Redis, { Callback, Cluster, RedisKey, ScanStream } from 'ioredis';
import Debug from 'debug';
const debug = Debug('bte:biothings-explorer-trapi:redis-client');
import Redlock, { RedlockAbortSignal } from 'redlock';

const prefix = `{BTEHashSlotPrefix}:`;

type AsyncFunction = (...args: unknown[]) => Promise<unknown>;
type DropFirst<T extends unknown[]> = T extends [unknown, ...infer U] ? U : never;
type Awaited<T> = T extends PromiseLike<infer U> ? U : T;

function timeoutFunc<F extends AsyncFunction>(func: F, timeoutms = 0) {
return (...args: Parameters<F>): ReturnType<F> => {
return new Promise((resolve, reject) => {
const timeout = timeoutms ? timeoutms : parseInt(process.env.REDIS_TIMEOUT || '60000');
let done = false;
setTimeout(() => {
if (!done) {
reject(new Error(`redis call timed out, args: ${JSON.stringify([...args])}`));
}
}, timeout);
func(...args).then((returnValue: ReturnType<F>) => {
done = true;
resolve(returnValue);
});
}) as ReturnType<F>;
};
}

/**
* Decorate a function such that the first argument is given the module-defined prefix
*/
function addPrefix<F extends AsyncFunction>(func: F) {
return (arg0: Parameters<F>[0], ...args: DropFirst<Parameters<F>>): ReturnType<F> => {
if (arg0 && (arg0 as string).length > 0) {
arg0 = `${prefix}${arg0 as string}`;
}
return func(arg0, ...args) as ReturnType<F>;
};
}

/**
* Decorate a function such that each argument is given the module-defined prefix
*/
function addPrefixToAll<F extends AsyncFunction>(func: F) {
return (...args: Parameters<F>): ReturnType<F> => {
return func(...args.map((arg) => `${prefix}${arg}`)) as ReturnType<F>;
};
}

/**
* Decorate a Redlock function such that the locks are given the module-defined prefix
*/
function lockPrefix<F extends AsyncFunction>(func: F) {
return async (locks: Parameters<F>[0], ...args: DropFirst<Parameters<F>>): Promise<Awaited<ReturnType<F>>> => {
return (await func(
(locks as string[]).map((lockName: string) => `${prefix}${lockName}`),
...args,
)) as Awaited<ReturnType<F>>;
};
}

interface RedisClientInterface {
clearEdgeCache: () => void;
getTimeout: (key: RedisKey) => Promise<string>;
setTimeout: (key: RedisKey, value: string | number | Buffer) => Promise<'OK'>;
hsetTimeout: (...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) => Promise<number>;
hgetallTimeout: (key: RedisKey) => Promise<Record<string, string>>;
expireTimeout: (key: RedisKey, seconds: string | number) => Promise<number>;
delTimeout: (key: RedisKey | RedisKey[]) => Promise<number>;
usingLock: (
resources: string[],
duration: number,
routine?: (signal: RedlockAbortSignal) => Promise<unknown>,
) => Promise<unknown>;
incrTimeout: (key: string) => Promise<number>;
decrTimeout: (key: string) => Promise<number>;
existsTimeout: (...args: RedisKey[]) => Promise<number>;
pingTimeout: () => Promise<'PONG'>;
}

function addClientFuncs(client: Redis | Cluster, redlock: Redlock): RedisClientInterface {
function decorate<F extends AsyncFunction>(func: F, timeoutms?: number): (...args: Parameters<F>) => ReturnType<F> {
let wrapped = timeoutFunc(func, timeoutms);
if (client instanceof Cluster) {
// Dirty way to cast the function so that typescript doesn't complain
// But given the extremely limited use-case of this function, it's fine for now
wrapped = addPrefix(wrapped) as unknown as (...args: Parameters<F>) => ReturnType<F>;
}

return wrapped;
}
return {
clearEdgeCache: () => null,
getTimeout: decorate((key: RedisKey) => client.get(key)),
setTimeout: decorate((key: RedisKey, value: string | number | Buffer) => client.set(key, value)),
hsetTimeout: decorate((...args: [key: RedisKey, ...fieldValues: (string | Buffer | number)[]]) =>
client.hset(...args),
),
hgetallTimeout: decorate((key: RedisKey) => client.hgetall(key)),

expireTimeout: decorate((key: RedisKey, seconds: string | number) => client.expire(key, seconds)),

delTimeout:
client instanceof Cluster
? addPrefixToAll(timeoutFunc((...args: RedisKey[]) => client.del(...args)))
: timeoutFunc((...args: RedisKey[]) => client.del(...args)),
usingLock: lockPrefix(
(resources: string[], duration: number, routine?: (signal: RedlockAbortSignal) => Promise<unknown>) =>
redlock.using(resources, duration, routine),
),
incrTimeout: decorate((key: string) => client.incr(key)),
decrTimeout: decorate((key: string) => client.decr(key)),
existsTimeout: decorate((...args: RedisKey[]) => client.exists(...args)),
pingTimeout: decorate(() => client.ping(), 10000), // for testing
// hmsetTimeout: decorate((...args) => client.hmset(...args)),
// keysTimeout: decorate((...args) => client.keys(...args)),
};
}

class RedisClient {
client: Record<string, never> | ReturnType<typeof addClientFuncs>;
enableRedis: boolean;
clientEnabled: boolean;
internalClient: Redis | Cluster;
constructor() {
this.client;
this.enableRedis = !(process.env.REDIS_HOST === undefined) && !(process.env.REDIS_PORT === undefined);

if (!this.enableRedis) {
this.client = {};
this.clientEnabled = false;
return;
}

interface RedisClusterDetails {
redisOptions: {
connectTimeout: number;
password?: string;
tls?: {
checkServerIdentity: () => undefined | Error;
};
};
// How long to wait given how many failed tries
clusterRetryStrategy?: (times: number) => number;
}

if (process.env.REDIS_CLUSTER === 'true') {
const details = {
redisOptions: {
connectTimeout: 20000,
},
clusterRetryStrategy(times: number) {
return Math.min(times * 100, 5000);
},
} as RedisClusterDetails;

if (process.env.REDIS_PASSWORD) {
details.redisOptions.password = process.env.REDIS_PASSWORD;
}
if (process.env.REDIS_TLS_ENABLED) {
details.redisOptions.tls = { checkServerIdentity: () => undefined };
}

const cluster = new Redis.Cluster(
[
{
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT),
},
],
details,
);

// allow up to 10 minutes to acquire lock (in case of large items being saved/retrieved)
const redlock = new Redlock([cluster], { retryDelay: 500, retryCount: 1200 });

this.internalClient = cluster;

this.client = addClientFuncs(cluster, redlock);

this.client.clearEdgeCache = () => {
let count = 0;
const nodes = (this.internalClient as Cluster).nodes('master');
let completedNodes = 0;
nodes.forEach((node, i) => {
const stream = node.scanStream({
match: '*bte:edgeCache:*',
count: 50,
});

stream
.on('data', (foundKeys: string[]) => {
if (!foundKeys.length) return;
count += foundKeys.length;
try {
node.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then(
() => null,
(error) => {
debug(`Cache clear: error deleting ${foundKeys.length} keys`);
debug(error);
},
);
} catch (error) {
debug('Cache clearing failure:');
debug(error);
}
})
.on('error', (error) => {
debug(`Cache clearing failure on node ${i}:`);
debug(error);
completedNodes += 1;
if (completedNodes >= nodes.length) {
debug(`Cache clearing completes, cleared ${count} keys.`);
}
})
.on('end', () => {
debug(`Cache clearing completes on cluster node ${i}`);
completedNodes += 1;
if (completedNodes >= nodes.length) {
debug(`Cache clearing completes, cleared ${count} keys.`);
}
});
});
};

debug('Initialized redis client (cluster-mode)');
} else {
interface RedisDetails {
host: string;
port: number;
connectTimeout: number;
retryStrategy: (times: number) => number;
password?: string;
tls?: {
checkServerIdentity: () => undefined | Error;
};
}

const details = {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT),
connectTimeout: 20000,
retryStrategy(times) {
return Math.min(times * 100, 5000);
},
} as RedisDetails;
if (process.env.REDIS_PASSWORD) {
details.password = process.env.REDIS_PASSWORD;
}
if (process.env.REDIS_TLS_ENABLED) {
details.tls = { checkServerIdentity: () => undefined };
}
const client = new Redis(details);

// allow up to 10 minutes to acquire lock (in case of large items being saved/retrieved)
const redlock = new Redlock([client], { retryDelay: 500, retryCount: 1200 });

this.internalClient = client;

this.client = addClientFuncs(client, redlock);

this.client.clearEdgeCache = () => {
const stream = (redisClient.internalClient as Redis).scanStream({
match: '*bte:edgeCache:*',
count: 50,
});

let count = 0;

stream
.on('data', (foundKeys: string[]) => {
if (!foundKeys.length) return;
count += foundKeys.length;
try {
redisClient.internalClient.del(...foundKeys.map((key) => key.replace(`${prefix} `, ''))).then(
() => null,
(error) => {
debug(`Cache clear: error deleting ${foundKeys.length} keys`);
debug(error);
},
);
} catch (error) {
debug('Cache clearing failure:');
debug(error);
}
})
.on('error', (error) => {
debug('Cache clearing failure:');
debug(error);
})
.on('end', () => {
debug(`Cache clearing completes, cleared ${count} keys.`);
});
};

debug('Initialized redis client (non-cluster-mode)');
}
this.clientEnabled = true;
}
}

const redisClient = new RedisClient();

function getNewRedisClient(): RedisClient {
return new RedisClient();
}

export { redisClient, getNewRedisClient };

0 comments on commit 061360f

Please sign in to comment.