diff --git a/apps/webservice/src/app/[workspaceSlug]/(app)/(deploy)/(raw)/systems/[systemSlug]/(raw)/deployments/[deploymentSlug]/(sidebar)/_components/release-cell/DeploymentVersionEnvironmentCell.tsx b/apps/webservice/src/app/[workspaceSlug]/(app)/(deploy)/(raw)/systems/[systemSlug]/(raw)/deployments/[deploymentSlug]/(sidebar)/_components/release-cell/DeploymentVersionEnvironmentCell.tsx index 292c2fdf7..a3ea76c4d 100644 --- a/apps/webservice/src/app/[workspaceSlug]/(app)/(deploy)/(raw)/systems/[systemSlug]/(raw)/deployments/[deploymentSlug]/(sidebar)/_components/release-cell/DeploymentVersionEnvironmentCell.tsx +++ b/apps/webservice/src/app/[workspaceSlug]/(app)/(deploy)/(raw)/systems/[systemSlug]/(raw)/deployments/[deploymentSlug]/(sidebar)/_components/release-cell/DeploymentVersionEnvironmentCell.tsx @@ -4,7 +4,7 @@ import type * as SCHEMA from "@ctrlplane/db/schema"; import React from "react"; import Link from "next/link"; import { useParams } from "next/navigation"; -import { IconBoltOff, IconCubeOff } from "@tabler/icons-react"; +import { IconBoltOff, IconClock, IconCubeOff } from "@tabler/icons-react"; import { useInView } from "react-intersection-observer"; import { Skeleton } from "@ctrlplane/ui/skeleton"; @@ -47,6 +47,41 @@ const NoReleaseTargetsCell: React.FC<{ ); +const BlockedByActiveJobsCell: React.FC<{ + deploymentVersion: { id: string; tag: string }; + deployment: { id: string; name: string; slug: string }; + system: { slug: string }; +}> = ({ deploymentVersion, deployment, system }) => { + const { workspaceSlug } = useParams<{ workspaceSlug: string }>(); + + const deploymentUrl = urls + .workspace(workspaceSlug) + .system(system.slug) + .deployment(deployment.slug) + .releases(); + + return ( +
+ +
+ +
+
+
+ {deploymentVersion.tag} +
+
+ Waiting on another release +
+
+ +
+ ); +}; + const NoJobAgentCell: React.FC<{ tag: string; system: { slug: string }; @@ -95,6 +130,14 @@ const DeploymentVersionEnvironmentCell: React.FC< deploymentId: deployment.id, }); + const { + data: targetsWithActiveJobs, + isLoading: isTargetsWithActiveJobsLoading, + } = api.releaseTarget.activeJobs.useQuery({ + environmentId: environment.id, + deploymentId: deployment.id, + }); + const { data: policyEvaluations, isLoading: isPolicyEvaluationsLoading } = api.policy.evaluate.useQuery({ environmentId: environment.id, @@ -108,7 +151,10 @@ const DeploymentVersionEnvironmentCell: React.FC< }); const isLoading = - isReleaseTargetsLoading || isPolicyEvaluationsLoading || isJobsLoading; + isReleaseTargetsLoading || + isPolicyEvaluationsLoading || + isJobsLoading || + isTargetsWithActiveJobsLoading; if (isLoading) return ; const hasJobs = jobs != null && jobs.length > 0; @@ -149,6 +195,12 @@ const DeploymentVersionEnvironmentCell: React.FC< /> ); + const allActiveJobs = (targetsWithActiveJobs ?? []).flatMap((t) => t.jobs); + const isWaitingOnActiveJobs = allActiveJobs.some( + ({ versionId }) => versionId !== deploymentVersion.id, + ); + if (isWaitingOnActiveJobs) return ; + const hasNoJobAgent = deployment.jobAgentId == null; if (hasNoJobAgent) return ; diff --git a/packages/api/src/router/release-target.ts b/packages/api/src/router/release-target.ts index 9d4250a80..efee8a088 100644 --- a/packages/api/src/router/release-target.ts +++ b/packages/api/src/router/release-target.ts @@ -1,9 +1,11 @@ +import _ from "lodash"; import { isPresent } from "ts-is-present"; import { z } from "zod"; -import { and, eq } from "@ctrlplane/db"; +import { and, eq, notInArray } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; import { Permission } from "@ctrlplane/validators/auth"; +import { exitedStatus } from "@ctrlplane/validators/jobs"; import { createTRPCRouter, protectedProcedure } from "../trpc"; @@ -77,4 +79,98 @@ export const releaseTargetRouter = createTRPCRouter({ limit: 500, }); }), + + activeJobs: protectedProcedure + .input( + z + .object({ + resourceId: z.string().uuid().optional(), + environmentId: z.string().uuid().optional(), + deploymentId: z.string().uuid().optional(), + }) + .refine( + (data) => + data.resourceId != null || + data.environmentId != null || + data.deploymentId != null, + ), + ) + .meta({ + authorizationCheck: async ({ canUser, input }) => { + const resourceResult = + input.resourceId != null + ? await canUser.perform(Permission.ResourceGet).on({ + type: "resource", + id: input.resourceId, + }) + : true; + + const environmentResult = + input.environmentId != null + ? await canUser.perform(Permission.EnvironmentGet).on({ + type: "environment", + id: input.environmentId, + }) + : true; + + const deploymentResult = + input.deploymentId != null + ? await canUser.perform(Permission.DeploymentGet).on({ + type: "deployment", + id: input.deploymentId, + }) + : true; + + return resourceResult && environmentResult && deploymentResult; + }, + }) + .query(async ({ ctx, input }) => { + const { resourceId, environmentId, deploymentId } = input; + + const activeJobs = await ctx.db + .select() + .from(schema.job) + .innerJoin( + schema.releaseJob, + eq(schema.releaseJob.jobId, schema.job.id), + ) + .innerJoin( + schema.release, + eq(schema.releaseJob.releaseId, schema.release.id), + ) + .innerJoin( + schema.versionRelease, + eq(schema.release.versionReleaseId, schema.versionRelease.id), + ) + .innerJoin( + schema.releaseTarget, + eq(schema.versionRelease.releaseTargetId, schema.releaseTarget.id), + ) + .where( + and( + notInArray(schema.job.status, exitedStatus), + resourceId != null + ? eq(schema.releaseTarget.resourceId, resourceId) + : undefined, + environmentId != null + ? eq(schema.releaseTarget.environmentId, environmentId) + : undefined, + deploymentId != null + ? eq(schema.releaseTarget.deploymentId, deploymentId) + : undefined, + ), + ); + + return _.chain(activeJobs) + .groupBy((job) => job.release_target.id) + .map((jobsByTarget) => { + const releaseTarget = jobsByTarget[0]!.release_target; + const jobs = jobsByTarget.map((j) => ({ + ...j.job, + versionId: j.version_release.versionId, + })); + return { ...releaseTarget, jobs }; + }) + .value(); + }), }); diff --git a/packages/job-dispatch/src/job-creation.ts b/packages/job-dispatch/src/job-creation.ts index cff585bdb..2b4b370b6 100644 --- a/packages/job-dispatch/src/job-creation.ts +++ b/packages/job-dispatch/src/job-creation.ts @@ -1,16 +1,11 @@ import type { Tx } from "@ctrlplane/db"; import _ from "lodash"; -import { and, eq, isNotNull, ne, or, takeFirst } from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; +import { eq, takeFirst } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; import { logger } from "@ctrlplane/logger"; import { JobStatus } from "@ctrlplane/validators/jobs"; -import { dispatchReleaseJobTriggers } from "./job-dispatch.js"; -import { isPassingAllPolicies } from "./policy-checker.js"; -import { cancelOldReleaseJobTriggersOnJobDispatch } from "./release-sequencing.js"; - export const createTriggeredRunbookJob = async ( db: Tx, runbook: schema.Runbook, @@ -57,123 +52,3 @@ export const createTriggeredRunbookJob = async ( return job; }; - -/** - * When a job completes, there may be other jobs that should now be triggered - * because the completion of this job means that some policies are now passing. - * - * criteria requirement - "need n from QA to pass before deploying to staging" - * wait requirement - "in the same environment, need to wait for previous release to be deployed first" - * concurrency requirement - "only n releases in staging at a time" - * version dependency - "need to wait for deployment X version Y to be deployed first" - * - * - * This function looks at the job's release and deployment and finds all the - * other release that should be triggered and dispatches them. - * - * @param je - */ -export const onJobCompletion = async (je: schema.Job) => { - const triggers = await db - .select() - .from(schema.releaseJobTrigger) - .innerJoin( - schema.deploymentVersion, - eq(schema.releaseJobTrigger.versionId, schema.deploymentVersion.id), - ) - .innerJoin( - schema.deployment, - eq(schema.deploymentVersion.deploymentId, schema.deployment.id), - ) - .innerJoin( - schema.environment, - eq(schema.releaseJobTrigger.environmentId, schema.environment.id), - ) - .where(eq(schema.releaseJobTrigger.jobId, je.id)) - .then(takeFirst); - - const isDependentOnTriggerForCriteria = and( - eq(schema.releaseJobTrigger.versionId, triggers.deployment_version.id), - eq( - schema.environmentPolicyDeployment.environmentId, - triggers.release_job_trigger.environmentId, - ), - ); - - const isWaitingOnConcurrencyRequirementInSameRelease = and( - isNotNull(schema.environmentPolicy.concurrencyLimit), - eq(schema.environmentPolicy.id, triggers.environment.policyId), - eq( - schema.deploymentVersion.deploymentId, - triggers.deployment_version.deploymentId, - ), - eq(schema.job.status, JobStatus.Pending), - ); - - const isDependentOnVersionOfTriggerDeployment = isNotNull( - schema.versionDependency.id, - ); - - const isWaitingOnJobToFinish = and( - eq(schema.environment.id, triggers.release_job_trigger.environmentId), - eq(schema.deployment.id, triggers.deployment.id), - ne(schema.deploymentVersion.id, triggers.deployment_version.id), - ); - - const affectedReleaseJobTriggers = await db - .select() - .from(schema.releaseJobTrigger) - .innerJoin( - schema.deploymentVersion, - eq(schema.releaseJobTrigger.versionId, schema.deploymentVersion.id), - ) - .innerJoin( - schema.deployment, - eq(schema.deploymentVersion.deploymentId, schema.deployment.id), - ) - .innerJoin(schema.job, eq(schema.releaseJobTrigger.jobId, schema.job.id)) - .innerJoin( - schema.environment, - eq(schema.releaseJobTrigger.environmentId, schema.environment.id), - ) - .leftJoin( - schema.environmentPolicy, - eq(schema.environment.policyId, schema.environmentPolicy.id), - ) - .leftJoin( - schema.environmentPolicyDeployment, - eq( - schema.environmentPolicyDeployment.policyId, - schema.environmentPolicy.id, - ), - ) - .leftJoin( - schema.versionDependency, - and( - eq( - schema.versionDependency.versionId, - schema.releaseJobTrigger.versionId, - ), - eq(schema.versionDependency.deploymentId, triggers.deployment.id), - ), - ) - .where( - and( - eq(schema.job.status, JobStatus.Pending), - or( - isDependentOnTriggerForCriteria, - isWaitingOnJobToFinish, - isWaitingOnConcurrencyRequirementInSameRelease, - isDependentOnVersionOfTriggerDeployment, - ), - ), - ); - - await dispatchReleaseJobTriggers(db) - .releaseTriggers( - affectedReleaseJobTriggers.map((t) => t.release_job_trigger), - ) - .filter(isPassingAllPolicies) - .then(cancelOldReleaseJobTriggersOnJobDispatch) - .dispatch(); -}; diff --git a/packages/job-dispatch/src/job-failure.ts b/packages/job-dispatch/src/job-failure.ts deleted file mode 100644 index ff316db06..000000000 --- a/packages/job-dispatch/src/job-failure.ts +++ /dev/null @@ -1,71 +0,0 @@ -import { and, count, eq, ne, takeFirst, takeFirstOrNull } from "@ctrlplane/db"; -import { db } from "@ctrlplane/db/client"; -import * as schema from "@ctrlplane/db/schema"; -import { logger } from "@ctrlplane/logger"; - -import { dispatchReleaseJobTriggers } from "./job-dispatch.js"; -import { createReleaseJobTriggers } from "./release-job-trigger.js"; -import { cancelOldReleaseJobTriggersOnJobDispatch } from "./release-sequencing.js"; - -export const onJobFailure = async (job: schema.Job) => { - const jobInfo = await db - .select() - .from(schema.releaseJobTrigger) - .innerJoin( - schema.deploymentVersion, - eq(schema.releaseJobTrigger.versionId, schema.deploymentVersion.id), - ) - .innerJoin( - schema.deployment, - eq(schema.deploymentVersion.deploymentId, schema.deployment.id), - ) - .where(eq(schema.releaseJobTrigger.jobId, job.id)) - .then(takeFirstOrNull); - - if (jobInfo == null) return; - - const releaseJobTriggers = await db - .select({ count: count() }) - .from(schema.releaseJobTrigger) - .where( - and( - eq(schema.releaseJobTrigger.versionId, jobInfo.deployment_version.id), - eq( - schema.releaseJobTrigger.environmentId, - jobInfo.release_job_trigger.environmentId, - ), - eq( - schema.releaseJobTrigger.resourceId, - jobInfo.release_job_trigger.resourceId, - ), - ne(schema.releaseJobTrigger.id, jobInfo.release_job_trigger.id), - ), - ) - .then(takeFirst); - - const { count: releaseJobTriggerCount } = releaseJobTriggers; - - if (releaseJobTriggerCount >= jobInfo.deployment.retryCount) return; - - const createTrigger = createReleaseJobTriggers(db, "retry") - .versions([jobInfo.deployment_version.id]) - .resources([jobInfo.release_job_trigger.resourceId]) - .environments([jobInfo.release_job_trigger.environmentId]); - - const trigger = - jobInfo.release_job_trigger.causedById != null - ? await createTrigger - .causedById(jobInfo.release_job_trigger.causedById) - .insert() - : await createTrigger.insert(); - - await dispatchReleaseJobTriggers(db) - .releaseTriggers(trigger) - .then(cancelOldReleaseJobTriggersOnJobDispatch) - .dispatch() - .then(() => - logger.info( - `Retry job for deployment version ${jobInfo.deployment_version.id} and resource ${jobInfo.release_job_trigger.resourceId} created and dispatched.`, - ), - ); -}; diff --git a/packages/job-dispatch/src/job-update.ts b/packages/job-dispatch/src/job-update.ts index 9251a54f9..bb9b8d472 100644 --- a/packages/job-dispatch/src/job-update.ts +++ b/packages/job-dispatch/src/job-update.ts @@ -1,15 +1,13 @@ import type { Tx } from "@ctrlplane/db"; -import { eq, sql, takeFirst } from "@ctrlplane/db"; +import { eq, sql, takeFirst, takeFirstOrNull } from "@ctrlplane/db"; import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; +import { Channel, getQueue } from "@ctrlplane/events"; import { logger } from "@ctrlplane/logger"; import { ReservedMetadataKey } from "@ctrlplane/validators/conditions"; import { exitedStatus, JobStatus } from "@ctrlplane/validators/jobs"; -import { onJobCompletion } from "./job-creation.js"; -import { onJobFailure } from "./job-failure.js"; - const log = logger.child({ module: "job-update" }); const updateJobMetadata = async ( @@ -83,6 +81,38 @@ const getCompletedAt = ( return null; }; +const getIsJobJustCompleted = ( + previousStatus: JobStatus, + newStatus: JobStatus, +) => { + const isPreviousStatusExited = exitedStatus.includes(previousStatus); + const isNewStatusExited = exitedStatus.includes(newStatus); + return !isPreviousStatusExited && isNewStatusExited; +}; + +const getReleaseTarget = (db: Tx, jobId: string) => + db + .select({ + environmentId: schema.releaseTarget.environmentId, + resourceId: schema.releaseTarget.resourceId, + deploymentId: schema.releaseTarget.deploymentId, + }) + .from(schema.releaseJob) + .innerJoin( + schema.release, + eq(schema.releaseJob.releaseId, schema.release.id), + ) + .innerJoin( + schema.versionRelease, + eq(schema.release.versionReleaseId, schema.versionRelease.id), + ) + .innerJoin( + schema.releaseTarget, + eq(schema.versionRelease.releaseTargetId, schema.releaseTarget.id), + ) + .where(eq(schema.releaseJob.jobId, jobId)) + .then(takeFirstOrNull); + export const updateJob = async ( db: Tx, jobId: string, @@ -112,15 +142,18 @@ export const updateJob = async ( if (metadata != null) await updateJobMetadata(jobId, jobBeforeUpdate.metadata, metadata); - const isJobFailure = - data.status === JobStatus.Failure && - jobBeforeUpdate.status !== JobStatus.Failure; - if (isJobFailure) await onJobFailure(updatedJob); + const isJobJustCompleted = getIsJobJustCompleted( + jobBeforeUpdate.status as JobStatus, + updatedJob.status as JobStatus, + ); + if (!isJobJustCompleted) return updatedJob; - const isJobCompletion = - data.status === JobStatus.Successful && - jobBeforeUpdate.status !== JobStatus.Successful; - if (isJobCompletion) await onJobCompletion(updatedJob); + const releaseTarget = await getReleaseTarget(db, jobId); + if (releaseTarget == null) return updatedJob; + await getQueue(Channel.EvaluateReleaseTarget).add( + `${releaseTarget.resourceId}-${releaseTarget.environmentId}-${releaseTarget.deploymentId}`, + releaseTarget, + ); return updatedJob; }; diff --git a/packages/rule-engine/src/manager/version-manager-rules.ts b/packages/rule-engine/src/manager/version-manager-rules.ts index 3d45f0373..8a361f3a9 100644 --- a/packages/rule-engine/src/manager/version-manager-rules.ts +++ b/packages/rule-engine/src/manager/version-manager-rules.ts @@ -1,6 +1,7 @@ import type { FilterRule, Policy, PreValidationRule } from "../types"; import type { Version } from "./version-rule-engine"; import { DeploymentDenyRule } from "../rules/deployment-deny-rule.js"; +import { ReleaseTargetConcurrencyRule } from "../rules/release-target-concurrency-rule.js"; import { getAnyApprovalRecords, getRoleApprovalRecords, @@ -68,8 +69,12 @@ export const getVersionApprovalRules = ( export const getRules = ( policy: Policy | null, + releaseTargetId: string, ): Array | PreValidationRule> => { - return getVersionApprovalRules(policy); + return [ + new ReleaseTargetConcurrencyRule(releaseTargetId), + ...getVersionApprovalRules(policy), + ]; // The rrule package is being stupid and deny windows is not top priority // right now so I am commenting this out // https://github.com/jkbrzt/rrule/issues/478 diff --git a/packages/rule-engine/src/manager/version-manager.ts b/packages/rule-engine/src/manager/version-manager.ts index c1b3e1141..a63009663 100644 --- a/packages/rule-engine/src/manager/version-manager.ts +++ b/packages/rule-engine/src/manager/version-manager.ts @@ -165,7 +165,7 @@ export class VersionReleaseManager implements ReleaseManager { async evaluate(options?: VersionEvaluateOptions) { const policy = options?.policy ?? (await this.getPolicy()); - const rules = (options?.rules ?? getRules)(policy); + const rules = (options?.rules ?? getRules)(policy, this.releaseTarget.id); const engine = new VersionRuleEngine(rules); const versions = diff --git a/packages/rule-engine/src/manager/version-rule-engine.ts b/packages/rule-engine/src/manager/version-rule-engine.ts index ccf9de1d7..03b880f2b 100644 --- a/packages/rule-engine/src/manager/version-rule-engine.ts +++ b/packages/rule-engine/src/manager/version-rule-engine.ts @@ -62,7 +62,7 @@ export class VersionRuleEngine implements RuleEngine { async evaluate(candidates: Version[]): Promise> { const preValidationRules = this.rules.filter(isPreValidationRule); for (const rule of preValidationRules) { - const result = rule.passing(); + const result = await rule.passing(); if (!result.passing) { return { diff --git a/packages/rule-engine/src/rules/release-target-concurrency-rule.ts b/packages/rule-engine/src/rules/release-target-concurrency-rule.ts new file mode 100644 index 000000000..089e4b508 --- /dev/null +++ b/packages/rule-engine/src/rules/release-target-concurrency-rule.ts @@ -0,0 +1,42 @@ +import { and, eq, notInArray, takeFirstOrNull } from "@ctrlplane/db"; +import { db } from "@ctrlplane/db/client"; +import * as schema from "@ctrlplane/db/schema"; +import { exitedStatus } from "@ctrlplane/validators/jobs"; + +import type { PreValidationRule } from "../types"; + +export class ReleaseTargetConcurrencyRule implements PreValidationRule { + public readonly name = "ReleaseTargetConcurrencyRule"; + + constructor(private readonly releaseTargetId: string) {} + + async passing() { + const activeJob = await db + .select() + .from(schema.job) + .innerJoin(schema.releaseJob, eq(schema.releaseJob.jobId, schema.job.id)) + .innerJoin( + schema.release, + eq(schema.releaseJob.releaseId, schema.release.id), + ) + .innerJoin( + schema.versionRelease, + eq(schema.release.versionReleaseId, schema.versionRelease.id), + ) + .where( + and( + eq(schema.versionRelease.releaseTargetId, this.releaseTargetId), + notInArray(schema.job.status, exitedStatus), + ), + ) + .limit(1) + .then(takeFirstOrNull); + + if (activeJob == null) return { passing: true }; + + return { + passing: false, + rejectionReason: `Release target ${this.releaseTargetId} has an active job`, + }; + } +} diff --git a/packages/rule-engine/src/types.ts b/packages/rule-engine/src/types.ts index b834f190f..03cbb7060 100644 --- a/packages/rule-engine/src/types.ts +++ b/packages/rule-engine/src/types.ts @@ -42,7 +42,7 @@ export type PreValidationResult = { export interface PreValidationRule { name: string; - passing(): PreValidationResult; + passing(): PreValidationResult | Promise; } /**