Skip to content

Commit fb9abe5

Browse files
authored
fix: Incorrect/missing logs on new schedule engine runs (#2185)
* Fix incorrect logs on new schedule engine triggered taskss Also added the ability to recover schedules in the schedule engine via an Admin API endpoint in the new schedule engine * Fixed schedule recovery failing test
1 parent f2db1b8 commit fb9abe5

File tree

12 files changed

+715
-11
lines changed

12 files changed

+715
-11
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { ActionFunctionArgs, json, LoaderFunctionArgs } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
5+
import { scheduleEngine } from "~/v3/scheduleEngine.server";
6+
7+
const ParamsSchema = z.object({
8+
environmentId: z.string(),
9+
});
10+
11+
export async function action({ request, params }: ActionFunctionArgs) {
12+
// Next authenticate the request
13+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
14+
15+
if (!authenticationResult) {
16+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
17+
}
18+
19+
const user = await prisma.user.findUnique({
20+
where: {
21+
id: authenticationResult.userId,
22+
},
23+
});
24+
25+
if (!user) {
26+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
27+
}
28+
29+
if (!user.admin) {
30+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
31+
}
32+
33+
const parsedParams = ParamsSchema.parse(params);
34+
35+
const environment = await prisma.runtimeEnvironment.findFirst({
36+
where: {
37+
id: parsedParams.environmentId,
38+
},
39+
include: {
40+
organization: true,
41+
project: true,
42+
},
43+
});
44+
45+
if (!environment) {
46+
return json({ error: "Environment not found" }, { status: 404 });
47+
}
48+
49+
const results = await scheduleEngine.recoverSchedulesInEnvironment(
50+
environment.projectId,
51+
environment.id
52+
);
53+
54+
return json({
55+
success: true,
56+
results,
57+
});
58+
}

apps/webapp/app/runEngine/concerns/traceEvents.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ export class DefaultTraceEventsConcern implements TraceEventConcern {
3939
},
4040
incomplete: true,
4141
immediate: true,
42+
startTime: request.options?.overrideCreatedAt
43+
? BigInt(request.options.overrideCreatedAt.getTime()) * BigInt(1000000)
44+
: undefined,
4245
},
4346
async (event, traceContext, traceparent) => {
4447
return await callback({

apps/webapp/app/runEngine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export type TriggerTaskServiceOptions = {
2121
runFriendlyId?: string;
2222
skipChecks?: boolean;
2323
oneTimeUseToken?: string;
24+
overrideCreatedAt?: Date;
2425
};
2526

2627
// domain/triggerTask.ts

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ export class EventRepository {
970970
const propagatedContext = extractContextFromCarrier(options.context ?? {});
971971

972972
const start = process.hrtime.bigint();
973-
const startTime = getNowInNanoseconds();
973+
const startTime = options.startTime ?? getNowInNanoseconds();
974974

975975
const traceId = options.spanParentAsLink
976976
? this.generateTraceId()

apps/webapp/app/v3/services/triggerTaskV1.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,9 @@ export class TriggerTaskServiceV1 extends BaseService {
312312
},
313313
incomplete: true,
314314
immediate: true,
315+
startTime: options.overrideCreatedAt
316+
? BigInt(options.overrideCreatedAt.getTime()) * BigInt(1000000)
317+
: undefined,
315318
},
316319
async (event, traceContext, traceparent) => {
317320
const run = await autoIncrementCounter.incrementInTransaction(

apps/webapp/app/v3/taskEventStore.server.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,20 @@ export class TaskEventStore {
8282
let finalWhere: Prisma.TaskEventWhereInput = where;
8383

8484
if (table === "taskEventPartitioned") {
85-
// Add 1 minute to endCreatedAt to make sure we include all events in the range.
85+
// Add buffer to start and end of the range to make sure we include all events in the range.
8686
const end = endCreatedAt
8787
? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000)
8888
: new Date();
89+
const startCreatedAtWithBuffer = new Date(
90+
startCreatedAt.getTime() - env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000
91+
);
8992

9093
finalWhere = {
9194
AND: [
9295
where,
9396
{
9497
createdAt: {
95-
gte: startCreatedAt,
98+
gte: startCreatedAtWithBuffer,
9699
lt: end,
97100
},
98101
},
@@ -138,6 +141,11 @@ export class TaskEventStore {
138141
options?.includeDebugLogs === false || options?.includeDebugLogs === undefined;
139142

140143
if (table === "taskEventPartitioned") {
144+
const createdAtBufferInMillis = env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000;
145+
const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - createdAtBufferInMillis);
146+
const $endCreatedAt = endCreatedAt ?? new Date();
147+
const endCreatedAtWithBuffer = new Date($endCreatedAt.getTime() + createdAtBufferInMillis);
148+
141149
return await this.readReplica.$queryRaw<TraceEvent[]>`
142150
SELECT
143151
"spanId",
@@ -158,11 +166,8 @@ export class TaskEventStore {
158166
FROM "TaskEventPartitioned"
159167
WHERE
160168
"traceId" = ${traceId}
161-
AND "createdAt" >= ${startCreatedAt.toISOString()}::timestamp
162-
AND "createdAt" < ${(endCreatedAt
163-
? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000)
164-
: new Date()
165-
).toISOString()}::timestamp
169+
AND "createdAt" >= ${startCreatedAtWithBuffer.toISOString()}::timestamp
170+
AND "createdAt" < ${endCreatedAtWithBuffer.toISOString()}::timestamp
166171
${
167172
filterDebug
168173
? Prisma.sql`AND \"kind\" <> CAST('LOG'::text AS "public"."TaskEventKind")`

internal-packages/schedule-engine/src/engine/index.ts

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
Tracer,
99
} from "@internal/tracing";
1010
import { Logger } from "@trigger.dev/core/logger";
11-
import { PrismaClient } from "@trigger.dev/database";
11+
import { PrismaClient, TaskSchedule, TaskScheduleInstance } from "@trigger.dev/database";
1212
import { Worker, type JobHandlerParams } from "@trigger.dev/redis-worker";
1313
import { calculateDistributedExecutionTime } from "./distributedScheduling.js";
1414
import { calculateNextScheduledTimestamp, nextScheduledTimestamps } from "./scheduleCalculation.js";
@@ -645,6 +645,140 @@ export class ScheduleEngine {
645645
});
646646
}
647647

648+
public recoverSchedulesInEnvironment(projectId: string, environmentId: string) {
649+
return startSpan(this.tracer, "recoverSchedulesInEnvironment", async (span) => {
650+
this.logger.info("Recovering schedules in environment", {
651+
environmentId,
652+
projectId,
653+
});
654+
655+
span.setAttribute("environmentId", environmentId);
656+
657+
const schedules = await this.prisma.taskSchedule.findMany({
658+
where: {
659+
projectId,
660+
instances: {
661+
some: {
662+
environmentId,
663+
},
664+
},
665+
},
666+
select: {
667+
id: true,
668+
generatorExpression: true,
669+
instances: {
670+
select: {
671+
id: true,
672+
environmentId: true,
673+
lastScheduledTimestamp: true,
674+
nextScheduledTimestamp: true,
675+
},
676+
},
677+
},
678+
});
679+
680+
const instancesWithSchedule = schedules
681+
.map((schedule) => ({
682+
schedule,
683+
instance: schedule.instances.find((instance) => instance.environmentId === environmentId),
684+
}))
685+
.filter((instance) => instance.instance) as Array<{
686+
schedule: Omit<(typeof schedules)[number], "instances">;
687+
instance: NonNullable<(typeof schedules)[number]["instances"][number]>;
688+
}>;
689+
690+
if (instancesWithSchedule.length === 0) {
691+
this.logger.info("No instances found for environment", {
692+
environmentId,
693+
projectId,
694+
});
695+
696+
return {
697+
recovered: [],
698+
skipped: [],
699+
};
700+
}
701+
702+
const results = {
703+
recovered: [],
704+
skipped: [],
705+
} as { recovered: string[]; skipped: string[] };
706+
707+
for (const { instance, schedule } of instancesWithSchedule) {
708+
this.logger.info("Recovering schedule", {
709+
schedule,
710+
instance,
711+
});
712+
713+
const [recoverError, result] = await tryCatch(
714+
this.#recoverTaskScheduleInstance({ instance, schedule })
715+
);
716+
717+
if (recoverError) {
718+
this.logger.error("Error recovering schedule", {
719+
error: recoverError instanceof Error ? recoverError.message : String(recoverError),
720+
});
721+
722+
span.setAttribute("recover_error", true);
723+
span.setAttribute(
724+
"recover_error_message",
725+
recoverError instanceof Error ? recoverError.message : String(recoverError)
726+
);
727+
} else {
728+
span.setAttribute("recover_success", true);
729+
730+
if (result === "recovered") {
731+
results.recovered.push(instance.id);
732+
} else {
733+
results.skipped.push(instance.id);
734+
}
735+
}
736+
}
737+
738+
return results;
739+
});
740+
}
741+
742+
async #recoverTaskScheduleInstance({
743+
instance,
744+
schedule,
745+
}: {
746+
instance: {
747+
id: string;
748+
environmentId: string;
749+
lastScheduledTimestamp: Date | null;
750+
nextScheduledTimestamp: Date | null;
751+
};
752+
schedule: { id: string; generatorExpression: string };
753+
}) {
754+
// inspect the schedule worker to see if there is a job for this instance
755+
const job = await this.worker.getJob(`scheduled-task-instance:${instance.id}`);
756+
757+
if (job) {
758+
this.logger.info("Job already exists for instance", {
759+
instanceId: instance.id,
760+
job,
761+
schedule,
762+
});
763+
764+
return "skipped";
765+
}
766+
767+
this.logger.info("No job found for instance, registering next run", {
768+
instanceId: instance.id,
769+
schedule,
770+
});
771+
772+
// If the job does not exist, register the next run
773+
await this.registerNextTaskScheduleInstance({ instanceId: instance.id });
774+
775+
return "recovered";
776+
}
777+
778+
async getJob(id: string) {
779+
return this.worker.getJob(id);
780+
}
781+
648782
async quit() {
649783
this.logger.info("Shutting down schedule engine");
650784

internal-packages/schedule-engine/test/scheduleEngine.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { containerTest } from "@internal/testcontainers";
22
import { trace } from "@internal/tracing";
3-
import { describe, expect, vi } from "vitest";
4-
import { ScheduleEngine } from "../src/index.js";
53
import { setTimeout } from "timers/promises";
4+
import { describe, expect, vi } from "vitest";
65
import { TriggerScheduledTaskParams } from "../src/engine/types.js";
6+
import { ScheduleEngine } from "../src/index.js";
77

88
describe("ScheduleEngine Integration", () => {
99
containerTest(

0 commit comments

Comments
 (0)