Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fix tingo too

  • Loading branch information...
commit 30605d53ad3a88a4d96bdae63cce95a465e47d42 1 parent 07fc0ec
@adrai authored
View
2  lib/eventStore.js
@@ -365,7 +365,7 @@ Store.prototype = {
event = eventstream.uncommittedEvents[i];
event.commitId = id;
event.commitSequence = i;
- event.restInStream = len - 1 - i;
+ event.restInCommitStream = len - 1 - i;
event.commitStamp = new Date();
currentRevision++;
event.streamRevision = currentRevision;
View
4 storage/mongoDb/storage.js
@@ -175,7 +175,7 @@ Storage.prototype = {
var lastEvt = res[res.length - 1];
- if (lastEvt.restInStream === 0 || !lastEvt.restInStream) {
+ if (lastEvt.restInCommitStream === 0 || !lastEvt.restInCommitStream) {
callback(err, res);
self.transactions.remove({ _id: lastEvt.commitId }, function(err) {});
return;
@@ -186,7 +186,7 @@ Storage.prototype = {
return callback(err, res);
}
- var missingEvts = tx.events.slice(tx.events.length - lastEvt.restInStream);
+ var missingEvts = tx.events.slice(tx.events.length - lastEvt.restInCommitStream);
self.events.insert(missingEvts, {keepGoing: true}, function(err) {
self.transactions.remove({ _id: tx._id }, function(err) {});
View
20 storage/mongoDb/test/mongoDbStorageSpec.js
@@ -171,15 +171,15 @@ var expect = require('expect.js')
before(function(done) {
storage.addEvents([
- {streamId: '2', streamRevision: 0, commitId: '0', commitSequence: 0, restInStream: 3, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}},
- {streamId: '2', streamRevision: 1, commitId: '0', commitSequence: 1, restInStream: 2, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}},
- {streamId: '2', streamRevision: 2, commitId: '0', commitSequence: 2, restInStream: 1, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}},
- {streamId: '2', streamRevision: 3, commitId: '0', commitSequence: 3, restInStream: 0, commitStamp: new Date(2012, 3, 15, 8, 0, 0), payload: {id: '4', event:'blaaaaaaaaaaa'}}
+ {streamId: '2', streamRevision: 0, commitId: '0', commitSequence: 0, restInCommitStream: 3, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}},
+ {streamId: '2', streamRevision: 1, commitId: '0', commitSequence: 1, restInCommitStream: 2, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}},
+ {streamId: '2', streamRevision: 2, commitId: '0', commitSequence: 2, restInCommitStream: 1, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}},
+ {streamId: '2', streamRevision: 3, commitId: '0', commitSequence: 3, restInCommitStream: 0, commitStamp: new Date(2012, 3, 15, 8, 0, 0), payload: {id: '4', event:'blaaaaaaaaaaa'}}
],
function (err) {
storage.addEvents([
- {streamId: '3', streamRevision: 0, commitId: '1', commitSequence: 0, restInStream: 1, commitStamp: new Date(2012, 3, 16, 8, 0, 0), payload: {id: '5', event:'blaaaaaaaaaaa'}},
- {streamId: '3', streamRevision: 1, commitId: '1', commitSequence: 1, restInStream: 0, commitStamp: new Date(2012, 3, 17, 8, 0, 0), payload: {id: '6', event:'blaaaaaaaaaaa'}}
+ {streamId: '3', streamRevision: 0, commitId: '1', commitSequence: 0, restInCommitStream: 1, commitStamp: new Date(2012, 3, 16, 8, 0, 0), payload: {id: '5', event:'blaaaaaaaaaaa'}},
+ {streamId: '3', streamRevision: 1, commitId: '1', commitSequence: 1, restInCommitStream: 0, commitStamp: new Date(2012, 3, 17, 8, 0, 0), payload: {id: '6', event:'blaaaaaaaaaaa'}}
],
function (err) {
storage.addSnapshot({snapshotId: '1', streamId: '3', revision: 1, data: 'data'}, function() {
@@ -349,13 +349,13 @@ var expect = require('expect.js')
before(function(done) {
var evts = [
- {streamId: '1818', streamRevision: 0, commitId: '18180', commitSequence: 0, restInStream: 3, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}},
- {streamId: '1818', streamRevision: 1, commitId: '18180', commitSequence: 1, restInStream: 2, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}},
- {streamId: '1818', streamRevision: 2, commitId: '18180', commitSequence: 2, restInStream: 1, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}}
+ {streamId: '1818', streamRevision: 0, commitId: '18180', commitSequence: 0, restInCommitStream: 3, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}},
+ {streamId: '1818', streamRevision: 1, commitId: '18180', commitSequence: 1, restInCommitStream: 2, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}},
+ {streamId: '1818', streamRevision: 2, commitId: '18180', commitSequence: 2, restInCommitStream: 1, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}}
];
storage.addEvents(evts,
function (err) {
- evts.push({_id: '181803', streamId: '1818', streamRevision: 3, commitId: '18180', commitSequence: 3, restInStream: 0, commitStamp: new Date(2012, 3, 14, 11, 0, 0), payload: {id: '4', event:'missed'}});
+ evts.push({_id: '181803', streamId: '1818', streamRevision: 3, commitId: '18180', commitSequence: 3, restInCommitStream: 0, commitStamp: new Date(2012, 3, 14, 11, 0, 0), payload: {id: '4', event:'missed'}});
var tx = {
_id: '18180',
events: evts
View
64 storage/tingoDb/storage.js
@@ -42,7 +42,8 @@ Storage = function(options, callback) {
var defaults = {
dbPath: __dirname + '/',
eventsCollectionName: 'events',
- snapshotsCollectionName: 'snapshots'
+ snapshotsCollectionName: 'snapshots',
+ transactionsCollectionName: 'transactions'
};
this.options = mergeOptions(options, defaults);
@@ -66,6 +67,7 @@ Storage.prototype = {
this.events = this.db.collection(this.options.eventsCollectionName + '.tingo');
this.snapshots = this.db.collection(this.options.snapshotsCollectionName + '.tingo');
+ this.transactions = this.db.collection(this.options.transactionsCollectionName + '.tingo');
this.isConnected = true;
@@ -79,10 +81,27 @@ Storage.prototype = {
// - __events:__ the events array
// - __callback:__ `function(err){}`
addEvents: function(events, callback) {
- for(var i in events) {
+ for (var i in events) {
events[i]._id = events[i].commitId + events[i].commitSequence;
+ events[i].dispatched = false;
+ }
+
+ var self = this;
+
+ if (events.length > 1) {
+ var tx = {
+ _id: events[0].commitId,
+ events: events
+ };
+ this.transactions.insert(tx, {keepGoing: true}, function(err) {
+ self.events.insert(events, {keepGoing: true}, function(err) {
+ self.transactions.remove({ _id: tx._id }, function(err) {});
+ if (callback) { callback(err); }
+ });
+ });
+ } else {
+ this.events.insert(events, {keepGoing: true}, callback);
}
- this.events.insert(events, {keepGoing: true}, callback);
},
// __addSnapshot:__ stores the snapshot
@@ -105,21 +124,48 @@ Storage.prototype = {
// - __maxRev:__ revision endpoint (hint: -1 = to end) [optional]
// - __callback:__ `function(err, events){}`
getEvents: function(streamId, minRev, maxRev, callback) {
-
+
if (typeof maxRev === 'function') {
callback = maxRev;
maxRev = -1;
}
-
+
var options = {'$gte':minRev, '$lt':maxRev};
if (maxRev == -1) options = {'$gte':minRev};
-
+
var findStatement = {
'streamId' : streamId,
'streamRevision': options
};
- this.events.find(findStatement, {sort:[['streamRevision','asc']]}).toArray(callback);
+ var self = this;
+
+ this.events.find(findStatement, {sort:[['streamRevision','asc']]}).toArray(function(err, res) {
+ if (maxRev != -1 || !res || res.length === 0) {
+ return callback(err, res);
+ }
+
+ var lastEvt = res[res.length - 1];
+
+ if (lastEvt.restInCommitStream === 0 || !lastEvt.restInCommitStream) {
+ callback(err, res);
+ self.transactions.remove({ _id: lastEvt.commitId }, function(err) {});
+ return;
+ }
+
+ self.transactions.findOne({ _id: lastEvt.commitId }, function(err, tx) {
+ if (!tx) {
+ return callback(err, res);
+ }
+
+ var missingEvts = tx.events.slice(tx.events.length - lastEvt.restInCommitStream);
+
+ self.events.insert(missingEvts, {keepGoing: true}, function(err) {
+ self.transactions.remove({ _id: tx._id }, function(err) {});
+ self.getEvents(streamId, minRev, maxRev, callback);
+ });
+ });
+ });
},
// __getEventRange:__ loads the range of events from given storage.
@@ -200,7 +246,7 @@ Storage.prototype = {
var findStatement = {'streamId' : streamId};
if (maxRev > -1) findStatement = { 'streamId' : streamId, 'revision': {'$lte':maxRev} };
-
+
this.snapshots.find(findStatement, {sort:[['revision','desc']], limit: 1}).toArray(function(err, snapshots) {
if (err) {
callback(err);
@@ -247,7 +293,7 @@ var mergeOptions = function(options, defaultOptions) {
if (!options || typeof options === 'function') {
return defaultOptions;
}
-
+
var merged = {};
for (var attrname in defaultOptions) { merged[attrname] = defaultOptions[attrname]; }
for (attrname in options) { if (options[attrname]) merged[attrname] = options[attrname]; }
View
79 storage/tingoDb/test/tingoDbStorageSpec.js
@@ -12,7 +12,7 @@ var expect = require('expect.js')
describe('without a callback', function() {
before(function() {
- storage = storageModule.createStorage({ dbPath: __dirname });
+ storage = storageModule.createStorage({ dbName: 'testeventstore' });
});
describe('calling connect', function() {
@@ -34,7 +34,7 @@ var expect = require('expect.js')
it('it should connect successfully', function(done) {
- storageModule.createStorage({ dbPath: __dirname }, function(err, str) {
+ storageModule.createStorage({ dbName: 'testeventstore' }, function(err, str) {
storage = str;
expect(err).not.to.be.ok();
expect(str).to.not.eql(null);
@@ -53,7 +53,9 @@ var expect = require('expect.js')
before(function(done) {
storage.events.remove({}, function() {
- storage.snapshots.remove({}, done);
+ storage.snapshots.remove({}, function() {
+ storage.transactions.remove({}, done);
+ });
});
});
@@ -169,16 +171,16 @@ var expect = require('expect.js')
before(function(done) {
storage.addEvents([
- {streamId: '2', streamRevision: 0, commitId: 0, commitSequence: 0, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}, dispatched: false},
- {streamId: '2', streamRevision: 1, commitId: 1, commitSequence: 1, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}, dispatched: false},
- {streamId: '2', streamRevision: 2, commitId: 2, commitSequence: 2, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}, dispatched: false},
- {streamId: '2', streamRevision: 3, commitId: 3, commitSequence: 3, commitStamp: new Date(2012, 3, 15, 8, 0, 0), payload: {id: '4', event:'blaaaaaaaaaaa'}, dispatched: false}
+ {streamId: '2', streamRevision: 0, commitId: '0', commitSequence: 0, restInCommitStream: 3, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}},
+ {streamId: '2', streamRevision: 1, commitId: '0', commitSequence: 1, restInCommitStream: 2, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}},
+ {streamId: '2', streamRevision: 2, commitId: '0', commitSequence: 2, restInCommitStream: 1, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}},
+ {streamId: '2', streamRevision: 3, commitId: '0', commitSequence: 3, restInCommitStream: 0, commitStamp: new Date(2012, 3, 15, 8, 0, 0), payload: {id: '4', event:'blaaaaaaaaaaa'}}
],
function (err) {
storage.addEvents([
- {streamId: '3', streamRevision: 0, commitId: 4, commitSequence: 4, commitStamp: new Date(2012, 3, 16, 8, 0, 0), payload: {id: '5', event:'blaaaaaaaaaaa'}, dispatched: false},
- {streamId: '3', streamRevision: 1, commitId: 5, commitSequence: 5, commitStamp: new Date(2012, 3, 17, 8, 0, 0), payload: {id: '6', event:'blaaaaaaaaaaa'}, dispatched: false}
- ],
+ {streamId: '3', streamRevision: 0, commitId: '1', commitSequence: 0, restInCommitStream: 1, commitStamp: new Date(2012, 3, 16, 8, 0, 0), payload: {id: '5', event:'blaaaaaaaaaaa'}},
+ {streamId: '3', streamRevision: 1, commitId: '1', commitSequence: 1, restInCommitStream: 0, commitStamp: new Date(2012, 3, 17, 8, 0, 0), payload: {id: '6', event:'blaaaaaaaaaaa'}}
+ ],
function (err) {
storage.addSnapshot({snapshotId: '1', streamId: '3', revision: 1, data: 'data'}, function() {
storage.addSnapshot({snapshotId: '2', streamId: '3', revision: 2, data: 'dataPlus'}, done);
@@ -195,8 +197,8 @@ var expect = require('expect.js')
expect(err).not.to.be.ok();
expect(events).to.have.length(4);
expect(events[0].commitId).to.eql('0');
- expect(events[1].commitId).to.eql('1');
- expect(events[3].commitId).to.eql('3');
+ expect(events[1].commitId).to.eql('0');
+ expect(events[3].commitId).to.eql('0');
done();
});
@@ -237,8 +239,8 @@ var expect = require('expect.js')
expect(err).not.to.be.ok();
expect(events).to.have.length(6);
expect(events[0].commitId).to.eql('0');
- expect(events[2].commitId).to.eql('2');
- expect(events[5].commitId).to.eql('5');
+ expect(events[2].commitId).to.eql('0');
+ expect(events[5].commitId).to.eql('1');
done();
});
@@ -252,8 +254,8 @@ var expect = require('expect.js')
storage.getEventRange({id: '2'}, 2, function(err, events) {
expect(err).not.to.be.ok();
expect(events).to.have.length(2);
- expect(events[0].commitId).to.eql('2');
- expect(events[1].commitId).to.eql('3');
+ expect(events[0].commitId).to.eql('1');
+ expect(events[1].commitId).to.eql('1');
done();
});
@@ -343,6 +345,51 @@ var expect = require('expect.js')
});
+ describe('having a pending transaction', function() {
+
+ before(function(done) {
+ var evts = [
+ {streamId: '1818', streamRevision: 0, commitId: '18180', commitSequence: 0, restInCommitStream: 3, commitStamp: new Date(2012, 3, 14, 8, 0, 0), payload: {id: '1', event:'blaaaaaaaaaaa'}},
+ {streamId: '1818', streamRevision: 1, commitId: '18180', commitSequence: 1, restInCommitStream: 2, commitStamp: new Date(2012, 3, 14, 9, 0, 0), payload: {id: '2', event:'blaaaaaaaaaaa'}},
+ {streamId: '1818', streamRevision: 2, commitId: '18180', commitSequence: 2, restInCommitStream: 1, commitStamp: new Date(2012, 3, 14, 10, 0, 0), payload: {id: '3', event:'blaaaaaaaaaaa'}}
+ ];
+ storage.addEvents(evts,
+ function (err) {
+ evts.push({_id: '181803', streamId: '1818', streamRevision: 3, commitId: '18180', commitSequence: 3, restInCommitStream: 0, commitStamp: new Date(2012, 3, 14, 11, 0, 0), payload: {id: '4', event:'missed'}});
+ var tx = {
+ _id: '18180',
+ events: evts
+ };
+ setTimeout(function() {
+ storage.transactions.save(tx, done);
+ }, 100);
+ });
+ });
+
+ describe('calling getEvents without maxRev', function() {
+
+ it('it should try to fix the pending transaction', function(done) {
+
+ storage.getEvents('1818', 0, function(err, events) {
+ expect(err).not.to.be.ok();
+ expect(events).to.have.length(4);
+ expect(events[0].commitId).to.eql('18180');
+ expect(events[1].commitId).to.eql('18180');
+ expect(events[3].commitId).to.eql('18180');
+ expect(events[3].payload.event).to.eql('missed');
+
+ storage.transactions.findOne({ id: '18180'}, function(err, res) {
+ expect(res).not.to.be.ok();
+ done();
+ });
+ });
+
+ });
+
+ });
+
+ });
+
});
});
View
4 test/eventStoreSpec.js
@@ -265,9 +265,9 @@ var expect = require('expect.js'),
eventstore.getEventStream('e1', 0, -1, function(err, es) {
expect(es.events).to.have.length(2);
expect(es.events[0].commitSequence).to.eql(0);
- expect(es.events[0].restInStream).to.eql(1);
+ expect(es.events[0].restInCommitStream).to.eql(1);
expect(es.events[1].commitSequence).to.eql(1);
- expect(es.events[1].restInStream).to.eql(0);
+ expect(es.events[1].restInCommitStream).to.eql(0);
expect(es.uncommittedEvents).to.have.length(0);
done();
});
Please sign in to comment.
Something went wrong with that request. Please try again.