Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
use sync module woker to handle sync process. fixed #19
  • Loading branch information
fengmk2 committed Dec 8, 2013
1 parent e0493ec commit b6214f3
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 149 deletions.
29 changes: 29 additions & 0 deletions controllers/registry/common.js
@@ -0,0 +1,29 @@
/**!
* cnpmjs.org - controllers/registry/common.js
*
* Copyright(c) cnpmjs.org and other contributors.
* MIT Licensed
*
* Authors:
* fengmk2 <fengmk2@gmail.com> (http://fengmk2.github.com)
*/

'use strict';

/**
* Module dependencies.
*/

var crypto = require('crypto');
var path = require('path');
var config = require('../../config');

exports.getTarballFilepath = function (filename) {
// ensure download file path unique
var name = filename.replace(/\.tgz$/, '.' + crypto.randomBytes(16).toString('hex') + '.tgz');
return path.join(config.uploadDir, name);
};

exports.getCDNKey = function (name, filename) {
return '/' + name + '/-/' + filename;
};
150 changes: 21 additions & 129 deletions controllers/registry/module.js
Expand Up @@ -29,10 +29,9 @@ var Module = require('../../proxy/module');
var Total = require('../../proxy/total');
var nfs = require('../../common/nfs');
var npm = require('../../proxy/npm');

function getCDNKey(name, filename) {
return '/' + name + '/-/' + filename;
}
var common = require('./common');
var Log = require('../../proxy/module_log');
var SyncModuleWorker = require('./sync_module_worker');

exports.show = function (req, res, next) {
var name = req.params.name;
Expand Down Expand Up @@ -141,7 +140,7 @@ exports.upload = function (req, res, next) {
});
}

var filepath = path.join(config.uploadDir, filename);
var filepath = common.getTarballFilepath(filename);
var ws = fs.createWriteStream(filepath);
var shasum = crypto.createHash('sha1');
req.pipe(ws);
Expand All @@ -158,7 +157,7 @@ exports.upload = function (req, res, next) {
});
}
shasum = shasum.digest('hex');
var key = getCDNKey(name, filename);
var key = common.getCDNKey(name, filename);
nfs.upload(filepath, {key: key, size: length}, function (err, result) {
// remove tmp file whatever
fs.unlink(filepath, utility.noop);
Expand Down Expand Up @@ -384,7 +383,7 @@ exports.removeTar = function (req, res, next) {
});
}

var key = getCDNKey(mod.name, filename);
var key = common.getCDNKey(mod.name, filename);
nfs.remove(key, ep.done(function () {
res.json(200, {ok: true});
}));
Expand Down Expand Up @@ -439,138 +438,31 @@ exports.removeAll = function (req, res, next) {
});
};

exports._syncModule = function (username, sourcePackage, callback) {
var downurl = sourcePackage.dist.tarball;
var filename = path.basename(downurl);
var filepath = path.join(config.uploadDir, filename);
var ws = fs.createWriteStream(filepath);
var options = {
writeStream: ws,
};
var ep = eventproxy.create();
ep.fail(callback);

var shasum = crypto.createHash('sha1');
var dataSize = 0;
urllib.request(downurl, options, ep.done(function (_, response) {
var statusCode = response && response.statusCode || -1;
if (statusCode !== 200) {
var err = new Error('Download ' + downurl + ' fail, status: ' + statusCode);
err.name = 'DownloadTarballError';
err.data = sourcePackage;
return ep.emit('error', err);
}

var rs = fs.createReadStream(filepath);
rs.once('error', ep.fail.bind(ep));
rs.on('data', function (data) {
shasum.update(data);
dataSize += data.length;
});
rs.on('end', function () {
shasum = shasum.digest('hex');
if (shasum !== sourcePackage.dist.shasum) {
var err = new Error('Download ' + downurl + ' shasum:' + shasum + ' not match ' + sourcePackage.dist.shasum);
err.name = 'DownloadTarballShasumError';
err.data = sourcePackage;
return ep.emit('error', err);
}

var key = getCDNKey(sourcePackage.name, filename);
nfs.upload(filepath, {key: key, size: dataSize}, ep.done('uploadResult'));
});
}));

ep.on('uploadResult', function (result) {
// remove tmp file whatever
fs.unlink(filepath, utility.noop);
var mod = {
version: sourcePackage.version,
name: sourcePackage.name,
package: sourcePackage,
author: username,
};
var dist = {
tarball: result.url,
shasum: shasum,
size: dataSize
};
mod.package.dist = dist;

debug('sync %s, size: %d, sha1: %s, dist: %j, version: %s',
downurl, dataSize, shasum, dist, mod.version);
Module.add(mod, ep.done(function (result) {
callback(null, result);
}));
});
};

exports.sync = function (req, res, next) {
var username = req.session.name;
var name = req.params.name;
var ep = eventproxy.create();
ep.fail(next);

npm.get(name, ep.done(function (pkg, response) {
if (!pkg._rev) {
return res.json(response.statusCode, pkg);
npm.get(name, function (err, pkg, response) {
if (err) {
return next(err);
}
ep.emit('sourcePackage', pkg);
}));

Module.listByName(name, ep.done(function (rows) {
var map = {};
for (var i = 0; i < rows.length; i++) {
var r = rows[i];
if (r.version === 'next') {
continue;
}
map[r.version] = r;
if (!pkg._rev) {
return res.json(response.statusCode, pkg);
}
ep.emit('existsMap', map);
}));

var missingVersions = [];
ep.all('sourcePackage', 'existsMap', function (pkg, map) {
var times = pkg.time || {};
var versions = [];
for (var v in times) {
var exists = map[v];
var version = pkg.versions[v];
if (!version || !version.dist) {
continue;
}
if (exists && exists.package.dist.shasum === version.shasum) {
continue;
Log.create({name: name, username: username}, function (err, result) {
if (err) {
return next(err);
}
version.gmt_modified = Date.parse(times[v]);
versions.push(version);
}

if (versions.length === 0) {
return ep.emit('done');
}

versions.sort(function (a, b) {
return a.gmt_modified - b.gmt_modified;
var worker = new SyncModuleWorker({
logId: result.id,
name: name,
username: username,
});
worker.start();
res.json(201, {ok: true, logId: result.id});
});
missingVersions = versions;
ep.emit('syncVersion', missingVersions.shift());
});

ep.on('syncVersion', function (version) {
exports._syncModule(username, version, ep.done(function (result) {
var nextVersion = missingVersions.shift();
if (!nextVersion) {
return ep.emit('done', result);
}
ep.emit('syncVersion', nextVersion);
}));
});

ep.on('done', function () {
// TODO: set latest version
res.json(201, {ok: true});
});
};

Expand Down

0 comments on commit b6214f3

Please sign in to comment.