Skip to content

13Bytes/redis-pubsub

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

90 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

redis-pubsub

Full type-safe Redis PubSub system with async iterators

This is a fork of https://github.com/soundxyz/redis-pubsub with updated dependencies

Features

Install

npm install @13bytes/redis-pubsub

Peer dependencies

pnpm add zod ioredis
npm install zod ioredis
yarn add zod ioredis

Usage

Create a Redis PubSub instance:

import Redis from "ioredis";
import { z } from "zod";

import { RedisPubSub } from "@soundxyz/redis-pubsub";

const { createChannel } = RedisPubSub({
  publisher: new Redis({
    port: 6379,
  }),
  subscriber: new Redis({
    port: 6379,
  }),
});

Create a channel with any Zod schema and a unique "name" to be used as main trigger.

const schema = z.object({
  id: z.string(),
  name: z.string(),
});

const userChannel = createChannel({
  name: "User",
  schema,
});

const nonLazyUserChannel = createChannel({
  name: "User",
  schema,
  // By default the channels are lazily connected with redis
  isLazy: false,
});

Subscribe and publish to the channel

// Using async iterators / async generators to subscribe
(async () => {
  for await (const user of userChannel.subscribe()) {
    console.log("User", {
      id: user.id,
      name: user.name,
    });
  }
})();

// You can explicitly wait until the channel is sucessfully connected with Redis
await userChannel.isReady();

// Publish data into the channel
await userChannel.publish(
  {
    value: {
      id: "1",
      name: "John",
    },
  },
  // You can also publish more than a single value
  {
    value: {
      id: "2",
      name: "Peter",
    },
  }
);

Filter based on the data

(async () => {
  for await (const user of userChannel.subscribe({
    filter(value) {
      return value.id === "1";
    },
  })) {
    console.log("User 1", {
      id: user.id,
      name: user.name,
    });
  }
})();

// You can also use type predicates / type guards
(async () => {
  for await (const user of userChannel.subscribe({
    filter(value): value is { id: "1"; name: string } {
      return value.id === "1";
    },
  })) {
    // typeof user.id == "1"
    console.log("User 1", {
      id: user.id,
      name: user.name,
    });
  }
})();

Use custom identifiers

It will create a separate redis channel for every identifier, concatenating "name" and "identifier", for example, with "name"="User" and "identifier" = 1, the channel trigger name will be "User1"

(async () => {
  for await (const user of userChannel.subscribe({
    // number or string
    identifier: 1,
  })) {
    console.log("User with identifier=1", {
      id: user.id,
      name: user.name,
    });
  }
})();

await userChannel.isReady({
  // number or string
  identifier: 1,
});

await userChannel.publish({
  value: {
    id: "1",
    name: "John",
  },
  identifier: 1,
});

Separate input from output

You can levarage Zod Transforms to be able to separate input types from the output types, and receive any custom class or output on your subscriptions.

class CustomClass {
  constructor(public name: string) {}
}

const inputSchema = z.string();
const outputSchema = z.string().transform((input) => new CustomClass(input));

const channel = pubSub.createChannel({
  name: "separate-type",
  inputSchema,
  outputSchema,
});

const subscription = (async () => {
  for await (const data of channel.subscribe()) {
    return data;
  }
})();

await channel.isReady();

await channel.publish({
  value: "test",
});

const result = await subscription;

// true
console.log(result instanceof CustomClass);

// true
console.log(result.name === "test");

Use AbortController / AbortSignal

If isLazy is not disabled, the last subscription to a channel will be automatically unsubscribed from Redis.

const abortController = new AbortController();
const abortedSubscription = (() => {
  for await (const data of userChannel.subscribe({
    abortSignal: abortController.signal,
  })) {
    console.log({ data });
  }
})();

// ...

firstSubscribeAbortController.abort();

await abortedSubscription;

Unsubscribe specific identifiers

await userChannel.unsubscribe(
  {
    identifier: 1,
  },
  // You can specify more than a single identifer at once
  {
    identifier: 2,
  }
);

Unsubscribe an entire channel

await userChannel.unsubscribeAll();

Close the PubSub instance

const pubSub = RedisPubSub({
  publisher: new Redis({
    port: 6379,
  }),
  subscriber: new Redis({
    port: 6379,
  }),
});

// ...
await pubSub.close();

About

Full type-safe Redis PubSub with Zod

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • TypeScript 99.2%
  • JavaScript 0.8%