diff --git a/apps/webservice/src/app/[workspaceSlug]/(appv2)/systems/[systemSlug]/(raw)/deployments/[deploymentSlug]/(sidebar)/_components/DeploymentCTA.tsx b/apps/webservice/src/app/[workspaceSlug]/(appv2)/systems/[systemSlug]/(raw)/deployments/[deploymentSlug]/(sidebar)/_components/DeploymentCTA.tsx index ea177c0b3..272b9b083 100644 --- a/apps/webservice/src/app/[workspaceSlug]/(appv2)/systems/[systemSlug]/(raw)/deployments/[deploymentSlug]/(sidebar)/_components/DeploymentCTA.tsx +++ b/apps/webservice/src/app/[workspaceSlug]/(appv2)/systems/[systemSlug]/(raw)/deployments/[deploymentSlug]/(sidebar)/_components/DeploymentCTA.tsx @@ -25,15 +25,6 @@ export const DeploymentCTA: React.FC<{ ); - if (tab === "releases") - return ( - - - - ); - if (tab === "channels") return ( @@ -42,5 +33,12 @@ export const DeploymentCTA: React.FC<{ ); - return null; + + return ( + + + + ); }; diff --git a/packages/job-dispatch/src/job-creation.ts b/packages/job-dispatch/src/job-creation.ts index a6c0c1c64..1d6d96c74 100644 --- a/packages/job-dispatch/src/job-creation.ts +++ b/packages/job-dispatch/src/job-creation.ts @@ -85,6 +85,10 @@ export const onJobCompletion = async (je: schema.Job) => { schema.deployment, eq(schema.release.deploymentId, schema.deployment.id), ) + .innerJoin( + schema.environment, + eq(schema.releaseJobTrigger.environmentId, schema.environment.id), + ) .where(eq(schema.releaseJobTrigger.jobId, je.id)) .then(takeFirst); @@ -98,8 +102,8 @@ export const onJobCompletion = async (je: schema.Job) => { const isWaitingOnConcurrencyRequirementInSameRelease = and( isNotNull(schema.environmentPolicy.concurrencyLimit), - eq(schema.environment.id, triggers.release_job_trigger.environmentId), - eq(schema.releaseJobTrigger.releaseId, triggers.release.id), + eq(schema.environmentPolicy.id, triggers.environment.policyId), + eq(schema.release.deploymentId, triggers.release.deploymentId), eq(schema.job.status, JobStatus.Pending), ); diff --git a/packages/job-dispatch/src/policies/concurrency-policy.ts b/packages/job-dispatch/src/policies/concurrency-policy.ts index 80db98db0..2a1a826de 100644 --- a/packages/job-dispatch/src/policies/concurrency-policy.ts +++ b/packages/job-dispatch/src/policies/concurrency-policy.ts @@ -1,6 +1,6 @@ import _ from "lodash"; -import { and, eq, inArray, isNull, ne, notInArray, sql } from "@ctrlplane/db"; +import { and, count, eq, inArray, isNull, ne, notInArray } from "@ctrlplane/db"; import * as schema from "@ctrlplane/db/schema"; import { exitedStatus, JobStatus } from "@ctrlplane/validators/jobs"; @@ -20,17 +20,47 @@ export const isPassingConcurrencyPolicy: ReleaseIdPolicyChecker = async ( ) => { if (releaseJobTriggers.length === 0) return []; - const isActiveJob = and( - notInArray(schema.job.status, exitedStatus), - ne(schema.job.status, JobStatus.Pending), - isNull(schema.resource.deletedAt), - ); + const triggersGroupedByDeploymentAndPolicy = await db + .select() + .from(schema.releaseJobTrigger) + .innerJoin( + schema.release, + eq(schema.releaseJobTrigger.releaseId, schema.release.id), + ) + .innerJoin( + schema.environment, + eq(schema.releaseJobTrigger.environmentId, schema.environment.id), + ) + .innerJoin( + schema.environmentPolicy, + eq(schema.environment.policyId, schema.environmentPolicy.id), + ) + .where( + inArray( + schema.releaseJobTrigger.id, + releaseJobTriggers.map((t) => t.id), + ), + ) + .then((rows) => + _.chain(rows) + .groupBy((r) => [r.release.deploymentId, r.environment.policyId]) + .map((groupedTriggers) => ({ + deploymentId: groupedTriggers[0]!.release.deploymentId, + policyId: groupedTriggers[0]!.environment.policyId, + concurrencyLimit: + groupedTriggers[0]!.environment_policy.concurrencyLimit, + triggers: groupedTriggers + .map((t) => t.release_job_trigger) + .sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()), + })) + .value(), + ); - const activeJobSubquery = db - .selectDistinct({ - count: sql`count(*)`.as("count"), - releaseId: schema.releaseJobTrigger.releaseId, - environmentId: schema.releaseJobTrigger.environmentId, + const activeJobsPerDeploymentAndPolicy = await db + .select({ + count: count(), + deploymentId: schema.release.deploymentId, + policyId: schema.environment.policyId, }) .from(schema.job) .innerJoin( @@ -41,63 +71,49 @@ export const isPassingConcurrencyPolicy: ReleaseIdPolicyChecker = async ( schema.resource, eq(schema.releaseJobTrigger.resourceId, schema.resource.id), ) - .where(isActiveJob) - .groupBy( - schema.releaseJobTrigger.releaseId, - schema.releaseJobTrigger.environmentId, - ) - .as("active_job_subquery"); - - return db - .select() - .from(schema.releaseJobTrigger) - .leftJoin( - activeJobSubquery, - and( - eq(schema.releaseJobTrigger.releaseId, activeJobSubquery.releaseId), - eq( - schema.releaseJobTrigger.environmentId, - activeJobSubquery.environmentId, - ), - ), + .innerJoin( + schema.release, + eq(schema.releaseJobTrigger.releaseId, schema.release.id), ) .innerJoin( schema.environment, eq(schema.releaseJobTrigger.environmentId, schema.environment.id), ) - .leftJoin( - schema.environmentPolicy, - eq(schema.environment.policyId, schema.environmentPolicy.id), - ) .where( - inArray( - schema.releaseJobTrigger.id, - releaseJobTriggers.map((t) => t.id), + and( + notInArray(schema.job.status, exitedStatus), + ne(schema.job.status, JobStatus.Pending), + isNull(schema.resource.deletedAt), + inArray( + schema.release.deploymentId, + triggersGroupedByDeploymentAndPolicy + .filter((t) => t.concurrencyLimit != null) + .map((t) => t.deploymentId), + ), + inArray( + schema.environment.policyId, + triggersGroupedByDeploymentAndPolicy + .filter((t) => t.concurrencyLimit != null) + .map((t) => t.policyId), + ), ), ) - .then((data) => - _.chain(data) - .groupBy((j) => [ - j.release_job_trigger.releaseId, - j.release_job_trigger.environmentId, - ]) - .map((jcs) => - // Check if the policy has a concurrency limit - jcs[0]!.environment_policy?.concurrencyLimit != null - ? // If so, limit the number of release job triggers based on the concurrency limit - jcs.slice( - 0, - Math.max( - 0, - jcs[0]!.environment_policy.concurrencyLimit - - (jcs[0]!.active_job_subquery?.count ?? 0), - ), - ) - : // If not, return all release job triggers in the group - jcs, - ) - .flatten() - .map((jc) => jc.release_job_trigger) - .value(), - ); + .groupBy(schema.release.deploymentId, schema.environment.policyId); + + return triggersGroupedByDeploymentAndPolicy + .map((info) => { + const { concurrencyLimit, deploymentId, policyId, triggers } = info; + if (concurrencyLimit == null) return triggers; + + const activeJobs = activeJobsPerDeploymentAndPolicy.find( + (j) => j.deploymentId === deploymentId && j.policyId === policyId, + ); + + const count = activeJobs?.count ?? 0; + + const allowedJobs = Math.max(0, concurrencyLimit - count); + + return triggers.slice(0, allowedJobs); + }) + .flat(); }; diff --git a/packages/job-dispatch/src/policies/release-sequencing.ts b/packages/job-dispatch/src/policies/release-sequencing.ts index f83858a5f..fadfb80d1 100644 --- a/packages/job-dispatch/src/policies/release-sequencing.ts +++ b/packages/job-dispatch/src/policies/release-sequencing.ts @@ -99,12 +99,19 @@ const latestActiveReleaseSubQuery = (db: Tx) => eq(schema.releaseJobTrigger.releaseId, schema.release.id), ) .innerJoin(schema.job, eq(schema.releaseJobTrigger.jobId, schema.job.id)) + .innerJoin( + schema.resource, + eq(schema.releaseJobTrigger.resourceId, schema.resource.id), + ) .where( - notInArray(schema.job.status, [ - JobStatus.Pending, - JobStatus.Skipped, - JobStatus.Cancelled, - ]), + and( + notInArray(schema.job.status, [ + JobStatus.Pending, + JobStatus.Skipped, + JobStatus.Cancelled, + ]), + isNull(schema.resource.deletedAt), + ), ) .as("active_releases");