Skip to content

Commit

Permalink
feat: Add redis streams support.
Browse files Browse the repository at this point in the history
Closes #340.
  • Loading branch information
boehlkers committed Dec 22, 2022
1 parent 1775b7c commit 524e702
Show file tree
Hide file tree
Showing 6 changed files with 798 additions and 105 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"lint": "eslint src --ext ts",
"watch": "tsc -w",
"testonly": "mocha --reporter spec src/test/tests.ts",
"integration": "mocha --reporter spec src/test/integration-tests.ts",
"integration": "mocha --reporter spec src/test/integration-tests.ts src/test/stream-tests.ts",
"coverage": "nyc --reporter=html --reporter=text mocha src/test/**/*.ts",
"prepublish": "tsc",
"prepublishOnly": "npm run test"
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export { RedisPubSub } from './redis-pubsub';
export { RedisStreamPubSub } from './redis-stream-pubsub';
123 changes: 123 additions & 0 deletions src/redis-pubsub-base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { Cluster, Redis, RedisOptions } from 'ioredis';
import { PubSubEngine } from 'graphql-subscriptions';
import {PubSubAsyncIterator} from './pubsub-async-iterator';

type DeserializerContext = { channel: string, pattern?: string };

export type RedisClient = Redis | Cluster;
export type OnMessage<T> = (message: T) => void;

export interface PubSubRedisBaseOptions {
connection?: RedisOptions | string;
triggerTransform?: TriggerTransform;
connectionListener?: (err: Error) => void;
publisher?: RedisClient;
subscriber?: RedisClient;
reviver?: Reviver;
serializer?: Serializer;
deserializer?: Deserializer;
}

export abstract class RedisPubSubBase implements PubSubEngine {

constructor(options: PubSubRedisBaseOptions = {}) {
const {
triggerTransform,
connection,
connectionListener,
subscriber,
publisher,
reviver,
serializer,
deserializer,
} = options;

this.triggerTransform = triggerTransform || (trigger => trigger as string);

if (reviver && deserializer) {
throw new Error("Reviver and deserializer can't be used together");
}

this.reviver = reviver;
this.serializer = serializer;
this.deserializer = deserializer;

if (subscriber && publisher) {
this.redisPublisher = publisher;
this.redisSubscriber = subscriber;
} else {
try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const IORedis = require('ioredis');
this.redisPublisher = new IORedis(connection);
this.redisSubscriber = new IORedis(connection);

if (connectionListener) {
this.redisPublisher
.on('connect', connectionListener)
.on('error', connectionListener);
this.redisSubscriber
.on('connect', connectionListener)
.on('error', connectionListener);
} else {
this.redisPublisher.on('error', console.error);
this.redisSubscriber.on('error', console.error);
}
} catch (error) {
console.error(
`No publisher or subscriber instances were provided and the package 'ioredis' wasn't found. Couldn't create Redis clients.`,
);
}
}

this.currentSubscriptionId = 0;
}

public abstract publish<T>(trigger: string, payload: T): Promise<void>;

public abstract subscribe<T = any>(
trigger: string,
onMessage: OnMessage<T>,
options: unknown,
): Promise<number>;

public abstract unsubscribe(subId: number): void;

public asyncIterator<T>(triggers: string | string[], options?: unknown): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers, options);
}

public getSubscriber(): RedisClient {
return this.redisSubscriber;
}

public getPublisher(): RedisClient {
return this.redisPublisher;
}

public close(): Promise<'OK'[]> {
return Promise.all([
this.redisPublisher.quit(),
this.redisSubscriber.quit(),
]);
}

protected readonly serializer?: Serializer;
protected readonly deserializer?: Deserializer;
protected readonly triggerTransform: TriggerTransform;
protected readonly redisSubscriber: RedisClient;
protected readonly redisPublisher: RedisClient;
protected readonly reviver: Reviver;

protected currentSubscriptionId: number;
}

export type Path = Array<string | number>;
export type Trigger = string | Path;
export type TriggerTransform = (
trigger: Trigger,
channelOptions?: unknown,
) => string;
export type Reviver = (key: any, value: any) => any;
export type Serializer = (source: any) => string;
export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any;
116 changes: 12 additions & 104 deletions src/redis-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,32 @@
import {Cluster, Redis, RedisOptions} from 'ioredis';
import {PubSubEngine} from 'graphql-subscriptions';
import {PubSubAsyncIterator} from './pubsub-async-iterator';

