Skip to content
12 changes: 12 additions & 0 deletions apps/webservice/src/app/api/v1/jobs/[jobId]/openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ export const openapi: Swagger.SwaggerV3 = {
"external_run_not_found",
],
},
externalId: {
type: "string",
nullable: true,
description:
"External job identifier (e.g. GitHub workflow run ID)",
},
release: {
type: "object",
properties: {
Expand Down Expand Up @@ -205,13 +211,19 @@ export const openapi: Swagger.SwaggerV3 = {
type: "string",
format: "date-time",
},
jobAgentConfig: {
type: "object",
description: "Configuration for the Job Agent",
additionalProperties: true,
},
},
required: [
"id",
"status",
"createdAt",
"updatedAt",
"variables",
"jobAgentConfig",
],
},
},
Expand Down
7 changes: 7 additions & 0 deletions github/get-job-inputs/index.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions integrations/kubernetes-job-agent/src/agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { JobAgent } from "@ctrlplane/node-sdk";

import { env } from "./config.js";
import { api } from "./sdk.js";

export const agent = new JobAgent(
{
name: env.CTRLPLANE_AGENT_NAME,
workspaceId: env.CTRLPLANE_WORKSPACE_ID,
type: "kubernetes-job",
},
api,
);
114 changes: 58 additions & 56 deletions integrations/kubernetes-job-agent/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import type { Job } from "@ctrlplane/node-sdk";
import { CronJob } from "cron";
import handlebars from "handlebars";
import yaml from "js-yaml";
import { z } from "zod";

import { logger } from "@ctrlplane/logger";

import { agent } from "./agent.js";
import { env } from "./config.js";
import { getBatchClient, getJobStatus } from "./k8s.js";
import { api } from "./sdk.js";

