Skip to content

Commit

Permalink
Correctly propagate properties from task declaration to task definition
Browse files Browse the repository at this point in the history
  • Loading branch information
ilijaNL committed May 26, 2023
1 parent 9e415d0 commit 25a6055
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 15 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# pg-tbus

## 0.1.8

### Patch Changes

- Correctly propagate properties from task declaration to task definition

## 0.1.7

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "pg-tbus",
"author": "IlijaNL",
"version": "0.1.7",
"version": "0.1.8",
"types": "dist/index.d.ts",
"module": "dist/index.mjs",
"main": "dist/index.js",
Expand Down
25 changes: 11 additions & 14 deletions src/definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,31 +171,30 @@ export type TaskConfig = {
singletonKey: string | null;
};

export const declareTask = <T extends TSchema>(props: DeclareTaskProps<T>): TaskDeclaration<T> => {
const validateFn = createValidateFn(props.schema);
export const declareTask = <T extends TSchema>(decl: DeclareTaskProps<T>): TaskDeclaration<T> => {
const validateFn = createValidateFn(decl.schema);

const from: TaskDeclaration<T>['from'] = function from(input, config) {
if (!validateFn(input)) {
throw new Error(
`invalid input for task ${props.task_name}: ${validateFn.errors?.map((e) => e.message).join(' \n')}`
`invalid input for task ${decl.task_name}: ${validateFn.errors?.map((e) => e.message).join(' \n')}`
);
}

return {
queue: props.queue,
task_name: props.task_name,
queue: decl.queue,
task_name: decl.task_name,
data: input,
config: { ...props.config, ...config },
config: { ...decl.config, ...config },
};
};

return {
schema: props.schema,
task_name: props.task_name,
...decl,
validate: validateFn,
from,
// specifiy some defaults
config: props.config ?? {},
config: decl.config ?? {},
};
};

Expand All @@ -207,6 +206,7 @@ export const defineTask = <T extends TSchema>(
handler: TaskHandler<Static<T>>;
}
): TaskDefinition<T> => {
// is declarated
if ('from' in props) {
return {
...props,
Expand All @@ -218,11 +218,8 @@ export const defineTask = <T extends TSchema>(
const decl = declareTask(props);

return {
schema: props.schema,
task_name: props.task_name,
validate: decl.validate,
...decl,
handler: props.handler,
from: decl.from,
// specifiy some defaults
config: props.config ?? {},
};
Expand All @@ -233,7 +230,7 @@ export const defineTask = <T extends TSchema>(
*/
export const createEventHandler = <TName extends string, T extends TSchema>(props: {
/**
* Task queue name. Should be unique per pg-tbus instance
* Task name. Should be unique per pg-tbus instance
*/
task_name: string;
/**
Expand Down
44 changes: 44 additions & 0 deletions tests/bus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,50 @@ tap.test('bus', async (tap) => {
await waitProm;
});

tap.test('emit task from declaration', async ({ teardown, equal, same }) => {
const ee = new EventEmitter();
const queue = 'emit_queue';
const bus = createTBus(queue, { db: sqlPool, schema: schema });
const task_name = 'emit_task';
const declaredTask = declareTask({
task_name: task_name,
schema: Type.Object({ works: Type.String() }),
queue: queue,
config: {
keepInSeconds: 120,
},
});

equal(declaredTask.queue, queue);
equal(declaredTask.task_name, task_name);

const taskDef = defineTask({
...declaredTask,
handler: async (props) => {
equal(props.input.works, 'abcd');
equal(props.trigger.type, 'direct');
ee.emit('handled');
},
});

equal(taskDef.queue, declaredTask.queue);
equal(taskDef.task_name, declaredTask.task_name);
equal(taskDef.config, declaredTask.config);
equal(taskDef.config.keepInSeconds, 120);

bus.registerTask(taskDef);

await bus.start();

teardown(() => bus.stop());

const waitProm = once(ee, 'handled');

await bus.send(taskDef.from({ works: 'abcd' }));

await waitProm;
});

tap.test('emit task to different queue', async ({ teardown, equal }) => {
const ee = new EventEmitter();
const bus = createTBus('emit_queue', { db: sqlPool, schema: schema });
Expand Down

0 comments on commit 25a6055

Please sign in to comment.