Skip to content

Commit

Permalink
Singleton key for singleton tasks (#15)
Browse files Browse the repository at this point in the history
* Singleton key for singleton tasks

* Changeset

* Fix tests and pool reuse across different schemas

* Add nodejs 20 for testing

* Woker test concurrency
  • Loading branch information
ilijaNL committed May 24, 2023
1 parent 133f25c commit a6d83f6
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 152 deletions.
5 changes: 5 additions & 0 deletions .changeset/lucky-dogs-press.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'pg-tbus': patch
---

Singleton task per queue
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:

strategy:
matrix:
node-version: [16.x, 18.x]
node-version: [16.x, 18.x, 20.x]

# Service containers to run with `container-job`
services:
Expand Down
3 changes: 3 additions & 0 deletions migrations/3-singletontask.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE {{schema}}."tasks" ADD COLUMN "singleton_key" text default null;
-- 0: create, 1: retry, 2: active, 3 >= all completed/failed
CREATE UNIQUE INDEX idx_unique_queue_task ON {{schema}}."tasks" ("queue", "singleton_key") WHERE state < 3;
20 changes: 12 additions & 8 deletions src/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ export const createTBus = (serviceName: string, configuration: TBusConfiguration

let pool: Pool;

const remotePool = 'query' in db;
let cleanupDB = async () => {
// noop
};

if ('query' in db) {
pool = db;
Expand All @@ -139,6 +141,9 @@ export const createTBus = (serviceName: string, configuration: TBusConfiguration
max: 3,
...db,
});
cleanupDB = async () => {
await pool.end();
};
}

const tWorker = createTaskWorker({
Expand All @@ -153,7 +158,8 @@ export const createTBus = (serviceName: string, configuration: TBusConfiguration

// log
if (!taskHandler) {
return;
console.error('task handler ' + tn + 'not registered for service ' + serviceName);
throw new Error('task handler ' + tn + 'not registered for service ' + serviceName);
}

await taskHandler({ name: tn, input: data, trigger: trace });
Expand All @@ -174,6 +180,9 @@ export const createTBus = (serviceName: string, configuration: TBusConfiguration

const maintainceWorker = createMaintainceWorker({ client: pool, retentionInDays: 30, schema: schema });

const notifyFanout = debounce(() => fanoutWorker.notify(), { ms: 75, maxMs: 300 });
const notifyWorker = debounce(() => tWorker.notify(), { maxMs: 300, ms: 75 });

/**
* Register one or more event handlers
*/
Expand Down Expand Up @@ -221,9 +230,6 @@ export const createTBus = (serviceName: string, configuration: TBusConfiguration
maintainceWorker.start();
}

const notifyFanout = debounce(() => fanoutWorker.notify(), { ms: 75, maxMs: 300 });
const notifyWorker = debounce(() => tWorker.notify(), { maxMs: 300, ms: 75 });

/**
* Returnes a query command which can be used to do manual submitting
*/
Expand Down Expand Up @@ -287,9 +293,7 @@ export const createTBus = (serviceName: string, configuration: TBusConfiguration

await Promise.all([fanoutWorker.stop(), maintainceWorker.stop(), tWorker.stop()]);

if (!remotePool) {
await pool.end();
}
await cleanupDB();
},
};
};
Expand Down
13 changes: 9 additions & 4 deletions src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export const defaultTaskConfig: TaskConfig = {
startAfterSeconds: 0,
expireInSeconds: 60 * 5, // 5 minutes
keepInSeconds: 7 * 24 * 60 * 60,
singletonKey: null,
};

export type TaskConfig = {
Expand All @@ -116,7 +117,7 @@ export type TaskConfig = {
*/
retryLimit: number;
/**
* Delay between retries of failed jobs, in seconds. Default 5
* Delay between retries of failed tasks, in seconds. Default 5
*/
retryDelay: number;
/**
Expand All @@ -128,13 +129,17 @@ export type TaskConfig = {
*/
startAfterSeconds: number;
/**
* How many seconds a job may be in active state before it is failed because of expiration. Default 60 * 5 (5minutes)
* How many seconds a task may be in active state before it is failed because of expiration. Default 60 * 5 (5minutes)
*/
expireInSeconds: number;
/**
* How long job is hold in the jobs table before it is archieved. Default 7 * 24 * 60 * 60 (7 days)
* How long task is hold in the tasks table before it is archieved. Default 7 * 24 * 60 * 60 (7 days)
*/
keepInSeconds: number;
/**
* A singleton key which can be used to have an unique active task in a queue.
*/
singletonKey: string | null;
};

/**
Expand Down Expand Up @@ -169,7 +174,7 @@ export const defineTask = <T extends TSchema>(props: {
return {
data: input,
task_name: props.task_name,
config: { ...config, ...props.config },
config: { ...props.config, ...config },
};
};

Expand Down
11 changes: 7 additions & 4 deletions src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const createMessagePlans = (schema: string) => {
retryLimit,
retryDelay,
retryBackoff,
singleton_key,
startAfter,
expireIn,
keepUntil
Expand All @@ -50,9 +51,10 @@ export const createMessagePlans = (schema: string) => {
"retryLimit",
"retryDelay",
"retryBackoff",
(now() + ("startAfterSeconds" * interval '1s'))::timestamp with time zone as startAfter,
"singletonKey" as singleton_key,
(now() + ("startAfterSeconds" * interval '1s'))::timestamptz as startAfter,
"expireInSeconds" * interval '1s' as expireIn,
(now() + ("startAfterSeconds" * interval '1s') + ("keepInSeconds" * interval '1s'))::timestamp with time zone as keepUntil
(now() + ("startAfterSeconds" * interval '1s') + ("keepInSeconds" * interval '1s'))::timestamptz as keepUntil
FROM json_to_recordset(${JSON.stringify(tasks)}) as x(
queue text,
data jsonb,
Expand All @@ -61,7 +63,8 @@ export const createMessagePlans = (schema: string) => {
"retryBackoff" boolean,
"startAfterSeconds" integer,
"expireInSeconds" integer,
"keepInSeconds" integer
"keepInSeconds" integer,
"singletonKey" text
)
ON CONFLICT DO NOTHING
`;
Expand Down Expand Up @@ -155,7 +158,7 @@ export const createMessagePlans = (schema: string) => {
};
};

export type taskFactory = ReturnType<typeof createTaskFactory>;
export type TaskFactory = ReturnType<typeof createTaskFactory>;

export const createTaskFactory =
(props: { taskConfig: Partial<TaskConfig>; queue: string }) =>
Expand Down
7 changes: 1 addition & 6 deletions src/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,11 @@ export function combineSQL(f: ReadonlyArray<string>, ...parameters: QueryCommand
};
}

export async function query<Result extends QueryResultRow>(
client: PGClient,
command: QueryCommand<Result>,
opts?: Partial<{ name: string }>
) {
export async function query<Result extends QueryResultRow>(client: PGClient, command: QueryCommand<Result>) {
return client
.query<Result>({
text: command.text,
values: command.values,
...opts,
})
.then((d) => d.rows);
}
Expand Down
99 changes: 42 additions & 57 deletions src/workers/fanout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import { Pool } from 'pg';
import { combineSQL, createSql, query, withTransaction } from '../sql';
import { createBaseWorker } from './base';
import { EventHandler } from '../definitions';
import { createMessagePlans, InsertTask, taskFactory } from '../messages';
import { createMessagePlans, InsertTask, TaskFactory } from '../messages';

type _Event = { id: string; event_name: string; event_data: any; position: string };

const createPlans = (schema: string) => {
const sql = createSql(schema);
const taskSql = createMessagePlans(schema);

function createJobsAndUpdateCursor(props: { tasks: InsertTask[]; service_name: string; last_position: number }) {
function createTasksAndUpdateSvcCursor(props: { tasks: InsertTask[]; service_name: string; last_position: number }) {
const res = combineSQL`
WITH insert as (
${taskSql.createTasks(props.tasks)}
Expand Down Expand Up @@ -51,63 +51,50 @@ const createPlans = (schema: string) => {
return events;
}

// function getEvents(offset: number, options: { limit: number }) {
// const events = sql<_Event>`
// SELECT
// id,
// event_name,
// event_data,
// pos as position
// FROM {{schema}}.events
// WHERE pos > ${offset}
// ORDER BY pos ASC
// LIMIT ${options.limit}`;

// return events;
// }

return {
getCursorLockEvents,
// getEvents,
createJobsAndUpdateCursor,
createTasksAndUpdateSvcCursor,
};
};

const createEventToTasks = (eventHandlers: EventHandler<string, any>[], jobFactory: taskFactory) => (event: _Event) => {
return eventHandlers.reduce((agg, curr) => {
if (curr.def.event_name !== event.event_name) {
return agg;
}

const config = typeof curr.config === 'function' ? curr.config(event.event_data) : curr.config;

agg.push(
jobFactory(
{
data: event.event_data,
task_name: curr.task_name,
config: config,
},
{
type: 'event',
e: { id: event.id, name: event.event_name, p: +event.position },
}
)
);
const createEventToTasks =
(eventHandlers: EventHandler<string, any>[], taskFactory: TaskFactory) => (event: _Event) => {
return eventHandlers.reduce((agg, curr) => {
if (curr.def.event_name !== event.event_name) {
return agg;
}

return agg;
}, [] as InsertTask[]);
};
const config = typeof curr.config === 'function' ? curr.config(event.event_data) : curr.config;

agg.push(
taskFactory(
{
data: event.event_data,
task_name: curr.task_name,
config: config,
},
{
type: 'event',
e: { id: event.id, name: event.event_name, p: +event.position },
}
)
);

return agg;
}, [] as InsertTask[]);
};

export const createFanoutWorker = (props: {
pool: Pool;
serviceName: string;
schema: string;
getEventHandlers: () => EventHandler<string, any>[];
onNewTasks: () => void;
taskFactory: taskFactory;
taskFactory: TaskFactory;
}) => {
const plans = createPlans(props.schema);
const fetchSize = 100;
// worker which responsible for creating tasks from incoming integrations events
const fanoutWorker = createBaseWorker(
async () => {
Expand All @@ -118,40 +105,38 @@ export const createFanoutWorker = (props: {

const eventToTasks = createEventToTasks(handlers, props.taskFactory);
// start transaction
const newTasks = await withTransaction<boolean>(props.pool, async (client) => {
const events = await query(client, plans.getCursorLockEvents(props.serviceName, { limit: 100 }), {
name: 'getLockAndEvents',
});
const trxResult = await withTransaction<{ hasMore: boolean; hasChanged: boolean }>(props.pool, async (client) => {
const events = await query(client, plans.getCursorLockEvents(props.serviceName, { limit: fetchSize }));

if (events.length === 0) {
return false;
return { hasChanged: false, hasMore: false };
}

const newCursor = +events[events.length - 1]!.position;
const tasks = events.map(eventToTasks).flat();

await query(
client,
plans.createJobsAndUpdateCursor({
plans.createTasksAndUpdateSvcCursor({
tasks: tasks,
last_position: newCursor,
service_name: props.serviceName,
}),
{
name: 'createJobsAndUpdateCursor',
}
})
);

return tasks.length > 0;
return {
hasChanged: tasks.length > 0,
hasMore: events.length === fetchSize,
};
});

if (newTasks) {
if (trxResult.hasChanged) {
props.onNewTasks();
}

return false;
return trxResult.hasMore;
},
{ loopInterval: 1000 }
{ loopInterval: 1500 }
);

return fanoutWorker;
Expand Down

0 comments on commit a6d83f6

Please sign in to comment.