From 524e70222c7605a29d716a424f2b61c6bf3bbc64 Mon Sep 17 00:00:00 2001 From: Ben Oehlkers Date: Thu, 22 Dec 2022 14:08:43 -0500 Subject: [PATCH] feat: Add redis streams support. Closes #340. --- package.json | 2 +- src/index.ts | 1 + src/redis-pubsub-base.ts | 123 +++++++++++++ src/redis-pubsub.ts | 116 ++---------- src/redis-stream-pubsub.ts | 304 +++++++++++++++++++++++++++++++ src/test/stream-tests.ts | 357 +++++++++++++++++++++++++++++++++++++ 6 files changed, 798 insertions(+), 105 deletions(-) create mode 100644 src/redis-pubsub-base.ts create mode 100644 src/redis-stream-pubsub.ts create mode 100644 src/test/stream-tests.ts diff --git a/package.json b/package.json index 1db8f954..bebd47f0 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "lint": "eslint src --ext ts", "watch": "tsc -w", "testonly": "mocha --reporter spec src/test/tests.ts", - "integration": "mocha --reporter spec src/test/integration-tests.ts", + "integration": "mocha --reporter spec src/test/integration-tests.ts src/test/stream-tests.ts", "coverage": "nyc --reporter=html --reporter=text mocha src/test/**/*.ts", "prepublish": "tsc", "prepublishOnly": "npm run test" diff --git a/src/index.ts b/src/index.ts index c581adc1..d7566d5a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1 +1,2 @@ export { RedisPubSub } from './redis-pubsub'; +export { RedisStreamPubSub } from './redis-stream-pubsub'; \ No newline at end of file diff --git a/src/redis-pubsub-base.ts b/src/redis-pubsub-base.ts new file mode 100644 index 00000000..9e64c4d0 --- /dev/null +++ b/src/redis-pubsub-base.ts @@ -0,0 +1,123 @@ +import { Cluster, Redis, RedisOptions } from 'ioredis'; +import { PubSubEngine } from 'graphql-subscriptions'; +import {PubSubAsyncIterator} from './pubsub-async-iterator'; + +type DeserializerContext = { channel: string, pattern?: string }; + +export type RedisClient = Redis | Cluster; +export type OnMessage = (message: T) => void; + +export interface PubSubRedisBaseOptions { + connection?: RedisOptions | string; + triggerTransform?: TriggerTransform; + connectionListener?: (err: Error) => void; + publisher?: RedisClient; + subscriber?: RedisClient; + reviver?: Reviver; + serializer?: Serializer; + deserializer?: Deserializer; +} + +export abstract class RedisPubSubBase implements PubSubEngine { + + constructor(options: PubSubRedisBaseOptions = {}) { + const { + triggerTransform, + connection, + connectionListener, + subscriber, + publisher, + reviver, + serializer, + deserializer, + } = options; + + this.triggerTransform = triggerTransform || (trigger => trigger as string); + + if (reviver && deserializer) { + throw new Error("Reviver and deserializer can't be used together"); + } + + this.reviver = reviver; + this.serializer = serializer; + this.deserializer = deserializer; + + if (subscriber && publisher) { + this.redisPublisher = publisher; + this.redisSubscriber = subscriber; + } else { + try { + // eslint-disable-next-line @typescript-eslint/no-var-requires + const IORedis = require('ioredis'); + this.redisPublisher = new IORedis(connection); + this.redisSubscriber = new IORedis(connection); + + if (connectionListener) { + this.redisPublisher + .on('connect', connectionListener) + .on('error', connectionListener); + this.redisSubscriber + .on('connect', connectionListener) + .on('error', connectionListener); + } else { + this.redisPublisher.on('error', console.error); + this.redisSubscriber.on('error', console.error); + } + } catch (error) { + console.error( + `No publisher or subscriber instances were provided and the package 'ioredis' wasn't found. Couldn't create Redis clients.`, + ); + } + } + + this.currentSubscriptionId = 0; + } + + public abstract publish(trigger: string, payload: T): Promise; + + public abstract subscribe( + trigger: string, + onMessage: OnMessage, + options: unknown, + ): Promise; + + public abstract unsubscribe(subId: number): void; + + public asyncIterator(triggers: string | string[], options?: unknown): AsyncIterator { + return new PubSubAsyncIterator(this, triggers, options); + } + + public getSubscriber(): RedisClient { + return this.redisSubscriber; + } + + public getPublisher(): RedisClient { + return this.redisPublisher; + } + + public close(): Promise<'OK'[]> { + return Promise.all([ + this.redisPublisher.quit(), + this.redisSubscriber.quit(), + ]); + } + + protected readonly serializer?: Serializer; + protected readonly deserializer?: Deserializer; + protected readonly triggerTransform: TriggerTransform; + protected readonly redisSubscriber: RedisClient; + protected readonly redisPublisher: RedisClient; + protected readonly reviver: Reviver; + + protected currentSubscriptionId: number; +} + +export type Path = Array; +export type Trigger = string | Path; +export type TriggerTransform = ( + trigger: Trigger, + channelOptions?: unknown, +) => string; +export type Reviver = (key: any, value: any) => any; +export type Serializer = (source: any) => string; +export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any; diff --git a/src/redis-pubsub.ts b/src/redis-pubsub.ts index de375b4d..c436205e 100644 --- a/src/redis-pubsub.ts +++ b/src/redis-pubsub.ts @@ -1,78 +1,25 @@ -import {Cluster, Redis, RedisOptions} from 'ioredis'; -import {PubSubEngine} from 'graphql-subscriptions'; -import {PubSubAsyncIterator} from './pubsub-async-iterator'; - -type RedisClient = Redis | Cluster; -type OnMessage = (message: T) => void; -type DeserializerContext = { channel: string, pattern?: string }; - -export interface PubSubRedisOptions { - connection?: RedisOptions | string; - triggerTransform?: TriggerTransform; - connectionListener?: (err: Error) => void; - publisher?: RedisClient; - subscriber?: RedisClient; - reviver?: Reviver; - serializer?: Serializer; - deserializer?: Deserializer; +import { OnMessage, PubSubRedisBaseOptions, RedisPubSubBase } from "./redis-pubsub-base"; + +interface PubSubRedisSubscribeOptions { messageEventName?: string; pmessageEventName?: string; } -export class RedisPubSub implements PubSubEngine { +export type PubSubRedisOptions = PubSubRedisBaseOptions & PubSubRedisSubscribeOptions; +/** + * Redis PubSub implementation that uses `subscribe` or `psubscribe` redis commands + * as the communication method. + */ +export class RedisPubSub extends RedisPubSubBase { constructor(options: PubSubRedisOptions = {}) { + super(options); + const { - triggerTransform, - connection, - connectionListener, - subscriber, - publisher, - reviver, - serializer, - deserializer, messageEventName = 'message', pmessageEventName = 'pmessage', } = options; - this.triggerTransform = triggerTransform || (trigger => trigger as string); - - if (reviver && deserializer) { - throw new Error("Reviver and deserializer can't be used together"); - } - - this.reviver = reviver; - this.serializer = serializer; - this.deserializer = deserializer; - - if (subscriber && publisher) { - this.redisPublisher = publisher; - this.redisSubscriber = subscriber; - } else { - try { - // eslint-disable-next-line @typescript-eslint/no-var-requires - const IORedis = require('ioredis'); - this.redisPublisher = new IORedis(connection); - this.redisSubscriber = new IORedis(connection); - - if (connectionListener) { - this.redisPublisher - .on('connect', connectionListener) - .on('error', connectionListener); - this.redisSubscriber - .on('connect', connectionListener) - .on('error', connectionListener); - } else { - this.redisPublisher.on('error', console.error); - this.redisSubscriber.on('error', console.error); - } - } catch (error) { - console.error( - `No publisher or subscriber instances were provided and the package 'ioredis' wasn't found. Couldn't create Redis clients.`, - ); - } - } - // handle messages received via psubscribe and subscribe this.redisSubscriber.on(pmessageEventName, this.onMessage.bind(this)); // partially applied function passes undefined for pattern arg since 'message' event won't provide it: @@ -80,7 +27,6 @@ export class RedisPubSub implements PubSubEngine { this.subscriptionMap = {}; this.subsRefsMap = new Map>(); - this.currentSubscriptionId = 0; } public async publish(trigger: string, payload: T): Promise { @@ -138,36 +84,8 @@ export class RedisPubSub implements PubSubEngine { } delete this.subscriptionMap[subId]; } - - public asyncIterator(triggers: string | string[], options?: unknown): AsyncIterator { - return new PubSubAsyncIterator(this, triggers, options); - } - - public getSubscriber(): RedisClient { - return this.redisSubscriber; - } - - public getPublisher(): RedisClient { - return this.redisPublisher; - } - - public close(): Promise<'OK'[]> { - return Promise.all([ - this.redisPublisher.quit(), - this.redisSubscriber.quit(), - ]); - } - - private readonly serializer?: Serializer; - private readonly deserializer?: Deserializer; - private readonly triggerTransform: TriggerTransform; - private readonly redisSubscriber: RedisClient; - private readonly redisPublisher: RedisClient; - private readonly reviver: Reviver; - private readonly subscriptionMap: { [subId: number]: [string, OnMessage] }; private readonly subsRefsMap: Map>; - private currentSubscriptionId: number; private onMessage(pattern: string, channel: string, message: string) { const subscribers = this.subsRefsMap.get(pattern || channel); @@ -189,14 +107,4 @@ export class RedisPubSub implements PubSubEngine { listener(parsedMessage); }); } -} - -export type Path = Array; -export type Trigger = string | Path; -export type TriggerTransform = ( - trigger: Trigger, - channelOptions?: unknown, -) => string; -export type Reviver = (key: any, value: any) => any; -export type Serializer = (source: any) => string; -export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any; +} \ No newline at end of file diff --git a/src/redis-stream-pubsub.ts b/src/redis-stream-pubsub.ts new file mode 100644 index 00000000..d817243e --- /dev/null +++ b/src/redis-stream-pubsub.ts @@ -0,0 +1,304 @@ +import { OnMessage, PubSubRedisBaseOptions, RedisPubSubBase } from "./redis-pubsub-base"; + +interface RedisStreamSubscribeOptions { + /** ID of the last message received on the specified stream. */ + lastMessageId?: string; +} + +interface PubSubRedisStreamAdditionalOptions { + blockTimeout?: number; +} + +export type PubSubRedisStreamOptions = PubSubRedisStreamAdditionalOptions & PubSubRedisBaseOptions; + +type SubscriberDetails = [streamName: string, lastMessageId: string, onMessage: OnMessage]; + +/** Return value from the XREAD command. */ +type XReadResponse = [streamName: string, messages: [messageId: string, messageData: string[]][]][]; + +/** + * 1 if client was unblocked successfully, 0 if not. + * Defined here because IORedis typings for the `client` function are incomplete. + */ +type ClientUnblockResult = 1 | 0; + +/** + * Messages returned from a streams client have two pieces of data - + * the data itself, and the id of the message. This can be used + * by the consumer to retrieve previous messages after a disconnect. + */ +export type StreamsMessage = { id: string; data: T }; + +const DEFAULT_BLOCK_TIMEOUT = 5000; + +/** +* Redis PubSub implementation that uses redis streams (xread/xadd) as the +* communication method. As with many things, this provides better consistency +* (clients can disconnect and still receive missed messages), at the tradeoff of slightly +* worse performance. +*/ +export class RedisStreamPubSub extends RedisPubSubBase { + + constructor(options: PubSubRedisStreamOptions = {}) { + super(options); + + this.blockTimeout = options.blockTimeout || DEFAULT_BLOCK_TIMEOUT; + + this.streamsMap = {}; + this.subscriptionMap = {}; + } + + // Run the event loop. + private async runEventLoop(): Promise { + while (this.hasSubscribers() && !this.closed) { + + await this.waitAndProcessMessages(); + } + + delete this.eventLoop; + } + + // Begins the event loop which blocks on the XREAD command, + // or returns a reference to the promise for the currently + // running event loop if it's currently running. + private startEventLoop(): Promise { + if (this.eventLoop) { + return this.eventLoop; + } + else { + this.eventLoop = this.runEventLoop().catch(error => { + console.error('Error in event loop:'); + console.error(error); + }); + return this.eventLoop; + } + } + + private async waitAndProcessMessages(): Promise { + // Determine which streams and last messages we need to provide to the XREAD command. + // This data structure becomes the source of truth as to which streams and last message ids + // we're interested in on this iteration of the event loop, meaning subscribers can + // be added or removed at will without introducing race conditions. + const streamsAndOrderedLastMessageIds: [string, string[]][] = Object.entries(this.streamsMap) + .map(([streamName, lastMessageIds]) => [streamName, Object.keys(lastMessageIds).sort(sortLastMessageIds)]); + + const streamAndLastMessageIdPairs = streamsAndOrderedLastMessageIds + .flatMap(([streamName, lastMessageIds]) => lastMessageIds.map(lastMessageId => [streamName, lastMessageId])); + + const streams = streamAndLastMessageIdPairs.map(pair => pair[0]); + const lastMessageIds = streamAndLastMessageIdPairs.map(pair => pair[1]); + + // Before we block, record the subscriber client id so we can unblock when new data comes in. + this.subscriberClientId = await this.redisSubscriber.client('ID'); + + // Just make sure by the time we're about to block, we still want to (in case we've shut down or run out of subscribers + // in between the while loop check and now). + if (!this.closed && this.hasSubscribers()) { + const messages: XReadResponse = await this.redisSubscriber.xread('COUNT', 1, 'BLOCK', this.blockTimeout, 'STREAMS', ...streams, ...lastMessageIds); + if (messages) { + this.processMessages(messages, streamsAndOrderedLastMessageIds); + } + } + + return; + } + + private processMessages(messages: XReadResponse, streamsAndOrderedLastMessageIds: [streamName: string, lastMessageIds: string[]][]) { + // Group incoming messages by the stream they were requested on. + const messagesByStream: {[streamName: string]: [messageId: string, messageData: string[]][][]} = messages.reduce((current, [streamName, messages]) => { + current[streamName] ??= []; + current[streamName].push(messages); + return current; + }, {}); + + // Provide the received messages back to their individual handlers. + const lastMessagesLookupMap = Object.fromEntries(streamsAndOrderedLastMessageIds); + Object.entries(messagesByStream).forEach(([streamName, messageGroupsForStream]) => { + const updateMap: Record = {}; + messageGroupsForStream.forEach((messages, index) => { + // If there are n instances of the same stream requested, and fewer than + // n returned, any missing have no updates. Since we're sorting the lastMessageIds, + // the ones at the end of the list will always be the ones with no updates, + // so we can just happily skip those with no problem. + const lastMessageId = lastMessagesLookupMap[streamName][index]; + messages.forEach(([messageId, [, message]]) => this.onMessage(streamName, lastMessageId, messageId, message)); + const [ newLastMessageId ] = messages.at(-1); + updateMap[lastMessageId] = newLastMessageId; + }); + + // After processing all new messages for each stream, update the last message each subscriber has read + this.updateSubscriberLastMessages(streamName, updateMap); + }); + } + + /** + * + * @param streamName The stream we're listening on + * @param updateMap Keys are previously last seen message ids, values are newly last seen message ids + */ + private updateSubscriberLastMessages(streamName: string, updateMap: Record) { + Object.keys(updateMap).sort(sortLastMessageIds).forEach((oldLastMessageId) => { + const newLastMessageId = updateMap[oldLastMessageId]; + const subscribers = this.streamsMap[streamName]?.[oldLastMessageId]; + + if (!subscribers) { + return; + } + + if (this.streamsMap[streamName][newLastMessageId]) { + this.streamsMap[streamName][newLastMessageId] = this.streamsMap[streamName][newLastMessageId].concat(subscribers); + } + else { + this.streamsMap[streamName][newLastMessageId] = subscribers; + } + + delete this.streamsMap[streamName][oldLastMessageId]; + + subscribers.forEach(subId => { + this.subscriptionMap[subId][1] = newLastMessageId; + }); + }); + } + + private onMessage(streamName: string, lastMessageId: string, messageId: string, messageData: string) { + const subscribers = this.streamsMap[streamName]?.[lastMessageId]; + + // Don't work for nothing.. + if (!subscribers || !subscribers.length) return; + + let parsedMessage; + try { + parsedMessage = this.deserializer + ? this.deserializer(messageData, { channel: streamName }) + : JSON.parse(messageData, this.reviver); + } catch (e) { + parsedMessage = messageData; + } + + for (const subId of subscribers) { + const [, , listener] = this.subscriptionMap[subId]; + listener({id: messageId, data: parsedMessage}); + } + } + + private hasSubscribers(): boolean { + return Object.keys(this.subscriptionMap).length > 0; + } + + public async publish(trigger: string, payload: T): Promise { + await this.redisPublisher.xadd(trigger, '*', 'message', this.serializer ? this.serializer(payload) : JSON.stringify(payload)); + return; + } + + public async subscribe(streamName: string, onMessage: OnMessage, options: RedisStreamSubscribeOptions = {}): Promise { + const id = this.currentSubscriptionId++; + + // If the last message received is specified with this subscription request, use that. + // If there is already a last message id for this stream, group this new subscription with that one. + // Otherwise this is a new subscription for this stream and we should start from now with the current timestamp. + this.streamsMap[streamName] ||= {}; + const lastMessageId = options.lastMessageId + || Object.keys(this.streamsMap[streamName]).sort(sortLastMessageIds).at(-1) + || `${new Date().getTime()}-0`; + + const subscriberDetails: SubscriberDetails = [streamName, lastMessageId, onMessage]; + this.subscriptionMap[id] = subscriberDetails; + + const requiresNewCommand = !(this.streamsMap[streamName][lastMessageId]); + + this.streamsMap[streamName][lastMessageId] ||= []; + this.streamsMap[streamName][lastMessageId].push(id); + + if (requiresNewCommand && this.subscriberClientId) { + this.unblockClient(this.subscriberClientId); + } + + // Finally, kick off the XREAD event loop if it hasn't been started yet. + if (!this.eventLoop) { + this.startEventLoop(); + } + + return Promise.resolve(id); + } + + private async unblockClient(clientId: number): Promise { + return this.redisPublisher.client('UNBLOCK', clientId, 'TIMEOUT') as Promise; + } + + public unsubscribe(subId: number): void { + const subscriptionDetails = this.subscriptionMap[subId]; + + if (!subscriptionDetails) throw new Error(`There is no subscription of id "${subId}"`); + const [ streamName, lastMessageId ] = subscriptionDetails; + + const listenerIndex = this.streamsMap[streamName][lastMessageId].indexOf(subId); + this.streamsMap[streamName][lastMessageId].splice(listenerIndex, 1); + + if (this.streamsMap[streamName][lastMessageId].length === 0) { + delete this.streamsMap[streamName][lastMessageId]; + if (Object.keys(this.streamsMap[streamName]).length === 0) { + delete this.streamsMap[streamName]; + } + + // Flush the event loop since we've changed the subscription set. + // This isn't strictly necessary, but it helps us clean up + // more consistently. + this.unblockClient(this.subscriberClientId); + } + + delete this.subscriptionMap[subId]; + } + + public async close(): Promise<'OK'[]> { + this.closed = true; + + let unblockPromise: Promise; + if (this.subscriberClientId) { + unblockPromise = this.unblockClient(this.subscriberClientId); + } + else { + unblockPromise = Promise.resolve(); + } + + return unblockPromise.then(() => Promise.all([ + this.redisPublisher.quit(), + this.redisSubscriber.quit(), + ])); + } + + private blockTimeout: number; + private closed = false; + private eventLoop: Promise; + private streamsMap: { + [streamName: string]: { + [lastMessageId: string]: number[], // subscription ids registered to that streamName and lastMessageId + } + }; + private subscriptionMap: { + [subId: number]: SubscriberDetails, + } + // Used to unblock the subscriber client when the set of subscriptions changes + private subscriberClientId: number; +} + +/** + * Sorts the Stream message ids. + * Redis Stream message ids are of the format [timestamp]-[sequence], + * so we can't do a simple numeric or alphabetic sort. + **/ +function sortLastMessageIds(a: string, b: string) { + if (a === b) { + return 0; + } + else { + const aSplit = a.split('-').map(num => parseInt(num)); + const bSplit = b.split('-').map(num => parseInt(num)); + + if (aSplit[0] === bSplit[0]) { + return aSplit[1] - bSplit[1]; + } + else { + return aSplit[0] - bSplit[0]; + } + } +} \ No newline at end of file diff --git a/src/test/stream-tests.ts b/src/test/stream-tests.ts new file mode 100644 index 00000000..1faadf13 --- /dev/null +++ b/src/test/stream-tests.ts @@ -0,0 +1,357 @@ +import * as chai from 'chai'; +import * as chaiAsPromised from 'chai-as-promised'; +import { RedisStreamPubSub, StreamsMessage } from '../redis-stream-pubsub'; +const expect = chai.expect; +chai.use(chaiAsPromised); + +const redisHost = process.env.TEST_REDIS_HOST || 'localhost'; + +let pubSub: RedisStreamPubSub; +describe('redis stream pubsub', () => { + beforeEach(() => { + pubSub = new RedisStreamPubSub({ + connection: { + host: redisHost, + port: 6379, + } + }); + }); + + afterEach(async () => { + return await pubSub.close(); + }); + + it('should subscribe on a channel and receive messages on that channel', () => { + const testStream = 'teststream'; + const expectedMessage = { foo: 'bar' }; + + const subscribePromise = new Promise((res, rej) => { + pubSub.subscribe>(testStream, message => res(message.data)); + }); + + setTimeout(() => { + pubSub.publish(testStream, expectedMessage); + }, 5); + + return subscribePromise.then(message => { + expect(message).to.deep.equal(expectedMessage); + }); + }); + + it('should subscribe on multiple channels', () => { + const testStream1 = 'teststream1'; + const testStream2 = 'teststream2'; + const expectedMessage1 = { foo: 'bar' }; + const expectedMessage2 = { bar: 'baz' }; + + const subscribePromise1 = new Promise((res, rej) => { + pubSub.subscribe>(testStream1, message => res(message.data)); + }); + + const subscribePromise2 = new Promise((res, rej) => { + pubSub.subscribe>(testStream2, message => res(message.data)); + }); + + setTimeout(() => { + pubSub.publish(testStream1, expectedMessage1); + pubSub.publish(testStream2, expectedMessage2); + }, 5); + + return Promise.all([ + subscribePromise1, + subscribePromise2 + ]).then(([message1, message2]) => { + expect(message1).to.deep.equal(expectedMessage1); + expect(message2).to.deep.equal(expectedMessage2); + }); + }); + + it('should receive multiple messages in order on a single stream', () => { + const testStream = 'teststream'; + const totalMessages = 3; + + const messagesReceived: any[] = []; + const subscribePromise: Promise = new Promise((res, rej) => { + pubSub.subscribe>(testStream, message => { + messagesReceived.push(message.data); + if (messagesReceived.length === totalMessages) { + res(messagesReceived); + } + }); + }); + + for (let i = 0; i < totalMessages; i++ ) { + setTimeout(() => pubSub.publish(testStream, { messageNumber: i }), i * 100 + 5); + } + + return subscribePromise.then(messages => { + expect(messages).to.have.length(totalMessages); + for (let i = 0; i < totalMessages; i++) { + expect(messages[i]).to.include({ messageNumber: i }); + } + }); + }); + + it('should receive multiple messages in order on multiple streams', () => { + const testStream1 = 'teststream1'; + const testStream2 = 'teststream2'; + const totalMessages = 3; + + const messagesReceived1: any[] = []; + const subscribePromise1: Promise = new Promise((res, rej) => { + pubSub.subscribe>(testStream1, message => { + messagesReceived1.push(message.data); + if (messagesReceived1.length === totalMessages) { + res(messagesReceived1); + } + }); + }); + + const messagesReceived2: any[] = []; + const subscribePromise2: Promise = new Promise((res, rej) => { + pubSub.subscribe>(testStream2, message => { + messagesReceived2.push(message.data); + if (messagesReceived2.length === totalMessages) { + res(messagesReceived2); + } + }); + }); + + for (let i = 0; i < totalMessages; i++ ) { + setTimeout(() => pubSub.publish(testStream1, { messageNumber: i, stream: testStream1 }), i * 100 + 5); + setTimeout(() => pubSub.publish(testStream2, { messageNumber: i, stream: testStream2 }), i * 100 + 5); + } + + return Promise.all([subscribePromise1, subscribePromise2]).then(([messages1, messages2]) => { + expect(messages1).to.have.length(totalMessages); + for (let i = 0; i < totalMessages; i++) { + expect(messages1[i]).to.include({ messageNumber: i }); + expect(messages1[i]).to.include({ stream: testStream1 }); + } + + expect(messages2).to.have.length(totalMessages); + for (let i = 0; i < totalMessages; i++) { + expect(messages2[i]).to.include({ messageNumber: i }); + expect(messages2[i]).to.include({ stream: testStream2 }); + } + }); + }); + + it('should fan out messages to multiple subscribers on the same stream', () => { + const testStream = 'teststream'; + const expectedMessage = { foo: 'bar' }; + + const subscribePromise1 = new Promise((res, rej) => { + pubSub.subscribe>(testStream, message => res(message.data)); + }); + + const subscribePromise2 = new Promise((res, rej) => { + pubSub.subscribe>(testStream, message => res(message.data)); + }); + + setTimeout(() => { + pubSub.publish(testStream, expectedMessage); + }, 5); + + return Promise.all([subscribePromise1, subscribePromise2]).then(([message1, message2]) => { + expect(message1).to.deep.equal(message2); + expect(message1).to.deep.equal(expectedMessage); + }); + }); + + it('should allow a subscriber to specify a message id with which to start listening', async () => { + const testStream = 'teststreamblah'; + + // Push a message and record its id. + let initialMessageId; + const subscriber1Messages: any[] = []; + const initialSubscriberPromise = new Promise((res, rej) => { + pubSub.subscribe>(testStream, message => { + if (!initialMessageId) { + initialMessageId = message.id; + res(); + } + + subscriber1Messages.push(message.data); + }); + }); + + setTimeout(() => { + pubSub.publish(testStream, { messageNumber: 0 }); + pubSub.publish(testStream, { messageNumber: 1 }); + }, 5); + + await initialSubscriberPromise; + + // Start subscriber 2 listening starting with the initial message id; it should receive + // the initial message in addition. + const subscriber2Messages: any[] = []; + const nextSubscriberPromise = new Promise((res, rej) => { + pubSub.subscribe>(testStream, message => { + subscriber2Messages.push(message.data); + if (subscriber2Messages.length === 2) { + res(subscriber2Messages); + } + }, { lastMessageId: initialMessageId }); + }); + + setTimeout(() => { + pubSub.publish(testStream, { messageNumber: 2 }); + }, 5); + + // The originally listening stream and the one that joined in + // late should have the same set of messages now + return nextSubscriberPromise.then(() => { + // Subscriber 1 should have all three messages, + // subscriber 2 should have the second two. + expect(subscriber1Messages).to.have.length(3); + for (let i = 0; i < 3; i++) { + expect(subscriber1Messages[i]).to.include({ messageNumber: i }); + } + + expect(subscriber2Messages).to.have.length(2); + for (let i = 0; i < 2; i++) { + expect(subscriber2Messages[i]).to.include({ messageNumber: i + 1 }); + } + }); + }); + + it('should allow subscribers to unsubscribe', async () => { + const testStream = 'teststream'; + const message1 = { messageNumber: 1 }; + const message2 = { messageNumber: 2 }; + + const subscriberMessages: any[] = []; + let subId; + const subscribePromise = new Promise(async (res, rej) => { + subId = await pubSub.subscribe>(testStream, message => { + subscriberMessages.push(message.data); + if (subscriberMessages.length === 1) { + res(message.data); + } + }); + + }); + + setTimeout(() => { + pubSub.publish(testStream, message1); + }, 5); + + await subscribePromise; + + pubSub.unsubscribe(subId); + + await pubSub.publish(testStream, message2); + + return new Promise((res, rej) => { + setTimeout(() => { + expect(subscriberMessages).to.have.length(1); + + expect((pubSub as any).eventLoop).to.be.undefined; + res(); + }, 5); + }); + }); +}); + +describe('redis stream pubsub with asyncIterator', () => { + beforeEach(() => { + pubSub = new RedisStreamPubSub({ + connection: { + host: redisHost, + port: 6379, + } + }); + }); + + afterEach(async () => { + return await pubSub.close(); + }); + + it('should return an async iterator that fetches values for the stream', async () => { + const streamName = 'teststream'; + type TestStreamData = StreamsMessage<{messageNumber: number}>; + + const asyncIterator = pubSub.asyncIterator(streamName); + + // Publish messages every 100ms + for (let i = 0; i < 3; i++) { + setTimeout(() => { + pubSub.publish(streamName, { messageNumber: i }); + }, 100 * i); + } + + const messages: IteratorResult[] = [ await asyncIterator.next(), await asyncIterator.next(), await asyncIterator.next() ]; + + expect(messages).to.have.length(3); + for (let i = 0; i < 3; i++ ) { + expect(messages[i].value?.data).to.include({ messageNumber: i }); + } + }); + + it('should support multiple async iterators at once', async () => { + const stream1Name = 'teststream1'; + const stream2Name = 'teststream2'; + type TestStreamData = StreamsMessage<{messageNumber: number, stream: number}>; + + const asyncIterator1 = pubSub.asyncIterator(stream1Name); + const asyncIterator2 = pubSub.asyncIterator(stream1Name); + const asyncIterator3 = pubSub.asyncIterator(stream2Name); + + const messages1: TestStreamData[] = []; + const messages2: TestStreamData[] = []; + const messages3: TestStreamData[] = []; + const doneReceivingPromise = new Promise((res, rej) => { + const resolveIfDone = () => { + if (messages1.length === 3 && messages2.length === 3 && messages3.length ===3) { + res(); + } + } + + for (let i = 0; i < 3; i++) { + asyncIterator1.next().then(item => { messages1.push(item.value); resolveIfDone() }); + asyncIterator2.next().then(item => { messages2.push(item.value); resolveIfDone() }); + asyncIterator3.next().then(item => { messages3.push(item.value); resolveIfDone() }); + } + }); + + // Publish messages every 100ms + for (let i = 0; i < 3; i++) { + setTimeout(() => { + pubSub.publish(stream1Name, { messageNumber: i, stream: 1 }); + pubSub.publish(stream2Name, { messageNumber: i, stream: 2 }); + }, 100 * i); + } + + await doneReceivingPromise; + + expect(messages1).to.have.length(3); + expect(messages2).to.have.length(3); + expect(messages3).to.have.length(3); + }); + + it('should close a subscription when return is called on an async iterator', async () => { + const streamName = 'teststream'; + type TestStreamData = StreamsMessage<{messageNumber: number}>; + + const asyncIterator = pubSub.asyncIterator(streamName); + + // Publish messages every 100ms + for (let i = 0; i < 3; i++) { + setTimeout(() => { + pubSub.publish(streamName, { messageNumber: i }); + }, 100 * i); + } + + const messages: IteratorResult[] = [ await asyncIterator.next(), await asyncIterator.next(), await asyncIterator.next() ]; + + expect(messages).to.have.length(3); + asyncIterator.return!(); + + // Subsequent next calls should return the return value; + const next = await asyncIterator.next(); + expect(next).to.include({ value: undefined, done: true }); + + return expect((pubSub as any).eventLoop).to.eventually.be.undefined; + }); +}); \ No newline at end of file