Skip to content

Commit

Permalink
fix: make sure chagnes stream destroy (#982)
Browse files Browse the repository at this point in the history
closes #981
  • Loading branch information
fengmk2 committed Jul 1, 2016
1 parent abcb81d commit 6d921a9
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions sync/changes_stream_syncer.js
Expand Up @@ -10,22 +10,31 @@ const config = require('../config');

const db = 'https://replicate.npmjs.com';
const lastSeqFile = path.join(config.dataDir, '.cnpmjs.org.last_seq.txt');
let _STREAM_ID = 0;

module.exports = function* sync() {
const since = yield getLastSequence();
logger.syncInfo('start changes stream, since: %s', since);
const streamId = _STREAM_ID++;
logger.syncInfo('start changes stream#%d, since: %s', streamId, since);
const changes = new ChangesStream({
db,
since,
include_docs: false,
});
changes.await = streamAwait;
changes.on('data', change => {
logger.syncInfo('Get change: %j', change);
logger.syncInfo('stream#%d get change: %j', streamId, change);
syncPackage(change);
});

yield changes.await('error');
try {
yield changes.await('error');
} catch (err) {
// make sure changes steam is destroy
changes.destroy();
err.message += `, stream#${streamId}`;
throw err;
}
};

function syncPackage(change) {
Expand Down Expand Up @@ -56,7 +65,7 @@ function* getLastSequence() {
lastSeq = Number(lastSeq);
}
if (!lastSeq) {
lastSeq = 2614765;
lastSeq = 2649694;
}
// const r = yield urllib.request(db, {
// dataType: 'json',
Expand Down

0 comments on commit 6d921a9

Please sign in to comment.