Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions apps/jobs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@ import { logger } from "@ctrlplane/logger";

import { run as expiredEnvChecker } from "./expired-env-checker/index.js";
import { run as jobPolicyChecker } from "./policy-checker/index.js";
import { run as timeoutChecker } from "./timeout-checker/index.js";

const jobs: Record<string, { run: () => Promise<void>; schedule: string }> = {
"policy-checker": {
run: jobPolicyChecker,
schedule: "* * * * *", // Default: Every minute
},
"expired-env-checker": {
run: expiredEnvChecker,
schedule: "* * * * *", // Default: Every minute
},
"policy-checker": { run: jobPolicyChecker, schedule: "* * * * *" },
"expired-env-checker": { run: expiredEnvChecker, schedule: "* * * * *" },
"timeout-checker": { run: timeoutChecker, schedule: "* * * * *" },
};

const jobSchema = z.object({
Expand All @@ -37,15 +33,8 @@ const jobSchema = z.object({
const parseJobArgs = () => {
const { values } = parseArgs({
options: {
job: {
type: "string",
short: "j",
multiple: true,
},
runOnce: {
type: "boolean",
short: "r",
},
job: { type: "string", short: "j", multiple: true },
runOnce: { type: "boolean", short: "r" },
},
});
return jobSchema.parse(values);
Expand Down
34 changes: 34 additions & 0 deletions apps/jobs/src/timeout-checker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { and, eq, isNotNull, lt, sql } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as SCHEMA from "@ctrlplane/db/schema";
import { updateJob } from "@ctrlplane/job-dispatch";
import { JobStatus } from "@ctrlplane/validators/jobs";

export const run = async () =>
db
.select({ id: SCHEMA.job.id })
.from(SCHEMA.deployment)
.innerJoin(
SCHEMA.release,
eq(SCHEMA.release.deploymentId, SCHEMA.deployment.id),
)
.innerJoin(
SCHEMA.releaseJobTrigger,
eq(SCHEMA.releaseJobTrigger.releaseId, SCHEMA.release.id),
)
.innerJoin(SCHEMA.job, eq(SCHEMA.releaseJobTrigger.jobId, SCHEMA.job.id))
.where(
and(
isNotNull(SCHEMA.deployment.timeout),
eq(SCHEMA.job.status, JobStatus.InProgress),
lt(
SCHEMA.job.createdAt,
sql`now() - ${SCHEMA.deployment.timeout} * interval '1 second'`,
),
),
)
.then(async (jobs) => {
await Promise.all(
jobs.map((job) => updateJob(job.id, { status: JobStatus.Failure })),
);
});
Comment on lines +7 to +34
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling and improve observability.

The timeout checker implementation needs better error handling and logging for production reliability.

Consider these improvements:

 export const run = async () =>
-  db
+  try {
+    const timedOutJobs = await db
     .select({ id: SCHEMA.job.id })
     .from(SCHEMA.deployment)
     .innerJoin(
       SCHEMA.release,
       eq(SCHEMA.release.deploymentId, SCHEMA.deployment.id),
     )
     .innerJoin(
       SCHEMA.releaseJobTrigger,
       eq(SCHEMA.releaseJobTrigger.releaseId, SCHEMA.release.id),
     )
     .innerJoin(SCHEMA.job, eq(SCHEMA.releaseJobTrigger.jobId, SCHEMA.job.id))
     .where(
       and(
         isNotNull(SCHEMA.deployment.timeout),
         eq(SCHEMA.job.status, JobStatus.InProgress),
         lt(
           SCHEMA.job.createdAt,
           sql`now() - ${SCHEMA.deployment.timeout} * interval '1 second'`,
         ),
       ),
-    )
-    .then(async (jobs) => {
-      await Promise.all(
-        jobs.map((job) => updateJob(job.id, { status: JobStatus.Failure })),
-      );
-    });
+    );
+
+    if (timedOutJobs.length > 0) {
+      logger.info(`Found ${timedOutJobs.length} timed-out jobs`);
+      // Process in batches of 50 to avoid overwhelming the system
+      for (let i = 0; i < timedOutJobs.length; i += 50) {
+        const batch = timedOutJobs.slice(i, i + 50);
+        await Promise.all(
+          batch.map(async (job) => {
+            try {
+              await updateJob(job.id, { status: JobStatus.Failure });
+              logger.info(`Marked job ${job.id} as failed due to timeout`);
+            } catch (error) {
+              logger.error(`Failed to update job ${job.id}:`, error);
+            }
+          }),
+        );
+      }
+    }
+  } catch (error) {
+    logger.error("Error in timeout checker:", error);
+    throw error;
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const run = async () =>
db
.select({ id: SCHEMA.job.id })
.from(SCHEMA.deployment)
.innerJoin(
SCHEMA.release,
eq(SCHEMA.release.deploymentId, SCHEMA.deployment.id),
)
.innerJoin(
SCHEMA.releaseJobTrigger,
eq(SCHEMA.releaseJobTrigger.releaseId, SCHEMA.release.id),
)
.innerJoin(SCHEMA.job, eq(SCHEMA.releaseJobTrigger.jobId, SCHEMA.job.id))
.where(
and(
isNotNull(SCHEMA.deployment.timeout),
eq(SCHEMA.job.status, JobStatus.InProgress),
lt(
SCHEMA.job.createdAt,
sql`now() - ${SCHEMA.deployment.timeout} * interval '1 second'`,
),
),
)
.then(async (jobs) => {
await Promise.all(
jobs.map((job) => updateJob(job.id, { status: JobStatus.Failure })),
);
});
export const run = async () =>
try {
const timedOutJobs = await db
.select({ id: SCHEMA.job.id })
.from(SCHEMA.deployment)
.innerJoin(
SCHEMA.release,
eq(SCHEMA.release.deploymentId, SCHEMA.deployment.id),
)
.innerJoin(
SCHEMA.releaseJobTrigger,
eq(SCHEMA.releaseJobTrigger.releaseId, SCHEMA.release.id),
)
.innerJoin(SCHEMA.job, eq(SCHEMA.releaseJobTrigger.jobId, SCHEMA.job.id))
.where(
and(
isNotNull(SCHEMA.deployment.timeout),
eq(SCHEMA.job.status, JobStatus.InProgress),
lt(
SCHEMA.job.createdAt,
sql`now() - ${SCHEMA.deployment.timeout} * interval '1 second'`,
),
),
);
if (timedOutJobs.length > 0) {
logger.info(`Found ${timedOutJobs.length} timed-out jobs`);
// Process in batches of 50 to avoid overwhelming the system
for (let i = 0; i < timedOutJobs.length; i += 50) {
const batch = timedOutJobs.slice(i, i + 50);
await Promise.all(
batch.map(async (job) => {
try {
await updateJob(job.id, { status: JobStatus.Failure });
logger.info(`Marked job ${job.id} as failed due to timeout`);
} catch (error) {
logger.error(`Failed to update job ${job.id}:`, error);
}
}),
);
}
}
} catch (error) {
logger.error("Error in timeout checker:", error);
throw error;
}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import type { RouterOutputs } from "@ctrlplane/api";
import { useParams, useRouter } from "next/navigation";
import { IconX } from "@tabler/icons-react";
import { IconInfoCircle, IconX } from "@tabler/icons-react";
import ms from "ms";
import prettyMilliseconds from "pretty-ms";
import { z } from "zod";

import * as SCHEMA from "@ctrlplane/db/schema";
Expand All @@ -26,6 +28,12 @@ import {
SelectValue,
} from "@ctrlplane/ui/select";
import { Textarea } from "@ctrlplane/ui/textarea";
import {
Tooltip,
TooltipContent,
TooltipProvider,
TooltipTrigger,
} from "@ctrlplane/ui/tooltip";
import {
defaultCondition,
isEmptyCondition,
Expand All @@ -35,7 +43,29 @@ import { ResourceConditionRender } from "~/app/[workspaceSlug]/(app)/_components
import { api } from "~/trpc/react";
import { DeploymentResourcesDialog } from "./DeploymentResourcesDialog";

const schema = z.object(SCHEMA.deploymentSchema.shape);
const timeoutSchema = z
.string()
.optional()
.refine((val) => {
if (val == null || val === "") return true;
try {
ms(val);
return true;
} catch {
return false;
}
}, "Invalid timeout, must be a valid duration string")
.refine((val) => {
if (val == null || val === "") return true;
const timeout = ms(val);
if (timeout < 1000) return false;
return true;
}, "Timeout must be at least 1 second");

const schema = z
.object(SCHEMA.deploymentSchema.shape)
.omit({ timeout: true })
.extend({ timeout: timeoutSchema });

type System = RouterOutputs["system"]["list"]["items"][number];

Expand All @@ -58,7 +88,11 @@ export const EditDeploymentSection: React.FC<EditDeploymentSectionProps> = ({
.map((e) => ({ ...e, resourceFilter: e.resourceFilter! })) ?? [];

const resourceFilter = deployment.resourceFilter ?? undefined;
const defaultValues = { ...deployment, resourceFilter };
const timeout =
deployment.timeout != null
? prettyMilliseconds(deployment.timeout)
: undefined;
const defaultValues = { ...deployment, resourceFilter, timeout };
const form = useForm({ schema, defaultValues, mode: "onSubmit" });
const { handleSubmit, setError } = form;

Expand All @@ -70,7 +104,12 @@ export const EditDeploymentSection: React.FC<EditDeploymentSectionProps> = ({
data.resourceFilter == null || isEmptyCondition(data.resourceFilter)
? null
: data.resourceFilter;
const updates = { ...data, resourceFilter: filter };
const timeout =
data.timeout != null && data.timeout !== ""
? ms(data.timeout) / 1000
: null;
const updates = { ...data, resourceFilter: filter, timeout };

updateDeployment
.mutateAsync({ id: deployment.id, data: updates })
.then((updatedDeployment) => {
Expand Down Expand Up @@ -194,6 +233,32 @@ export const EditDeploymentSection: React.FC<EditDeploymentSectionProps> = ({
</FormItem>
)}
/>
<FormField
control={form.control}
name="timeout"
render={({ field }) => (
<FormItem>
<FormLabel className="flex items-center gap-2">
Timeout
<TooltipProvider>
<Tooltip>
<TooltipTrigger>
<IconInfoCircle className="h-3 w-3 text-muted-foreground" />
</TooltipTrigger>
<TooltipContent className="p-2 text-xs text-muted-foreground">
If a job for this deployment takes longer than the
timeout, it will be marked as failed.
</TooltipContent>
</Tooltip>
</TooltipProvider>
</FormLabel>
<FormControl>
<Input {...field} className="w-16" />
</FormControl>
<FormMessage />
</FormItem>
)}
/>
<FormField
control={form.control}
name="resourceFilter"
Expand Down
1 change: 1 addition & 0 deletions packages/db/drizzle/0051_brown_gambit.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE "deployment" ADD COLUMN "timeout" integer DEFAULT NULL;
Loading
Loading