Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,6 @@ export const DeploymentCTA: React.FC<{
</CreateVariableDialog>
);

if (tab === "releases")
return (
<CreateReleaseDialog deploymentId={deploymentId} systemId={systemId}>
<Button variant="outline" className="flex items-center gap-2" size="sm">
New Release
</Button>
</CreateReleaseDialog>
);

if (tab === "channels")
return (
<CreateReleaseChannelDialog deploymentId={deploymentId}>
Expand All @@ -42,5 +33,12 @@ export const DeploymentCTA: React.FC<{
</Button>
</CreateReleaseChannelDialog>
);
return null;

return (
<CreateReleaseDialog deploymentId={deploymentId} systemId={systemId}>
<Button variant="outline" className="flex items-center gap-2" size="sm">
New Release
</Button>
</CreateReleaseDialog>
);
};
8 changes: 6 additions & 2 deletions packages/job-dispatch/src/job-creation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ export const onJobCompletion = async (je: schema.Job) => {
schema.deployment,
eq(schema.release.deploymentId, schema.deployment.id),
)
.innerJoin(
schema.environment,
eq(schema.releaseJobTrigger.environmentId, schema.environment.id),
)
.where(eq(schema.releaseJobTrigger.jobId, je.id))
.then(takeFirst);

Expand All @@ -98,8 +102,8 @@ export const onJobCompletion = async (je: schema.Job) => {

const isWaitingOnConcurrencyRequirementInSameRelease = and(
isNotNull(schema.environmentPolicy.concurrencyLimit),
eq(schema.environment.id, triggers.release_job_trigger.environmentId),
eq(schema.releaseJobTrigger.releaseId, triggers.release.id),
eq(schema.environmentPolicy.id, triggers.environment.policyId),
eq(schema.release.deploymentId, triggers.release.deploymentId),
eq(schema.job.status, JobStatus.Pending),
);

Expand Down
140 changes: 78 additions & 62 deletions packages/job-dispatch/src/policies/concurrency-policy.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import _ from "lodash";

import { and, eq, inArray, isNull, ne, notInArray, sql } from "@ctrlplane/db";
import { and, count, eq, inArray, isNull, ne, notInArray } from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { exitedStatus, JobStatus } from "@ctrlplane/validators/jobs";

Expand All @@ -20,17 +20,47 @@ export const isPassingConcurrencyPolicy: ReleaseIdPolicyChecker = async (
) => {
if (releaseJobTriggers.length === 0) return [];

const isActiveJob = and(
notInArray(schema.job.status, exitedStatus),
ne(schema.job.status, JobStatus.Pending),
isNull(schema.resource.deletedAt),
);
const triggersGroupedByDeploymentAndPolicy = await db
.select()
.from(schema.releaseJobTrigger)
.innerJoin(
schema.release,
eq(schema.releaseJobTrigger.releaseId, schema.release.id),
)
.innerJoin(
schema.environment,
eq(schema.releaseJobTrigger.environmentId, schema.environment.id),
)
.innerJoin(
schema.environmentPolicy,
eq(schema.environment.policyId, schema.environmentPolicy.id),
)
.where(
inArray(
schema.releaseJobTrigger.id,
releaseJobTriggers.map((t) => t.id),
),
)
.then((rows) =>
_.chain(rows)
.groupBy((r) => [r.release.deploymentId, r.environment.policyId])
.map((groupedTriggers) => ({
deploymentId: groupedTriggers[0]!.release.deploymentId,
policyId: groupedTriggers[0]!.environment.policyId,
concurrencyLimit:
groupedTriggers[0]!.environment_policy.concurrencyLimit,
triggers: groupedTriggers
.map((t) => t.release_job_trigger)
.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()),
}))
.value(),
);

const activeJobSubquery = db
.selectDistinct({
count: sql<number>`count(*)`.as("count"),
releaseId: schema.releaseJobTrigger.releaseId,
environmentId: schema.releaseJobTrigger.environmentId,
const activeJobsPerDeploymentAndPolicy = await db
.select({
count: count(),
deploymentId: schema.release.deploymentId,
policyId: schema.environment.policyId,
})
.from(schema.job)
.innerJoin(
Expand All @@ -41,63 +71,49 @@ export const isPassingConcurrencyPolicy: ReleaseIdPolicyChecker = async (
schema.resource,
eq(schema.releaseJobTrigger.resourceId, schema.resource.id),
)
.where(isActiveJob)
.groupBy(
schema.releaseJobTrigger.releaseId,
schema.releaseJobTrigger.environmentId,
)
.as("active_job_subquery");

return db
.select()
.from(schema.releaseJobTrigger)
.leftJoin(
activeJobSubquery,
and(
eq(schema.releaseJobTrigger.releaseId, activeJobSubquery.releaseId),
eq(
schema.releaseJobTrigger.environmentId,
activeJobSubquery.environmentId,
),
),
.innerJoin(
schema.release,
eq(schema.releaseJobTrigger.releaseId, schema.release.id),
)
.innerJoin(
schema.environment,
eq(schema.releaseJobTrigger.environmentId, schema.environment.id),
)
.leftJoin(
schema.environmentPolicy,
eq(schema.environment.policyId, schema.environmentPolicy.id),
)
.where(
inArray(
schema.releaseJobTrigger.id,
releaseJobTriggers.map((t) => t.id),
and(
notInArray(schema.job.status, exitedStatus),
ne(schema.job.status, JobStatus.Pending),
isNull(schema.resource.deletedAt),
inArray(
schema.release.deploymentId,
triggersGroupedByDeploymentAndPolicy
.filter((t) => t.concurrencyLimit != null)
.map((t) => t.deploymentId),
),
inArray(
schema.environment.policyId,
triggersGroupedByDeploymentAndPolicy
.filter((t) => t.concurrencyLimit != null)
.map((t) => t.policyId),
),
),
)
.then((data) =>
_.chain(data)
.groupBy((j) => [
j.release_job_trigger.releaseId,
j.release_job_trigger.environmentId,
])
.map((jcs) =>
// Check if the policy has a concurrency limit
jcs[0]!.environment_policy?.concurrencyLimit != null
? // If so, limit the number of release job triggers based on the concurrency limit
jcs.slice(
0,
Math.max(
0,
jcs[0]!.environment_policy.concurrencyLimit -
(jcs[0]!.active_job_subquery?.count ?? 0),
),
)
: // If not, return all release job triggers in the group
jcs,
)
.flatten()
.map((jc) => jc.release_job_trigger)
.value(),
);
.groupBy(schema.release.deploymentId, schema.environment.policyId);

return triggersGroupedByDeploymentAndPolicy
.map((info) => {
const { concurrencyLimit, deploymentId, policyId, triggers } = info;
if (concurrencyLimit == null) return triggers;

const activeJobs = activeJobsPerDeploymentAndPolicy.find(
(j) => j.deploymentId === deploymentId && j.policyId === policyId,
);

const count = activeJobs?.count ?? 0;

const allowedJobs = Math.max(0, concurrencyLimit - count);

return triggers.slice(0, allowedJobs);
})
.flat();
};
17 changes: 12 additions & 5 deletions packages/job-dispatch/src/policies/release-sequencing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,19 @@ const latestActiveReleaseSubQuery = (db: Tx) =>
eq(schema.releaseJobTrigger.releaseId, schema.release.id),
)
.innerJoin(schema.job, eq(schema.releaseJobTrigger.jobId, schema.job.id))
.innerJoin(
schema.resource,
eq(schema.releaseJobTrigger.resourceId, schema.resource.id),
)
.where(
notInArray(schema.job.status, [
JobStatus.Pending,
JobStatus.Skipped,
JobStatus.Cancelled,
]),
and(
notInArray(schema.job.status, [
JobStatus.Pending,
JobStatus.Skipped,
JobStatus.Cancelled,
]),
isNull(schema.resource.deletedAt),
),
)
.as("active_releases");

Expand Down
Loading