const renderManifest = (manifestTemplate: string, variables: object) => {
try {
Expand All @@ -20,37 +22,39 @@ const renderManifest = (manifestTemplate: string, variables: object) => {
};

const deployManifest = async (
job: Job,
jobId: string,
namespace: string,
manifest: any,
) => {
try {
const name = manifest?.metadata?.name;
logger.info(`Deploying manifest: ${namespace}/${name}`);

if (name == null) {
logger.error("Job name not found in manifest", {
jobId,
namespace,
manifest,
});
await api.updateJob({
jobId,
updateJobRequest: {
status: "invalid_job_agent",
message: "Job name not found in manifest.",
await job.update({
externalId: "",
status: "invalid_job_agent",
message: "Job name not found in manifest.",
});
return;
}

logger.info(`Creating job - ${namespace}/${name}`);

await getBatchClient().createNamespacedJob(namespace, manifest);
await api.updateJob({
jobId,
updateJobRequest: {
status: "in_progress",
externalId: `${namespace}/${name}`,
message: "Job created successfully.",

await job.update({
status: "in_progress",
externalId: `${namespace}/${name}`,
message: "Job created successfully.",
});

logger.info(`Job created successfully`, {
jobId,
namespace,
Expand All @@ -62,71 +66,76 @@ const deployManifest = async (
namespace,
error,
});
await api.updateJob({
jobId,
updateJobRequest: {
status: "invalid_job_agent",
message: error.body?.message || error.message,
},

await job.update({
status: "invalid_job_agent" as const,
message: error.body?.message || error.message,
});
}
};

const spinUpNewJobs = async (agentId: string) => {
const jobAgentConfigSchema = z.object({
manifest: z.string(),
});

const spinUpNewJobs = async () => {
try {
const { jobs = [] } = await api.getNextJobs({ agentId });
const jobs = await agent.next();
logger.info(`Found ${jobs.length} job(s) to run.`);

await Promise.allSettled(
jobs.map(async (job) => {
logger.info(`Running job ${job.id}`);
logger.debug(`Job details:`, { job });
try {
const je = await api.getJob({ jobId: job.id });
const manifest = renderManifest(job.jobAgentConfig.manifest, je);
jobs.map(async (job: Job) => {
const jobDetails = await job.get();
logger.info(`Running job ${jobDetails.id}`);
logger.debug(`Job details:`, { job: jobDetails });

const namespace = manifest?.metadata?.namespace ?? env.KUBE_NAMESPACE;
await api.acknowledgeJob({ jobId: job.id });
await deployManifest(job.id, namespace, manifest);
} catch (error: any) {
logger.error(`Error processing job ${job.id}`, {
error: error.message,
stack: error.stack,
const parseResult = jobAgentConfigSchema.safeParse(
jobDetails.jobAgentConfig,
);
if (!parseResult.success) {
await job.update({
status: "failed",
message:
"Invalid job agent configuration: " + parseResult.error.message,
});
throw error;
return;
}

const manifest = renderManifest(parseResult.data.manifest, jobDetails);
const namespace = manifest?.metadata?.namespace ?? env.KUBE_NAMESPACE;

await job.acknowledge();
await deployManifest(job, jobDetails.id, namespace, manifest);
}),
);
} catch (error: any) {
logger.error("Error spinning up new jobs", {
agentId,
error: error.message,
});
throw error;
}
};

const updateExecutionStatus = async (agentId: string) => {
const updateExecutionStatus = async () => {
try {
const jobs = await api.getAgentRunningJob({ agentId });
logger.info(`Found ${jobs.length} running execution(s)`);
const jobs = await agent.running();
logger.info(`Found ${jobs.length} running job(s)`);
await Promise.allSettled(
jobs.map(async (job) => {
const [namespace, name] = job.externalId?.split("/") ?? [];
jobs.map(async (job: Job) => {
const jobDetails = await job.get();
const [namespace, name] = jobDetails.externalId?.split("/") ?? [];
if (namespace == null || name == null) {
logger.error("Invalid external run ID", {
jobId: job.id,
externalId: job.externalId,
jobId: jobDetails.id,
externalId: jobDetails.externalId,
});
return;
}

logger.debug(`Checking status of ${namespace}/${name}`);
try {
const { status, message } = await getJobStatus(namespace, name);
await api.updateJob({
jobId: job.id,
updateJobRequest: { status, message },
});
await job.update({ status, message });
logger.info(`Updated status for ${namespace}/${name}`, {
status,
message,
Expand All @@ -140,25 +149,18 @@ const updateExecutionStatus = async (agentId: string) => {
);
} catch (error: any) {
logger.error("Error updating execution statuses", {
agentId,
error: error.message,
});
}
};

const scan = async () => {
try {
const { id } = await api.updateJobAgent({
updateJobAgentRequest: {
name: env.CTRLPLANE_AGENT_NAME,
workspaceId: env.CTRLPLANE_WORKSPACE_ID,
type: "kubernetes-job",
},
});
const { id } = await agent.get();

logger.info(`Agent ID: ${id}`);
await spinUpNewJobs(id);
await updateExecutionStatus(id);
await spinUpNewJobs();
await updateExecutionStatus();
} catch (error: any) {
logger.error("Error during scan operation", { error: error.message });
throw error;
Expand Down
8 changes: 3 additions & 5 deletions integrations/kubernetes-job-agent/src/sdk.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { Configuration, DefaultApi } from "@ctrlplane/node-sdk";
import { createClient } from "@ctrlplane/node-sdk";

import { env } from "./config.js";

const config = new Configuration({
basePath: `${env.CTRLPLANE_API_URL}/api`,
export const api = createClient({
baseUrl: env.CTRLPLANE_API_URL,
apiKey: env.CTRLPLANE_API_KEY,
});

export const api = new DefaultApi(config);
4 changes: 2 additions & 2 deletions integrations/kubernetes-job-agent/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { SetTargetProvidersTargetsRequest } from "@ctrlplane/node-sdk";
import type { Operations } from "@ctrlplane/node-sdk";

export function omitNullUndefined(obj: object) {
return Object.entries(obj).reduce<Record<string, string>>(
Expand All @@ -11,5 +11,5 @@ export function omitNullUndefined(obj: object) {
}

export type ScannerFunc = () => Promise<
SetTargetProvidersTargetsRequest["targets"]
Operations["setTargetProvidersTargets"]["requestBody"]["content"]["application/json"]["targets"]
>;
Loading
Loading