Skip to content

Commit

Permalink
feat(@cubejs-backend/query-orchestrator): Introduce AsyncRedisClient …
Browse files Browse the repository at this point in the history
…type
  • Loading branch information
ovr committed Dec 14, 2020
1 parent 78e8422 commit 728110e
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 29 deletions.
71 changes: 48 additions & 23 deletions packages/cubejs-query-orchestrator/src/orchestrator/RedisFactory.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
import redis, { ClientOpts } from 'redis';
import redis, { ClientOpts, RedisClient, Commands } from 'redis';
import { promisify } from 'util';

export function createRedisClient(url: string) {
redis.Multi.prototype.execAsync = function execAsync() {
return new Promise((resolve, reject) => this.exec((err, res) => (
err ? reject(err) : resolve(res)
)));
};

const options: ClientOpts = {
url,
};

if (process.env.REDIS_TLS === 'true') {
options.tls = {};
}

if (process.env.REDIS_PASSWORD) {
options.password = process.env.REDIS_PASSWORD;
}

const client = redis.createClient(options);
export interface AsyncRedisClient extends RedisClient {
brpopAsync: Commands<Promise<any>>['brpop'],
delAsync: Commands<Promise<any>>['del'],
getAsync: Commands<Promise<any>>['get'],
hgetAsync: Commands<Promise<any>>['hget'],
rpopAsync: Commands<Promise<any>>['rpop'],
setAsync: Commands<Promise<any>>['set'],
zaddAsync: Commands<Promise<any>>['zadd'],
zrangeAsync: Commands<Promise<any>>['zrange'],
zrangebyscoreAsync: Commands<Promise<any>>['zrangebyscore'],
keysAsync: Commands<Promise<any>>['keys'],
watchAsync: Commands<Promise<any>>['watch'],
unwatchAsync: Commands<Promise<any>>['unwatch'],
incrAsync: Commands<Promise<any>>['incr'],
decrAsync: Commands<Promise<any>>['decr'],
lpushAsync: Commands<Promise<any>>['lpush'],
}

function decorateRedisClient(client: RedisClient): AsyncRedisClient {
[
'brpop',
'del',
Expand All @@ -37,12 +35,39 @@ export function createRedisClient(url: string) {
'unwatch',
'incr',
'decr',
'lpush'
'lpush',
].forEach(
k => {
client[`${k}Async`] = promisify(client[k]);
}
);

return client;
return <any>client;
}

export function createRedisClient(url: string, opts: ClientOpts = {}) {
redis.Multi.prototype.execAsync = function execAsync() {
return new Promise((resolve, reject) => this.exec((err, res) => (
err ? reject(err) : resolve(res)
)));
};

const options: ClientOpts = {
url,
};

if (process.env.REDIS_TLS === 'true') {
options.tls = {};
}

if (process.env.REDIS_PASSWORD) {
options.password = process.env.REDIS_PASSWORD;
}

return decorateRedisClient(
redis.createClient({
...options,
...opts,
})
);
}
11 changes: 5 additions & 6 deletions packages/cubejs-query-orchestrator/src/orchestrator/RedisPool.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import genericPool, { Pool, Options as PoolOptions } from 'generic-pool';
import type { RedisClient } from 'redis';

import { createRedisClient } from './RedisFactory';
import { createRedisClient, AsyncRedisClient } from './RedisFactory';

export type CreateRedisClientFn = () => PromiseLike<RedisClient>;
export type CreateRedisClientFn = () => PromiseLike<AsyncRedisClient>;

export interface RedisPoolOptions {
poolMin?: number;
poolMax?: number;
createClient?: CreateRedisClientFn;
destroyClient?: (client: RedisClient) => PromiseLike<void>;
destroyClient?: (client: AsyncRedisClient) => PromiseLike<void>;
}

export class RedisPool {
protected readonly pool: Pool<RedisClient>|null = null;
protected readonly pool: Pool<AsyncRedisClient>|null = null;

protected readonly create: CreateRedisClientFn|null = null;

Expand All @@ -36,7 +35,7 @@ export class RedisPool {
if (max > 0) {
const destroy = options.destroyClient || (async (client) => client.end(true));

this.pool = genericPool.createPool<RedisClient>({ create, destroy }, opts);
this.pool = genericPool.createPool<AsyncRedisClient>({ create, destroy }, opts);
} else {
// fallback to un-pooled behavior if pool max is 0
this.create = create;
Expand Down

0 comments on commit 728110e

Please sign in to comment.