Skip to content

Commit

Permalink
Merge branch 'main' into webhooks
Browse files Browse the repository at this point in the history
  • Loading branch information
steebchen committed Jun 18, 2024
2 parents 3ff5497 + 93a6c6e commit f759115
Show file tree
Hide file tree
Showing 15 changed files with 134 additions and 38 deletions.
8 changes: 4 additions & 4 deletions examples/api.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Hatchet, { Context, AdminClient } from '../src';
import Hatchet, { Context } from '../src';
import { CreateWorkflowVersionOpts } from '../src/protoc/workflows';

type CustomUserData = {
Expand Down Expand Up @@ -42,9 +42,9 @@ type StepOneInput = {
async function main() {
const hatchet = Hatchet.init();

const admin = hatchet.admin;
const { admin } = hatchet;

await admin.put_workflow(opts);
await admin.putWorkflow(opts);

const worker = await hatchet.worker('example-worker');

Expand All @@ -54,7 +54,7 @@ async function main() {
return { step1: 'step1' };
});

await hatchet.admin.run_workflow('api-workflow', {});
await hatchet.admin.runWorkflow('api-workflow', {});

worker.start();
}
Expand Down
2 changes: 1 addition & 1 deletion examples/example-event-with-results.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Hatchet from '../src/sdk';
async function main() {
const hatchet = Hatchet.init();

const workflowRunId = await hatchet.admin.run_workflow('simple-workflow', {
const workflowRunId = await hatchet.admin.runWorkflow('simple-workflow', {
test: 'test',
});

Expand Down
3 changes: 1 addition & 2 deletions examples/fanout-worker.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import sleep from '../src/util/sleep';
import Hatchet from '../src/sdk';

xdescribe('fanout-e2e', () => {

it('should pass a fanout workflow', async () => {
let invoked = 0;
const start = new Date();
Expand Down Expand Up @@ -63,7 +62,7 @@ xdescribe('fanout-e2e', () => {

console.log('pushing event...');

await hatchet.admin.run_workflow('parent-workflow', { input: 'parent-input' });
await hatchet.admin.runWorkflow('parent-workflow', { input: 'parent-input' });

await sleep(10000);

Expand Down
2 changes: 1 addition & 1 deletion examples/manual-trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Hatchet from '../src/sdk';
const hatchet = Hatchet.init();

async function main() {
const workflowRunId = await hatchet.admin.run_workflow('simple-workflow', {});
const workflowRunId = await hatchet.admin.runWorkflow('simple-workflow', {});
const stream = await hatchet.listener.stream(workflowRunId);

for await (const event of stream) {
Expand Down
2 changes: 1 addition & 1 deletion examples/rate-limit/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const workflow: Workflow = {
};

async function main() {
await hatchet.admin.put_rate_limit('test-limit', 1, RateLimitDuration.MINUTE);
await hatchet.admin.putRateLimit('test-limit', 1, RateLimitDuration.MINUTE);
const worker = await hatchet.worker('example-worker');
await worker.registerWorkflow(workflow);
worker.start();
Expand Down
2 changes: 1 addition & 1 deletion hatchet
8 changes: 4 additions & 4 deletions src/clients/admin/admin-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('AdminClient', () => {
);
});

describe('put_workflow', () => {
describe('putWorkflow', () => {
it('should throw an error if no version and not auto version', async () => {
const workflow: CreateWorkflowVersionOpts = {
name: 'workflow1',
Expand All @@ -64,7 +64,7 @@ describe('AdminClient', () => {
concurrency: undefined,
};

expect(() => client.put_workflow(workflow)).rejects.toThrow(
expect(() => client.putWorkflow(workflow)).rejects.toThrow(
'PutWorkflow error: workflow version is required, or use autoVersion'
);
});
Expand All @@ -90,7 +90,7 @@ describe('AdminClient', () => {
updatedAt: undefined,
});

await client.put_workflow(workflow);
await client.putWorkflow(workflow);

expect(putSpy).toHaveBeenCalled();
});
Expand All @@ -111,7 +111,7 @@ describe('AdminClient', () => {

const now = new Date();

client.schedule_workflow('workflowName', {
client.scheduleWorkflow('workflowName', {
schedules: [now],
});

Expand Down
117 changes: 106 additions & 11 deletions src/clients/admin/admin-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,34 @@ export class AdminClient {
this.logger = new Logger(`Admin`, config.log_level);
}

/**
* @deprecated use putWorkflow instead
*/
async put_workflow(opts: CreateWorkflowVersionOpts) {
return this.putWorkflow(opts);
}

/**
* Creates a new workflow or updates an existing workflow. If the workflow already exists, Hatchet will automatically
* determine if the workflow definition has changed and create a new version if necessary.
* @param workflow a workflow definition to create
*/
async put_workflow(workflow: CreateWorkflowVersionOpts) {
async putWorkflow(workflow: CreateWorkflowVersionOpts) {
try {
await retrier(async () => this.client.putWorkflow({ opts: workflow }), this.logger);
} catch (e: any) {
throw new HatchetError(e.message);
}
}

async put_rate_limit(
/**
* @deprecated use putRateLimit instead
*/
async put_rate_limit(key: string, limit: number, duration: RateLimitDuration) {
return this.putRateLimit(key, limit, duration);
}

async putRateLimit(
key: string,
limit: number,
duration: RateLimitDuration = RateLimitDuration.SECOND
Expand All @@ -94,14 +108,32 @@ export class AdminClient {
return this.api.webhookCreate(this.tenantId, data);
}

/**
* @deprecated use runWorkflow instead
*/
async run_workflow<T = object>(
workflowName: string,
input: T,
options?: {
parentId?: string | undefined;
parentStepRunId?: string | undefined;
childIndex?: number | undefined;
childKey?: string | undefined;
additionalMetadata?: Record<string, string> | undefined;
}
) {
return this.runWorkflow(workflowName, input, options);
}

/**
* Run a new instance of a workflow with the given input. This will create a new workflow run and return the ID of the
* new run.
* @param workflowName the name of the workflow to run
* @param input an object containing the input to the workflow
* @param options an object containing the options to run the workflow
* @returns the ID of the new workflow run
*/
async run_workflow<T = object>(
async runWorkflow<T = object>(
workflowName: string,
input: T,
options?: {
Expand Down Expand Up @@ -136,55 +168,99 @@ export class AdminClient {
}
}

/**
* @deprecated use listWorkflows instead
*/
async list_workflows() {
return this.listWorkflows();
}

/**
* List workflows in the tenant associated with the API token.
* @returns a list of all workflows in the tenant
*/
async list_workflows() {
async listWorkflows() {
const res = await this.api.workflowList(this.tenantId);
return res.data;
}

/**
* @deprecated use getWorkflow instead
*/
async get_workflow(workflowId: string) {
return this.getWorkflow(workflowId);
}

/**
* Get a workflow by its ID.
* @param workflowId the workflow ID (**note:** this is not the same as the workflow version id)
* @returns
*/
async get_workflow(workflowId: string) {
async getWorkflow(workflowId: string) {
const res = await this.api.workflowGet(workflowId);
return res.data;
}

/**
* @deprecated use getWorkflowVersion instead
*/
async get_workflow_version(workflowId: string, version?: string) {
return this.getWorkflowVersion(workflowId, version);
}

/**
* Get a workflow version.
* @param workflowId the workflow ID
* @param version the version of the workflow to get. If not provided, the latest version will be returned.
* @returns the workflow version
*/
async get_workflow_version(workflowId: string, version?: string) {
async getWorkflowVersion(workflowId: string, version?: string) {
const res = await this.api.workflowVersionGet(workflowId, {
version,
});

return res.data;
}

/**
* @deprecated use getWorkflowRun instead
*/
async get_workflow_run(workflowRunId: string) {
return this.getWorkflowRun(workflowRunId);
}

/**
* Get a workflow run.
* @param workflowRunId the id of the workflow run to get
* @returns the workflow run
*/
async get_workflow_run(workflowRunId: string) {
async getWorkflowRun(workflowRunId: string) {
const res = await this.api.workflowRunGet(this.tenantId, workflowRunId);
return res.data;
}

/**
* @deprecated use listWorkflowRuns instead
*/
async list_workflow_runs(query: {
offset?: number | undefined;
limit?: number | undefined;
eventId?: string | undefined;
workflowId?: string | undefined;
parentWorkflowRunId?: string | undefined;
parentStepRunId?: string | undefined;
statuses?: WorkflowRunStatusList | undefined;
additionalMetadata?: string[] | undefined;
}) {
return this.listWorkflowRuns(query);
}

/**
* List workflow runs in the tenant associated with the API token.
* @param query the query to filter the list of workflow runs
* @returns
*/
async list_workflow_runs(query: {
async listWorkflowRuns(query: {
offset?: number | undefined;
limit?: number | undefined;
eventId?: string | undefined;
Expand All @@ -198,12 +274,24 @@ export class AdminClient {
return res.data;
}

/**
* @deprecated use scheduleWorkflow instead
*/
async schedule_workflow(
name: string,
options?: {
schedules?: Date[];
}
) {
return this.scheduleWorkflow(name, options);
}

/**
* Schedule a workflow to run at a specific time or times.
* @param name the name of the workflow to schedule
* @param options an object containing the schedules to set
*/
schedule_workflow(name: string, options?: { schedules?: Date[] }) {
scheduleWorkflow(name: string, options?: { schedules?: Date[] }) {
try {
this.client.scheduleWorkflow({
name,
Expand All @@ -214,21 +302,28 @@ export class AdminClient {
}
}

/**
* @deprecated use getWorkflowMetrics instead
*/
async get_workflow_metrics(data: WorkflowMetricsQuery) {
return this.getWorkflowMetrics(data);
}

/**
* Get the metrics for a workflow.
*
* @param workflowId the ID of the workflow to get metrics for
* @param workflowName the name of the workflow to get metrics for
* @param query an object containing query parameters to filter the metrics
*/
get_workflow_metrics({ workflowId, workflowName, status, groupKey }: WorkflowMetricsQuery) {
getWorkflowMetrics({ workflowId, workflowName, status, groupKey }: WorkflowMetricsQuery) {
const params = {
status,
groupKey,
};

if (workflowName) {
this.list_workflows().then((res) => {
this.listWorkflows().then((res) => {
const workflow = res.rows?.find((row) => row.name === workflowName);

if (workflow) {
Expand Down
6 changes: 3 additions & 3 deletions src/clients/worker/worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ describe('Worker', () => {
);
});

describe('register_workflow', () => {
describe('registerWorkflow', () => {
it('should update the registry', async () => {
const worker = new Worker(hatchet, { name: 'WORKER_NAME' });
const putWorkflowSpy = jest.spyOn(worker.client.admin, 'put_workflow').mockResolvedValue();
const putWorkflowSpy = jest.spyOn(worker.client.admin, 'putWorkflow').mockResolvedValue();

const workflow = {
id: 'workflow1',
Expand Down Expand Up @@ -87,7 +87,7 @@ describe('Worker', () => {
it('should start a step run', async () => {
const worker = new Worker(hatchet, { name: 'WORKER_NAME' });

const putWorkflowSpy = jest.spyOn(worker.client.admin, 'put_workflow').mockResolvedValue();
const putWorkflowSpy = jest.spyOn(worker.client.admin, 'putWorkflow').mockResolvedValue();

const getActionEventSpy = jest.spyOn(worker, 'getStepActionEvent');

Expand Down
12 changes: 7 additions & 5 deletions src/clients/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ export class Worker {
return this.client.admin.webhook_create({ ...webhook, workflows: this.registeredWorkflowIds });
}

// @deprecated
async registerWorkflow(initWorkflow: Workflow) {
return this.register_workflow(initWorkflow);
/**
* @deprecated use registerWorkflow instead
*/
async register_workflow(initWorkflow: Workflow) {
return this.registerWorkflow(initWorkflow);
}

async register_workflow(initWorkflow: Workflow) {
async registerWorkflow(initWorkflow: Workflow) {
const workflow: Workflow = {
...initWorkflow,
id: this.client.config.namespace + initWorkflow.id,
Expand Down Expand Up @@ -143,7 +145,7 @@ export class Worker {

this.registeredWorkflowIds.push(workflow.id);

const registeredWorkflow = this.client.admin.put_workflow({
const registeredWorkflow = this.client.admin.putWorkflow({
name: workflow.id,
description: workflow.description,
version: workflow.version || '',
Expand Down
Loading

0 comments on commit f759115

Please sign in to comment.