Skip to content

Commit

Permalink
remove timeout fix concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
mavdi committed Dec 8, 2015
1 parent 8e11209 commit 452cb6d
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 65 deletions.
29 changes: 12 additions & 17 deletions lib/sse.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,25 +80,25 @@ SSE.prototype.init = function(config) {
};

SSE.prototype.requestValidationMiddleware = function (req, res, next) {
routeNames = req.query.resources ? req.query.resources.split(',') : [];
this.routeNames = req.query.resources ? req.query.resources.split(',') : [];

if (this.singleResourceName) {
routeNames = this.singleResourceName;
this.routeNames = this.singleResourceName;
}

if (routeNames.length === 0) {
if (this.routeNames.length === 0) {
throw new JSONAPI_Error({
status: 400,
title: 'Requested changes on missing resource',
detail: 'You have not specified any resources, please do so by providing "resource?foo,bar" as query'
});
}

if (!this.allResourcesExist(routeNames)) {
if (!this.allResourcesExist(this.routeNames)) {
throw new JSONAPI_Error({
status: 400,
title: 'Requested changes on missing resource',
detail: 'The follow resources don\'t exist ' + this.getMissingResources(routeNames).join(',')
detail: 'The follow resources don\'t exist ' + this.getMissingResources(this.routeNames).join(',')
});
}

Expand All @@ -107,7 +107,7 @@ SSE.prototype.requestValidationMiddleware = function (req, res, next) {
return parseInt(item, 10);
});

isValidTS = _.all(tsSplit, function(ts) {
var isValidTS = _.all(tsSplit, function(ts) {
return !isNaN(ts);
});

Expand All @@ -133,13 +133,13 @@ SSE.prototype.handler = function (req, res, next) {
numberOfRetries: Number.MAX_VALUE
};

routeNames = req.query.resources ? req.query.resources.split(',') : [];
this.routeNames = req.query.resources ? req.query.resources.split(',') : [];

if (this.singleResourceName) {
routeNames = this.singleResourceName;
this.routeNames = this.singleResourceName;
}

var regex = new RegExp('.*\\.(' + routeNames.join('|') + ')', 'i');
var regex = new RegExp('.*\\.(' + this.routeNames.join('|') + ')', 'i');
var docStream = hl();

this.getQuery(req, regex)
Expand All @@ -151,7 +151,7 @@ SSE.prototype.handler = function (req, res, next) {
docStream.resume();

var consume = hl().consume(function(err, chunk, push, next) {
var resourceNames = _.map(routeNames, function(routeName) {
var resourceNames = _.map(that.routeNames, function(routeName) {
var pluralName = (that.options.inflect) ? inflect.pluralize(routeName) : routeName;
return new RegExp(pluralName, 'i');
});
Expand All @@ -162,8 +162,8 @@ SSE.prototype.handler = function (req, res, next) {

if (matchesEitherResource) {
var id = chunk.ts.getHighBits() + '_' + chunk.ts.getLowBits();
var eventName = that.getEventName(routeNames, chunk);
var data = that.getData(routeNames[0], chunk);
var eventName = that.getEventName(that.routeNames, chunk);
var data = that.getData(that.routeNames[0], chunk);

var filters = that.getFilters(req);

Expand All @@ -189,11 +189,6 @@ SSE.prototype.handler = function (req, res, next) {
console.log('HARVESTER SSE ERROR>>> ' + err.stack)
that.handleError(err, res, docStream);
});

setTimeout(function() {
res.end();
docStream.destroy();
}, this.options.sseExpiry || 240000);
};

SSE.prototype.handleError = function(err, res, docStream) {
Expand Down
48 changes: 0 additions & 48 deletions test/singleRouteSSE.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,52 +193,4 @@ describe('EventSource implementation for resource changes', function () {
});
});
});

describe('Server Sent Events Expiry', function () {
this.timeout(20000);
var lastEventId;

before(function () {
var options = {
adapter: 'mongodb',
connectionString: config.harvester.options.connectionString,
db: 'test',
inflect: true,
oplogConnectionString: config.harvester.options.oplogConnectionString,
sseExpiry: 1000
};

/**
* dvd resource should be declared after book, to test if it does not overwrite book sse config
*/
harvesterApp = harvester(options).resource('book', {
title: Joi.string(),
author: Joi.string()
}).resource('superHero', {
timestamp: Joi.number()
}).resource('dvd', {
title: Joi.string()
});

harvesterApp.listen(8006);

return seeder(harvesterApp, baseUrl).dropCollections('books', 'dvds', 'superHeros');
});

describe('When I post to the newly created resource', function () {
it('Then I should receive a change event with data but not the one before it', function (done) {
var that = this;

var eventSource = ess('http://localhost:8006/books/changes/stream', {retry : false})
.on('data', function(data) {
expect(data).to.be.undefined;
});

setTimeout(function() {
eventSource.destroy();
done();
}, 1500);
});
});
});
});

0 comments on commit 452cb6d

Please sign in to comment.