This repository has been archived by the owner on Apr 10, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
secondary.js
62 lines (56 loc) · 1.66 KB
/
secondary.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import { User, getWatermark, setWatermark, batchSize } from "../lib/database";
import { Sequelize } from "sequelize";
const Op = Sequelize.Op;
// implements batch processing if you also know about an unchanging auto-ascending primary key
// by using it as a secondary sort
export default async function sync(processRow) {
// using node and sequelize
const saved = await getWatermark();
const watermark = saved ? saved.watermark : null;
const oldId = saved ? saved.id : null;
const sqlOptions = {
limit: batchSize,
order: [
["updatedAt", "ASC"],
["id", "ASC"],
],
};
if (watermark) {
sqlOptions.where = {
updatedAt: {
[Op.gte]: watermark, // WHERE updatedAt >= {watermark}
},
};
if (oldId) {
sqlOptions.where["id"] = {
[Op.gt]: oldId, // WHERE id > {oldId}
};
}
}
const rows = await User.findAll(sqlOptions);
let done = false;
let newId = null;
let newWatermark = watermark;
if (!rows || rows.length === 0) {
done = true;
} else {
for (const row of rows) {
await processRow(row);
}
done = rows.length < batchSize; // is there more to be done?
const lastRow = rows[rows.length - 1];
newWatermark = lastRow.updatedAt.getTime();
if (!done && watermark === newWatermark) {
// the last one was the same as the first, need to use secondary sort
newId = lastRow.id;
}
}
if (done && oldId) {
// it might just have come up short because of the id filtering
// advanced the watermark and try again
newWatermark = watermark + 1;
newId = null;
}
await setWatermark({ watermark: newWatermark, id: newId });
return done;
}