Skip to content
This repository has been archived by the owner on Jun 2, 2024. It is now read-only.

refactor: Sync exists package from cnpmcore changes stream #1707

Merged
merged 3 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 41 additions & 1 deletion services/npm.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ exports.getAllToday = function* (timeout) {
};

exports.getShort = function* (timeout) {
const registry = config.sourceNpmRegistryIsCNpm ? config.sourceNpmRegistry : 'https://r.cnpmjs.org';
const registry = config.sourceNpmRegistryIsCNpm ? config.sourceNpmRegistry : 'https://registry.npmmirror.com';
var r = yield request('/-/short', {
timeout: timeout || 300000,
// registry.npmjs.org/-/short is 404 now therefore have a fallback
Expand Down Expand Up @@ -198,3 +198,43 @@ exports.getPopular = function* (top, timeout) {
return r[0];
});
};

exports.getChangesUpdateSeq = function* () {
const registry = config.sourceNpmRegistryIsCNpm ? config.sourceNpmRegistry : 'https://registry.npmmirror.com';
const r = yield request('/', {
timeout: 30000,
registry: registry,
});
const data = r.data || {};
if (r.status !== 200) {
if (data.code && data.message) {
const url = registry + '/';
const err = new Error(data.message + ', url: ' + url);
err.name = data.code;
err.url = url;
throw err;
}
}
return data.update_seq || 0;
};

exports.listChanges = function* (updateSeq) {
const registry = config.sourceNpmRegistryIsCNpm ? config.sourceNpmRegistry : 'https://registry.npmmirror.com';
const changesUrl = `/_changes?since=${updateSeq}`;
const r = yield request(changesUrl, {
timeout: 30000,
registry: registry,
});
const data = r.data || {};
if (r.status !== 200) {
if (data.code && data.message) {
const url = registry + changesUrl;
const err = new Error(data.message + ', url: ' + url);
err.name = data.code;
err.url = url;
throw err;
}
}
// {"results":[{"seq":1988653,"type":"PACKAGE_VERSION_ADDED","id":"dsr-package-mercy-magot-thorp-sward","changes":[{"version":"1.0.1"}]},
return data.results || [];
};
51 changes: 27 additions & 24 deletions sync/sync_exist.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,38 @@ module.exports = function* sync() {
throw new Error('can not found total info');
}

var allPackages;
if (!info.last_exist_sync_time) {
var pkgs = yield npmService.getShort();
debug('First time sync all packages from official registry, got %d packages', pkgs.length);
if (info.last_sync_module) {
// start from last success
var lastIndex = pkgs.indexOf(info.last_sync_module);
if (lastIndex > 0) {
pkgs = pkgs.slice(lastIndex);
debug('recover from %d: %s', lastIndex, info.last_sync_module);
}
}
allPackages = pkgs;
} else {
debug('sync new module from last exist sync time: %s', info.last_exist_sync_time);
var result = yield npmService.fetchUpdatesSince(info.last_exist_sync_time);
allPackages = result.names;
syncTime = result.lastModified;
var lastSeq = info.last_exist_sync_time;
if (lastSeq && lastSeq > 132897820073) {
// ignore exists timestamp
lastSeq = null;
}

var packages = intersection(existPackages, allPackages);
if (!packages.length) {
if (!lastSeq) {
lastSeq = yield npmService.getChangesUpdateSeq();
}
if (!lastSeq) {
debug('no packages need be sync');
return {
successes: [],
fails: []
};
}

var updatesPackages = [];
var changes = yield npmService.listChanges(lastSeq);
changes.forEach(change => {
updatesPackages.push(change.id);
lastSeq = change.seq;
});
var packages = intersection(existPackages, updatesPackages);
debug('Total %d packages to sync, top 10: %j', packages.length, packages.slice(0, 10));
if (!packages.length) {
yield totalService.setLastExistSyncTime(lastSeq);
debug('no packages need be sync, lastSeq: %s, changes: %s', lastSeq, changes.length);
return {
successes: [],
fails: []
};
}

var worker = new SyncModuleWorker({
username: 'admin',
Expand All @@ -75,10 +78,10 @@ module.exports = function* sync() {
var end = thunkify.event(worker);
yield end();

debug('All packages sync done, successes %d, fails %d',
worker.successes.length, worker.fails.length);
debug('All packages sync done, successes %d, fails %d, lastSeq: %s, changes: %s',
worker.successes.length, worker.fails.length, lastSeq, changes.length);

yield totalService.setLastExistSyncTime(syncTime);
yield totalService.setLastExistSyncTime(lastSeq);
return {
successes: worker.successes,
fails: worker.fails
Expand Down
40 changes: 17 additions & 23 deletions test/sync/sync_exist.test.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,5 @@
/*!
* cnpmjs.org - test/sync/sync_exist.test.js
*
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* dead_horse <dead_horse@qq.com> (http://deadhorse.me)
*/

'use strict';

/**
* Module dependencies.
*/

var mm = require('mm');
var config = require('../../config');
var sync = require('../../sync/sync_exist');
Expand All @@ -34,37 +20,45 @@ describe('test/sync/sync_exist.test.js', function () {
});

it('should sync first time ok', function *() {
mm.data(npmService, 'getShort', ['byte']);
mm.data(npmService, 'listChanges', [
{
seq: 1,
id: 'byte',
}
]);
mm.data(totalService, 'getTotalInfo', {last_exist_sync_time: 0});
var data = yield sync();
data.successes[0].should.equal('byte');
});

it('should sync common ok', function *() {
mm.data(npmService, 'getAllSince', {
_updated: Date.now(),
byte: {},
});
mm.data(npmService, 'listChanges', [
{
seq: 2,
id: 'byte',
}
]);
mm.data(totalService, 'getTotalInfo', {last_exist_sync_time: Date.now()});
var data = yield sync();
data.successes[0].should.equal('byte');

mm.data(npmService, 'getAllSince', []);
mm.data(npmService, 'listChanges', []);
var data = yield sync();
data.successes.should.eql([]);
});

it('should sync with array format data', function *() {
mm.data(npmService, 'getAllSince', [
mm.data(npmService, 'listChanges', [
{
name: 'byte',
seq: 3,
id: 'byte',
}
]);
mm.data(totalService, 'getTotalInfo', {last_exist_sync_time: Date.now()});
var data = yield sync();
data.successes[0].should.equal('byte');

mm.data(npmService, 'getAllSince', []);
mm.data(npmService, 'listChanges', []);
var data = yield sync();
data.successes.should.eql([]);
});
Expand Down