type RedisClient = Redis | Cluster;
type OnMessage<T> = (message: T) => void;
type DeserializerContext = { channel: string, pattern?: string };

export interface PubSubRedisOptions {
connection?: RedisOptions | string;
triggerTransform?: TriggerTransform;
connectionListener?: (err: Error) => void;
publisher?: RedisClient;
subscriber?: RedisClient;
reviver?: Reviver;
serializer?: Serializer;
deserializer?: Deserializer;
import { OnMessage, PubSubRedisBaseOptions, RedisPubSubBase } from "./redis-pubsub-base";

interface PubSubRedisSubscribeOptions {
messageEventName?: string;
pmessageEventName?: string;
}

export class RedisPubSub implements PubSubEngine {
export type PubSubRedisOptions = PubSubRedisBaseOptions & PubSubRedisSubscribeOptions;

/**
* Redis PubSub implementation that uses `subscribe` or `psubscribe` redis commands
* as the communication method.
*/
export class RedisPubSub extends RedisPubSubBase {
constructor(options: PubSubRedisOptions = {}) {
super(options);

const {
triggerTransform,
connection,
connectionListener,
subscriber,
publisher,
reviver,
serializer,
deserializer,
messageEventName = 'message',
pmessageEventName = 'pmessage',
} = options;

this.triggerTransform = triggerTransform || (trigger => trigger as string);

if (reviver && deserializer) {
throw new Error("Reviver and deserializer can't be used together");
}

this.reviver = reviver;
this.serializer = serializer;
this.deserializer = deserializer;

if (subscriber && publisher) {
this.redisPublisher = publisher;
this.redisSubscriber = subscriber;
} else {
try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const IORedis = require('ioredis');
this.redisPublisher = new IORedis(connection);
this.redisSubscriber = new IORedis(connection);

if (connectionListener) {
this.redisPublisher
.on('connect', connectionListener)
.on('error', connectionListener);
this.redisSubscriber
.on('connect', connectionListener)
.on('error', connectionListener);
} else {
this.redisPublisher.on('error', console.error);
this.redisSubscriber.on('error', console.error);
}
} catch (error) {
console.error(
`No publisher or subscriber instances were provided and the package 'ioredis' wasn't found. Couldn't create Redis clients.`,
);
}
}

// handle messages received via psubscribe and subscribe
this.redisSubscriber.on(pmessageEventName, this.onMessage.bind(this));
// partially applied function passes undefined for pattern arg since 'message' event won't provide it:
this.redisSubscriber.on(messageEventName, this.onMessage.bind(this, undefined));

this.subscriptionMap = {};
this.subsRefsMap = new Map<string, Set<number>>();
this.currentSubscriptionId = 0;
}

public async publish<T>(trigger: string, payload: T): Promise<void> {
Expand Down Expand Up @@ -138,36 +84,8 @@ export class RedisPubSub implements PubSubEngine {
}
delete this.subscriptionMap[subId];
}

public asyncIterator<T>(triggers: string | string[], options?: unknown): AsyncIterator<T> {
return new PubSubAsyncIterator<T>(this, triggers, options);
}

public getSubscriber(): RedisClient {
return this.redisSubscriber;
}

public getPublisher(): RedisClient {
return this.redisPublisher;
}

public close(): Promise<'OK'[]> {
return Promise.all([
this.redisPublisher.quit(),
this.redisSubscriber.quit(),
]);
}

private readonly serializer?: Serializer;
private readonly deserializer?: Deserializer;
private readonly triggerTransform: TriggerTransform;
private readonly redisSubscriber: RedisClient;
private readonly redisPublisher: RedisClient;
private readonly reviver: Reviver;

private readonly subscriptionMap: { [subId: number]: [string, OnMessage<unknown>] };
private readonly subsRefsMap: Map<string, Set<number>>;
private currentSubscriptionId: number;

private onMessage(pattern: string, channel: string, message: string) {
const subscribers = this.subsRefsMap.get(pattern || channel);
Expand All @@ -189,14 +107,4 @@ export class RedisPubSub implements PubSubEngine {
listener(parsedMessage);
});
}
}

export type Path = Array<string | number>;
export type Trigger = string | Path;
export type TriggerTransform = (
trigger: Trigger,
channelOptions?: unknown,
) => string;
export type Reviver = (key: any, value: any) => any;
export type Serializer = (source: any) => string;
export type Deserializer = (source: string | Buffer, context: DeserializerContext) => any;
}
Loading

0 comments on commit 524e702

Please sign in to comment.