Permalink
Browse files

Merge pull request #582 from Singly/sync-parallel

Syncing refactor (not yet in parallel). This just refactors some code to make it easier to implement parallel syncing without having to opt all connectors in at once.
  • Loading branch information...
2 parents 7c81419 + 2ff8ac3 commit 677cdb3107855c17638f00ded0db3858d2d7d54c @smurthas smurthas committed Oct 12, 2012
Showing with 50 additions and 33 deletions.
  1. +50 −33 lib/taskman.js
View
@@ -343,12 +343,12 @@ exports.taskUpdate = function(auth, callback, force)
};
// perform the synclet and pipeline, saving state at the end
-function runTask(pi, task, callback)
+function runTask(pi, task, fnCallback)
{
if (!pi || !task) {
logger.warn("runtask invalid args", typeof pi, typeof task);
- return callback();
+ return fnCallback();
}
logger.debug("running task", task.idr);
@@ -400,6 +400,19 @@ function runTask(pi, task, callback)
});
}
+ // if config updated, sanitize it
+ if (response.config && response.config.nextRun) {
+ task.nextRun = response.config.nextRun;
+ delete response.config.nextRun;
+ }
+
+ // pass back the updated config, which will be saved by the parent function
+ // save a pointer here, because response obj get modified along the way
+ var cfg = response.config;
+ var callback = function(err) {
+ fnCallback(err, cfg);
+ }
+
STATS.total += task.count;
STATS.tasks++;
@@ -449,10 +462,11 @@ function runTask(pi, task, callback)
// if any auth updates, merge+flag it
if (typeof response.auth === 'object') {
- pi.newauth = true;
+ if (!cfg) cfg = {};
+ cfg._auth = {};
Object.keys(response.auth).forEach(function(key) {
- pi.auth[key] = response.auth[key];
+ cfg._auth[key] = response.auth[key];
});
}
@@ -463,8 +477,8 @@ function runTask(pi, task, callback)
task.tpipe = Date.now();
- // run it through the pipeline!
- pipeline.inject(response.data, pi.auth, function(err) {
+ // run it through the pipeline! if there is an updated auth object, us it
+ pipeline.inject(response.data, (cfg && cfg._auth) || pi.auth, function(err) {
// when we can't save, capture that state, but bail fast
if (err) {
task.err = err;
@@ -474,17 +488,6 @@ function runTask(pi, task, callback)
return saveTask(task, callback);
}
- // if config updated, sanitize it and save it in background
- if (response.config && response.config.nextRun) {
- task.nextRun = response.config.nextRun;
-
- delete response.config.nextRun;
- }
-
- if (typeof response.config === 'object') {
- profileManager.configSet(pi.auth.pid, response.config, function() {});
- }
-
task.tdone = Date.now();
logger.verbose("Pipeline finished", task.idr, "in", task.tdone - task.tstart, "ms");
@@ -688,28 +691,42 @@ function runWorker(pid, callback, force)
}
self.tasks = todo;
-
- // do each task serially due to config sharing!
- logger.debug("working doing tasks",todo.length);
- async.forEachSeries(todo, function(task, cbLoop) {
- if (self.killed) return cbLoop();
- runTask(pi, task, cbLoop);
- }, function() {
-
- // auth info is rarely updated, must save it if so, can happen async
- if (pi.newauth) profileManager.authSet(pid, pi.auth, false, function(err) {
- if (err) logger.warn(err);
- });
-
- // release locks and return
- cbDone(null, self);
- });
+ doTasksInSeries(todo, self, pi, cbDone);
});
});
});
});
}
+function doTasksInSeries(todo, self, pi, cbDone) {
+ // do each task serially due to config sharing!
+ logger.debug('working doing tasks in series', todo.length);
+ async.forEachSeries(todo, function(task, cbLoop) {
+ if (self.killed) return cbLoop();
+ runTask(pi, task, function(err, config) {
+ if (err) return cbLoop(err);
+ updateConfig(config, pi.auth.pid, cbLoop);
+ });
+ }, function() {
+ // release locks and return
+ cbDone(null, self);
+ });
+}
+
+function updateConfig(configUpdate, pid, cb) {
+ if (!configUpdate) return process.nextTick(cb);
+ // auth info is rarely updated, must save it if so, can happen async
+ if (configUpdate && configUpdate._auth) {
+ return profileManager.authSet(pid, configUpdate._auth, false, function(err) {
+ if (err) logger.warn(err);
+ delete configUpdate._auth;
+ profileManager.configSet(pid, configUpdate, cb);
+ });
+ }
+
+ profileManager.configSet(pid, configUpdate, cb);
+}
+
// optionally run a base here if needed
exports.fresh = function(base, callback) {
if(!base) return callback();

0 comments on commit 677cdb3

Please sign in to comment.