diff --git a/tcp/connection.js b/tcp/connection.js index b2a14da..76a74bc 100644 --- a/tcp/connection.js +++ b/tcp/connection.js @@ -10,6 +10,7 @@ var util = require('util') , systemEventTypes = require('./systemEventTypes') , streamMetadata = require('./streamMetadata') , streamMetadataResult = require('./streamMetadataResult') + , transaction = require('./transaction') module.exports = createConnection @@ -71,6 +72,40 @@ EsTcpConnection.prototype.appendToStream = function(stream, appendData, cb) { }) } +EsTcpConnection.prototype.startTransaction = function(stream, transactionData, cb) { + var auth = transactionData.auth + , me = this + this.enqueueOperation({ + name: 'StartTransaction' + , stream: stream + , auth: auth + , data: transactionData + , cb: function(err, result) { + if(err) return cb(err) + + cb(null, transaction(result.TransactionId, auth, me)) + } + }) +} + +EsTcpConnection.prototype.transactionalWrite = function(writeData, cb) { + this.enqueueOperation({ + name: 'TransactionalWrite' + , auth: writeData.auth + , data: writeData + , cb: cb + }) +} + +EsTcpConnection.prototype.commitTransaction = function(commitData, cb) { + this.enqueueOperation({ + name: 'CommitTransaction' + , auth: commitData.auth + , data: commitData + , cb: cb + }) +} + EsTcpConnection.prototype.deleteStream = function(stream, deleteData, cb) { this.enqueueOperation({ name: 'DeleteStream' diff --git a/tcp/operations.js b/tcp/operations.js index 855163f..13c45a5 100644 --- a/tcp/operations.js +++ b/tcp/operations.js @@ -22,6 +22,10 @@ function OperationItem(operation) { this.finish = function(message) { var cb = operation.cb , payload + + if(message.messageName === 'BadRequest') { + return cb(new Error('Bad Request - ' + message.payload.toString())) + } try { payload = messageParser.parse(operation.responseType, message.payload) @@ -35,7 +39,7 @@ function OperationItem(operation) { } //TODO: Investigate further if this is needed - var errorIfDeleted = ['WriteEvents', 'DeleteStream'] + var errorIfDeleted = ['WriteEvents', 'DeleteStream', 'TransactionCommit'] if(payload.result === 'StreamDeleted' && errorIfDeleted.indexOf(operation.requestType) !== -1) { return cb(new Error(payload.message)) } @@ -250,4 +254,78 @@ var operations = { } } } +, StartTransaction: function(operationData) { + return { + auth: operationData.auth + , cb: operationData.cb + , requestType: 'TransactionStart' + , toRequestPayload: function(payload) { + var payload = operationData.data + + return messageParser.serialize('TransactionStart', { + eventStreamId: operationData.stream + , expectedVersion: payload.expectedVersion + , requireMaster: !!payload.requireMaster + }) + } + , responseType: 'TransactionStartCompleted' + , toResponseObject: function(payload) { + return { + Result: payload.result + , TransactionId: payload.transactionId + , Message: payload.message + } + } + } + } +, TransactionalWrite: function(operationData) { + return { + auth: operationData.auth + , cb: operationData.cb + , requestType: 'TransactionWrite' + , toRequestPayload: function(payload) { + var payload = operationData.data + , events = !payload.events ? [] : Array.isArray(payload.events) ? payload.events : [ payload.events ] + return messageParser.serialize('TransactionWrite', { + transactionId: payload.transactionId + , events: events.map(eventPayloads.toEventStoreEvent) + , requireMaster: !!payload.requireMaster + }) + } + , responseType: 'TransactionWriteCompleted' + , toResponseObject: function(payload) { + return { + Result: payload.result + , TransactionId: payload.transactionId + , Message: payload.message + } + } + } + } +, CommitTransaction: function(operationData) { + return { + auth: operationData.auth + , cb: operationData.cb + , requestType: 'TransactionCommit' + , toRequestPayload: function(payload) { + var payload = operationData.data + + return messageParser.serialize('TransactionCommit', { + transactionId: payload.transactionId + , requireMaster: !!payload.requireMaster + }) + } + , responseType: 'TransactionCommitCompleted' + , toResponseObject: function(payload) { + return { + Result: payload.result + , TransactionId: payload.transactionId + , Message: payload.message + , FirstEventNumber: payload.firstEventNumber + , NextExpectedVersion: payload.lastEventNumber + , LogPosition: position(payload) + } + } + } + } } diff --git a/tcp/transaction.js b/tcp/transaction.js new file mode 100644 index 0000000..5f16a95 --- /dev/null +++ b/tcp/transaction.js @@ -0,0 +1,37 @@ + + +module.exports = EsTransaction + + +function EsTransaction(transactionId, userCredentials, connection) { + if(!(this instanceof EsTransaction)) { + return new EsTransaction(transactionId, userCredentials, connection) + } + + this._transactionId = transactionId + this._connection = connection + this._userCredentials = userCredentials + + this._isCommitted = false + this._isRolledBack = false +} + +EsTransaction.prototype.commit = function(cb) { + this._isCommitted = true + this._connection.commitTransaction({ + transactionId: this._transactionId + , auth: this._userCredentials + }, cb) +} + +EsTransaction.prototype.write = function(events, cb) { + if(!cb) { + cb = events + events = [] + } + this._connection.transactionalWrite({ + transactionId: this._transactionId + , events: events + , auth: this._userCredentials + }, cb) +} diff --git a/test/tcp/ges/appending_to_implicitly_created_stream_using_transaction.js b/test/tcp/ges/appending_to_implicitly_created_stream_using_transaction.js index 9e167b1..1c36a0b 100644 --- a/test/tcp/ges/appending_to_implicitly_created_stream_using_transaction.js +++ b/test/tcp/ges/appending_to_implicitly_created_stream_using_transaction.js @@ -21,26 +21,386 @@ describe('appending_to_implicitly_created_stream_using_transaction', function() }) }) - it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0em1_idempotent') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0em1_idempotent' - it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0any_idempotent') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0any_idempotent' - it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e5_non_idempotent') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e5_non_idempotent' - it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e6_wev') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e6_wev' - it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e4_wev') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e4_wev' - it('sequence_0em1_0e0_non_idempotent') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0e0_non_idempotent' - it('sequence_0em1_0any_idempotent') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0any_idempotent' - it('sequence_0em1_0em1_idempotent') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0em1_idempotent' - it('sequence_0em1_1e0_2e1_1any_1any_idempotent') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_1any_1any_idempotent' - it('sequence_S_0em1_1em1_E_S_0em1_1em1_2em1_E_idempotancy_fail') - //var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_S_0em1_1em1_E_S_0em1_1em1_2em1_E_idempotancy_fail' + it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0em1_idempotent', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0em1_idempotent' + , allEvents = createTestEvent(range(0, 6)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(5) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[0], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + + eventStreamCounter(connection, stream, function(err, count) { + if(err) return done(err) + + count.should.equal(allEvents.length) + done() + }) + }) + }) + }) + }) + }) + }) + }) + + it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0any_idempotent', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0any_idempotent' + , allEvents = createTestEvent(range(0, 6)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(5) + + connection.startTransaction(stream, { expectedVersion: client.expectedVersion.any }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[0], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + + eventStreamCounter(connection, stream, function(err, count) { + if(err) return done(err) + + count.should.equal(allEvents.length) + done() + }) + }) + }) + }) + }) + }) + }) + }) + + it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e5_non_idempotent', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e5_non_idempotent' + , allEvents = createTestEvent(range(0, 6)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(5) + + connection.startTransaction(stream, { expectedVersion: 5 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[0], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(6) + + eventStreamCounter(connection, stream, function(err, count) { + if(err) return done(err) + + count.should.equal(allEvents.length + 1) + done() + }) + }) + }) + }) + }) + }) + }) + }) + + it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e6_wev', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e6_wev' + , allEvents = createTestEvent(range(0, 6)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(5) + + connection.startTransaction(stream, { expectedVersion: 6 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[0], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + should.not.be.null(err) + err.message.should.equal('Wrong expected version.') + done() + }) + }) + }) + }) + }) + }) + }) + + it('sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e4_wev', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_3e2_4e3_5e4_0e4_wev' + , allEvents = createTestEvent(range(0, 6)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(5) + + connection.startTransaction(stream, { expectedVersion: 4 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[0], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + should.not.be.null(err) + err.message.should.equal('Wrong expected version.') + done() + }) + }) + }) + }) + }) + }) + }) + + it('sequence_0em1_0e0_non_idempotent', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0e0_non_idempotent' + , allEvents = createTestEvent(range(0, 1)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + + connection.startTransaction(stream, { expectedVersion: 0 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[0], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(1) + eventStreamCounter(connection, stream, function(err, count) { + if(err) return done(err) + + count.should.equal(allEvents.length + 1) + done() + }) + }) + }) + }) + }) + }) + }) + }) + + it('sequence_0em1_0any_idempotent', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0any_idempotent' + , allEvents = createTestEvent(range(0, 1)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + + connection.startTransaction(stream, { expectedVersion: client.expectedVersion.any }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[0], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + eventStreamCounter(connection, stream, function(err, count) { + if(err) return done(err) + + count.should.equal(1) + done() + }) + }) + }) + }) + }) + }) + }) + }) + + it('sequence_0em1_0em1_idempotent', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_0em1_idempotent' + , allEvents = createTestEvent(range(0, 1)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[0], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + eventStreamCounter(connection, stream, function(err, count) { + if(err) return done(err) + + count.should.equal(1) + done() + }) + }) + }) + }) + }) + }) + }) + }) + + it('sequence_0em1_1e0_2e1_1any_1any_idempotent', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_0em1_1e0_2e1_1any_1any_idempotent' + , allEvents = createTestEvent(range(0, 3)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(2) + + connection.startTransaction(stream, { expectedVersion: client.expectedVersion.any }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents[1], function(err, writeResult) { + if(err) return done(err) + + transaction.write(allEvents[1], function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(1) + eventStreamCounter(connection, stream, function(err, count) { + if(err) return done(err) + + count.should.equal(allEvents.length) + done() + }) + }) + }) + }) + }) + }) + }) + }) + }) + + it('sequence_S_0em1_1em1_E_S_0em1_1em1_2em1_E_idempotancy_fail', function(done) { + var stream = 'appending_to_implicitly_created_stream_using_transaction_sequence_S_0em1_1em1_E_S_0em1_1em1_2em1_E_idempotancy_fail' + , allEvents = createTestEvent(range(0, 2)) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents, function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(1) + + connection.startTransaction(stream, { expectedVersion: -1 }, function(err, transaction) { + if(err) return done(err) + + transaction.write(allEvents.concat(createTestEvent()), function(err, writeResult) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + should.not.be.null(err) + err.message.should.equal('Wrong expected version.') + + done() + }) + }) + }) + }) + }) + }) + }) + after(function(done) { diff --git a/test/tcp/ges/isjson_flag_on_event.js b/test/tcp/ges/isjson_flag_on_event.js index aebdd21..43ac85a 100644 --- a/test/tcp/ges/isjson_flag_on_event.js +++ b/test/tcp/ges/isjson_flag_on_event.js @@ -21,8 +21,55 @@ describe('isjson_flag_on_event', function() { }) }) - it('should_be_preserved_with_all_possible_write_and_read_methods') - //var stream = 'should_be_preserved_with_all_possible_write_methods' + it('should_be_preserved_with_all_possible_write_and_read_methods', function(done) { + var stream = 'should_be_preserved_with_all_possible_write_methods' + , appendOptions = { + expectedVersion: client.expectedVersion.any + , events: [ + client.createEventData(uuid.v4(), 'some-type', true, { some: 'json' }, null) + , client.createEventData(uuid.v4(), 'some-type', true, null, { some: 'json' }) + , client.createEventData(uuid.v4(), 'some-type', true, { some: 'json' }, { some: 'json' }) + ] + } + + connection.appendToStream(stream, appendOptions, function(err, appendResult) { + if(err) return done(err) + var transactionOptions = { + expectedVersion: client.expectedVersion.any + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + var events = [ + client.createEventData(uuid.v4(), 'some-type', true, { some: 'json' }, null) + , client.createEventData(uuid.v4(), 'some-type', true, null, { some: 'json' }) + , client.createEventData(uuid.v4(), 'some-type', true, { some: 'json' }, { some: 'json' }) + ] + transaction.write(events, function(err) { + if(err) return done(err) + + transaction.commit(function(err) { + if(err) return done(err) + var readOptions = { + start: 0 + , count: 100 + } + + connection.readStreamEventsForward(stream, readOptions, function(err, readResult) { + if(err) return done(err) + + readResult.Status.should.equal('Success') + readResult.Events.length.should.equal(6) + readResult.Events.forEach(function(evt) { + evt.OriginalEvent.IsJson.should.be.true + }) + done() + }) + }) + }) + }) + }) + }) after(function(done) { diff --git a/test/tcp/ges/transaction.js b/test/tcp/ges/transaction.js index 74bbd11..c32a39e 100644 --- a/test/tcp/ges/transaction.js +++ b/test/tcp/ges/transaction.js @@ -1,10 +1,12 @@ var client = require('../../../') , ges = require('ges-test-helper') , uuid = require('node-uuid') + , async = require('async') , createTestEvent = require('../../createTestEvent') , range = require('../../range') , streamWriter = require('../../streamWriter') , eventStreamCounter = require('../../eventStreamCounter') + , should = require('../../shouldExtensions') describe('transaction', function() { var es @@ -19,28 +21,289 @@ describe('transaction', function() { }) }) - it('should_start_on_non_existing_stream_with_correct_exp_ver_and_create_stream_on_commit') - //var stream = 'should_start_on_non_existing_stream_with_correct_exp_ver_and_create_stream_on_commit' - it('should_start_on_non_existing_stream_with_exp_ver_any_and_create_stream_on_commit') - //var stream = 'should_start_on_non_existing_stream_with_exp_ver_any_and_create_stream_on_commit' - it('should_fail_to_commit_non_existing_stream_with_wrong_exp_ver') - //var stream = 'should_fail_to_commit_non_existing_stream_with_wrong_exp_ver' - it('should_do_nothing_if_commits_no_events_to_empty_stream') - //var stream = 'should_do_nothing_if_commits_no_events_to_empty_stream' - it('should_do_nothing_if_transactionally_writing_no_events_to_empty_stream') - //var stream = 'should_do_nothing_if_transactionally_writing_no_events_to_empty_stream' - it('should_validate_expectations_on_commit') - //var stream = 'should_validate_expectations_on_commit' + it('should_start_on_non_existing_stream_with_correct_exp_ver_and_create_stream_on_commit', function(done) { + var stream = 'should_start_on_non_existing_stream_with_correct_exp_ver_and_create_stream_on_commit' + , transactionOptions = { + expectedVersion: client.expectedVersion.noStream + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + + transaction.write(createTestEvent(), function(err) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + done() + }) + }) + }) + }) + + it('should_start_on_non_existing_stream_with_exp_ver_any_and_create_stream_on_commit', function(done) { + var stream = 'should_start_on_non_existing_stream_with_exp_ver_any_and_create_stream_on_commit' + , transactionOptions = { + expectedVersion: client.expectedVersion.any + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + + transaction.write(createTestEvent(), function(err) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(0) + done() + }) + }) + }) + }) + + it('should_fail_to_commit_non_existing_stream_with_wrong_exp_ver', function(done) { + var stream = 'should_fail_to_commit_non_existing_stream_with_wrong_exp_ver' + , transactionOptions = { + expectedVersion: 1 + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + + transaction.write(createTestEvent(), function(err) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + should.not.be.null(err) + done() + }) + }) + }) + }) + + it('should_do_nothing_if_commits_no_events_to_empty_stream', function(done) { + var stream = 'should_do_nothing_if_commits_no_events_to_empty_stream' + , transactionOptions = { + expectedVersion: client.expectedVersion.noStream + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(-1) + + var readOptions = { + start: 0 + , count: 1 + } + connection.readStreamEventsForward(stream, readOptions, function(err, readResult) { + if(err) return done(err) + + readResult.Events.length.should.equal(0) + done() + }) + }) + }) + }) + + it('should_do_nothing_if_transactionally_writing_no_events_to_empty_stream', function(done) { + var stream = 'should_do_nothing_if_transactionally_writing_no_events_to_empty_stream' + , transactionOptions = { + expectedVersion: client.expectedVersion.noStream + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + + transaction.write(function(err) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(-1) + + var readOptions = { + start: 0 + , count: 1 + } + connection.readStreamEventsForward(stream, readOptions, function(err, readResult) { + if(err) return done(err) + + readResult.Events.length.should.equal(0) + done() + }) + }) + }) + }) + }) + + it('should_validate_expectations_on_commit', function(done) { + var stream = 'should_validate_expectations_on_commit' + , transactionOptions = { + expectedVersion: 100500 + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + + transaction.write(createTestEvent(), function(err) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + should.not.be.null(err) + err.message.should.equal('Wrong expected version.') + done() + }) + }) + }) + }) + it('should_commit_when_writing_with_exp_ver_any_even_while_somene_is_writing_in_parallel') //var stream = 'should_commit_when_writing_with_exp_ver_any_even_while_somene_is_writing_in_parallel' - it('should_fail_to_commit_if_started_with_correct_ver_but_committing_with_bad') - //var stream = 'should_fail_to_commit_if_started_with_correct_ver_but_committing_with_bad' - it('should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_with_correct_ver') - //var stream = 'should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_with_correct_ver' - it('should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stream_was_deleted') - //var stream = 'should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stream_was_deleted' - it('idempotency_is_correct_for_explicit_transactions_with_expected_version_any') - //var streamId = 'idempotency_is_correct_for_explicit_transactions_with_expected_version_any' + + it('should_fail_to_commit_if_started_with_correct_ver_but_committing_with_bad', function(done) { + var stream = 'should_fail_to_commit_if_started_with_correct_ver_but_committing_with_bad' + , transactionOptions = { + expectedVersion: client.expectedVersion.emptyStream + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + var appendOptions = { + expectedVersion: client.expectedVersion.emptyStream + , events: createTestEvent() + } + + connection.appendToStream(stream, appendOptions, function(err, appendResult) { + if(err) return done(err) + + transaction.write(createTestEvent(), function(err) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + should.not.be.null(err) + err.message.should.equal('Wrong expected version.') + done() + }) + }) + }) + }) + }) + + it('should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_with_correct_ver', function(done) { + var stream = 'should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_with_correct_ver' + , transactionOptions = { + expectedVersion: 0 + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + var appendOptions = { + expectedVersion: client.expectedVersion.emptyStream + , events: createTestEvent() + } + + connection.appendToStream(stream, appendOptions, function(err, appendResult) { + if(err) return done(err) + + transaction.write(createTestEvent(), function(err) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(1) + done() + }) + }) + }) + }) + }) + + it('should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stream_was_deleted', function(done) { + var stream = 'should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stream_was_deleted' + , transactionOptions = { + expectedVersion: client.expectedVersion.emptyStream + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + if(err) return done(err) + + transaction.write(createTestEvent(), function(err) { + if(err) return done(err) + var deleteOptions = { + expectedVersion: client.expectedVersion.emptyStream + , hardDelete: true + } + + connection.deleteStream(stream, deleteOptions, function(err) { + if(err) return done(err) + + transaction.commit(function(err, commitResult) { + should.not.be.null(err) + err.message.should.equal('Stream is deleted.') + done() + }) + }) + }) + }) + }) + + it('idempotency_is_correct_for_explicit_transactions_with_expected_version_any', function(done) { + var stream = 'idempotency_is_correct_for_explicit_transactions_with_expected_version_any' + , transactionOptions = { + expectedVersion: client.expectedVersion.any + } + , evt = client.createEventData(uuid.v4(), 'SomethingHappened', true, { Value: 42}, null) + + connection.startTransaction(stream, transactionOptions, function(err, transaction1) { + if(err) return done(err) + + transaction1.write(evt, function(err) { + if(err) return done(err) + + transaction1.commit(function(err, commitResult1) { + if(err) return done(err) + + commitResult1.NextExpectedVersion.should.equal(0) + + connection.startTransaction(stream, transactionOptions, function(err, transaction2) { + if(err) return done(err) + + transaction2.write(evt, function(err) { + if(err) return done(err) + + transaction2.commit(function(err, commitResult2) { + if(err) return done(err) + + commitResult2.NextExpectedVersion.should.equal(0) + var readOptions = { + start: 0 + , count: 100 + } + + connection.readStreamEventsForward(stream, readOptions, function(err, readResult) { + if(err) return done(err) + + readResult.Events.length.should.equal(1) + readResult.Events[0].Event.EventId.should.equal(evt.EventId) + done() + }) + }) + }) + }) + }) + }) + }) + }) after(function(done) { connection.close(function() { diff --git a/test/tcp/ges/when_committing_empty_transaction.js b/test/tcp/ges/when_committing_empty_transaction.js index fa81b9c..64bf02d 100644 --- a/test/tcp/ges/when_committing_empty_transaction.js +++ b/test/tcp/ges/when_committing_empty_transaction.js @@ -6,29 +6,146 @@ var client = require('../../../') , streamWriter = require('../../streamWriter') , eventStreamCounter = require('../../eventStreamCounter') , should = require('../../shouldExtensions') + , port = 8200 describe('when_committing_empty_transaction', function() { var es , connection + , firstEvent = createTestEvent() + , stream = 'test-stream' - before(function(done) { - ges({ tcpPort: 5024 }, function(err, memory) { + beforeEach(function(done) { + ges({ tcpPort: port }, function(err, memory) { if(err) return done(err) es = memory - connection = client({ port: 5024 }, done) + client({ port: port }, function(err, con) { + if(err) return done(err) + connection = con + var appendOptions = { + expectedVersion: client.expectedVersion.noStream + , events: [ firstEvent, createTestEvent(), createTestEvent() ] + } + + connection.appendToStream(stream, appendOptions, function(err, appendResult) { + if(err) return done(err) + + appendResult.NextExpectedVersion.should.equal(2) + var transactionOptions = { + expectedVersion: 2 + } + + connection.startTransaction(stream, transactionOptions, function(err, transaction) { + transaction.commit(function(err, commitResult) { + if(err) return done(err) + + commitResult.NextExpectedVersion.should.equal(2) + done() + }) + }) + }) + }) }) }) - it('following_append_with_correct_expected_version_are_commited_correctly') - it('following_append_with_expected_version_any_are_commited_correctly') - it('committing_first_event_with_expected_version_no_stream_is_idempotent') - it('trying_to_append_new_events_with_expected_version_no_stream_fails') + it('following_append_with_correct_expected_version_are_commited_correctly', function(done) { + var appendOptions = { + expectedVersion: 2 + , events: [ createTestEvent(), createTestEvent() ] + } + connection.appendToStream(stream, appendOptions, function(err, appendResult) { + if(err) return done(err) + + appendResult.NextExpectedVersion.should.equal(4) + var readOptions = { + start: 0 + , count: 100 + } + + connection.readStreamEventsForward(stream, readOptions, function(err, readResult) { + if(err) return done(err) + + readResult.Status.should.equal('Success') + readResult.Events.length.should.equal(5) + for(var i = 0; i < 5; i += 1) { + readResult.Events[i].OriginalEventNumber.should.equal(i) + } + done() + }) + }) + }) + + it('following_append_with_expected_version_any_are_commited_correctly', function(done) { + var appendOptions = { + expectedVersion: client.expectedVersion.any + , events: [ createTestEvent(), createTestEvent() ] + } + connection.appendToStream(stream, appendOptions, function(err, appendResult) { + if(err) return done(err) + + appendResult.NextExpectedVersion.should.equal(4) + var readOptions = { + start: 0 + , count: 100 + } + + connection.readStreamEventsForward(stream, readOptions, function(err, readResult) { + if(err) return done(err) + + readResult.Status.should.equal('Success') + readResult.Events.length.should.equal(5) + for(var i = 0; i < 5; i += 1) { + readResult.Events[i].OriginalEventNumber.should.equal(i) + } + done() + }) + }) + }) + + it('committing_first_event_with_expected_version_no_stream_is_idempotent', function(done) { + var appendOptions = { + expectedVersion: client.expectedVersion.noStream + , events: firstEvent + } + connection.appendToStream(stream, appendOptions, function(err, appendResult) { + if(err) return done(err) + + appendResult.NextExpectedVersion.should.equal(0) + var readOptions = { + start: 0 + , count: 100 + } + + connection.readStreamEventsForward(stream, readOptions, function(err, readResult) { + if(err) return done(err) + + readResult.Status.should.equal('Success') + readResult.Events.length.should.equal(3) + for(var i = 0; i < 3; i += 1) { + readResult.Events[i].OriginalEventNumber.should.equal(i) + } + done() + }) + }) + }) + + it('trying_to_append_new_events_with_expected_version_no_stream_fails', function(done) { + var appendOptions = { + expectedVersion: client.expectedVersion.noStream + , events: createTestEvent() + } + connection.appendToStream(stream, appendOptions, function(err, appendResult) { + should.not.be.null(err) + err.message.should.equal('Wrong expected version.') + done() + }) + }) - after(function(done) { + afterEach(function(done) { connection.close(function() { es.on('exit', function(code, signal) { + port += 1 done() }) es.on('error', done)