Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions apps/event-worker/src/workers/delete-resource.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import type { Tx } from "@ctrlplane/db";
import _ from "lodash";

import { eq, inArray } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as SCHEMA from "@ctrlplane/db/schema";
import { Channel, createWorker } from "@ctrlplane/events";
import { handleEvent } from "@ctrlplane/job-dispatch";
import { HookAction } from "@ctrlplane/validators/events";

const softDeleteResource = async (tx: Tx, resource: SCHEMA.Resource) =>
tx
.update(SCHEMA.resource)
.set({ deletedAt: new Date() })
.where(eq(SCHEMA.resource.id, resource.id));

const deleteReleaseTargets = async (tx: Tx, resource: SCHEMA.Resource) =>
tx
.delete(SCHEMA.releaseTarget)
.where(eq(SCHEMA.releaseTarget.resourceId, resource.id))
.returning();

const deleteComputedResources = async (tx: Tx, resource: SCHEMA.Resource) =>
Promise.all([
tx
.delete(SCHEMA.computedDeploymentResource)
.where(eq(SCHEMA.computedDeploymentResource.resourceId, resource.id)),
tx
.delete(SCHEMA.computedEnvironmentResource)
.where(eq(SCHEMA.computedEnvironmentResource.resourceId, resource.id)),
]);

const deleteComputedReleaseTargets = async (
tx: Tx,
releaseTargets: SCHEMA.ReleaseTarget[],
) =>
tx.delete(SCHEMA.computedPolicyTargetReleaseTarget).where(
inArray(
SCHEMA.computedPolicyTargetReleaseTarget.releaseTargetId,
releaseTargets.map((rt) => rt.id),
),
);

const dispatchExitHooks = async (
tx: Tx,
resource: SCHEMA.Resource,
deletedReleaseTargets: SCHEMA.ReleaseTarget[],
) => {
const deploymentIds = _.uniq(
deletedReleaseTargets.map((rt) => rt.deploymentId),
);
const deployments = await tx.query.deployment.findMany({
where: inArray(SCHEMA.deployment.id, deploymentIds),
});
const events = deployments.map((deployment) => ({
action: HookAction.DeploymentResourceRemoved,
payload: { deployment, resource },
}));
const handleEventPromises = events.map(handleEvent);
await Promise.allSettled(handleEventPromises);
};

export const deleteResourceWorker = createWorker(
Channel.DeleteResource,
async ({ data: resource }) => {
await db.transaction(async (tx) => {
await softDeleteResource(tx, resource);
await deleteComputedResources(tx, resource);
const rts = await deleteReleaseTargets(tx, resource);
await deleteComputedReleaseTargets(tx, rts);
await dispatchExitHooks(tx, resource, rts);
});
},
);
2 changes: 2 additions & 0 deletions apps/event-worker/src/workers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Worker } from "bullmq";

import { Channel } from "@ctrlplane/events";

import { deleteResourceWorker } from "./delete-resource.js";
import { evaluateReleaseTargetWorker } from "./evaluate-release-target.js";
import { dispatchJobWorker } from "./job-dispatch/index.js";
import { newDeploymentVersionWorker } from "./new-deployment-version.js";
Expand Down Expand Up @@ -34,4 +35,5 @@ export const workers: Workers<keyof ChannelMap> = {
[Channel.UpdatedResource]: updatedResourceWorker,
[Channel.NewResource]: newResourceWorker,
[Channel.NewPolicy]: newPolicyWorker,
[Channel.DeleteResource]: deleteResourceWorker,
};
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { z } from "zod";
import { and, eq, isNull, selector, upsertResources } from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { Channel, getQueue } from "@ctrlplane/events";
import { deleteResources } from "@ctrlplane/job-dispatch";
import { logger } from "@ctrlplane/logger";
import { variablesAES256 } from "@ctrlplane/secrets";
import { Permission } from "@ctrlplane/validators/auth";
Expand Down Expand Up @@ -139,6 +138,6 @@ export const DELETE = request()
{ status: 404 },
);

