Skip to content

Commit

Permalink
Issue x-cubed#3 - Catch-up subscription - first green test
Browse files Browse the repository at this point in the history
  • Loading branch information
Zach Blocker committed Jun 10, 2016
1 parent 878bef6 commit 6580369
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 12 deletions.
39 changes: 28 additions & 11 deletions lib/catchUpSubscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
function EventStoreCatchUpSubscription(connection, streamId, userCredentials, eventAppeared, liveProcessingStarted, subscriptionDropped, settings) {
ArgValidator.notNull(connection, 'connection');
ArgValidator.notNull(eventAppeared, 'eventAppeared');
if (!settings) settings = new CatchUpSubscriptionSettings();

this._connection = connection;
this._streamId = streamId || "";
Expand Down Expand Up @@ -116,7 +117,7 @@
EventStoreCatchUpSubscription.prototype.isSubscribedToAll = function () { return this._streamId.length == 0; };

EventStoreCatchUpSubscription.prototype.start = function () {
this._log("Catch-up Subscription to %s: starting...", this.isSubscribedToAll ? "<all>" : this._streamId);
this._log("Catch-up Subscription to %s: starting...", this.isSubscribedToAll() ? "<all>" : this._streamId);
this.runSubscription();
};

Expand All @@ -135,6 +136,7 @@

EventStoreCatchUpSubscription.prototype.loadHistoricalEvents = function(callback) {
this._log("Catch-up Subscription to %s: running...", this.isSubscribedToAll() ? "<all>" : this._streamId);
var _this = this;

this._allowProcessing = false;

Expand All @@ -145,7 +147,7 @@
if (err) {
if (callback) callback(err);
} else {
this.subscribeToStream(callback);
_this.subscribeToStream(callback);
}
});
} else {
Expand All @@ -155,6 +157,8 @@
};

EventStoreCatchUpSubscription.prototype.subscribeToStream = function(callback) {
var _this = this;

if (!this._shouldStop) {
this._log("Catch-up Subscription to %s: subscribing...", this.isSubscribedToAll() ? "<all>" : this._streamId);

Expand All @@ -166,12 +170,12 @@
this._streamId, this._resolveLinkTos,
this.enqueuePushedEvent.bind(this),
function(confirmation) {
this._subscription = {
_this._subscription = {
correlationId: subscriptionId,
lastCommitPosition: confirmation.last_commit_position,
lastEventNumber: confirmation.last_event_number
};
this.readMissedHistoricEvents(callback);
_this.readMissedHistoricEvents(callback);
},
this.serverSubscriptionDropped.bind(this),
this._userCredentials);
Expand All @@ -183,14 +187,16 @@
};

EventStoreCatchUpSubscription.prototype.readMissedHistoricEvents = function (callback) {
var _this = this;

if (!this._shouldStop) {
this._log("Catch-up Subscription to %s: pulling events (if left)...", this.isSubscribedToAll() ? "<all>" : this._streamId);

this.readEventsTill(this._subscription.lastCommitPosition, this._subscription.lastEventNumber, function (err) {
if (err) {
if (callback) callback(err);
} else {
this.startLiveProcessing(callback);
_this.startLiveProcessing(callback);
}
});
} else {
Expand Down Expand Up @@ -269,8 +275,9 @@
EventStoreCatchUpSubscription.prototype.dropSubscriptionEvent = function() { return { specialType: 'DropSubscription' }; };

EventStoreCatchUpSubscription.prototype.dropSubscription = function(reason, err) {
this._log("Catch-up Subscription to %: dropping subscription, reason: %s %s.",
this.isSubscribedToAll() ? "<all>" : this._streamId, reason, err == null ? "" : err.toString());
this._log("Catch-up Subscription to %s: dropping subscription, reason: %s, result: %s, error: %s.",
this.isSubscribedToAll() ? "<all>" : this._streamId, reason,
err == null ? "" : err.result, err == null ? "" : err.error);

if (this._subscription != null) {
this._connection.unsubscribeFromStream(this._subscription.correlationId, this._userCredentials, function(pkg) {
Expand Down Expand Up @@ -337,12 +344,22 @@
nextReadEventNumber = (event.link ? event.link.eventNumber : event.eventNumber) + 1;
},
this._userCredentials,
function (err) {
var effectiveErr = err || eventErr;
function (completed) {
// Check for error.
var effectiveErr = eventErr;
if (!effectiveErr) {
// Check for error reading events.
if (completed.result != 0) {
effectiveErr = new Error('Error reading stream events: (' + completed.result.toString() + ') ' + completed.error);
}
}

// Act on error.
if (effectiveErr) {
if (callback) callback(effectiveErr);
return;
}

var isEndOfStream = nextReadEventNumber == _this._nextReadEventNumber; // Didn't read any events.
var done = lastEventNumber == null ? isEndOfStream : nextReadEventNumber > lastEventNumber;
_this._nextReadEventNumber = nextReadEventNumber;
Expand Down Expand Up @@ -379,8 +396,8 @@
processed = true;
}

_this._log("Catch-up Subscription to %s: %s event (%s, %d, %s)",
_this.isSubscribedToAll() ? "<all>" : _this._streamId,
this._log("Catch-up Subscription to %s: %s event (%s, %d, %s)",
this.isSubscribedToAll() ? "<all>" : this._streamId,
processed ? "processed" : "skipping",
origEvent.streamId, origEvent.eventNumber, origEvent.eventType);

Expand Down
4 changes: 3 additions & 1 deletion lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@ Connection.prototype.unsubscribeFromStream = function(correlationId, credentials
*/
Connection.prototype.subscribeToStreamFrom = function (streamName, fromEventNumber, userCredentials, eventAppeared, liveProcessingStarted, subscriptionDropped, settings) {
if (!settings) settings = new CatchUpSubscription.Settings();
return CatchUpSubscription.Stream(this, streamName, fromEventNumber, userCredentials, eventAppeared, liveProcessingStarted, subscriptionDropped, settings);
var subscription = new CatchUpSubscription.Stream(this, streamName, fromEventNumber, userCredentials, eventAppeared, liveProcessingStarted, subscriptionDropped, settings);
subscription.start();
return subscription;
};

Connection.prototype.deleteStream = function(streamId, expectedVersion, requireMaster, hardDelete, credentials, callback) {
Expand Down
50 changes: 50 additions & 0 deletions test/catchUpSubscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
var assert = require("assert");

var EventStoreClient = require("../index.js");

var dbconn = require("./common/dbconn");
var testData = require("./common/testData");
var defaultHostName = dbconn.defaultHostName;
var credentials = dbconn.credentials;

describe('Catch-Up Subscription', function() {
var testStreams = [];

context('setting up basic subscription', function () {
it('should succeed', function (done) {
dbconn.open(done, function (connection) {
var actualEventNumbers = [];
var streamName = testData.randomStreamName();
testStreams.push(streamName);

var settings = new EventStoreClient.CatchUpSubscription.Settings();

testData.writeEvents(
connection, credentials, streamName, 10,
testData.fooEvent,
function () {
connection.subscribeToStreamFrom(
streamName, 6, credentials,
function (event) {
actualEventNumbers.push(event.eventNumber);
},
function () {
assert.deepEqual(actualEventNumbers, [7, 8, 9]);
done();
},
function () {
assert.fail(null, null, 'Subscription dropped!');
done;
},
settings);
});
});
});
});

after(function () {
dbconn.open(null, function (connection) {
testData.deleteTestStreams(connection, credentials, testStreams);
});
});
});

0 comments on commit 6580369

Please sign in to comment.