Skip to content
This repository has been archived by the owner on Apr 12, 2022. It is now read-only.

Commit

Permalink
move from custom queries to 2-phase updates
Browse files Browse the repository at this point in the history
  • Loading branch information
evantahler committed Nov 9, 2020
1 parent 30ed4ef commit ce91f36
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 71 deletions.
71 changes: 40 additions & 31 deletions core/api/src/modules/ops/export.ts
Expand Up @@ -3,11 +3,10 @@ import {
ExportProfilePropertiesWithType,
} from "../../models/Export";
import { ProfilePropertyOps } from "../../modules/ops/profileProperty";
import { plugin } from "../../modules/plugin";
import { Export } from "../../models/Export";
import { Destination } from "../../models/Destination";
import { api, task } from "actionhero";
import { Op } from "sequelize";
import { Op, Transaction } from "sequelize";

export namespace ExportOps {
/** Count up the exports in each state, optionally filtered for a profile or destination */
Expand Down Expand Up @@ -91,40 +90,51 @@ export namespace ExportOps {

export async function processPendingExportsForDestination(
destination: Destination,
limit?: number
limit = 100
) {
if (!limit) {
limit = parseInt(
(await plugin.readSetting("core", "exports-profile-batch-size")).value
);
}
const app = await destination.$get("app");
const { pluginConnection } = await destination.getPlugin();

// We use Export#startedAt to denote that this export needs to be worked. We can update and claim them in one go.
// This requires a custom query.
const query = `
UPDATE "exports"
SET "startedAt" = NOW()
WHERE guid IN (
SELECT guid FROM "exports"
WHERE "startedAt" IS NULL
AND "destinationGuid" = '${destination.guid}'
ORDER BY "createdAt" DESC
LIMIT ${limit}
)
AND "startedAt" IS NULL
AND "destinationGuid" = '${destination.guid}'
RETURNING *
;
`;

const _exports: Export[] = await api.sequelize.query(query, {
model: Export,
mapToModel: true,
let _exports: Export[];

const transaction: Transaction = await api.sequelize.transaction({
lock: Transaction.LOCK.UPDATE,
type: Transaction.TYPES.EXCLUSIVE,
});

if (_exports.length > 0) {
try {
_exports = await Export.findAll({
where: {
startedAt: null,
destinationGuid: destination.guid,
},
order: [["createdAt", "asc"]],
limit,
transaction,
});

const updateResponse = await Export.update(
{ startedAt: new Date() },
{
where: {
guid: { [Op.in]: _exports.map((e) => e.guid) },
startedAt: null,
},
transaction,
}
);

transaction.commit();

_exports = updateResponse[1];
} catch (error) {
await transaction.rollback();
throw error;
}

if (!_exports) return _exports?.length || 0;

if (_exports.length > 0)
if (pluginConnection.methods.exportProfiles) {
// the plugin has a batch exportProfiles method
await task.enqueue(
Expand All @@ -150,7 +160,6 @@ RETURNING *
)
);
}
}

return _exports.length;
}
Expand Down
67 changes: 66 additions & 1 deletion core/api/src/modules/ops/profile.ts
Expand Up @@ -5,7 +5,7 @@ import { Source } from "../../models/Source";
import { Group } from "../../models/Group";
import { Destination } from "../../models/Destination";
import { Event } from "../../models/Event";
import { log } from "actionhero";
import { log, api, task } from "actionhero";
import { Op, Transaction } from "sequelize";
import { waitForLock } from "../locks";
import { ProfilePropertyOps } from "./profileProperty";
Expand Down Expand Up @@ -552,6 +552,71 @@ export namespace ProfileOps {
}
}

/**
* Find profiles that are not ready but whose properties are and make them ready.
* Task `profile:completeImport` will be enqueued for each Profile.
*/
export async function makeReady(limit = 100) {
let profiles: Profile[];

const transaction: Transaction = await api.sequelize.transaction({
lock: Transaction.LOCK.UPDATE,
type: Transaction.TYPES.EXCLUSIVE,
});

try {
const notInQuery = api.sequelize.dialect.QueryGenerator.selectQuery(
"profileProperties",
{
attributes: [
api.sequelize.fn("DISTINCT", api.sequelize.col("profileGuid")),
],
where: { state: "pending" },
}
).slice(0, -1);

profiles = await Profile.findAll({
where: {
state: "pending",
guid: { [Op.notIn]: api.sequelize.literal(`(${notInQuery})`) },
},
limit,
order: [["guid", "asc"]],
transaction,
});

const updateResponse = await Profile.update(
{ state: "ready" },
{
where: {
guid: { [Op.in]: profiles.map((p) => p.guid) },
state: "pending",
},
transaction,
}
);

await transaction.commit();

profiles = updateResponse[1];
} catch (error) {
await transaction.rollback();
throw error;
}

if (!profiles) return [];

await Promise.all(
profiles.map((profile) =>
task.enqueue("profile:completeImport", {
profileGuid: profile.guid,
})
)
);

return profiles;
}

function arraysAreEqual(a: Array<any>, b: Array<any>) {
return (
a.length === b.length && a.every((value, index) => value === b[index])
Expand Down
17 changes: 10 additions & 7 deletions core/api/src/tasks/export/enqueue.ts
Expand Up @@ -2,6 +2,7 @@ import { Destination } from "../../models/Destination";
import { log } from "actionhero";
import { ExportOps } from "../../modules/ops/export";
import { RetryableTask } from "../../classes/retryableTask";
import { plugin } from "../../modules/plugin";

export class EnqueueExports extends RetryableTask {
constructor() {
Expand All @@ -15,25 +16,27 @@ export class EnqueueExports extends RetryableTask {
}

async run() {
const limit = parseInt(
(await plugin.readSetting("core", "exports-profile-batch-size")).value
);

const destinations = await Destination.findAll({
where: { state: "ready" },
});

let totalEnqueued = 0;

for (const i in destinations) {
let enqueuedExportsCount = await ExportOps.processPendingExportsForDestination(
destinations[i]
);
totalEnqueued += enqueuedExportsCount;
let enqueuedExportsCount = 1;

while (enqueuedExportsCount > 0) {
enqueuedExportsCount = await ExportOps.processPendingExportsForDestination(
destinations[i],
limit
);
log(
`enqueued ${enqueuedExportsCount} exports to send to ${destinations[i].name} (${destinations[i].guid})`
);
enqueuedExportsCount = await ExportOps.processPendingExportsForDestination(
destinations[i]
);
totalEnqueued += enqueuedExportsCount;
}
}
Expand Down
35 changes: 3 additions & 32 deletions core/api/src/tasks/profile/checkReady.ts
@@ -1,5 +1,5 @@
import { api, Task, task } from "actionhero";
import { Profile } from "../../models/Profile";
import { Task, task } from "actionhero";
import { ProfileOps } from "../../modules/ops/profile";
import { plugin } from "../../modules/plugin";

export class ProfilesCheckReady extends Task {
Expand All @@ -18,35 +18,6 @@ export class ProfilesCheckReady extends Task {
(await plugin.readSetting("core", "runs-profile-batch-size")).value
);

const query = `
UPDATE "profiles"
SET "state" = 'ready'
WHERE guid IN (
SELECT "profiles"."guid"
FROM "profiles"
WHERE
"profiles"."state" = 'pending'
AND "profiles"."guid" NOT IN (
SELECT DISTINCT("profileGuid")
FROM "profileProperties"
WHERE "state" = 'pending'
)
ORDER BY "profiles"."guid" ASC
LIMIT ${limit}
)
RETURNING *
;
`;

const profiles: Profile[] = await api.sequelize.query(query, {
model: Profile,
mapToModel: true,
});

for (const i in profiles) {
await task.enqueue("profile:completeImport", {
profileGuid: profiles[i].guid,
});
}
await ProfileOps.makeReady(limit);
}
}

0 comments on commit ce91f36

Please sign in to comment.