Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions lib/commands/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,18 +420,25 @@ module.exports = function *(argv) {
let sourceCursor
let targetCursor

if (table === "files") {
console.log(`Synchronizing files (filtering out "automatic" docs)`)
sourceCursor = yield getFilesWithFilter(sr, sourceDB, table)
targetCursor = yield getFilesWithFilter(tr, targetDB, targetTable)
} else if (table === "messages" && allMessages === false) {
if (table === "messages" && allMessages === false) {
console.log(`Synchronizing last 3 months of Messages`)
sourceCursor = yield getMessages(sr, sourceDB, table, companyIds)
targetCursor = yield getMessages(tr, targetDB, table, companyIds)
} else if (companyIds) {
console.log(`Synchronizing by Companies: ${companyIds}`)
sourceCursor = yield getByCompany(sr, sourceDB, table, companyIds)
targetCursor = yield getByCompany(tr, targetDB, table, companyIds)
} else if (COPY_TABLE_MODE) {
if (table === "files") {
// Filter out unneeded docs
console.log(`Synchronizing files (filtering out "automatic" docs)`)
sourceCursor = yield getFilesWithFilter(sr, sourceDB, table)
targetCursor = yield getFilesWithFilter(tr, targetDB, targetTable)
}
// Copy `table` to `targetTable` on same server
console.log(`Copying ${table} to ${targetTable} on same server`)
sourceCursor = yield getInOrder(sr, sourceDB, table)
targetCursor = yield getInOrder(tr, targetDB, targetTable)
} else {
console.log(`Synchronizing everything`)
sourceCursor = yield getInOrder(sr, sourceDB, table)
Expand Down
9 changes: 4 additions & 5 deletions scripts/changefeeds/changefeed.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ const { setup } = require("./queue_manager");
const r = require("rethinkdbdash")({
db: "gather"
});
const RETHINKDB_TABLE = "files";
const RETHINKDB_TABLE = "messages";
const PUBLISH_QUEUE = "files_changes";

setup(PUBLISH_QUEUE)
.then(() => {
setup(PUBLISH_QUEUE).then(() => {
r.table(RETHINKDB_TABLE)
.changes({ includeTypes: true })
.then(changefeed => {
changefeed.eachAsync((change) => {
changefeed.eachAsync(change => {
publish(PUBLISH_QUEUE, change);
})
});
});
});
46 changes: 29 additions & 17 deletions scripts/changefeeds/consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,63 @@ const r = require("rethinkdbdash")({
const { getConn } = require("./queue_manager");

const QUEUE = "changes.files_changes";
const RETHINKDB_TABLE = "messages";

async function consumer(msg) {
const conn = await getConn();
const data = JSON.parse(msg.content);
logToDisk(JSON.stringify(data))

logToDisk(JSON.stringify(data));
if (data.type === "add" && data.new_val) {
try {
await r.table("files").insert(data.new_val).run();
await r
.table(RETHINKDB_TABLE)
.insert(data.new_val)
.run();
conn.ack(msg);
} catch(e) {
conn.nack(msg, undefined, false) // nack({ requeue: false })
} catch (e) {
conn.nack(msg, undefined, false); // nack({ requeue: false })
}
} else if (data.type === "change" && data.new_val) {
try {
await r.table("files").get(data.new_val.id).replace(data.new_val).run();
await r
.table(RETHINKDB_TABLE)
.get(data.new_val.id)
.replace(data.new_val)
.run();
conn.ack(msg);
} catch(e) {
conn.nack(msg, undefined, false) // nack({ requeue: false })
} catch (e) {
conn.nack(msg, undefined, false); // nack({ requeue: false })
}
} else if (data.type === "remove" && !data.new_val) {
try {
await r.table("files").get(data.old_val.id).delete().run();
await r
.table(RETHINKDB_TABLE)
.get(data.old_val.id)
.delete()
.run();
conn.ack(msg);
} catch(e) {
conn.nack(msg, undefined, false) // nack({ requeue: false })
} catch (e) {
conn.nack(msg, undefined, false); // nack({ requeue: false })
}
}
}

function logToDisk(str) {
try {
fs.appendFileSync("./consumer.log", str + "\n")
} catch(e) {
console.log(`Error write ID to disk:`)
console.log(str)
fs.appendFileSync("./consumer.log", str + "\n");
} catch (e) {
console.log(`Error write ID to disk:`);
console.log(str);
}
}

(async () => {
try {
const conn = await getConn();
await conn.consume(QUEUE, consumer)
await conn.consume(QUEUE, consumer);
} catch (e) {
console.log("Consumer fail, logging...");
console.error(e)
console.error(e);
}
})();
7 changes: 4 additions & 3 deletions scripts/changefeeds/publish.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ module.exports = {
try {
const conn = await getConn();
conn.publish("changes", queue, Buffer.from(JSON.stringify(msg)), {
contentType: "application/json"
contentType: "application/json",
persistent: true
});
} catch (e) {
console.log("Change not publish, logging...");
fs.appendFile("./error.log", JSON.stringify(msg) + "\n", (err) => {
fs.appendFile("./error.log", JSON.stringify(msg) + "\n", err => {
if (err) console.log(err);
console.error(e)
console.error(e);
});
}
}
Expand Down