Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cross-instance messenger pubsub setup #13651

Merged
merged 5 commits into from Jun 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions api/package.json
Expand Up @@ -185,6 +185,7 @@
"@types/flat": "5.0.2",
"@types/fs-extra": "9.0.13",
"@types/inquirer": "8.1.3",
"@types/ioredis": "^4.28.10",
"@types/jest": "27.4.1",
"@types/js-yaml": "4.0.5",
"@types/json2csv": "5.0.3",
Expand Down
4 changes: 2 additions & 2 deletions api/src/app.ts
Expand Up @@ -50,7 +50,7 @@ import schema from './middleware/schema';
import { track } from './utils/track';
import { validateEnv } from './utils/validate-env';
import { validateStorage } from './utils/validate-storage';
import { register as registerWebhooks } from './webhooks';
import { init as initWebhooks } from './webhooks';
import { flushCaches } from './cache';
import { registerAuthProviders } from './auth';
import { Url } from './utils/url';
Expand Down Expand Up @@ -242,7 +242,7 @@ export default async function createApp(): Promise<express.Application> {
await emitter.emitInit('routes.after', { app });

// Register all webhooks
await registerWebhooks();
await initWebhooks();

track('serverStarted');

Expand Down
80 changes: 80 additions & 0 deletions api/src/messenger.ts
@@ -0,0 +1,80 @@
import IORedis from 'ioredis';
import env from './env';
import { getConfigFromEnv } from './utils/get-config-from-env';
import { parseJSON } from './utils/parse-json';

export type MessengerSubscriptionCallback = (payload: Record<string, any>) => void;

export interface Messenger {
publish: (channel: string, payload: Record<string, any>) => void;
subscribe: (channel: string, callback: MessengerSubscriptionCallback) => void;
unsubscribe: (channel: string) => void;
}

export class MessengerMemory implements Messenger {
handlers: Record<string, MessengerSubscriptionCallback>;

constructor() {
this.handlers = {};
}

publish(channel: string, payload: Record<string, any>) {
this.handlers[channel]?.(payload);
}

subscribe(channel: string, callback: MessengerSubscriptionCallback) {
this.handlers[channel] = callback;
}

unsubscribe(channel: string) {
delete this.handlers[channel];
}
}

export class MessengerRedis implements Messenger {
namespace: string;
pub: IORedis.Redis;
sub: IORedis.Redis;

constructor() {
const config = getConfigFromEnv('MESSENGER_REDIS');

this.pub = new IORedis(env.MESSENGER_REDIS ?? config);
this.sub = new IORedis(env.MESSENGER_REDIS ?? config);
this.namespace = env.MESSENGER_NAMESPACE ?? 'directus';
}

publish(channel: string, payload: Record<string, any>) {
this.pub.publish(`${this.namespace}:${channel}`, JSON.stringify(payload));
}

subscribe(channel: string, callback: MessengerSubscriptionCallback) {
this.sub.subscribe(`${this.namespace}:${channel}`);

this.sub.on('message', (messageChannel, payloadString) => {
const payload = parseJSON(payloadString);

if (messageChannel === `${this.namespace}:${channel}`) {
callback(payload);
}
});
}

unsubscribe(channel: string) {
this.sub.unsubscribe(`${this.namespace}:${channel}`);
}
}

let messenger: Messenger;

export function getMessenger() {
if (messenger) return messenger;

if (env.MESSENGER_STORE === 'redis') {
messenger = new MessengerRedis();
} else {
messenger = new MessengerMemory();
}

return messenger;
}
17 changes: 10 additions & 7 deletions api/src/services/webhooks.ts
@@ -1,45 +1,48 @@
import { AbstractServiceOptions, Item, PrimaryKey, Webhook, MutationOptions } from '../types';
import { register } from '../webhooks';
import { ItemsService } from './items';
import { getMessenger, Messenger } from '../messenger';

export class WebhooksService extends ItemsService<Webhook> {
messenger: Messenger;

constructor(options: AbstractServiceOptions) {
super('directus_webhooks', options);
this.messenger = getMessenger();
}

async createOne(data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
const result = await super.createOne(data, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
rijkvanzanten marked this conversation as resolved.
Show resolved Hide resolved
return result;
}

async createMany(data: Partial<Item>[], opts?: MutationOptions): Promise<PrimaryKey[]> {
const result = await super.createMany(data, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}

async updateOne(key: PrimaryKey, data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey> {
const result = await super.updateOne(key, data, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}

async updateMany(keys: PrimaryKey[], data: Partial<Item>, opts?: MutationOptions): Promise<PrimaryKey[]> {
const result = await super.updateMany(keys, data, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}

async deleteOne(key: PrimaryKey, opts?: MutationOptions): Promise<PrimaryKey> {
const result = await super.deleteOne(key, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}

async deleteMany(keys: PrimaryKey[], opts?: MutationOptions): Promise<PrimaryKey[]> {
const result = await super.deleteMany(keys, opts);
await register();
this.messenger.publish('webhooks', { type: 'reload' });
return result;
}
}
18 changes: 17 additions & 1 deletion api/src/webhooks.ts
Expand Up @@ -6,15 +6,31 @@ import { Webhook, WebhookHeader } from './types';
import { WebhooksService } from './services';
import { getSchema } from './utils/get-schema';
import { ActionHandler } from '@directus/shared/types';
import { getMessenger } from './messenger';

let registered: { event: string; handler: ActionHandler }[] = [];

export async function register(): Promise<void> {
export async function init(): Promise<void> {
await register();
const messenger = getMessenger();

messenger.subscribe('webhooks', (event) => {
if (event.type === 'reload') {
reload();
}
});
}
rijkvanzanten marked this conversation as resolved.
Show resolved Hide resolved

export async function reload(): Promise<void> {
unregister();
await register();
}

export async function register(): Promise<void> {
const webhookService = new WebhooksService({ knex: getDatabase(), schema: await getSchema() });

const webhooks = await webhookService.readByQuery({ filter: { status: { _eq: 'active' } } });

for (const webhook of webhooks) {
for (const action of webhook.actions) {
const event = `items.${action}`;
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/config-options.md
Expand Up @@ -779,6 +779,16 @@ AUTH_FACEBOOK_ICON="facebook"
| `EXTENSIONS_PATH` | Path to your local extensions folder. | `./extensions` |
| `EXTENSIONS_AUTO_RELOAD` | Automatically reload extensions when they have changed. | `false` |

## Messenger

| Variable | Description | Default Value |
| --------------------- | ------------------------------------------------- | ------------- |
| `MESSENGER_STORE` | One of `memory`, `redis`<sup>[1]</sup> | `memory` |
| `MESSENGER_NAMESPACE` | How to scope the channels in Redis | `directus` |
| `MESSENGER_REDIS_*` | The Redis configuration for the pubsub connection | -- |

<sup>[1]</sup> `redis` should be used in load-balanced installations of Directus

## Email

| Variable | Description | Default Value |
Expand Down
20 changes: 20 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.