Skip to content

Commit

Permalink
feat(webhooks): add webhook workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
steebchen committed May 20, 2024
1 parent 2bbe697 commit fc310b9
Show file tree
Hide file tree
Showing 11 changed files with 738 additions and 25 deletions.
2 changes: 1 addition & 1 deletion examples/simple-worker.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe('e2e', () => {
await sleep(2000);
});

it('should pass a simple workflow', async () => {
xit('should pass a simple workflow', async () => {
let invoked = 0;
const start = new Date();

Expand Down
86 changes: 86 additions & 0 deletions examples/webhooks.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { createServer } from 'node:http';
import { Workflow, Worker } from '../src';
import sleep from '../src/util/sleep';
import Hatchet from '../src/sdk';

const port = 8369;

describe('e2e', () => {
let hatchet: Hatchet;
let worker: Worker;

beforeEach(async () => {
hatchet = Hatchet.init();
worker = await hatchet.worker('simple-webhook-workflow');
});

afterEach(async () => {
await worker.stop();
await sleep(2000);
});

it('should pass a simple workflow', async () => {
let invoked = 0;

const workflow: Workflow = {
id: 'simple-webhook-workflow',
webhook: `http://localhost:${port}/webhook`,
description: 'test',
on: {
event: 'user:create-webhook',
},
steps: [
{
name: 'step1',
run: async (ctx) => {
console.log('invoked!! ✅✅✅✅✅✅✅');
console.log('invoked!! ✅✅✅✅✅✅✅');
console.log('invoked!! ✅✅✅✅✅✅✅');
console.log('invoked!! ✅✅✅✅✅✅✅');
console.log('invoked!! ✅✅✅✅✅✅✅');
invoked += 1;
return { message: `${ctx.workflowName()} results!` };
},
},
{
name: 'step2',
parents: ['step1'],
run: (ctx) => {
invoked += 1;
return { message: `${ctx.workflowName()} results!` };
},
},
],
};

console.log('registering workflow...');
await worker.registerWorkflow(workflow);

const secret = 'secret';
const server = createServer(await worker.handler(secret));

await new Promise((resolve) => {
server.listen(port, () => {
resolve('');
});
});

console.log('worker started.');

await sleep(5000);

console.log('pushing event...');

await hatchet.event.push('user:create-webhook', {
test: 'test',
});

await sleep(10000);

console.log('invoked', invoked);

expect(invoked).toEqual(2);

await worker.stop();
}, 60000);
});
2 changes: 1 addition & 1 deletion hatchet
Submodule hatchet updated 65 files
+7 −0 .github/workflows/auto-merge.yml
+1 −0 .github/workflows/test.yml
+3 −0 api-contracts/dispatcher/dispatcher.proto
+1 −0 api-contracts/openapi/components/schemas/workflow_run.yaml
+1 −0 api-contracts/workflows/workflows.proto
+23 −18 api/v1/server/handlers/step-runs/rerun.go
+101 −100 api/v1/server/oas/gen/openapi.gen.go
+11 −3 cmd/hatchet-admin/cli/seed.go
+0 −2 examples/timeout/run.go
+48 −0 examples/webhook/main.go
+91 −0 examples/webhook/main_e2e_test.go
+136 −0 examples/webhook/run.go
+1 −0 frontend/app/src/lib/api/generated/data-contracts.ts
+1 −0 frontend/app/src/pages/main/workflow-runs/$run/components/step-run-events.tsx
+1 −0 frontend/docs/.npmrc
+1 −1 frontend/docs/pages/home/basics/workflows.mdx
+4 −4 go.mod
+8 −8 go.sum
+35 −0 internal/randstr/randstr.go
+4 −0 internal/repository/prisma/dbsqlc/models.go
+4 −1 internal/repository/prisma/dbsqlc/schema.sql
+144 −6 internal/repository/prisma/dbsqlc/step_runs.sql
+358 −6 internal/repository/prisma/dbsqlc/step_runs.sql.go
+4 −2 internal/repository/prisma/dbsqlc/tenants.sql.go
+4 −2 internal/repository/prisma/dbsqlc/workers.sql
+14 −6 internal/repository/prisma/dbsqlc/workers.sql.go
+4 −2 internal/repository/prisma/dbsqlc/workflow_runs.sql.go
+4 −2 internal/repository/prisma/dbsqlc/workflows.sql
+13 −6 internal/repository/prisma/dbsqlc/workflows.sql.go
+16 −0 internal/repository/prisma/sqlchelpers/bool.go
+140 −13 internal/repository/prisma/step_run.go
+1 −0 internal/repository/prisma/tenant.go
+1 −0 internal/repository/prisma/worker.go
+8 −0 internal/repository/prisma/workflow.go
+2 −2 internal/repository/prisma/workflow_run.go
+8 −0 internal/repository/step_run.go
+3 −0 internal/repository/tenant.go
+3 −0 internal/repository/worker.go
+3 −0 internal/repository/workflow.go
+169 −158 internal/services/admin/contracts/workflows.pb.go
+1 −0 internal/services/admin/server.go
+80 −5 internal/services/controllers/jobs/controller.go
+295 −283 internal/services/dispatcher/contracts/dispatcher.pb.go
+27 −3 internal/services/dispatcher/dispatcher.go
+73 −3 internal/services/dispatcher/server.go
+111 −0 internal/services/dispatcher/webhooks.go
+3 −0 internal/services/grpc/middleware/auth.go
+3 −0 internal/services/ingestor/ingestor.go
+12 −2 internal/services/ingestor/server.go
+35 −0 internal/services/shared/tasktypes/step.go
+18 −0 internal/signature/sign.go
+33 −0 internal/signature/sign_test.go
+1 −0 pkg/client/admin.go
+1 −1 pkg/client/client.go
+31 −5 pkg/client/dispatcher.go
+1 −0 pkg/client/rest/gen.go
+2 −0 pkg/client/types/file.go
+22 −0 pkg/worker/service.go
+138 −0 pkg/worker/webhooks_handler.go
+7 −4 pkg/worker/worker.go
+45 −28 pkg/worker/workflow.go
+5 −0 prisma/migrations/20240430154805_webhooks/migration.sql
+2 −0 prisma/migrations/20240516215142_webhooks/migration.sql
+2 −0 prisma/migrations/20240517204453_v0_28_1/migration.sql
+8 −0 prisma/schema.prisma
31 changes: 17 additions & 14 deletions src/clients/dispatcher/action-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import sleep from '@util/sleep';
import HatchetError from '@util/errors/hatchet-error';
import { Logger } from '@hatchet/util/logger';

import { z } from 'zod';
import { DispatcherClient } from './dispatcher-client';
import { Heartbeat } from './heartbeat/heartbeat-controller';

Expand All @@ -18,20 +19,22 @@ enum ListenStrategy {
LISTEN_STRATEGY_V2 = 2,
}

export interface Action {
tenantId: string;
jobId: string;
jobName: string;
jobRunId: string;
stepId: string;
stepRunId: string;
actionId: string;
actionType: number;
actionPayload: string;
workflowRunId: string;
getGroupKeyRunId: string;
stepName: string;
}
export const ActionObject = z.object({
tenantId: z.string(),
jobId: z.string(),
jobName: z.string(),
jobRunId: z.string(),
stepId: z.string(),
stepRunId: z.string(),
actionId: z.string(),
actionType: z.number().optional(),
actionPayload: z.string(),
workflowRunId: z.string(),
getGroupKeyRunId: z.string().optional(),
stepName: z.string(),
});

export type Action = z.infer<typeof ActionObject>;

export class ActionListener {
config: ClientConfig;
Expand Down
Loading

0 comments on commit fc310b9

Please sign in to comment.