-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async tasks #739
base: master
Are you sure you want to change the base?
Add async tasks #739
Conversation
daecf95
to
48d91f4
Compare
0863e32
to
0152c12
Compare
11d7ea0
to
24eda29
Compare
import { sbvrUtils } from '../../../src/server-glue/module'; | ||
|
||
// Define JSON schema for accepted parameters | ||
const createDeviceParamsSchema = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: I also investigated using zod to define parameter object shapes, but decided to stick with JSON Schema as its a commonly used standard. Consumers can still define their shapes with zod and then convert to JSON Schema using something like zod-to-json-schema when defining task handlers.
24eda29
to
82a48d2
Compare
0280f3d
to
636a893
Compare
9e1fe55
to
a83c83b
Compare
33d3587
to
56c035f
Compare
4a165ae
to
f9c7bcd
Compare
src/tasks/worker.ts
Outdated
|
||
// Start listening and polling for tasks | ||
public start(): void { | ||
sbvrUtils.db.on?.( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the ?.
needed? I feel like if it's ever undefined then the entire task system would break and that's not something we'd want to silently ignore
if (this.canExecute()) { | ||
await sbvrUtils.db.transaction(async (tx) => { | ||
const result = await tx.executeSql( | ||
`SELECT ${selectColumns} FROM task AS t WHERE id = $1 FOR UPDATE SKIP LOCKED`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this compliant with postgres+mysql+sqlite? If not we should have a warning if you're trying to use with an unsupported db engine
if (!this.canExecute()) { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be outside of the transaction imo to avoid the unnecessary db interaction. Presumably you've put it inside to be in a promise chain but I think it'd be better to do something like
(async () => {
try {
let executed = false;
const handlerNames = Object.keys(this.handlers);
const binds = handlerNames.map((_, index) => `$${index + 1}`).join(', ');
if (!this.canExecute()) {
return;
}
await sbvrUtils.db.transaction(async (tx) => {
...
});
} catch (err) {
...
} finally {
...
}
})();
to be able to use async/await as usual and put the try/catch/finally around everything
src/tasks/worker.ts
Outdated
|
||
// Calculate next attempt time using exponential backoff | ||
private getNextAttemptTime(attempt: number): Date | null { | ||
const delay = Math.ceil(Math.exp(Math.min(10, attempt))) * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure this is not doing what you expect as by my calculations this would be setting the minimum delay to a bit over 6 hours (e^10)? I think it probably makes sense to use logic more similar to https://github.com/balena-io-modules/pinejs-client-js/blob/v6.14.6/src/index.ts#L1123 for the next delay time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Page- My mistake here was * 1000
, not sure how I missed it. I didn't come up with the formula myself, I pulled it from graphile/worker: https://worker.graphile.org/docs/exponential-backoff
}, | ||
body: { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}, | |
body: { | |
}, | |
options: { | |
returnResource: false, | |
}, | |
body: { |
src/tasks/index.ts
Outdated
// Create trigger function if it doesn't exist | ||
await createTrigger(tx); | ||
|
||
// Create indexes if they don't exist | ||
await createIndexes(tx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be able to be done by defining and initSql
in the model definition the same way as user models would
src/tasks/index.ts
Outdated
if (worker == null) { | ||
return; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is because we only support tasks on postgres, which is fine but should throw an error here rather than silently/surprisingly doing nothing
src/tasks/index.ts
Outdated
modelText, | ||
customServerCode: exports, | ||
}, | ||
] as sbvrUtils.ExecutableModel[], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
satisfies
is better because it doesn't cast in the same way
] as sbvrUtils.ExecutableModel[], | |
] satisfies sbvrUtils.ExecutableModel[], |
although I think it'd be better to use
export const config ConfigLoader.Config = {
so that the whole thing is typed and matches the expectation
src/tasks/common.ts
Outdated
export const apiRoot = 'tasks'; | ||
|
||
// Channel name for task insert notifications | ||
export const channel = 'task_insert'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be something like pinejs$task_insert
to try to namespace it more?
src/database-layer/db.ts
Outdated
client.on(name, (msg) => { | ||
fn(msg).catch((error) => { | ||
console.error('Error handling message:', error); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer try/catch with await so we can handle both sync/async exceptions, just as additional safety
client.on(name, (msg) => { | |
fn(msg).catch((error) => { | |
console.error('Error handling message:', error); | |
}); | |
}); | |
client.on(name, async (msg) => { | |
try { | |
await fn(msg); | |
} catch (error) { | |
console.error('Error handling message:', error); | |
} | |
}); |
Change-type: minor
8b3bb22
to
5073fad
Compare
Pull request was converted to draft
6d57027
to
1c3c1de
Compare
1c3c1de
to
149b21c
Compare
Change-type: minor
Spec: https://balena.fibery.io/Inputs/Research/PineJS-Async-Tasks-483
Testing locally: