Permalink
Browse files

added getAllEvents function with pagination

  • Loading branch information...
1 parent 5d0680e commit 4dab355b8fcdf8a516cbcc3aa954a4ae29ceabbb @adrai committed Jan 15, 2014
View
@@ -3,4 +3,5 @@ Jakefile.js
bench/
storage/
build/
-npmlink.js
+npmlink.js
+test
View
@@ -128,7 +128,20 @@ create a snapshot point
### Replaying events
-If you want to replay all events from the store you can do it with the function getEventRange:
+If you want to replay all events from the store you can do it with the function getAllEvents:
+
+ var handle = function(err, events) {
+ // events is the eventstream
+ if (events.length === amount) {
+ events.next(handle);
+ } else {
+ // finished to replay
+ }
+ };
+
+ es.getAllEvents(0, 100, handle);
+
+or with the function getEventRange:
var match = {} // match query in inner event (payload), for example: { id: eventId }
// if {} all events will return
@@ -145,7 +158,6 @@ If you want to replay all events from the store you can do it with the function
es.getEventRange(match, amount, handle);
-
If you want to replay all events of a particular aggregate or stream you can do it with the function getEvents:
var streamId = '1234'
@@ -156,6 +168,7 @@ If you want to replay all events of a particular aggregate or stream you can do
// events is the eventstream
});
+
### own event dispatching
es.getUndispatchedEvents(function(err, evts) {
View
@@ -435,7 +435,7 @@ Store.prototype = {
// __getEventRange:__ loads the range of events from given storage.
//
- // `storage.getEventRange(match, amount, callback)`
+ // `eventStore.getEventRange(match, amount, callback)`
//
// - __match:__ match query in inner event (payload)
// - __amount:__ amount of events
@@ -470,6 +470,47 @@ Store.prototype = {
},
+ // __getEvents:__ loads the events from _minRev_ to _maxRev_.
+ //
+ // `eventStore.getAllEvents(from, amount, callback)`
+ //
+ // - __from:__ from entry index [optional, default 0]
+ // - __amount:__ amount of results (hint: -1 = to end) [optional]
+ // - __callback:__ `function(err, events){}`
+ getAllEvents: function(from, amount, callback) {
+
+ if (this.hasConfigurationErrors(callback)) return;
+
+ if (typeof amount === 'function') {
+ callback = amount;
+ amount = -1;
+ }
+
+ if (typeof from === 'function') {
+ callback = from;
+ from = 0;
+ amount = -1;
+ }
+
+ var self = this;
+
+ this.storage.getAllEvents(from, amount, function(err, events) {
+
+ events.next = function(callback) {
+
+ from += amount;
+
+ self.getAllEvents(from, amount, callback);
+
+ };
+
+ if (typeof callback === 'function') {
+ callback(err, events);
+ }
+ });
+
+ },
+
// __getUndispatchedEvents:__ loads all undispatched events.
//
// `eventStore.getUndispatchedEvents(callback)`
@@ -168,6 +168,43 @@ Storage.prototype = {
callback(null, events);
},
+ // __getEvents:__ loads the events from _minRev_ to _maxRev_.
+ //
+ // `storage.getAllEvents(from, amount, callback)`
+ //
+ // - __from:__ from entry index [optional, default 0]
+ // - __amount:__ amount of results (hint: -1 = to end) [optional]
+ // - __callback:__ `function(err, events){}`
+ getAllEvents: function(from, amount, callback) {
+
+ if (typeof amount === 'function') {
+ callback = amount;
+ amount = -1;
+ }
+
+ if (typeof from === 'function') {
+ callback = from;
+ from = 0;
+ amount = -1;
+ }
+
+ if (!this.store) {
+ callback(null, []);
+ }
+ else {
+ var res = [];
+ for (var s in this.store) {
+ res = res.concat(this.store[s]);
+ }
+ if (amount === -1) {
+ callback(null, res.slice(from));
+ }
+ else {
+ callback(null, res.slice(from, from + amount));
+ }
+ }
+ },
+
// __getSnapshot:__ loads the next snapshot back from given max revision or the latest if you
// don't pass in a _maxRev_.
//
View
@@ -1,7 +1,7 @@
{
"author": "Jan Muehlemann, Adriano Raiano"
, "name": "eventstore"
- , "version": "0.7.1"
+ , "version": "0.7.2"
, "contributors": [
{ "name": "Jan Muehlemann", "email": "jan.muehlemann@gmail.com" },
{ "name": "Adriano Raiano", "email": "adriano@raiano.ch" }
@@ -1 +1,2 @@
-npmlink.js
+npmlink.js
+test
@@ -1,7 +1,7 @@
{
"author": "Jan Muehlemann, Adriano Raiano",
"name": "eventstore.mongoDb",
- "version": "0.7.0",
+ "version": "0.7.1",
"contributors": [
{
"name": "Jan Muehlemann",
View
@@ -184,6 +184,33 @@ Storage.prototype = {
});
},
+ // __getEvents:__ loads the events from _minRev_ to _maxRev_.
+ //
+ // `storage.getAllEvents(from, amount, callback)`
+ //
+ // - __from:__ from entry index [optional, default 0]
+ // - __amount:__ amount of results (hint: -1 = to end) [optional]
+ // - __callback:__ `function(err, events){}`
+ getAllEvents: function(from, amount, callback) {
+
+ if (typeof amount === 'function') {
+ callback = amount;
+ amount = -1;
+ }
+
+ if (typeof from === 'function') {
+ callback = from;
+ from = 0;
+ amount = -1;
+ }
+
+ if (amount === -1) {
+ return this.events.find({}, {sort:[['commitStamp','asc']]}).skip(from).toArray(callback);
+ }
+
+ this.events.find({}, {sort:[['commitStamp','asc']]}).skip(from).limit(amount).toArray(callback);
+ },
+
// __getSnapshot:__ loads the next snapshot back from given max revision or the latest if you
// don't pass in a _maxRev_.
//
@@ -178,7 +178,7 @@ var expect = require('expect.js')
storage.addEvents([
{streamId: '3', streamRevision: 0, commitId: 4, commitSequence: 4, commitStamp: new Date(2012, 3, 16, 8, 0, 0), payload: {id: '5', event:'blaaaaaaaaaaa'}, dispatched: false},
{streamId: '3', streamRevision: 1, commitId: 5, commitSequence: 5, commitStamp: new Date(2012, 3, 17, 8, 0, 0), payload: {id: '6', event:'blaaaaaaaaaaa'}, dispatched: false}
- ],
+ ],
function (err) {
storage.addSnapshot({snapshotId: '1', streamId: '3', revision: 1, data: 'data'}, function() {
storage.addSnapshot({snapshotId: '2', streamId: '3', revision: 2, data: 'dataPlus'}, done);
@@ -293,6 +293,54 @@ var expect = require('expect.js')
});
+ describe('calling getAllEvents without a from and a amount argument', function() {
+
+ it('it should callback with the correct values', function(done) {
+ storage.getAllEvents(function(err, evts) {
+ expect(evts).to.have.length(7);
+
+ done();
+ });
+ });
+
+ });
+
+ describe('calling getAllEvents with a from but without an amount argument', function() {
+
+ it('it should callback with the correct values', function(done) {
+ storage.getAllEvents(2, function(err, evts) {
+ expect(evts).to.have.length(5);
+
+ done();
+ });
+ });
+
+ });
+
+ describe('calling getAllEvents with a from and an amount argument that does not exceed the limit', function() {
+
+ it('it should callback with the correct values', function(done) {
+ storage.getAllEvents(2, 2, function(err, evts) {
+ expect(evts).to.have.length(2);
+
+ done();
+ });
+ });
+
+ });
+
+ describe('calling getAllEvents with a from and an amount argument that does exceed the limit', function() {
+
+ it('it should callback with the correct values', function(done) {
+ storage.getAllEvents(2, 10, function(err, evts) {
+ expect(evts).to.have.length(5);
+
+ done();
+ });
+ });
+
+ });
+
});
});
View
@@ -1 +1,2 @@
-npmlink.js
+npmlink.js
+test
@@ -1,7 +1,7 @@
{
"author": "Jan Muehlemann, Adriano Raiano",
"name": "eventstore.redis",
- "version": "0.7.0",
+ "version": "0.7.1",
"contributors": [
{
"name": "Jan Muehlemann",
View
@@ -71,7 +71,7 @@ Storage.prototype = {
var ansered = false;
self.client.on('error', function(msg) {
- if (msg.indexOf('connect') >= 0) {
+ if (msg.indexOf && msg.indexOf('connect') >= 0) {
if (!ansered && callback) callback(msg);
}
});
@@ -122,7 +122,13 @@ Storage.prototype = {
if (err) {
callback(err);
} else {
- self.client.rpush('undispatched:' + self.options.eventsCollectionName, JSON.stringify(event), callback);
+ self.client.rpush('all:' + self.options.eventsCollectionName, JSON.stringify(event), function(err, res) {
+ if (err) {
+ callback(err);
+ } else {
+ self.client.rpush('undispatched:' + self.options.eventsCollectionName, JSON.stringify(event), callback);
+ }
+ });
}
});
}, callback);
@@ -238,6 +244,34 @@ Storage.prototype = {
});
},
+ // __getEvents:__ loads the events from _minRev_ to _maxRev_.
+ //
+ // `storage.getAllEvents(from, amount, callback)`
+ //
+ // - __from:__ from entry index [optional, default 0]
+ // - __amount:__ amount of results (hint: -1 = to end) [optional]
+ // - __callback:__ `function(err, events){}`
+ getAllEvents: function(from, amount, callback) {
+
+ if (typeof amount === 'function') {
+ callback = amount;
+ amount = -1;
+ }
+
+ if (typeof from === 'function') {
+ callback = from;
+ from = 0;
+ amount = -1;
+ }
+
+ var minRev = from,
+ maxRev = amount === -1 ? -1 : from + amount - 1;
+
+ this.client.lrange('all:' + this.options.eventsCollectionName, minRev, maxRev, function (err, res) {
+ handleResultSet(err, res, callback);
+ });
+ },
+
// __getSnapshot:__ loads the next snapshot back from given max revision or the latest if you
// don't pass in a _maxRev_.
//
@@ -290,6 +290,54 @@ var expect = require('expect.js')
});
+ describe('calling getAllEvents without a from and a amount argument', function() {
+
+ it('it should callback with the correct values', function(done) {
+ storage.getAllEvents(function(err, evts) {
+ expect(evts).to.have.length(7);
+
+ done();
+ });
+ });
+
+ });
+
+ describe('calling getAllEvents with a from but without an amount argument', function() {
+
+ it('it should callback with the correct values', function(done) {
+ storage.getAllEvents(2, function(err, evts) {
+ expect(evts).to.have.length(5);
+
+ done();
+ });
+ });
+
+ });
+
+ describe('calling getAllEvents with a from and an amount argument that does not exceed the limit', function() {
+
+ it('it should callback with the correct values', function(done) {
+ storage.getAllEvents(2, 2, function(err, evts) {
+ expect(evts).to.have.length(2);
+
+ done();
+ });
+ });
+
+ });
+
+ describe('calling getAllEvents with a from and an amount argument that does exceed the limit', function() {
+
+ it('it should callback with the correct values', function(done) {
+ storage.getAllEvents(2, 10, function(err, evts) {
+ expect(evts).to.have.length(5);
+
+ done();
+ });
+ });
+
+ });
+
});
});
Oops, something went wrong.

0 comments on commit 4dab355

Please sign in to comment.