Skip to content

Commit

Permalink
Allow to emit tasks to different queues
Browse files Browse the repository at this point in the history
  • Loading branch information
ilijaNL committed May 26, 2023
1 parent 0a24f04 commit 9e415d0
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 4 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# pg-tbus

## 0.1.7

### Patch Changes

- Allow to emit tasks to different queues

## 0.1.6

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "pg-tbus",
"author": "IlijaNL",
"version": "0.1.6",
"version": "0.1.7",
"types": "dist/index.d.ts",
"module": "dist/index.mjs",
"main": "dist/index.js",
Expand Down
5 changes: 4 additions & 1 deletion src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export interface TaskDefinition<T extends TSchema> extends TaskDeclaration<T> {

export interface Task<Data = {}> {
task_name: string;
queue?: string;
data: Data;
config: Partial<TaskConfig>;
}
Expand Down Expand Up @@ -179,9 +180,11 @@ export const declareTask = <T extends TSchema>(props: DeclareTaskProps<T>): Task
`invalid input for task ${props.task_name}: ${validateFn.errors?.map((e) => e.message).join(' \n')}`
);
}

return {
data: input,
queue: props.queue,
task_name: props.task_name,
data: input,
config: { ...props.config, ...config },
};
};
Expand Down
2 changes: 1 addition & 1 deletion src/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export const createTaskFactory =
tn: task.task_name,
trace: trigger,
},
queue: props.queue,
queue: task.queue ?? props.queue,
...config,
};
};
31 changes: 30 additions & 1 deletion tests/bus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Type } from '@sinclair/typebox';
import EventEmitter, { once } from 'events';
import { Pool } from 'pg';
import tap from 'tap';
import { createEventHandler, createTBus, defineEvent, defineTask } from '../src';
import { createEventHandler, createTBus, declareTask, defineEvent, defineTask } from '../src';
import { resolveWithinSeconds } from '../src/utils';
import { cleanupSchema, createRandomSchema } from './helpers';
import stringify from 'safe-stable-stringify';
Expand Down Expand Up @@ -62,6 +62,35 @@ tap.test('bus', async (tap) => {
await waitProm;
});

tap.test('emit task to different queue', async ({ teardown, equal }) => {
const ee = new EventEmitter();
const bus = createTBus('emit_queue', { db: sqlPool, schema: schema });
const task_name = 'emit_task';

const taskDef = defineTask({
task_name: task_name,
queue: 'abc',
schema: Type.Object({ works: Type.String() }),
handler: async (props) => {
equal(props.input.works, 'abcd');
equal(props.trigger.type, 'direct');
ee.emit('handled');
},
});

const waitProm = once(ee, 'handled');

const bus2 = createTBus('abc', { db: sqlPool, schema: schema });

bus2.registerTask(taskDef);
await bus2.start();

teardown(() => bus2.stop());

await bus.send(taskDef.from({ works: 'abcd' }));
await waitProm;
});

tap.test('emit event', async (t) => {
const ee = new EventEmitter();
const bus = createTBus('emit_event_queue', { db: sqlPool, schema: schema });
Expand Down

0 comments on commit 9e415d0

Please sign in to comment.