Skip to content

Commit f1c79af

Browse files
authored
feat: support multiple handlers for a single subscription (#66)
1 parent 5c1d91b commit f1c79af

File tree

3 files changed

+108
-39
lines changed

3 files changed

+108
-39
lines changed

src/connections/abstract_connection.ts

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import Emittery from 'emittery'
1111
import type { Redis, Cluster } from 'ioredis'
12-
import * as errors from '../errors.js'
1312
import type {
1413
PubSubOptions,
1514
ConnectionEvents,
@@ -38,8 +37,8 @@ export abstract class AbstractConnection<
3837
/**
3938
* A list of active subscriptions and pattern subscription
4039
*/
41-
protected subscriptions: Map<string, PubSubChannelHandler> = new Map()
42-
protected psubscriptions: Map<string, PubSubPatternHandler> = new Map()
40+
protected subscriptions: Map<string, Set<PubSubChannelHandler>> = new Map()
41+
protected psubscriptions: Map<string, Set<PubSubPatternHandler>> = new Map()
4342

4443
/**
4544
* The last error emitted by the `error` event. We set it to `null` after
@@ -230,19 +229,23 @@ export abstract class AbstractConnection<
230229
* Listen for messages
231230
*/
232231
this.ioSubscriberConnection!.on('message', (channel, message) => {
233-
const handler = this.subscriptions.get(channel)
234-
if (handler) {
235-
handler(message)
232+
const handlers = this.subscriptions.get(channel)
233+
if (handlers) {
234+
for (const handler of handlers) {
235+
handler(message)
236+
}
236237
}
237238
})
238239

239240
/**
240241
* Listen for pattern messages
241242
*/
242243
this.ioSubscriberConnection!.on('pmessage', (pattern, channel, message) => {
243-
const handler = this.psubscriptions.get(pattern)
244-
if (handler) {
245-
handler(channel, message)
244+
const handlers = this.psubscriptions.get(pattern)
245+
if (handlers) {
246+
for (const handler of handlers) {
247+
handler(channel, message)
248+
}
246249
}
247250
})
248251
}
@@ -278,13 +281,6 @@ export abstract class AbstractConnection<
278281
*/
279282
this.setupSubscriberConnection()
280283

281-
/**
282-
* Disallow multiple subscriptions to a single channel
283-
*/
284-
if (this.subscriptions.has(channel)) {
285-
throw new errors.E_MULTIPLE_REDIS_SUBSCRIPTIONS([channel])
286-
}
287-
288284
/**
289285
* If the subscriptions map is empty, it means we have no active subscriptions
290286
* on the given channel, hence we should make one subscription and also set
@@ -296,7 +292,12 @@ export abstract class AbstractConnection<
296292
options?.onSubscription(count as number)
297293
}
298294
this.emit('subscription:ready', { count: count as number, connection: this })
299-
this.subscriptions.set(channel, handler)
295+
const subscriptions = this.subscriptions.get(channel)
296+
if (subscriptions) {
297+
subscriptions.add(handler)
298+
} else {
299+
this.subscriptions.set(channel, new Set([handler]))
300+
}
300301
})
301302
.catch((error) => {
302303
if (options?.onError) {
@@ -309,8 +310,19 @@ export abstract class AbstractConnection<
309310
/**
310311
* Unsubscribe from a channel
311312
*/
312-
unsubscribe(channel: string) {
313-
this.subscriptions.delete(channel)
313+
unsubscribe(channel: string, handler?: PubSubChannelHandler) {
314+
if (handler) {
315+
const subscriptions = this.subscriptions.get(channel)
316+
if (subscriptions) {
317+
subscriptions.delete(handler)
318+
}
319+
320+
if (subscriptions && subscriptions.size !== 0) {
321+
return Promise.resolve()
322+
}
323+
} else {
324+
this.subscriptions.delete(channel)
325+
}
314326
return this.ioSubscriberConnection!.unsubscribe(channel)
315327
}
316328

@@ -324,13 +336,6 @@ export abstract class AbstractConnection<
324336
*/
325337
this.setupSubscriberConnection()
326338

327-
/**
328-
* Disallow multiple subscriptions to a single channel
329-
*/
330-
if (this.psubscriptions.has(pattern)) {
331-
throw new errors.E_MULTIPLE_REDIS_PSUBSCRIPTIONS([pattern])
332-
}
333-
334339
/**
335340
* If the subscriptions map is empty, it means we have no active subscriptions
336341
* on the given channel, hence we should make one subscription and also set
@@ -342,7 +347,12 @@ export abstract class AbstractConnection<
342347
options?.onSubscription(count as number)
343348
}
344349
this.emit('psubscription:ready', { count: count as number, connection: this })
345-
this.psubscriptions.set(pattern, handler)
350+
const psubscriptions = this.psubscriptions.get(pattern)
351+
if (psubscriptions) {
352+
psubscriptions.add(handler)
353+
} else {
354+
this.psubscriptions.set(pattern, new Set([handler]))
355+
}
346356
})
347357
.catch((error) => {
348358
if (options?.onError) {
@@ -355,8 +365,20 @@ export abstract class AbstractConnection<
355365
/**
356366
* Unsubscribe from a given pattern
357367
*/
358-
punsubscribe(pattern: string) {
359-
this.psubscriptions.delete(pattern)
368+
punsubscribe(pattern: string, handler?: PubSubPatternHandler) {
369+
if (handler) {
370+
const psubscriptions = this.psubscriptions.get(pattern)
371+
if (psubscriptions) {
372+
psubscriptions.delete(handler)
373+
}
374+
375+
if (psubscriptions && psubscriptions.size !== 0) {
376+
return Promise.resolve()
377+
}
378+
} else {
379+
this.psubscriptions.delete(pattern)
380+
}
381+
360382
return this.ioSubscriberConnection!.punsubscribe(pattern)
361383
}
362384

src/errors.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@
99

1010
import { createError } from '@poppinss/utils'
1111

12+
/** @deprecated */
1213
export const E_MULTIPLE_REDIS_SUBSCRIPTIONS = createError<[string]>(
1314
'Cannot subscribe to "%s" channel. Channel already has an active subscription',
1415
'E_MULTIPLE_REDIS_SUBSCRIPTIONS',
1516
500
1617
)
1718

19+
/** @deprecated */
1820
export const E_MULTIPLE_REDIS_PSUBSCRIPTIONS = createError<[string]>(
1921
'Cannot subscribe to "%s" pattern. Pattern already has an active subscription',
2022
'E_MULTIPLE_REDIS_PSUBSCRIPTIONS',

tests/redis_connection.spec.ts

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ test.group('Redis connection', () => {
138138
assert.equal(message, JSON.stringify({ username: 'virk' }))
139139
})
140140

141-
test('throw error when subscribing to a channel twice', async ({ cleanup }) => {
141+
test('call all handlers subscribed to a channel', async ({ assert, cleanup }) => {
142142
const connection = new RedisConnection('main', {
143143
host: process.env.REDIS_HOST,
144144
port: Number(process.env.REDIS_PORT),
@@ -147,10 +147,32 @@ test.group('Redis connection', () => {
147147

148148
await pEvent(connection, 'ready')
149149

150-
connection.subscribe('new:user', () => {})
151-
await pEvent(connection, 'subscription:ready')
152-
connection.subscribe('new:user', () => {})
153-
}).throws('Cannot subscribe to "new:user" channel. Channel already has an active subscription')
150+
const [message1] = await Promise.all([
151+
new Promise<any>((resolve) => {
152+
connection.subscribe('new:user', (message) => {
153+
resolve(message)
154+
})
155+
}),
156+
pEvent(connection, 'subscription:ready').then(() => {
157+
connection.publish('new:user', JSON.stringify({ username: 'virk' }))
158+
}),
159+
])
160+
161+
assert.equal(message1, JSON.stringify({ username: 'virk' }))
162+
163+
const [message2] = await Promise.all([
164+
new Promise<any>((resolve) => {
165+
connection.subscribe('new:user', (message) => {
166+
resolve(message)
167+
})
168+
}),
169+
pEvent(connection, 'subscription:ready').then(() => {
170+
connection.publish('new:user', JSON.stringify({ username: 'virk' }))
171+
}),
172+
])
173+
174+
assert.equal(message2, JSON.stringify({ username: 'virk' }))
175+
})
154176

155177
test('subscribe to a pattern and listen for messages', async ({ assert, cleanup }) => {
156178
const connection = new RedisConnection('main', {
@@ -174,7 +196,7 @@ test.group('Redis connection', () => {
174196
assert.equal(message.data, JSON.stringify({ username: 'virk' }))
175197
})
176198

177-
test('throw error when subscribing to a pattern twice', async ({ cleanup }) => {
199+
test('call all handlers subscribed to a pattern', async ({ assert, cleanup }) => {
178200
const connection = new RedisConnection('main', {
179201
host: process.env.REDIS_HOST,
180202
port: Number(process.env.REDIS_PORT),
@@ -183,11 +205,34 @@ test.group('Redis connection', () => {
183205

184206
await pEvent(connection, 'ready')
185207

186-
connection.psubscribe('user:*', () => {})
187-
await pEvent(connection, 'psubscription:ready')
208+
const [message1] = await Promise.all([
209+
new Promise<any>((resolve) => {
210+
connection.psubscribe('user:*', (channel, message) => {
211+
resolve({ channel, message })
212+
})
213+
}),
214+
pEvent(connection, 'psubscription:ready').then(() => {
215+
connection.publish('user:add', JSON.stringify({ username: 'virk' }))
216+
}),
217+
])
218+
219+
assert.equal(message1.channel, 'user:add')
220+
assert.equal(message1.message, JSON.stringify({ username: 'virk' }))
188221

189-
connection.psubscribe('user:*', () => {})
190-
}).throws('Cannot subscribe to "user:*" pattern. Pattern already has an active subscription')
222+
const [message2] = await Promise.all([
223+
new Promise<any>((resolve) => {
224+
connection.psubscribe('user:*', (channel, message) => {
225+
resolve({ channel, message })
226+
})
227+
}),
228+
pEvent(connection, 'psubscription:ready').then(() => {
229+
connection.publish('user:add', JSON.stringify({ username: 'virk' }))
230+
}),
231+
])
232+
233+
assert.equal(message2.channel, 'user:add')
234+
assert.equal(message2.message, JSON.stringify({ username: 'virk' }))
235+
})
191236

192237
test('unsubscribe from a channel', async ({ cleanup }) => {
193238
const connection = new RedisConnection('main', {

0 commit comments

Comments
 (0)