diff --git a/lib/commands/sync.js b/lib/commands/sync.js index 1c17219..ea6b839 100644 --- a/lib/commands/sync.js +++ b/lib/commands/sync.js @@ -420,11 +420,7 @@ module.exports = function *(argv) { let sourceCursor let targetCursor - if (table === "files") { - console.log(`Synchronizing files (filtering out "automatic" docs)`) - sourceCursor = yield getFilesWithFilter(sr, sourceDB, table) - targetCursor = yield getFilesWithFilter(tr, targetDB, targetTable) - } else if (table === "messages" && allMessages === false) { + if (table === "messages" && allMessages === false) { console.log(`Synchronizing last 3 months of Messages`) sourceCursor = yield getMessages(sr, sourceDB, table, companyIds) targetCursor = yield getMessages(tr, targetDB, table, companyIds) @@ -432,6 +428,17 @@ module.exports = function *(argv) { console.log(`Synchronizing by Companies: ${companyIds}`) sourceCursor = yield getByCompany(sr, sourceDB, table, companyIds) targetCursor = yield getByCompany(tr, targetDB, table, companyIds) + } else if (COPY_TABLE_MODE) { + if (table === "files") { + // Filter out unneeded docs + console.log(`Synchronizing files (filtering out "automatic" docs)`) + sourceCursor = yield getFilesWithFilter(sr, sourceDB, table) + targetCursor = yield getFilesWithFilter(tr, targetDB, targetTable) + } + // Copy `table` to `targetTable` on same server + console.log(`Copying ${table} to ${targetTable} on same server`) + sourceCursor = yield getInOrder(sr, sourceDB, table) + targetCursor = yield getInOrder(tr, targetDB, targetTable) } else { console.log(`Synchronizing everything`) sourceCursor = yield getInOrder(sr, sourceDB, table) diff --git a/scripts/changefeeds/changefeed.js b/scripts/changefeeds/changefeed.js index 58f0776..cba7a68 100644 --- a/scripts/changefeeds/changefeed.js +++ b/scripts/changefeeds/changefeed.js @@ -3,16 +3,15 @@ const { setup } = require("./queue_manager"); const r = require("rethinkdbdash")({ db: "gather" }); -const RETHINKDB_TABLE = "files"; +const RETHINKDB_TABLE = "messages"; const PUBLISH_QUEUE = "files_changes"; -setup(PUBLISH_QUEUE) -.then(() => { +setup(PUBLISH_QUEUE).then(() => { r.table(RETHINKDB_TABLE) .changes({ includeTypes: true }) .then(changefeed => { - changefeed.eachAsync((change) => { + changefeed.eachAsync(change => { publish(PUBLISH_QUEUE, change); - }) + }); }); }); diff --git a/scripts/changefeeds/consume.js b/scripts/changefeeds/consume.js index 57af874..c2b9e49 100644 --- a/scripts/changefeeds/consume.js +++ b/scripts/changefeeds/consume.js @@ -5,51 +5,63 @@ const r = require("rethinkdbdash")({ const { getConn } = require("./queue_manager"); const QUEUE = "changes.files_changes"; +const RETHINKDB_TABLE = "messages"; async function consumer(msg) { const conn = await getConn(); const data = JSON.parse(msg.content); - - logToDisk(JSON.stringify(data)) + + logToDisk(JSON.stringify(data)); if (data.type === "add" && data.new_val) { try { - await r.table("files").insert(data.new_val).run(); + await r + .table(RETHINKDB_TABLE) + .insert(data.new_val) + .run(); conn.ack(msg); - } catch(e) { - conn.nack(msg, undefined, false) // nack({ requeue: false }) + } catch (e) { + conn.nack(msg, undefined, false); // nack({ requeue: false }) } } else if (data.type === "change" && data.new_val) { try { - await r.table("files").get(data.new_val.id).replace(data.new_val).run(); + await r + .table(RETHINKDB_TABLE) + .get(data.new_val.id) + .replace(data.new_val) + .run(); conn.ack(msg); - } catch(e) { - conn.nack(msg, undefined, false) // nack({ requeue: false }) + } catch (e) { + conn.nack(msg, undefined, false); // nack({ requeue: false }) } } else if (data.type === "remove" && !data.new_val) { try { - await r.table("files").get(data.old_val.id).delete().run(); + await r + .table(RETHINKDB_TABLE) + .get(data.old_val.id) + .delete() + .run(); conn.ack(msg); - } catch(e) { - conn.nack(msg, undefined, false) // nack({ requeue: false }) + } catch (e) { + conn.nack(msg, undefined, false); // nack({ requeue: false }) } } } function logToDisk(str) { try { - fs.appendFileSync("./consumer.log", str + "\n") - } catch(e) { - console.log(`Error write ID to disk:`) - console.log(str) + fs.appendFileSync("./consumer.log", str + "\n"); + } catch (e) { + console.log(`Error write ID to disk:`); + console.log(str); } } (async () => { try { const conn = await getConn(); - await conn.consume(QUEUE, consumer) + await conn.consume(QUEUE, consumer); } catch (e) { console.log("Consumer fail, logging..."); - console.error(e) + console.error(e); } })(); diff --git a/scripts/changefeeds/publish.js b/scripts/changefeeds/publish.js index b6b63e3..909888b 100644 --- a/scripts/changefeeds/publish.js +++ b/scripts/changefeeds/publish.js @@ -6,13 +6,14 @@ module.exports = { try { const conn = await getConn(); conn.publish("changes", queue, Buffer.from(JSON.stringify(msg)), { - contentType: "application/json" + contentType: "application/json", + persistent: true }); } catch (e) { console.log("Change not publish, logging..."); - fs.appendFile("./error.log", JSON.stringify(msg) + "\n", (err) => { + fs.appendFile("./error.log", JSON.stringify(msg) + "\n", err => { if (err) console.log(err); - console.error(e) + console.error(e); }); } }