Skip to content

Commit

Permalink
fix(deletes): handle npm deletions
Browse files Browse the repository at this point in the history
We had to change the way replicate is done, it was actually completely
broken since we "fixed" the memory leak.

The reasoning is to always use db.changes as an event emitter to allow:
- getting docs from couchdb
- avoiding memory leaks (when complete event is triggered, docs are
already gone from memory)

Co-authored-by: Haroen Viaene <haroen.viaene@algolia.com>
  • Loading branch information
vvo and Haroenv committed Feb 12, 2018
1 parent f620235 commit 1ad5025
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 22 deletions.
5 changes: 5 additions & 0 deletions README.md
Expand Up @@ -236,6 +236,11 @@ When the process starts with `seq=0`:
* replicate registry changes since the current sequence
* watch for registry changes continuously and replicate them

Replicate and watch are separated because:
1. In replicate we want to replicate a batch of documents in a fast way
2. In watch we want new changes as fast as possible, one by one. If watch was
asking for batches of 100, new packages would be added too late to the index

## Tests

```sh
Expand Down
3 changes: 3 additions & 0 deletions package.json
Expand Up @@ -75,5 +75,8 @@
},
"engines": {
"node": "9.4.0"
},
"jest": {
"testPathIgnorePatterns": ["node_modules", "lib"]
}
}
72 changes: 50 additions & 22 deletions src/index.js
Expand Up @@ -6,6 +6,7 @@ import PouchDB from 'pouchdb-http';
import * as npm from './npm.js';
import log from './log.js';
import ms from 'ms';
import cargo from 'async/cargo';
import queue from 'async/queue';

log.info('🗿 npm ↔️ Algolia replication starts ⛷ 🐌 🛰');
Expand All @@ -15,7 +16,6 @@ const defaultOptions = {
include_docs: true, // eslint-disable-line camelcase
conflicts: false,
attachments: false,
return_docs: false, // eslint-disable-line camelcase
};

let loopStart = Date.now();
Expand Down Expand Up @@ -105,6 +105,7 @@ async function bootstrap(state) {
.allDocs({
...defaultOptions,
...options,
return_docs: false, // eslint-disable-line camelcase
limit: c.bootstrapConcurrency,
})
.then(res => {
Expand Down Expand Up @@ -146,38 +147,57 @@ async function moveToProduction() {
await client.deleteIndex(c.bootstrapIndexName);
}

function replicate({ seq }) {
async function replicate({ seq }) {
log.info(
'🐌 Replicate: Asking for %d changes since sequence %d',
c.replicateConcurrency,
seq
);

return db
.changes({
const { seq: npmSeqToReach } = await npm.info();

return new Promise((resolve, reject) => {
const changes = db.changes({
...defaultOptions,
since: seq,
limit: c.replicateConcurrency,
})
.then(res =>
saveDocs({ docs: res.results, index: mainIndex })
batch_size: c.replicateConcurrency, // eslint-disable-line camelcase
live: true,
return_docs: true, // eslint-disable-line camelcase
});

const q = cargo((docs, done) => {
saveDocs({ docs, index: mainIndex })
.then(() => infoChange(docs[docs.length - 1].seq, 1, '🐌'))
.then(() =>
stateManager.save({
seq: res.last_seq,
seq: docs[docs.length - 1].seq,
})
)
.then(() => infoChange(res.last_seq, res.results.length, '🐌'))
.then(() => {
if (res.results.length < c.replicateConcurrency) {
log.info('🐌 Replicate: done');
return true;
.then(({ seq: lastDocSeq }) => {
if (lastDocSeq >= npmSeqToReach) {
log.info('🐌 We reached the npm current sequence');
changes.cancel();
}

return replicate({
seq: res.last_seq,
});
})
);
.then(done)
.catch(done);
}, c.replicateConcurrency);

changes.on('change', async change => {
if (change.deleted === true) {
await mainIndex.deleteObject(change.id);
log.info(`🐌 Deleted ${change.id}`);
}

q.push(change, err => {
if (err) {
reject(err);
}
});
});
changes.on('complete', resolve);
changes.on('error', reject);
});
}

function watch({ seq }) {
Expand All @@ -191,7 +211,7 @@ function watch({ seq }) {
since: seq,
live: true,
limit: undefined,
return_docs: false, // eslint-disable-line camelcase
return_docs: true, // eslint-disable-line camelcase
});

const q = queue((change, done) => {
Expand Down Expand Up @@ -226,8 +246,16 @@ function watch({ seq }) {
.catch(done);
}, 1);

changes.on('change', change => {
q.push(change);
changes.on('change', async change => {
if (change.deleted === true) {
await mainIndex.deleteObject(change.id);
log.info(`🛰 Deleted ${change.id}`);
}
q.push(change, err => {
if (err) {
reject(err);
}
});
});
changes.on('error', reject);
});
Expand Down

0 comments on commit 1ad5025

Please sign in to comment.