Skip to content

Commit

Permalink
Update thanks to Matthieu comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Quentin01 committed Jul 7, 2014
1 parent 140e212 commit dc624de
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 70 deletions.
11 changes: 0 additions & 11 deletions lib/handlers/init/callback.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,10 @@ module.exports = function(retrieveTokens, config) {

// Retrieve data from provider
retrieveTokens(req.params, tempToken.data, rarity.carry([tempToken], cb));
/*function(err, accountName, providerData) {
cb(err, accountName, providerData, tempToken);
});*/
},
function createAccessToken(tempToken, accountName, providerData, cb) {
// Create a new anyfetch access-token with our grant
Anyfetch.getAccessToken(config.appId, config.appSecret, tempToken.anyfetchCode, rarity.carry([accountName, providerData, tempToken], cb));

/*function(err, anyfetchAccessToken) {
cb(err, accountName, providerData, tempToken, anyfetchAccessToken);
});*/
},
function createNewToken(accountName, providerData, tempToken, anyfetchAccessToken, cb) {
// Create new token
Expand All @@ -58,10 +51,6 @@ module.exports = function(retrieveTokens, config) {
});

token.save(rarity.carryAndSlice([tempToken, tempToken.returnTo], 3, cb));

/*function(err) {
cb(err, tempToken, tempToken.returnTo);
});*/
},
function removeTempToken(tempToken, returnTo, cb) {
tempToken.remove(rarity.carry([returnTo], cb));
Expand Down
26 changes: 16 additions & 10 deletions lib/handlers/update.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

var async = require('async');
var restify = require('restify');
var rarity = require('rarity');

var Token = require('../models/token.js');

Expand Down Expand Up @@ -47,25 +48,20 @@ module.exports = function(updateAccount, queue, workers) {
token.lastUpdate = new Date();
token.isUpdating = true;

token.save(cb);
token.save(rarity.slice(1, cb));
},
function callUpdateAccount(cb) {
var queues = {};

Object.keys(workers).forEach(function(name) {
queues[name] = {
push: function(task) {
task.anyfetchToken = token.anyfetchToken;
queue.create(name, task).save();
}
};
queues[name] = [];
});

updateAccount(token.data, token.cursor, queues, function(err, newCursor, providerData) {
cb(err, newCursor, providerData);
cb(err, queues, newCursor, providerData);
});
},
function updateToken(newCursor, providerData, cb) {
function updateToken(queues, newCursor, providerData, cb) {
if(providerData) {
token.data = providerData;
token.markModified('data');
Expand All @@ -75,7 +71,17 @@ module.exports = function(updateAccount, queue, workers) {
token.isUpdating = false;
token.markModified('isUpdating');

token.save(cb);
token.save(rarity.carry([queues], cb));
},
function executeQueue(queues, cb) {
Object.keys(workers).forEach(function(name) {
queues[name].forEach(function(task) {
task.anyfetchToken = token.anyfetchToken;

queue.create(name, task).save();
});
});
cb(null);
}
], function(err) {
if(err) {
Expand Down
66 changes: 18 additions & 48 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,6 @@ var connectMongo = function(mongoUrl) {
}
};


/**
* Simple wrapper around the token model.
* Only one token should match.
*
* retrieveData({accessToken: ...}) => data for this access token
* retrieveData({'data.grant': ...}) => data with this value of grant.
*/
/*module.exports.retrieveData = function retrieveData(hash, cb) {
Token.findOne(hash, function(err, token) {
if(!token) {
return cb(new Error("no data matches"));
}
cb(err, token.data);
});
};*/


/**
* Create a new provider server.
* This server will use `config.task_generator` as its main function, to turn a file into metadata.
Expand Down Expand Up @@ -77,36 +59,24 @@ module.exports.createServer = function createServer(connectFunctions, updateAcco
});

Object.keys(workers).forEach(function createQueue(name) {
server.queue.process(name, (workers[name].concurrency) ? workers[name].concurrency : 5, (function createQueueProcess(name) {
return function(job, done) {
job.task = job.data;

job.anyfetchClient = new Anyfetch(job.task.anyfetchToken);

Token.findOne({anyfetchToken: job.task.anyfetchToken}, function(err, token) {
if(err) {
done(err);
}

job.serviceData = token.data;

delete job.task.anyfetchToken;

// Standard tasks
// Run in domain to avoid failures
var d = domain.create();
d.once('error', function(err) {
server.queue.job_counter -= 1;
done(err);
});
d.run(function() {
workers[name](job, done);
server.queue.job_counter -= 1;
done();
});
});
};
})(name));
server.queue.process(name, (workers[name].concurrency) ? workers[name].concurrency : 1, function(job, done) {
job.task = job.data;

job.anyfetchClient = new Anyfetch(job.task.anyfetchToken);

Token.findOne({anyfetchToken: job.task.anyfetchToken}, function(err, token) {
if(err) {
done(err);
}

job.serviceData = token.data;

delete job.data;
delete job.task.anyfetchToken;

workers[name](job, done);
});
});
});

// Load routes and generate endpoints using closures
Expand Down
2 changes: 1 addition & 1 deletion lib/models/token.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ var mongoose = require('mongoose');

var TokenSchema = new mongoose.Schema({
// Access token to communicate with AnyFetch
anyfetchToken: '',
anyfetchToken: {type: String, default: '', unique: true},

// Account identifier
accountName: '',
Expand Down
18 changes: 18 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,24 @@ describe("AnyFetchProvider.createServer()", function() {
var server = AnyFetchProvider.createServer(connectFunctions, updateAccount, {test: queueWorker}, config);
updateServer(server);
});

it("should allow to update token data", function(done) {
// We need to use test2 queue instead of test because this event worker can't override the last worker in the last test
var updateAccount = function(serviceData, cursor, queues, cb) {
serviceData.newKey = 'newValue';
queues.test2.push({});
cb(null, new Date(), serviceData);
};

var queueWorker = function(job, cb) {
job.serviceData.should.have.property('newKey', 'newValue');
done();
cb();
};

var server = AnyFetchProvider.createServer(connectFunctions, updateAccount, {test2: queueWorker}, config);
updateServer(server);
});
});

describe("/reset endpoint", function() {
Expand Down

0 comments on commit dc624de

Please sign in to comment.