Skip to content

Commit

Permalink
feat(webhooks): add support for workers (#637)
Browse files Browse the repository at this point in the history
* Emit events

* Boot only on main thread

* Move interfaces

* Move identifiers

* Use database contract

* Add webhooks to transaction pool

* Fix tests

* Add handler

* Listen to events

* Fix tests
  • Loading branch information
sebastijankuzner committed Jun 24, 2024
1 parent d387364 commit 116cd96
Show file tree
Hide file tree
Showing 20 changed files with 165 additions and 106 deletions.
1 change: 1 addition & 0 deletions packages/contracts/source/contracts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ export * as Transactions from "./transactions.js";
export * as Types from "./types/index.js";
export * as Validator from "./validator.js";
export * as ValidatorSet from "./validator-set.js";
export * as Webhooks from "./webhooks.js";
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { CommitHandler, Transaction } from "../crypto/index.js";
import { EventListener } from "../kernel/index.js";
import { EventCallback, Subprocess } from "../kernel/ipc.js";
import { StoreChange } from "../state/index.js";
import { KeyValuePair } from "../types/index.js";
Expand All @@ -13,6 +14,7 @@ export interface WorkerScriptHandler {
setPeer(ip: string): Promise<void>;
forgetPeer(ip: string): Promise<void>;
start(): Promise<void>;
reloadWebhooks(): Promise<void>;
}

export type WorkerFactory = () => Worker;
Expand All @@ -21,7 +23,7 @@ export type WorkerSubprocess = Subprocess<WorkerScriptHandler>;

export type WorkerSubprocessFactory = () => WorkerSubprocess;

export interface Worker extends Omit<WorkerScriptHandler, "commit" | "getTransactions">, CommitHandler {
export interface Worker extends Omit<WorkerScriptHandler, "commit" | "getTransactions">, CommitHandler, EventListener {
getQueueSize(): number;
kill(): Promise<number>;
setFailedTransactions(transactions: Transaction[]): void;
Expand Down
25 changes: 25 additions & 0 deletions packages/contracts/source/contracts/webhooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export interface Webhook {
id?: string;
token?: string;

event: string;
target: string;
enabled: boolean;
conditions: Array<{
key: string;
value: any;
condition: string;
}>;
}

export interface Database {
boot(): void;
restore(): void;
all(): Webhook[];
hasById(id: string): boolean;
findById(id: string): Webhook | undefined;
findByEvent(event: string): Webhook[];
create(data: Webhook): Webhook;
update(id: string, data: Webhook): Webhook | undefined;
destroy(id: string): Webhook | undefined;
}
6 changes: 6 additions & 0 deletions packages/contracts/source/identifiers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,10 @@ export const Identifiers = {
ValidatorSet: {
Service: Symbol("ValidatorSet<Service>"),
},
Webhooks: {
Broadcaster: Symbol.for("Webhooks<Broadcaster>"),
Database: Symbol.for("Webhooks<Database>"),
Listener: Symbol.for("Webhooks<Listener>"),
Server: Symbol.for("Webhooks<Server>"),
},
};
3 changes: 3 additions & 0 deletions packages/core/bin/config/testnet/core/app.json
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@
},
{
"package": "@mainsail/api-transaction-pool"
},
{
"package": "@mainsail/webhooks"
}
]
}
1 change: 1 addition & 0 deletions packages/transaction-pool-worker/source/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ export * from "./commit.js";
export * from "./forget-peer.js";
export * from "./get-transactions.js";
export * from "./import-snapshot.js";
export * from "./reload-webhooks.js";
export * from "./set-peer.js";
export * from "./start.js";
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { inject, injectable } from "@mainsail/container";
import { Contracts, Identifiers } from "@mainsail/contracts";

@injectable()
export class ReloadWebhooksHandler {
@inject(Identifiers.Webhooks.Database)
protected readonly database!: Contracts.Webhooks.Database;

public async handle(): Promise<void> {
this.database.restore();
}
}
5 changes: 5 additions & 0 deletions packages/transaction-pool-worker/source/worker-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
ForgetPeerHandler,
GetTransactionsHandler,
ImportSnapshotHandler,
ReloadWebhooksHandler,
SetPeerHandler,
StartHandler,
} from "./handlers/index.js";
Expand Down Expand Up @@ -54,4 +55,8 @@ export class WorkerScriptHandler implements Contracts.TransactionPool.WorkerScri
public async forgetPeer(ip: string): Promise<void> {
return await this.#app.resolve(ForgetPeerHandler).handle(ip);
}