await deleteResources(db, [resource]);
await getQueue(Channel.DeleteResource).add(resource.id, resource);
return NextResponse.json({ success: true });
});
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { NextResponse } from "next/server";
import { and, eq, isNull } from "@ctrlplane/db";
import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { deleteResources } from "@ctrlplane/job-dispatch";
import { Channel, getQueue } from "@ctrlplane/events";
import { Permission } from "@ctrlplane/validators/auth";

import { authn, authz } from "~/app/api/v1/auth";
Expand Down Expand Up @@ -96,7 +96,7 @@ export const DELETE = request()
);
}

await deleteResources(db, [resource]);
await getQueue(Channel.DeleteResource).add(resource.id, resource);

return NextResponse.json({ success: true });
},
Expand Down
18 changes: 10 additions & 8 deletions packages/api/src/router/resources.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import {
takeFirstOrNull,
} from "@ctrlplane/db";
import * as schema from "@ctrlplane/db/schema";
import { Channel, getQueue } from "@ctrlplane/events";
import {
cancelOldReleaseJobTriggersOnJobDispatch,
createJobApprovals,
createReleaseJobTriggers,
deleteResources,
dispatchReleaseJobTriggers,
isPassingAllPoliciesExceptNewerThanLastActive,
isPassingChannelSelectorPolicy,
Expand Down Expand Up @@ -695,13 +695,15 @@ export const resourceRouter = createTRPCRouter({
),
})
.input(z.array(z.string().uuid()))
.mutation(async ({ ctx, input }) =>
ctx.db.query.resource
.findMany({
where: and(inArray(schema.resource.id, input), isNotDeleted),
})
.then((resources) => deleteResources(ctx.db, resources)),
),
.mutation(async ({ ctx, input }) => {
const resources = await ctx.db.query.resource.findMany({
where: and(inArray(schema.resource.id, input), isNotDeleted),
});
await getQueue(Channel.DeleteResource).addBulk(
resources.map((r) => ({ name: r.id, data: r })),
);
return resources;
}),

metadataKeys: protectedProcedure
.meta({
Expand Down
10 changes: 9 additions & 1 deletion packages/db/src/selectors/compute/deployment-builder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { and, eq, inArray, isNotNull } from "drizzle-orm/pg-core/expressions";
import {
and,
eq,
inArray,
isNotNull,
isNull,
} from "drizzle-orm/pg-core/expressions";

import type { Tx } from "../../common.js";
import * as SCHEMA from "../../schema/index.js";
Expand Down Expand Up @@ -40,6 +46,7 @@ export class DeploymentBuilder {
where: and(
eq(SCHEMA.resource.workspaceId, workspaceId),
qb.resources().where(d.resourceSelector).sql(),
isNull(SCHEMA.resource.deletedAt),
),
});

Expand Down Expand Up @@ -114,6 +121,7 @@ export class WorkspaceDeploymentBuilder {
where: and(
eq(SCHEMA.resource.workspaceId, this.workspaceId),
qb.resources().where(d.resourceSelector).sql(),
isNull(SCHEMA.resource.deletedAt),
),
});

Expand Down
10 changes: 9 additions & 1 deletion packages/db/src/selectors/compute/environment-builder.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { and, eq, inArray, isNotNull } from "drizzle-orm/pg-core/expressions";
import {
and,
eq,
inArray,
isNotNull,
isNull,
} from "drizzle-orm/pg-core/expressions";

