-
Notifications
You must be signed in to change notification settings - Fork 437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(orchestrator): implement task processor #2221
Conversation
853da7a
to
8a1e376
Compare
payload: { taskId } | ||
}); | ||
} else { | ||
return Ok(undefined); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null
is not assignable to type void
. Only undefined
is. 🤷
8a1e376
to
d4d3bd4
Compare
processor.stop(); | ||
} | ||
}, 1000); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using setInterval
because of the flag so it can be enabled/disabled without restarting jobs. Once stable it will be a simple processor.start()
// TODO: Implement action processing | ||
// Returning an error for now | ||
return Err(`Not implemented: ${JSON.stringify({ taskId: task.id })}`); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dry-run: not doing real processing yet, just marking action as successful and webhook as failed for testing purpse
import { validateTask } from './validate.js'; | ||
import type { JsonValue } from 'type-fest'; | ||
|
||
export class OrchestratorClient { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everything until dequeue
was already there. I have moved the file in a clients
subdir
@@ -10,12 +10,9 @@ type Health = Endpoint<{ | |||
const path = '/health'; | |||
const method = 'GET'; | |||
|
|||
export const handler: RouteHandler<Health> = { | |||
export const getHandler: RouteHandler<Health> = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since each route file corresponds to a path it is possible to have multiple Endpoint/handler per file, one for each HTTP method. So I renamed the Endpoint and handlers to make it clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to keep one per file by prefixing the filename, i.e: getHealth.ts
would make sense to me and keep files small
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sensible. I will do the same then
import type { ApiError, Endpoint } from '@nangohq/types'; | ||
import type { EndpointRequest, EndpointResponse, RouteHandler, Route } from '@nangohq/utils'; | ||
import { validateRequest } from '@nangohq/utils'; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of search in the context of the processor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the search task endpoint. The processor must be able to check if pending tasks have been cancelled so it need to be able to search for specific tasks.
This endpoint could also be used in internal tooling to display list of tasks per queue/state/etc...
d3591ab
to
e57716e
Compare
I'm late in my reviews, checking this in a couple of hours 👍🏻 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice progress! That's a lot of code and complexity to review, hopefully we'll get used to it quickly 💯
payload: webhookArgsSchema | ||
}); | ||
|
||
export function validateTask(task: Task): Result<TaskAction | TaskWebhook> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like Task{ type: string }
would have been more scalable, because we'll need to add syncs and then maybe more stuff along the way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they don't have the same fields though. Having some type guard functionality is nice though even though I agree that the definitions are more complex
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean you can still validate the payload but it's just a waste of time to not use a discriminator 👍🏻
@@ -10,12 +10,9 @@ type Health = Endpoint<{ | |||
const path = '/health'; | |||
const method = 'GET'; | |||
|
|||
export const handler: RouteHandler<Health> = { | |||
export const getHandler: RouteHandler<Health> = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to keep one per file by prefixing the filename, i.e: getHealth.ts
would make sense to me and keep files small
cleanupAndRespond((res) => res.status(200).send([])); | ||
}, waitForCompletionTimeoutMs); | ||
|
||
eventEmitter.once(eventId, onTaskStarted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I get the wait for completion in that case, because if don't receive an answer you'll just wait 60s for nothing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are waiting for a task to be created/dequeuable, so you don't have to keep making request. Trying hard not to have a push based system that would requires handling states but in the scenario of a processor it could be an acceptable solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay took me a while to understand the executeAction -> execute -> schedule -> immediate -> event
that works 👍🏻
if (this.queue.available() > 0) { | ||
const tasks = await this.orchestratorClient.dequeue({ groupKey: this.groupKey, limit: this.queue.available() * 2, waitForCompletion: true }); // fetch more than available to keep the queue full | ||
if (tasks.isErr()) { | ||
logger.error(`failed to dequeue tasks: ${stringifyError(tasks.error)}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a heads up, happened to me once when the API stop responding all fetch are failing fast ultimately using the whole cpu. Might be an edge case but fyi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. introducing a little delay in case of error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛫
0a8e96d
to
8ac4bb9
Compare
8ac4bb9
to
3503468
Compare
Describe your changes
This PR is implementing the orchestrator processor which, for now, is being run by jobs.
The processor is behind a flag
flag:orchestrator:dryrun:process:global
. It is still in dryrun mode since no real processing is happening, tasks are either immediately succeeded or failed. It is the last step before actually processing webhook/actionsNotes
process
functions passed as argument.Tested in staging
Checklist before requesting a review (skip if just adding/editing APIs & templates)