diff --git a/core/api/src/modules/ops/export.ts b/core/api/src/modules/ops/export.ts index ef982dc717..c27d935005 100644 --- a/core/api/src/modules/ops/export.ts +++ b/core/api/src/modules/ops/export.ts @@ -3,11 +3,10 @@ import { ExportProfilePropertiesWithType, } from "../../models/Export"; import { ProfilePropertyOps } from "../../modules/ops/profileProperty"; -import { plugin } from "../../modules/plugin"; import { Export } from "../../models/Export"; import { Destination } from "../../models/Destination"; import { api, task } from "actionhero"; -import { Op } from "sequelize"; +import { Op, Transaction } from "sequelize"; export namespace ExportOps { /** Count up the exports in each state, optionally filtered for a profile or destination */ @@ -91,40 +90,51 @@ export namespace ExportOps { export async function processPendingExportsForDestination( destination: Destination, - limit?: number + limit = 100 ) { - if (!limit) { - limit = parseInt( - (await plugin.readSetting("core", "exports-profile-batch-size")).value - ); - } const app = await destination.$get("app"); const { pluginConnection } = await destination.getPlugin(); - // We use Export#startedAt to denote that this export needs to be worked. We can update and claim them in one go. - // This requires a custom query. - const query = ` -UPDATE "exports" -SET "startedAt" = NOW() -WHERE guid IN ( - SELECT guid FROM "exports" - WHERE "startedAt" IS NULL - AND "destinationGuid" = '${destination.guid}' - ORDER BY "createdAt" DESC - LIMIT ${limit} -) -AND "startedAt" IS NULL -AND "destinationGuid" = '${destination.guid}' -RETURNING * -; - `; - - const _exports: Export[] = await api.sequelize.query(query, { - model: Export, - mapToModel: true, + let _exports: Export[]; + + const transaction: Transaction = await api.sequelize.transaction({ + lock: Transaction.LOCK.UPDATE, + type: Transaction.TYPES.EXCLUSIVE, }); - if (_exports.length > 0) { + try { + _exports = await Export.findAll({ + where: { + startedAt: null, + destinationGuid: destination.guid, + }, + order: [["createdAt", "asc"]], + limit, + transaction, + }); + + const updateResponse = await Export.update( + { startedAt: new Date() }, + { + where: { + guid: { [Op.in]: _exports.map((e) => e.guid) }, + startedAt: null, + }, + transaction, + } + ); + + transaction.commit(); + + _exports = updateResponse[1]; + } catch (error) { + await transaction.rollback(); + throw error; + } + + if (!_exports) return _exports?.length || 0; + + if (_exports.length > 0) if (pluginConnection.methods.exportProfiles) { // the plugin has a batch exportProfiles method await task.enqueue( @@ -150,7 +160,6 @@ RETURNING * ) ); } - } return _exports.length; } diff --git a/core/api/src/modules/ops/profile.ts b/core/api/src/modules/ops/profile.ts index 047c8aefeb..99500d65f5 100644 --- a/core/api/src/modules/ops/profile.ts +++ b/core/api/src/modules/ops/profile.ts @@ -5,7 +5,7 @@ import { Source } from "../../models/Source"; import { Group } from "../../models/Group"; import { Destination } from "../../models/Destination"; import { Event } from "../../models/Event"; -import { log } from "actionhero"; +import { log, api, task } from "actionhero"; import { Op, Transaction } from "sequelize"; import { waitForLock } from "../locks"; import { ProfilePropertyOps } from "./profileProperty"; @@ -552,6 +552,71 @@ export namespace ProfileOps { } } + /** + * Find profiles that are not ready but whose properties are and make them ready. + * Task `profile:completeImport` will be enqueued for each Profile. + */ + export async function makeReady(limit = 100) { + let profiles: Profile[]; + + const transaction: Transaction = await api.sequelize.transaction({ + lock: Transaction.LOCK.UPDATE, + type: Transaction.TYPES.EXCLUSIVE, + }); + + try { + const notInQuery = api.sequelize.dialect.QueryGenerator.selectQuery( + "profileProperties", + { + attributes: [ + api.sequelize.fn("DISTINCT", api.sequelize.col("profileGuid")), + ], + where: { state: "pending" }, + } + ).slice(0, -1); + + profiles = await Profile.findAll({ + where: { + state: "pending", + guid: { [Op.notIn]: api.sequelize.literal(`(${notInQuery})`) }, + }, + limit, + order: [["guid", "asc"]], + transaction, + }); + + const updateResponse = await Profile.update( + { state: "ready" }, + { + where: { + guid: { [Op.in]: profiles.map((p) => p.guid) }, + state: "pending", + }, + transaction, + } + ); + + await transaction.commit(); + + profiles = updateResponse[1]; + } catch (error) { + await transaction.rollback(); + throw error; + } + + if (!profiles) return []; + + await Promise.all( + profiles.map((profile) => + task.enqueue("profile:completeImport", { + profileGuid: profile.guid, + }) + ) + ); + + return profiles; + } + function arraysAreEqual(a: Array, b: Array) { return ( a.length === b.length && a.every((value, index) => value === b[index]) diff --git a/core/api/src/tasks/export/enqueue.ts b/core/api/src/tasks/export/enqueue.ts index bfae9fb9f4..cd0673c23d 100644 --- a/core/api/src/tasks/export/enqueue.ts +++ b/core/api/src/tasks/export/enqueue.ts @@ -2,6 +2,7 @@ import { Destination } from "../../models/Destination"; import { log } from "actionhero"; import { ExportOps } from "../../modules/ops/export"; import { RetryableTask } from "../../classes/retryableTask"; +import { plugin } from "../../modules/plugin"; export class EnqueueExports extends RetryableTask { constructor() { @@ -15,6 +16,10 @@ export class EnqueueExports extends RetryableTask { } async run() { + const limit = parseInt( + (await plugin.readSetting("core", "exports-profile-batch-size")).value + ); + const destinations = await Destination.findAll({ where: { state: "ready" }, }); @@ -22,18 +27,16 @@ export class EnqueueExports extends RetryableTask { let totalEnqueued = 0; for (const i in destinations) { - let enqueuedExportsCount = await ExportOps.processPendingExportsForDestination( - destinations[i] - ); - totalEnqueued += enqueuedExportsCount; + let enqueuedExportsCount = 1; while (enqueuedExportsCount > 0) { + enqueuedExportsCount = await ExportOps.processPendingExportsForDestination( + destinations[i], + limit + ); log( `enqueued ${enqueuedExportsCount} exports to send to ${destinations[i].name} (${destinations[i].guid})` ); - enqueuedExportsCount = await ExportOps.processPendingExportsForDestination( - destinations[i] - ); totalEnqueued += enqueuedExportsCount; } } diff --git a/core/api/src/tasks/profile/checkReady.ts b/core/api/src/tasks/profile/checkReady.ts index 7734368b32..7cb3ef7954 100644 --- a/core/api/src/tasks/profile/checkReady.ts +++ b/core/api/src/tasks/profile/checkReady.ts @@ -1,5 +1,5 @@ -import { api, Task, task } from "actionhero"; -import { Profile } from "../../models/Profile"; +import { Task, task } from "actionhero"; +import { ProfileOps } from "../../modules/ops/profile"; import { plugin } from "../../modules/plugin"; export class ProfilesCheckReady extends Task { @@ -18,35 +18,6 @@ export class ProfilesCheckReady extends Task { (await plugin.readSetting("core", "runs-profile-batch-size")).value ); - const query = ` -UPDATE "profiles" -SET "state" = 'ready' -WHERE guid IN ( - SELECT "profiles"."guid" - FROM "profiles" - WHERE - "profiles"."state" = 'pending' - AND "profiles"."guid" NOT IN ( - SELECT DISTINCT("profileGuid") - FROM "profileProperties" - WHERE "state" = 'pending' - ) - ORDER BY "profiles"."guid" ASC - LIMIT ${limit} -) -RETURNING * -; - `; - - const profiles: Profile[] = await api.sequelize.query(query, { - model: Profile, - mapToModel: true, - }); - - for (const i in profiles) { - await task.enqueue("profile:completeImport", { - profileGuid: profiles[i].guid, - }); - } + await ProfileOps.makeReady(limit); } }