Skip to content

Commit 0a1f678

Browse files
committed
feat: add redis-sentinel URI parser
1 parent 27c9bdd commit 0a1f678

File tree

4 files changed

+269
-10
lines changed

4 files changed

+269
-10
lines changed

example/src/config.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
//import 'dotenv/config'
22

3-
const BULL_REDIS_URI = {
4-
port: parseInt(process.env.REDIS_PORT || '6379'),
5-
host: process.env.REDIS_HOST || '127.0.0.1',
6-
password: process.env.REDIS_PASSWORD || '',
7-
};
3+
// const BULL_REDIS_URI = {
4+
// port: parseInt(process.env.REDIS_PORT || '6379'),
5+
// host: process.env.REDIS_HOST || '127.0.0.1',
6+
// password: process.env.REDIS_PASSWORD || '',
7+
// };
8+
9+
const BULL_REDIS_URI =
10+
'redis://:uut2tiew5waeli1aefup0Toecaikoque5eepahch5AowaiJ2@10.216.129.127:6379';
811

912
const BULL_HOST_ID = 'maybe_uuid_and_mac';
1013

example/src/connectRedis.ts

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
import Redis, { RedisOptions } from 'ioredis';
2+
import { BULL_REDIS_URI } from './config';
3+
4+
const redisInstances = new Map<string, Redis.Redis>();
5+
6+
export function createBullConnection(type: 'queue' | 'worker' | 'scheduler' | 'custom') {
7+
const existedClient = redisInstances.get(type);
8+
if (existedClient) {
9+
return existedClient;
10+
}
11+
12+
const client = connectRedis(BULL_REDIS_URI);
13+
if (!client) {
14+
throw new Error(`No redisio connection provided to BULL (${BULL_REDIS_URI})!`);
15+
}
16+
17+
redisInstances.set(type, client);
18+
return client;
19+
}
20+
21+
/**
22+
* Connect to Redis by URI.
23+
*
24+
* Returns tuple of 2 elements:
25+
* [0] redisio instance
26+
* [1] addBeforeShutdown function which will be called before redis shutdown
27+
* this function is usefull when you want to await current Worker's jobs before disconnection redis.
28+
*
29+
* See https://github.com/lettuce-io/lettuce-core/wiki/Redis-URI-and-connection-details for URI formats
30+
* redis://localhost/0
31+
* rediss://localhost/0
32+
* redis-sentinel://:pass@localhost:26379,otherhost:26479/0?name=mymaster
33+
*/
34+
export function connectRedis(uri: string, opts?: RedisOptions): Redis.Redis {
35+
// TODO: UnhandledPromiseRejectionWarning: MaxRetriesPerRequestError: Reached the max retries per request limit (which is 20). Refer to "maxRetriesPerRequest" option for details.
36+
let cfg = {
37+
retryStrategy: (times: number) => Math.min(times * 500, 10000),
38+
reconnectOnError: (err: Error) => {
39+
const targetError = 'READONLY';
40+
if (err.message.slice(0, targetError.length) === targetError) {
41+
// Only reconnect when the error starts with "READONLY"
42+
// and resend the failed command after reconnecting
43+
return 2;
44+
}
45+
return false;
46+
},
47+
} as RedisOptions;
48+
49+
const cs = connectionStringParse(uri);
50+
if (cs.scheme === 'redis' || cs.scheme === 'rediss') {
51+
cfg.host = cs.hosts?.[0]?.host || 'localhost';
52+
cfg.port = cs.hosts?.[0]?.port || 6379;
53+
if (cs.scheme === 'rediss') {
54+
cfg.tls = {};
55+
}
56+
if (cs.password) {
57+
cfg.password = cs.password;
58+
}
59+
} else if (
60+
cs.scheme === 'redis-sentinel' ||
61+
cs.scheme === 'redis+sentinel' ||
62+
cs.scheme === 'redis+santinel'
63+
) {
64+
cfg.sentinels = cs.hosts as any;
65+
if (cs.password) {
66+
cfg.sentinelPassword = cs.password;
67+
}
68+
cfg.sentinelRetryStrategy = (times: number) => Math.min(times * 500, 10000);
69+
} else {
70+
throw new Error(`Unsupported connection string provided to connectRedis() method: ${uri}`);
71+
}
72+
73+
if (cs?.path?.[0]) {
74+
cfg.db = parseInt(cs?.path?.[0]) || 0;
75+
}
76+
if (cs?.options?.db) {
77+
// convert '0' -> 0
78+
cs.options.db = parseInt(cs.options.db) || 0;
79+
}
80+
81+
cfg = { ...cfg, ...cs.options, ...opts };
82+
83+
const redis = new Redis(cfg);
84+
return redis;
85+
}
86+
87+
export interface ConnectionStringHost {
88+
host: string;
89+
port?: number;
90+
}
91+
92+
export interface ConnectionStringParameters {
93+
scheme: string;
94+
username?: string;
95+
password?: string;
96+
hosts: ConnectionStringHost[];
97+
path: string[];
98+
options?: any;
99+
}
100+
101+
/**
102+
* Takes a connection string object and returns a URI string of the form:
103+
*
104+
* scheme://[username[:password]@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[endpoint]][?options]
105+
* @param connectionStringObject The object that describes connection string parameters
106+
*/
107+
export function connectionStringSerialize(connectionStringObject: ConnectionStringParameters) {
108+
if (!connectionStringObject.scheme) {
109+
throw new Error(`Scheme not provided`);
110+
}
111+
112+
let uri = connectionStringObject.scheme + '://';
113+
114+
if (connectionStringObject.username) {
115+
uri += encodeURIComponent(connectionStringObject.username);
116+
// Allow empty passwords
117+
if (connectionStringObject.password) {
118+
uri += ':' + encodeURIComponent(connectionStringObject.password);
119+
}
120+
uri += '@';
121+
}
122+
uri += _formatAddress(connectionStringObject);
123+
// Only put a slash when there is an endpoint
124+
if (Array.isArray(connectionStringObject.path)) {
125+
const path = connectionStringObject.path
126+
.filter((o) => o === null || o === undefined || o === '')
127+
.map((o) => encodeURIComponent(o))
128+
.join('/');
129+
if (path) {
130+
uri += '/' + path;
131+
}
132+
}
133+
if (connectionStringObject.options && Object.keys(connectionStringObject.options).length > 0) {
134+
uri +=
135+
'?' +
136+
Object.keys(connectionStringObject.options)
137+
.map(
138+
(option) =>
139+
encodeURIComponent(option) +
140+
'=' +
141+
encodeURIComponent(connectionStringObject.options[option])
142+
)
143+
.join('&');
144+
}
145+
return uri;
146+
}
147+
148+
/**
149+
* Takes a connection string URI of form:
150+
*
151+
* scheme://[username[:password]@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[path]][?options]
152+
*
153+
* and returns an object of form:
154+
*
155+
* {
156+
* scheme: string,
157+
* username?: string,
158+
* password?: string,
159+
* hosts: [ { host: string, port?: number }, ... ],
160+
* path?: string[],
161+
* options?: object
162+
* }
163+
*
164+
* Where scheme and hosts will always be present. Other fields will only be present in the result if they were
165+
* present in the input.
166+
* @param uri The connection string URI
167+
*/
168+
export function connectionStringParse(uri: string): ConnectionStringParameters {
169+
const connectionStringParser = new RegExp(
170+
'^\\s*' + // Optional whitespace padding at the beginning of the line
171+
'([^:]+):\\/\\/' + // Scheme (Group 1)
172+
'(?:([^:@,/?=&]*)' + // User (Group 2)
173+
'(?::([^:@,/?=&]*))?@)?' + // Password (Group 3)
174+
'([^@/?=&]+)' + // Host address(es) (Group 4)
175+
'(?:\\/([^:@,?=&]+)?)?' + // Endpoint (Group 5)
176+
'(?:\\?([^:@,/?]+)?)?' + // Options (Group 6)
177+
'\\s*$', // Optional whitespace padding at the end of the line
178+
'gi'
179+
);
180+
const connectionStringObject = {} as ConnectionStringParameters;
181+
182+
if (!uri.includes('://')) {
183+
throw new Error(`No scheme found in URI ${uri}`);
184+
}
185+
186+
const tokens = connectionStringParser.exec(uri);
187+
188+
if (Array.isArray(tokens)) {
189+
connectionStringObject.scheme = tokens[1];
190+
connectionStringObject.username = tokens[2] ? decodeURIComponent(tokens[2]) : tokens[2];
191+
connectionStringObject.password = tokens[3] ? decodeURIComponent(tokens[3]) : tokens[3];
192+
connectionStringObject.hosts = _parseAddress(tokens[4]);
193+
connectionStringObject.path = tokens[5]
194+
? tokens[5].split('/').map((o) => decodeURIComponent(o))
195+
: [];
196+
connectionStringObject.options = tokens[6] ? _parseOptions(tokens[6]) : tokens[6];
197+
}
198+
return connectionStringObject;
199+
}
200+
201+
/**
202+
* Formats the address portion of a connection string
203+
* @param connectionStringObject The object that describes connection string parameters
204+
*/
205+
function _formatAddress(connectionStringObject: ConnectionStringParameters): string {
206+
return connectionStringObject.hosts
207+
.map(
208+
(address) =>
209+
encodeURIComponent(address.host) +
210+
(address.port ? ':' + encodeURIComponent(address.port.toString(10)) : '')
211+
)
212+
.join(',');
213+
}
214+
215+
/**
216+
* Parses an address
217+
* @param addresses The address(es) to process
218+
*/
219+
function _parseAddress(addresses: string): ConnectionStringHost[] {
220+
return addresses.split(',').map((address) => {
221+
const i = address.indexOf(':');
222+
223+
return (i >= 0
224+
? { host: decodeURIComponent(address.substring(0, i)), port: +address.substring(i + 1) }
225+
: { host: decodeURIComponent(address) }) as ConnectionStringHost;
226+
});
227+
}
228+
229+
/**
230+
* Parses options
231+
* @param options The options to process
232+
*/
233+
function _parseOptions(options: string): { [key: string]: string } {
234+
const result: { [key: string]: string } = {};
235+
236+
options.split('&').forEach((option) => {
237+
const i = option.indexOf('=');
238+
239+
if (i >= 0) {
240+
result[decodeURIComponent(option.substring(0, i))] = decodeURIComponent(
241+
option.substring(i + 1)
242+
);
243+
}
244+
});
245+
return result;
246+
}

