diff --git a/packages/caliper-core/lib/common/core/connector-base.js b/packages/caliper-core/lib/common/core/connector-base.js index a83abc86d..f104ce65d 100644 --- a/packages/caliper-core/lib/common/core/connector-base.js +++ b/packages/caliper-core/lib/common/core/connector-base.js @@ -13,9 +13,11 @@ */ 'use strict'; - +const CaliperUtils = require('../../common/utils/caliper-utils.js'); +const TxStatus = require('../core/transaction-status'); const ConnectorInterface = require('./connector-interface'); const Events = require('./../utils/constants').Events.Connector; +const Logger = CaliperUtils.getLogger('connector-base'); /** * Optional base class for connectors. @@ -75,20 +77,64 @@ class ConnectorBase extends ConnectorInterface { async sendRequests(requests) { if (!Array.isArray(requests)) { this._onTxsSubmitted(1); - const result = await this._sendSingleRequest(requests); - this._onTxsFinished(result); + let result = new TxStatus(); + + try { + result = await this._sendSingleRequest(requests); + } catch(error) { + Logger.error(`Unexpected error while sending request: ${error.stack || error}`); + result.SetStatusFail(); + result.SetVerification(true); + result.SetResult(''); + + // re-throwing an error allows for the worker to exit doing further work + // and move into waiting for tx's to finish. If any further errors occur + // then they will be ignored but the tx's will be marked as finished still + throw error; + } finally { + this._onTxsFinished(result); + } + return result; } const promises = []; + const creationTime = Date.now(); for (let i = 0; i < requests.length; ++i) { this._onTxsSubmitted(1); promises.push(this._sendSingleRequest(requests[i])); } - const results = await Promise.all(promises); - this._onTxsFinished(results); - return results; + const results = await Promise.allSettled(promises); + let firstError; + const actualResults = results.map((result) => { + if (result.status === 'rejected') { + if (!firstError) { + firstError = result.reason; + } + const failureResult = new TxStatus(); + failureResult.SetStatusFail(); + failureResult.SetVerification(true); + failureResult.SetResult(''); + failureResult.Set('time_create', creationTime); + return failureResult; + } + + return result.value; + }); + + this._onTxsFinished(actualResults); + + if (firstError) { + Logger.error(`Unexpected error while sending multiple requests, first error was: ${firstError.stack || firstError}`); + + // re-throwing an error allows for the worker to exit doing further work + // and move into waiting for tx's to finish. If any further errors occur + // then they will be ignored but the tx's will be marked as finished still + throw firstError; + } + + return actualResults; } /** diff --git a/packages/caliper-core/lib/worker/caliper-worker.js b/packages/caliper-core/lib/worker/caliper-worker.js index c03944e96..c3392393e 100644 --- a/packages/caliper-core/lib/worker/caliper-worker.js +++ b/packages/caliper-core/lib/worker/caliper-worker.js @@ -131,7 +131,7 @@ class CaliperWorker { } if (error) { - Logger.error(`Unhandled error while executing TX: ${error.stack || error}`); + // Already logged, no need to log again throw error; } diff --git a/packages/caliper-core/test/core/connector-base.js b/packages/caliper-core/test/core/connector-base.js new file mode 100644 index 000000000..31bd44d91 --- /dev/null +++ b/packages/caliper-core/test/core/connector-base.js @@ -0,0 +1,108 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const chai = require('chai'); +chai.should(); +const chaiAsPromised = require('chai-as-promised'); +chai.use(chaiAsPromised); +const sinon = require('sinon'); + +const loggerSandbox = sinon.createSandbox(); +const CaliperUtils = require('../../lib/common/utils/caliper-utils'); +loggerSandbox.replace(CaliperUtils, "getLogger", () => { + return { + debug: sinon.stub(), + error: sinon.stub(), + warn: sinon.stub(), + info: sinon.stub() + }; +}); + +const { ConnectorBase, TxStatus } = require('../..'); +const Events = require('../../lib/common/utils/constants').Events.Connector; + +class MockConnector extends ConnectorBase { + async _sendSingleRequest(request) { + if (request instanceof Error) { + throw request; + } + return request; + } +} + +describe('the base connector implementation', () => { + after(() => { + loggerSandbox.restore(); + }); + + describe('on sending requests', () => { + let mockConnector; + let emitSpy; + const txStatus = new TxStatus(); + const txStatus2 = new TxStatus(); + const txStatus3 = new TxStatus(); + beforeEach(() => { + mockConnector = new MockConnector(1, 'mock'); + emitSpy = sinon.spy(mockConnector, 'emit'); + }) + + it('should process a single request that returns a transaction status result', async() => { + const result = await mockConnector.sendRequests(txStatus); + + result.should.equal(txStatus); + sinon.assert.calledTwice(emitSpy); + sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.secondCall, Events.TxsFinished, txStatus); + }); + + it('should process a single request that throws an error', async() => { + await mockConnector.sendRequests(new Error('some failure')).should.be.rejectedWith(/some failure/); + + sinon.assert.calledTwice(emitSpy); + sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.secondCall, Events.TxsFinished, sinon.match.instanceOf(TxStatus)); + }); + + it('should process multiple requests that where they all return a transaction status result', async() => { + const result = await mockConnector.sendRequests([txStatus, txStatus2, txStatus3]); + result.should.deep.equal([txStatus, txStatus2, txStatus3]); + sinon.assert.callCount(emitSpy, 4); + sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.secondCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.thirdCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.getCall(3), Events.TxsFinished, [txStatus, txStatus2, txStatus3]); + }); + + it('should process multiple requests where some return an error', async() => { + await mockConnector.sendRequests([new Error('error 1'), txStatus2, new Error('error 4'), txStatus3]).should.be.rejectedWith(/error 1/); + sinon.assert.callCount(emitSpy, 5); + sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.secondCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.thirdCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.getCall(3), Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.getCall(4), Events.TxsFinished, [sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus)]); + }); + + it('should process multiple requests where all return an error', async() => { + await mockConnector.sendRequests([new Error('error 1'), new Error('error 2'), new Error('error 3')]).should.be.rejectedWith(/error 1/); + sinon.assert.callCount(emitSpy, 4); + sinon.assert.calledWith(emitSpy.firstCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.secondCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.thirdCall, Events.TxsSubmitted, 1); + sinon.assert.calledWith(emitSpy.getCall(3), Events.TxsFinished, [sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus), sinon.match.instanceOf(TxStatus)]); + }); + }); +});