Skip to content
This repository has been archived by the owner on Apr 12, 2022. It is now read-only.

Commit

Permalink
Newest Export Lookup is Batched (#2943)
Browse files Browse the repository at this point in the history
* The magic of sql

* Evan code review

* Added test to account for more up to date record outside of the batch

* whoops

* fix CI?

* Confused why CI is failing

* SORT!

* Evan CR
  • Loading branch information
Krishna Glick committed Feb 8, 2022
1 parent c0aac66 commit 33c4585
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 16 deletions.
113 changes: 111 additions & 2 deletions core/__tests__/models/destination/plugins/exportRecords.ts
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,8 @@ describe("models/destination - with custom exportRecords plugin", () => {
errorLevel: null,
});

helper.changeTimestamps([exportOne], true, new Date());
helper.changeTimestamps(
await helper.changeTimestamps([exportOne], true, new Date());
await helper.changeTimestamps(
[exportTwo],
true,
new Date(new Date().valueOf() - 100000)
Expand Down Expand Up @@ -770,6 +770,115 @@ describe("models/destination - with custom exportRecords plugin", () => {
expect(exportTwo.state).toEqual("canceled");
});

test("If multiple exports exist for a single record/destination pair, the newest is sent and all older ones are canceled", async () => {
const record = await helper.factories.record();
const group = await helper.factories.group();
await GroupMember.create({ recordId: record.id, groupId: group.id });
await destination.updateTracking("group", group.id);

const exportOne = await Export.create({
retryCount: 0,
destinationId: destination.id,
recordId: record.id,
startedAt: null,
oldRecordProperties: "{}",
newRecordProperties: "{}",
oldGroups: "[]",
newGroups: "[]",
state: "pending",
sendAt: new Date(),
hasChanges: true,
toDelete: false,
force: true,
exportProcessorId: null,
completedAt: null,
errorMessage: null,
errorLevel: null,
});

const exportTwo = await Export.create({
retryCount: 0,
destinationId: destination.id,
recordId: record.id,
startedAt: null,
oldRecordProperties: "{}",
newRecordProperties: "{}",
oldGroups: "[]",
newGroups: "[]",
state: "pending",
sendAt: new Date(),
hasChanges: true,
toDelete: false,
force: true,
exportProcessorId: null,
completedAt: null,
errorMessage: null,
errorLevel: null,
});

await helper.changeTimestamps([exportOne], true, new Date());
await helper.changeTimestamps(
[exportTwo],
true,
new Date(new Date().valueOf() - 100000)
);

expect(exportOne.toDelete).toBe(false);
expect(exportOne.hasChanges).toBe(true);
expect(exportOne.force).toBe(true);
expect(exportTwo.toDelete).toBe(false);
expect(exportTwo.hasChanges).toBe(true);
expect(exportTwo.force).toBe(true);

await specHelper.runTask("export:enqueue", {});

const exportThree = await Export.create({
retryCount: 0,
destinationId: destination.id,
recordId: record.id,
startedAt: null,
oldRecordProperties: "{}",
newRecordProperties: "{}",
oldGroups: "[]",
newGroups: "[]",
state: "pending",
sendAt: new Date(),
hasChanges: true,
toDelete: false,
force: true,
exportProcessorId: null,
completedAt: null,
errorMessage: null,
errorLevel: null,
});
await helper.changeTimestamps(
[exportThree],
true,
new Date(new Date().valueOf() + 100000)
);

const foundTasks = await specHelper.findEnqueuedTasks("export:sendBatch");
expect(foundTasks.length).toBe(1);
expect(foundTasks[0].args[0].exportIds.sort()).toEqual(
[exportOne.id, exportTwo.id].sort()
);

await specHelper.runTask("export:sendBatch", foundTasks[0].args[0]);

expect(exportArgs.exports.length).toBe(0); // plugin#exportRecord was not called
await exportOne.reload();
await exportTwo.reload();
await exportThree.reload();

expect(exportOne.createdAt.valueOf()).toBeGreaterThan(
exportTwo.createdAt.valueOf()
);
expect(exportOne.state).toEqual("canceled");
expect(exportTwo.state).toEqual("canceled");
expect(exportThree.state).toEqual("pending");
await exportThree.update({ state: "complete" }); // Cleanup
});

test("if there is no previous export, it will be sent to the destination and all data will be new", async () => {
const record = await helper.factories.record();
await record.addOrUpdateProperties({
Expand Down
48 changes: 34 additions & 14 deletions core/src/modules/ops/destination.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import { MappingHelper } from "../mappingHelper";
import { RecordPropertyOps } from "./recordProperty";
import { Option } from "../../models/Option";
import { getLock } from "../locks";
import { RecordPropertyType } from "./record";

function deepStrictEqualBoolean(a: any, b: any): boolean {
try {
Expand Down Expand Up @@ -903,15 +902,16 @@ export namespace DestinationOps {
return { success, error, retryexportIds, retryDelay };
}

async function cancelOldExport(oldExport: Export, newExport: Export) {
async function cancelOldExport(oldExport: Export) {
await oldExport.update({
state: "canceled",
sendAt: null,
errorMessage: `Replaced by more recent export ${newExport.id}`,
errorMessage: `Replaced by more recent export`,
errorLevel: null,
completedAt: new Date(),
});
}

export async function sendExports(
destination: Destination,
givenExports: Export[],
Expand All @@ -926,23 +926,43 @@ export namespace DestinationOps {
const locks: Awaited<ReturnType<typeof getLock>>[] = [];

try {
const mostRecentExportIds = await Export.sequelize
.query(
{
query: `
SELECT
"id"
FROM (
SELECT
"id",
ROW_NUMBER() OVER (PARTITION BY "exports"."recordId" ORDER BY "exports"."createdAt" DESC) AS __rownum
FROM
"exports"
WHERE
"exports"."destinationId" = ?
AND "exports"."recordId" IN (?)) AS __ranked
WHERE
"__ranked"."__rownum" = 1;`,
values: [
destination.id,
givenExports.map(({ recordId }) => recordId),
],
},
{
type: "SELECT",
model: Export,
}
)
.then((exports) => exports.map((e) => e.id));

for (const givenExport of givenExports) {
if (!givenExport.hasChanges) {
await givenExport.complete();
continue;
}

const mostRecentExport = await Export.findOne({
where: {
recordId: givenExport.recordId,
destinationId: destination.id,
},
order: [["createdAt", "DESC"]],
});

const isNewest = mostRecentExport.id === givenExport.id;
if (!isNewest) {
await cancelOldExport(givenExport, mostRecentExport);
if (!mostRecentExportIds.includes(givenExport.id)) {
await cancelOldExport(givenExport);
continue;
}

Expand Down

0 comments on commit 33c4585

Please sign in to comment.