Skip to content

Commit

Permalink
feat: Tasker (#14180)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex van Andel <me@alexvanandel.com>
Co-authored-by: Joe Au-Yeung <65426560+joeauyeung@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 18, 2024
1 parent eeb0322 commit 5695ba7
Show file tree
Hide file tree
Showing 31 changed files with 548 additions and 17 deletions.
6 changes: 5 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ CSP_POLICY=
# Vercel Edge Config
EDGE_CONFIG=

NEXT_PUBLIC_MINUTES_TO_BOOK=5 # Minutes
NEXT_PUBLIC_MINUTES_TO_BOOK=5 # Minutes
NEXT_PUBLIC_BOOKER_NUMBER_OF_DAYS_TO_LOAD=0 # Override the booker to only load X number of days worth of data

# Control time intervals on a user's Schedule availability
Expand Down Expand Up @@ -347,6 +347,10 @@ SENTRY_DISABLE_SERVER_WEBPACK_PLUGIN=1
# api v2
NEXT_PUBLIC_API_V2_URL="http://localhost:5555/api/v2"

# Tasker features
TASKER_ENABLE_WEBHOOKS=0
TASKER_ENABLE_EMAILS=0

# Ratelimiting via unkey
UNKEY_ROOT_KEY=

Expand Down
2 changes: 1 addition & 1 deletion apps/api/v1/test/lib/bookings/_post.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { createMocks } from "node-mocks-http";
import { describe, expect, test, vi } from "vitest";

import dayjs from "@calcom/dayjs";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendOrSchedulePayload";
import { ErrorCode } from "@calcom/lib/errorCodes";
import { buildBooking, buildEventType, buildWebhook } from "@calcom/lib/test/builder";
import prisma from "@calcom/prisma";
Expand Down
1 change: 1 addition & 0 deletions apps/web/app/api/tasks/cleanup/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { GET } from "@calcom/features/tasker/api/cleanup";
1 change: 1 addition & 0 deletions apps/web/app/api/tasks/cron/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { GET } from "@calcom/features/tasker/api/cron";
2 changes: 1 addition & 1 deletion apps/web/pages/api/recorded-daily-video.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { z } from "zod";
import { getDownloadLinkOfCalVideoByRecordingId } from "@calcom/core/videoClient";
import { sendDailyVideoRecordingEmails } from "@calcom/emails";
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendOrSchedulePayload";
import { getTeamIdFromEventType } from "@calcom/lib/getTeamIdFromEventType";
import logger from "@calcom/lib/logger";
import { safeStringify } from "@calcom/lib/safeStringify";
Expand Down
16 changes: 8 additions & 8 deletions apps/web/playwright/webhook.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@ import { BookingStatus } from "@calcom/prisma/client";
import { test } from "./lib/fixtures";
import {
bookOptinEvent,
bookTimeSlot,
createHttpServer,
selectFirstAvailableTimeSlotNextMonth,
gotoRoutingLink,
createUserWithSeatedEventAndAttendees,
gotoRoutingLink,
selectFirstAvailableTimeSlotNextMonth,
} from "./lib/testUtils";

// remove dynamic properties that differs depending on where you run the tests
const dynamic = "[redacted/dynamic]";

test.afterEach(({ users }) => users.deleteAll());
test.afterEach(async ({ users }) => {
// This also delete forms on cascade
await users.deleteAll();
});

async function createWebhookReceiver(page: Page) {
const webhookReceiver = createHttpServer();
Expand Down Expand Up @@ -71,11 +75,7 @@ test.describe("BOOKING_CREATED", async () => {
// --- Book the first available day next month in the pro user's "30min"-event
await page.goto(`/${user.username}/${eventType.slug}`);
await selectFirstAvailableTimeSlotNextMonth(page);

// --- fill form
await page.fill('[name="name"]', "Test Testson");
await page.fill('[name="email"]', "test@example.com");
await page.press('[name="email"]', "Enter");
await bookTimeSlot(page);

await webhookReceiver.waitForRequestCount(1);

Expand Down
8 changes: 8 additions & 0 deletions apps/web/vercel.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@
{
"path": "/api/cron/calendar-cache-cleanup",
"schedule": "0 5 * * *"
},
{
"path": "/api/tasks/cron",
"schedule": "* * * * *"
},
{
"path": "/api/tasks/cleanup",
"schedule": "0 0 * * *"
}
],
"functions": {
Expand Down
2 changes: 1 addition & 1 deletion packages/features/bookings/lib/handleBookingRequested.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { sendAttendeeRequestEmail, sendOrganizerRequestEmail } from "@calcom/emails";
import { getWebhookPayloadForBooking } from "@calcom/features/bookings/lib/getWebhookPayloadForBooking";
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendOrSchedulePayload";
import logger from "@calcom/lib/logger";
import { safeStringify } from "@calcom/lib/safeStringify";
import { WebhookTriggerEvents } from "@calcom/prisma/enums";
Expand Down
2 changes: 1 addition & 1 deletion packages/features/bookings/lib/handleCancelBooking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import { deleteScheduledSMSReminder } from "@calcom/features/ee/workflows/lib/re
import { deleteScheduledWhatsappReminder } from "@calcom/features/ee/workflows/lib/reminders/whatsappReminderManager";
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import { cancelScheduledJobs } from "@calcom/features/webhooks/lib/scheduleTrigger";
import sendPayload from "@calcom/features/webhooks/lib/sendOrSchedulePayload";
import type { EventTypeInfo } from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import { isPrismaObjOrUndefined, parseRecurringEvent } from "@calcom/lib";
import { getTeamIdFromEventType } from "@calcom/lib/getTeamIdFromEventType";
import { HttpError } from "@calcom/lib/http-error";
Expand Down
2 changes: 1 addition & 1 deletion packages/features/bookings/lib/handleConfirmation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import { sendScheduledEmails } from "@calcom/emails";
import { scheduleWorkflowReminders } from "@calcom/features/ee/workflows/lib/reminders/reminderScheduler";
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import { scheduleTrigger } from "@calcom/features/webhooks/lib/scheduleTrigger";
import sendPayload from "@calcom/features/webhooks/lib/sendOrSchedulePayload";
import type { EventTypeInfo } from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import { getVideoCallUrlFromCalEvent } from "@calcom/lib/CalEventParser";
import { getTeamIdFromEventType } from "@calcom/lib/getTeamIdFromEventType";
import logger from "@calcom/lib/logger";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { sendCancelledSeatEmails } from "@calcom/emails";
import { deleteScheduledEmailReminder } from "@calcom/features/ee/workflows/lib/reminders/emailReminderManager";
import { deleteScheduledSMSReminder } from "@calcom/features/ee/workflows/lib/reminders/smsReminderManager";
import { deleteScheduledWhatsappReminder } from "@calcom/features/ee/workflows/lib/reminders/whatsappReminderManager";
import sendPayload from "@calcom/features/webhooks/lib/sendOrSchedulePayload";
import type { EventTypeInfo } from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import { HttpError } from "@calcom/lib/http-error";
import { getTranslation } from "@calcom/lib/server/i18n";
import prisma from "@calcom/prisma";
Expand Down
2 changes: 1 addition & 1 deletion packages/features/bookings/lib/handleWebhookTrigger.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import getWebhooks from "@calcom/features/webhooks/lib/getWebhooks";
import type { GetSubscriberOptions } from "@calcom/features/webhooks/lib/getWebhooks";
import sendPayload from "@calcom/features/webhooks/lib/sendOrSchedulePayload";
import type { WebhookDataType } from "@calcom/features/webhooks/lib/sendPayload";
import sendPayload from "@calcom/features/webhooks/lib/sendPayload";
import logger from "@calcom/lib/logger";

export async function handleWebhookTrigger(args: {
Expand Down
91 changes: 91 additions & 0 deletions packages/features/tasker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Tasker

Tasker: "One who performs a task, as a day-laborer."

Task: "A function to be performed; an objective."

## What is it?

Introduces a new pattern called Tasker which may be switched out in the future for other third party services.

Also introduces a base `InternalTasker` which doesn't require third party dependencies and should work out of the box (by configuring a proper cron).

## Why is this needed?

The Tasker pattern is needed to streamline the execution of non-critical tasks in an application, providing a structured approach to task scheduling, execution, retrying, and cancellation. Here's why it's necessary:

1. **Offloading non-critical tasks**: There are tasks that don't need to be executed immediately on the main thread, such as sending emails, generating reports, or performing periodic maintenance tasks. Offloading these tasks to a separate queue or thread improves the responsiveness and efficiency of the main application.

2. **Retry mechanism**: Not all tasks succeed on the first attempt due to errors or external dependencies. This pattern incorporates a retry mechanism, which allows failed tasks to be retried automatically for a specified number of attempts. This improves the robustness of the system by handling temporary failures gracefully.

3. **Scheduled task execution**: Some tasks need to be executed at a specific time or after a certain delay. The Tasker pattern facilitates scheduling tasks for future execution, ensuring they are performed at the designated time without manual intervention.

4. **Task cancellation**: Occasionally, it's necessary to cancel a scheduled task due to changing requirements or user actions. The Tasker pattern supports task cancellation, enabling previously scheduled tasks to be revoked or removed from the queue before execution.

5. **Flexible implementation**: The Tasker pattern allows for flexibility in implementation by providing a base structure (`InternalTasker`) that can be extended or replaced with third-party services (`TriggerDevTasker`, `AwsSqsTasker`, etc.). This modularity ensures that the task execution mechanism can be adapted to suit different application requirements or environments.

Overall, the Tasker pattern enhances the reliability, performance, and maintainability by managing non-critical tasks in a systematic and efficient manner. It abstracts away the complexities of task execution, allowing developers to focus on core application logic while ensuring timely and reliable execution of background tasks.

## How does it work?

Since the Tasker is a pattern on itself, it will depend on the actual implementation. For example, a `TriggerDevTasker` will work very differently from an `AwsSqsTasker`.

For simplicity sake will explain how the `InternalTasker` works:

- Instead of running a non-critical task you schedule using the tasker:

```diff
const examplePayload = { example: "payload" };
- await sendWebhook(examplePayload);
+ await tasker.create("sendWebhook", JSON.stringify(examplePayload));
```

- This will create a new task to be run on the next processing of the task queue.
- Then on the next cron run it will be picked up and executed:

```ts
// /app/api/tasks/cron/route.ts
import tasker from "@calcom/features/tasker";

export async function GET() {
// authenticate the call...
await tasker.processQueue();
return Response.json({ success: true });
}
```

- By default, the cron will run each minute and will pick the next 100 tasks to be executed.
- If the tasks succeeds, it will be marked as `suceededAt: new Date()`. If if fails, the `attempts` prop will increase by 1 and will be retried on the next cron run.
- If `attempts` reaches `maxAttemps`, it will be considered a failed and won't be retried again.
- By default, tasks will be attempted up to 3 times. This can be overridden when creating a task.
- From here we can either keep a record of executed tasks, or we can setup another cron to cleanup all successful and failed tasks:

```ts
// /app/api/tasks/cleanup/route.ts
import tasker from "@calcom/features/tasker";

export async function GET() {
// authenticate the call...
await tasker.cleanup();
return Response.json({ success: true });
}
```

- This will delete all failed and successful tasks.
- A task is just a simple function receives a payload:

```ts
type TaskHandler = (payload: string) => Promise<void>;
```

## How to contribute?

You can contribute by either expanding the `InternalTasker` or creating new Taskers. To see how to add new Taskers, see the `tasker-factory.ts` file.

You can also take some inspiration by looking into previous attempts to add various Message Queue pull requests:

- [feat: Messaging Bus Implementation using AWS SQS OSSHack Challenge](https://github.com/calcom/cal.com/pull/12663)
- [feat: add opt-in ready-to-deploy message queue (QStash+Next.js functions)](https://github.com/calcom/cal.com/pull/12658)
- [feat: Implement A Message Queuing System](https://github.com/calcom/cal.com/pull/12655)
- [Message Queuing System](https://github.com/calcom/cal.com/pull/12654)
- [feat: Message Queuing System using Trigger.dev](https://github.com/calcom/cal.com/pull/12641)
13 changes: 13 additions & 0 deletions packages/features/tasker/api/cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { NextRequest } from "next/server";
import { NextResponse } from "next/server";

import tasker from "..";

export async function GET(request: NextRequest) {
const authHeader = request.headers.get("authorization");
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return new Response("Unauthorized", { status: 401 });
}
await tasker.cleanup();
return NextResponse.json({ success: true });
}
13 changes: 13 additions & 0 deletions packages/features/tasker/api/cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { NextRequest } from "next/server";
import { NextResponse } from "next/server";

import tasker from "..";

export async function GET(request: NextRequest) {
const authHeader = request.headers.get("authorization");
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return new Response("Unauthorized", { status: 401 });
}
await tasker.processQueue();
return NextResponse.json({ success: true });
}
14 changes: 14 additions & 0 deletions packages/features/tasker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type { Tasker } from "./tasker";
import { getTasker } from "./tasker-factory";

const globalForTasker = global as unknown as {
tasker: Tasker;
};

export const tasker = globalForTasker.tasker || getTasker();

if (process.env.NODE_ENV !== "production") {
globalForTasker.tasker = tasker;
}

export default tasker;
38 changes: 38 additions & 0 deletions packages/features/tasker/internal-tasker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Task } from "./repository";
import { type Tasker, type TaskTypes } from "./tasker";
import tasksMap from "./tasks";

/**
* This is the default internal Tasker that uses the Task repository to create tasks.
* It doens't have any external dependencies and is suitable for most use cases.
* To use a different Tasker, you can create a new class that implements the Tasker interface.
* Then, you can use the TaskerFactory to select the new Tasker.
*/
export class InternalTasker implements Tasker {
async create(type: TaskTypes, payload: string): Promise<string> {
return Task.create(type, payload);
}
async processQueue(): Promise<void> {
const tasks = await Task.getNextBatch();
const tasksPromises = tasks.map(async (task) => {
const taskHandlerGetter = tasksMap[task.type as keyof typeof tasksMap];
if (!taskHandlerGetter) throw new Error(`Task handler not found for type ${task.type}`);
const taskHandler = await taskHandlerGetter();
return taskHandler(task.payload)
.then(async () => {
await Task.succeed(task.id);
})
.catch(async (error) => {
await Task.retry(task.id, error instanceof Error ? error.message : "Unknown error");
});
});
const settled = await Promise.allSettled(tasksPromises);
const failed = settled.filter((result) => result.status === "rejected");
const succeded = settled.filter((result) => result.status === "fulfilled");
console.info({ failed, succeded });
}
async cleanup(): Promise<void> {
const count = await Task.cleanup();
console.info(`Cleaned up ${count} tasks`);
}
}
19 changes: 19 additions & 0 deletions packages/features/tasker/redis-tasker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { type Tasker, type TaskTypes } from "./tasker";

/**
* RedisTasker is a tasker that uses Redis as a backend.
* WIP: This is a work in progress and is not fully implemented yet.
**/
export class RedisTasker implements Tasker {
async create(type: TaskTypes, payload: string): Promise<string> {
throw new Error("Method not implemented.");
}

processQueue(): Promise<void> {
throw new Error("Method not implemented.");
}

cleanup(): Promise<void> {
throw new Error("Method not implemented.");
}
}
Loading

0 comments on commit 5695ba7

Please sign in to comment.