example/src/queues/fetchMetrics.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { BULL_REDIS_URI, BULL_HOST_ID } from '../config';
22
import { Queue, Worker, QueueScheduler } from 'bullmq';
3+
import { createBullConnection } from '../connectRedis';
34

45
if (!BULL_REDIS_URI) {
56
throw new Error(`Env var BULL_REDIS_URI is empty. Cannot init task ${__filename}.`);
@@ -15,12 +16,12 @@ const prefix = queueSettings.prefix;
1516

1617
const metricsScheduler = new QueueScheduler(queueSettings.name, {
1718
prefix,
18-
connection: BULL_REDIS_URI,
19+
connection: createBullConnection('scheduler'), // BULL_REDIS_URI,
1920
});
2021

2122
export const metricsQueue = new Queue(queueSettings.name, {
2223
prefix,
23-
connection: BULL_REDIS_URI,
24+
connection: createBullConnection('queue'), // BULL_REDIS_URI,
2425
});
2526

2627
metricsQueue.add(
@@ -47,7 +48,7 @@ const metricsWorker = new Worker(
4748
},
4849
{
4950
prefix,
50-
connection: BULL_REDIS_URI,
51+
connection: createBullConnection('worker'), // BULL_REDIS_URI,
5152
}
5253
);
5354

example/src/schema/query/queue.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
import { SchemaComposer } from 'graphql-compose';
22
import { getQueueTC } from '../types/queue';
3+
import { Queue } from 'bullmq';
4+
import { createBullConnection } from '../../connectRedis';
35

46
export function createQueueFC(schemaComposer: SchemaComposer<any>) {
57
return {
68
type: getQueueTC(schemaComposer),
79
args: {
810
queueName: 'String!',
11+
prefix: {
12+
type: 'String',
13+
defaultValue: 'bull',
14+
},
915
},
10-
resolve: async (_, { queueName }, { Queues }) => {
11-
return Queues.get(queueName);
16+
resolve: async (_, { queueName, prefix }) => {
17+
return new Queue(queueName, {
18+
prefix,
19+
connection: createBullConnection('queue'),
20+
});
1221
},
1322
};
1423
}

0 commit comments

Comments
 (0)