public async reloadWebhooks(): Promise<void> {
await this.#app.resolve(ReloadWebhooksHandler).handle();
}
}
17 changes: 16 additions & 1 deletion packages/transaction-pool-worker/source/worker.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { inject, injectable, postConstruct } from "@mainsail/container";
import { Contracts, Identifiers } from "@mainsail/contracts";
import { Contracts, Events, Identifiers } from "@mainsail/contracts";

@injectable()
export class Worker implements Contracts.TransactionPool.Worker {
@inject(Identifiers.TransactionPool.WorkerSubprocess.Factory)
private readonly createWorkerSubprocess!: Contracts.Crypto.WorkerSubprocessFactory;

@inject(Identifiers.Services.EventDispatcher.Service)
private readonly eventDispatcher!: Contracts.Kernel.EventDispatcher;

private ipcSubprocess!: Contracts.TransactionPool.WorkerSubprocess;

#booted = false;
Expand All @@ -14,6 +17,14 @@ export class Worker implements Contracts.TransactionPool.Worker {
@postConstruct()
public initialize(): void {
this.ipcSubprocess = this.createWorkerSubprocess();

this.eventDispatcher.listen(Events.WebhookEvent.Created, this);
this.eventDispatcher.listen(Events.WebhookEvent.Updated, this);
this.eventDispatcher.listen(Events.WebhookEvent.Removed, this);
}

public handle(payload: { name: string; data: any }): void {
void this.reloadWebhooks();
}

public registerEventHandler(event: string, callback: Contracts.Kernel.IPC.EventCallback<any>): void {
Expand Down Expand Up @@ -69,4 +80,8 @@ export class Worker implements Contracts.TransactionPool.Worker {
public async forgetPeer(ip: string): Promise<void> {
await this.ipcSubprocess.sendRequest("forgetPeer", ip);
}

public async reloadWebhooks(): Promise<void> {
await this.ipcSubprocess.sendRequest("reloadWebhooks");
}
}
17 changes: 8 additions & 9 deletions packages/webhooks/source/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import { dirSync, setGracefulCleanup } from "tmp";
import { describe } from "../../test-framework/source";
import { dummyWebhook } from "../test/fixtures/assets";
import { Database } from "./database";
import { InternalIdentifiers } from "./identifiers";
import { Webhook } from "./interfaces";

describe<{
database: Database;
Expand All @@ -16,10 +14,11 @@ describe<{
const app = new Application(new Container());
app.bind("path.cache").toConstantValue(dirSync().name);

app.bind<Database>(InternalIdentifiers.Database).to(Database).inSingletonScope();
app.bind<Database>(Identifiers.Webhooks.Database).to(Database).inSingletonScope();
app.bind(Identifiers.Services.Filesystem.Service).toConstantValue({ existsSync: () => true });
app.bind(Identifiers.Services.EventDispatcher.Service).toConstantValue({ dispatch: () => {} });

const database = app.get<Database>(InternalIdentifiers.Database);
const database = app.get<Database>(Identifiers.Webhooks.Database);
database.boot();

context.database = database;
Expand Down Expand Up @@ -56,7 +55,7 @@ describe<{
});

it("should find webhooks by their event", ({ database }) => {
const webhook: Webhook = database.create(dummyWebhook);
const webhook: Contracts.Webhooks.Webhook = database.create(dummyWebhook);

const rows = database.findByEvent("event");

Expand All @@ -69,20 +68,20 @@ describe<{
});

it("should create a new webhook", ({ database }) => {
const webhook: Webhook = database.create(dummyWebhook);
const webhook: Contracts.Webhooks.Webhook = database.create(dummyWebhook);

assert.equal(database.create(webhook), webhook);
});

it("should update an existing webhook", ({ database }) => {
const webhook: Webhook = database.create(dummyWebhook);
const updated: Webhook = database.update(webhook.id, dummyWebhook);
const webhook: Contracts.Webhooks.Webhook = database.create(dummyWebhook);
const updated: Contracts.Webhooks.Webhook = database.update(webhook.id, dummyWebhook);

assert.equal(database.findById(webhook.id), updated);
});

it("should delete an existing webhook", ({ database }) => {
const webhook: Webhook = database.create(dummyWebhook);
const webhook: Contracts.Webhooks.Webhook = database.create(dummyWebhook);

assert.equal(database.findById(webhook.id), webhook);

Expand Down
57 changes: 36 additions & 21 deletions packages/webhooks/source/database.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
import { inject, injectable } from "@mainsail/container";
import { Contracts, Identifiers } from "@mainsail/contracts";
import { Contracts, Events, Identifiers } from "@mainsail/contracts";
import { existsSync } from "fs";
import { ensureFileSync } from "fs-extra/esm";
import { LowSync } from "lowdb";
import { JSONFileSync } from "lowdb/node";
import { v4 as uuidv4 } from "uuid";

import { Webhook } from "./interfaces.js";

@injectable()
export class Database {
export class Database implements Contracts.Webhooks.Database {
@inject(Identifiers.Application.Instance)
private readonly app!: Contracts.Kernel.Application;

#database!: LowSync<{ webhooks: Webhook[] }>;
@inject(Identifiers.Services.EventDispatcher.Service)
private readonly eventDispatcher!: Contracts.Kernel.EventDispatcher;

#database!: LowSync<{ webhooks: Contracts.Webhooks.Webhook[] }>;

public boot() {
const adapterFile: string = this.app.cachePath("webhooks.json");
Expand All @@ -22,53 +23,67 @@ export class Database {
ensureFileSync(adapterFile);
}

this.#database = new LowSync<{ webhooks: Webhook[] }>(new JSONFileSync(adapterFile), { webhooks: [] });
this.#restore();
this.#database = new LowSync<{ webhooks: Contracts.Webhooks.Webhook[] }>(new JSONFileSync(adapterFile), {
webhooks: [],
});
this.restore();
}

public all(): Webhook[] {
public restore(): void {
try {
this.#database.read();
} catch {}
}

public all(): Contracts.Webhooks.Webhook[] {
return this.#database.data.webhooks;
}

public hasById(id: string): boolean {
return !!this.findById(id);
}

public findById(id: string): Webhook | undefined {
public findById(id: string): Contracts.Webhooks.Webhook | undefined {
return this.#database.data.webhooks.find((webhook) => webhook.id === id);
}

public findByEvent(event: string): Webhook[] {
public findByEvent(event: string): Contracts.Webhooks.Webhook[] {
return this.#database.data.webhooks.filter((webhook) => webhook.event === event);
}

public create(data: Webhook): Webhook | undefined {
public create(data: Contracts.Webhooks.Webhook): Contracts.Webhooks.Webhook {
data.id = uuidv4();

this.#database.data.webhooks.push(data);
this.#database.write();

return this.findById(data.id);
void this.eventDispatcher.dispatch(Events.WebhookEvent.Created, { webhook: data });

return this.findById(data.id)!;
}

public update(id: string, data: Webhook): Webhook | undefined {
public update(id: string, data: Contracts.Webhooks.Webhook): Contracts.Webhooks.Webhook | undefined {
const webhook = this.#database.data.webhooks.find((webhook) => webhook.id === id);
if (webhook) {
Object.assign(webhook, data);
this.#database.write();

void this.eventDispatcher.dispatch(Events.WebhookEvent.Updated, { webhook: data });
}

return webhook;
}

public destroy(id: string): void {
this.#database.data.webhooks = this.#database.data.webhooks.filter((webhook) => webhook.id !== id);
this.#database.write();
}
public destroy(id: string): Contracts.Webhooks.Webhook | undefined {
const webhook = this.#database.data.webhooks.find((webhook) => webhook.id === id);

#restore(): void {
try {
this.#database.read();
} catch {}
if (webhook) {
this.#database.data.webhooks = this.#database.data.webhooks.filter((webhook) => webhook.id !== id);
this.#database.write();

void this.eventDispatcher.dispatch(Events.WebhookEvent.Removed, { webhook });
}

return webhook;
}
}
6 changes: 0 additions & 6 deletions packages/webhooks/source/identifiers.ts

This file was deleted.

13 changes: 0 additions & 13 deletions packages/webhooks/source/interfaces.ts

This file was deleted.

Loading

0 comments on commit 116cd96

Please sign in to comment.