import type { Tx } from "../../common.js";
import * as SCHEMA from "../../schema/index.js";
Expand Down Expand Up @@ -44,6 +50,7 @@ export class EnvironmentBuilder {
where: and(
eq(SCHEMA.resource.workspaceId, workspaceId),
qb.resources().where(env.resourceSelector).sql(),
isNull(SCHEMA.resource.deletedAt),
),
});
return resources.map((r) => ({ environmentId, resourceId: r.id }));
Expand Down Expand Up @@ -113,6 +120,7 @@ export class WorkspaceEnvironmentBuilder {
where: and(
eq(SCHEMA.resource.workspaceId, this.workspaceId),
qb.resources().where(env.resourceSelector).sql(),
isNull(SCHEMA.resource.deletedAt),
),
});

Expand Down
2 changes: 2 additions & 0 deletions packages/db/src/selectors/compute/resource-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export class ResourceBuilder {
isResourceMatchingEnvironment,
isResourceMatchingDeployment,
inArray(SCHEMA.resource.id, this.resourceIds),
isNull(SCHEMA.resource.deletedAt),
),
);
}
Expand Down Expand Up @@ -216,6 +217,7 @@ export class WorkspaceResourceBuilder {
isResourceMatchingEnvironment,
isResourceMatchingDeployment,
inArray(SCHEMA.resource.id, resourceIds),
isNull(SCHEMA.resource.deletedAt),
),
);
}
Expand Down
12 changes: 8 additions & 4 deletions packages/events/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,23 @@ export type EvaluateReleaseTargetJob = {
export type ChannelMap = {
[Channel.NewDeployment]: schema.Deployment;
[Channel.NewDeploymentVersion]: schema.DeploymentVersion;
[Channel.NewEnvironment]: typeof schema.environment.$inferSelect;
[Channel.NewResource]: schema.Resource;
[Channel.NewPolicy]: schema.Policy;

[Channel.UpdateDeploymentVariable]: schema.DeploymentVariable;
[Channel.UpdateResourceVariable]: schema.ResourceVariable;
[Channel.NewEnvironment]: typeof schema.environment.$inferSelect;
[Channel.UpdateEnvironment]: schema.Environment & {
oldSelector: ResourceCondition | null;
};
[Channel.UpdateDeployment]: schema.Deployment & {
oldSelector: ResourceCondition | null;
};
[Channel.UpdatedResource]: schema.Resource;

[Channel.DeleteResource]: schema.Resource;

[Channel.EvaluateReleaseTarget]: EvaluateReleaseTargetJob;
[Channel.DispatchJob]: { jobId: string };
[Channel.ResourceScan]: { resourceProviderId: string };
[Channel.UpdatedResource]: schema.Resource;
[Channel.NewResource]: schema.Resource;
[Channel.NewPolicy]: schema.Policy;
};
10 changes: 7 additions & 3 deletions packages/job-dispatch/src/resource/handle-provider-scan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { selector, upsertResources } from "@ctrlplane/db";
import { Channel, getQueue } from "@ctrlplane/events";
import { logger } from "@ctrlplane/logger";

import { deleteResources } from "./delete.js";
import { groupResourcesByHook } from "./group-resources-by-hook.js";

const log = logger.child({ label: "upsert-resources" });
Expand Down Expand Up @@ -43,14 +42,19 @@ export const handleResourceProviderScan = async (

const insertJobs = insertedResources.map((r) => ({ name: r.id, data: r }));
const updateJobs = updatedResources.map((r) => ({ name: r.id, data: r }));
const deleted = await deleteResources(tx, toDelete);
await getQueue(Channel.DeleteResource).addBulk(
toDelete.map((r) => ({ name: r.id, data: r })),
);

await selector().compute().allResourceSelectors(workspaceId);
await getQueue(Channel.NewResource).addBulk(insertJobs);
await getQueue(Channel.UpdatedResource).addBulk(updateJobs);

log.info("completed handling resource provider scan");
return { all: [...insertedResources, ...updatedResources], deleted };
return {
all: [...insertedResources, ...updatedResources],
deleted: toDelete,
};
} catch (error) {
log.error("Error upserting resources", { error });
throw error;
Expand Down
Loading