From af6440fe8b579b7305cb6a8b441f75783076f813 Mon Sep 17 00:00:00 2001 From: Dave Kelsey <25582377+davidkel@users.noreply.github.com> Date: Fri, 15 Apr 2022 14:19:39 +0100 Subject: [PATCH] address worker cleanup when an error occurs (#1315) This improves the cleanup of a worker if an error occurs which would cause the worker to terminate the round. closes #1312 Signed-off-by: D Co-authored-by: D --- .../caliper-core/lib/worker/caliper-worker.js | 61 +++++-- .../test/worker/caliper-worker.js | 165 ++++++++++++++++++ 2 files changed, 208 insertions(+), 18 deletions(-) create mode 100644 packages/caliper-core/test/worker/caliper-worker.js diff --git a/packages/caliper-core/lib/worker/caliper-worker.js b/packages/caliper-core/lib/worker/caliper-worker.js index bb158895e..c03944e96 100644 --- a/packages/caliper-core/lib/worker/caliper-worker.js +++ b/packages/caliper-core/lib/worker/caliper-worker.js @@ -150,10 +150,11 @@ class CaliperWorker { Logger.debug(`Worker #${this.workerIndex} creating workload module`); const workloadModuleFactory = CaliperUtils.loadModuleFunction(new Map(), prepareTestMessage.getWorkloadSpec().module, 'createWorkloadModule'); this.workloadModule = workloadModuleFactory(); + let context; try { // Retrieve context for this round - const context = await this.connector.getContext(roundIndex, prepareTestMessage.getWorkerArguments()); + context = await this.connector.getContext(roundIndex, prepareTestMessage.getWorkerArguments()); // Run init phase of callback Logger.info(`Info: worker ${this.workerIndex} prepare test phase for round ${roundIndex} is starting...`); @@ -163,6 +164,7 @@ class CaliperWorker { Logger.info(`Worker [${this.workerIndex}] encountered an error during prepare test phase for round ${roundIndex}: ${(err.stack ? err.stack : err)}`); throw err; } finally { + await this.connector.releaseContext(context); Logger.info(`Info: worker ${this.workerIndex} prepare test phase for round ${roundIndex} is completed`); } } @@ -179,21 +181,24 @@ class CaliperWorker { const roundIndex = testMessage.getRoundIndex(); const roundLabel = testMessage.getRoundLabel(); Logger.debug(`Worker #${this.workerIndex} starting round #${roundIndex}`); + let context, rateController, observerActivated; try { Logger.debug(`Worker #${this.workerIndex} initializing adapter context`); - let context = await this.connector.getContext(roundIndex, workerArguments); + context = await this.connector.getContext(roundIndex, workerArguments); // Activate dispatcher Logger.debug(`Worker #${this.workerIndex} activating TX observer dispatch`); await this.txObserverDispatch.activate(roundIndex, roundLabel); + observerActivated = true; // Configure Logger.debug(`Worker #${this.workerIndex} creating rate controller`); - let rateController = new RateControl(testMessage, this.internalTxObserver.getCurrentStatistics(), this.workerIndex); + rateController = new RateControl(testMessage, this.internalTxObserver.getCurrentStatistics(), this.workerIndex); // Run the test loop Logger.info(`Worker #${this.workerIndex} starting workload loop`); + if (testMessage.getRoundDuration()) { const duration = testMessage.getRoundDuration(); // duration in seconds await this.runDuration(this.workloadModule, duration, rateController); @@ -202,25 +207,45 @@ class CaliperWorker { await this.runFixedNumber(this.workloadModule, number, rateController); } - // Deactivate dispatcher - Logger.debug(`Worker #${this.workerIndex} deactivating TX observer dispatch`); - await this.txObserverDispatch.deactivate(); - - // Clean up - Logger.debug(`Worker #${this.workerIndex} cleaning up rate controller`); - await rateController.end(); - - Logger.debug(`Worker #${this.workerIndex} cleaning up user test module`); - await this.workloadModule.cleanupWorkloadModule(); - - Logger.debug(`Worker #${this.workerIndex} cleaning up adapter context`); - await this.connector.releaseContext(context); - Logger.debug(`Worker #${this.workerIndex} finished round #${roundIndex}`, this.internalTxObserver.getCurrentStatistics().getCumulativeTxStatistics()); return this.internalTxObserver.getCurrentStatistics(); } catch (err) { - Logger.error(`Unexpected error in worker #${this.workerIndex}: ${(err.stack || err)}`); + Logger.error(`Unexpected error in worker #${this.workerIndex} caused worker to finish round #${roundIndex}: ${(err.stack || err)}`); throw err; + } finally { + if (observerActivated) { + Logger.debug(`Worker #${this.workerIndex} deactivating TX observer dispatch`); + try { + await this.txObserverDispatch.deactivate(); + } catch(err) { + Logger.warn(`Worker #${this.workerIndex} failed to deactivate TX observer dispatch: ${(err.stack || err)}`); + } + } + + if (rateController) { + Logger.debug(`Worker #${this.workerIndex} cleaning up rate controller`); + try { + await rateController.end(); + } catch(err) { + Logger.warn(`Worker #${this.workerIndex} failed to clean up rate controller: ${(err.stack || err)}`); + } + } + + Logger.debug(`Worker #${this.workerIndex} cleaning up workload module`); + try { + await this.workloadModule.cleanupWorkloadModule(); + } catch(err) { + Logger.warn(`Worker #${this.workerIndex} failed to clean up workload module: ${(err.stack || err)}`); + } + + if (context) { + Logger.debug(`Worker #${this.workerIndex} cleaning up connector context`); + try { + await this.connector.releaseContext(context); + } catch(err) { + Logger.warn(`Worker #${this.workerIndex} failed to release connector context: ${(err.stack || err)}`); + } + } } } } diff --git a/packages/caliper-core/test/worker/caliper-worker.js b/packages/caliper-core/test/worker/caliper-worker.js new file mode 100644 index 000000000..ab989b847 --- /dev/null +++ b/packages/caliper-core/test/worker/caliper-worker.js @@ -0,0 +1,165 @@ +/* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +'use strict'; + +const chai = require('chai'); +chai.should(); +const chaiAsPromised = require('chai-as-promised'); +chai.use(chaiAsPromised); +const sinon = require('sinon'); +const mockery = require('mockery'); + +const MessengerInterface = require('../../lib/common/messengers/messenger-interface'); +const ConnectorInterface = require('../../lib/common/core/connector-interface'); +const TestMessage = require('../../lib/common/messages/testMessage'); +const RateInterface = require('../../lib/worker/rate-control/rateInterface'); +const WorkloadInterface = require('../../lib/worker/workload/workloadModuleInterface'); +const TransactionStatisticsCollector = require('../../lib/common/core/transaction-statistics-collector'); + +const mockRate = sinon.createStubInstance(RateInterface); +const mockWorkload = sinon.createStubInstance(WorkloadInterface); +const mockStats = sinon.createStubInstance(TransactionStatisticsCollector); +mockStats.getTotalSubmittedTx.onFirstCall().returns(0); +mockStats.getTotalSubmittedTx.onSecondCall().returns(1); +const deactivateMethod = sinon.stub(); +let logwarningMethod = sinon.stub(); + +class MockCaliperUtils { + static resolvePath(path) { + return 'fake/path'; + } + + static loadModuleFunction(map, a,b,c,d) { + let mock = mockWorkload; + if (map.size > 0) { + mock = mockRate; + } + return () => { + return mock; + }; + } + + static getLogger() { + return { + debug: sinon.stub(), + error: sinon.stub(), + warn: logwarningMethod, + info: sinon.stub() + }; + } + + static sleep() {} +} + +class MockInternalTxObserver { + getCurrentStatistics() { + return mockStats; + } +} + +class MockTxObserverDispatch { + activate() {} +} +MockTxObserverDispatch.prototype.deactivate = deactivateMethod; + +mockery.enable({ + warnOnReplace: false, + warnOnUnregistered: false, + useCleanCache: true +}); +mockery.registerMock('./tx-observers/internal-tx-observer', MockInternalTxObserver); +mockery.registerMock('./tx-observers/tx-observer-dispatch', MockTxObserverDispatch); + +const loggerSandbox = sinon.createSandbox(); +const CaliperUtils = require('../../lib/common/utils/caliper-utils'); +loggerSandbox.replace(CaliperUtils, "getLogger", MockCaliperUtils.getLogger); + +const CaliperWorker = require('../../lib/worker/caliper-worker'); + +describe('Caliper worker', () => { + after(() => { + loggerSandbox.restore(); + }); + + describe('When executing a round', () => { + let mockConnector, mockMessenger, mockTestMessage; + const sandbox = sinon.createSandbox(); + + beforeEach(() => { + logwarningMethod.reset(); + mockRate.end.reset(); + mockWorkload.cleanupWorkloadModule.reset(); + mockWorkload.submitTransaction.reset(); + mockStats.getTotalSubmittedTx.resetHistory(); + deactivateMethod.reset(); + + mockConnector = sinon.createStubInstance(ConnectorInterface); + mockConnector.getContext.resolves(1); + mockMessenger = sinon.createStubInstance(MessengerInterface); + mockTestMessage = sinon.createStubInstance(TestMessage); + mockTestMessage.getRateControlSpec.returns({type: '1zero-rate'}); + mockTestMessage.getWorkloadSpec.returns({module: 'test/workload'}); + mockTestMessage.getNumberOfTxs.returns(1); + sandbox.replace(CaliperUtils, 'resolvePath', MockCaliperUtils.resolvePath); + sandbox.replace(CaliperUtils, 'loadModuleFunction', MockCaliperUtils.loadModuleFunction); + sandbox.replace(CaliperUtils, 'sleep', MockCaliperUtils.sleep); + }); + + afterEach(() => { + sandbox.restore(); + }) + + const validateCallsAndWarnings = (warnings) => { + sinon.assert.calledOnce(mockWorkload.submitTransaction); + sinon.assert.calledOnce(deactivateMethod); + sinon.assert.calledOnce(mockRate.end); + sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule); + sinon.assert.calledTwice(mockConnector.releaseContext); + sinon.assert.callCount(logwarningMethod, warnings); + }; + + it('should clean up all resources if a connector does not throw an error', async () => { + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + await worker.prepareTest(mockTestMessage); + mockWorkload.submitTransaction.resolves(); + + await worker.executeRound(mockTestMessage); + validateCallsAndWarnings(0); + }); + + + it('should clean up all resources if a connector throws an error', async () => { + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + await worker.prepareTest(mockTestMessage); + mockWorkload.submitTransaction.rejects(new Error('failure')); + + await worker.executeRound(mockTestMessage).should.be.rejectedWith(/failure/); + validateCallsAndWarnings(0); + }); + + it('should warn if any of the cleanup tasks fail', async () => { + const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid'); + await worker.prepareTest(mockTestMessage); + mockWorkload.submitTransaction.resolves(); + deactivateMethod.rejects(new Error('deactivate error')); + mockRate.end.rejects(new Error('rate end error')); + mockWorkload.cleanupWorkloadModule.rejects(new Error('cleanup error')); + mockConnector.releaseContext.rejects(new Error('release error')); + + await worker.executeRound(mockTestMessage); + validateCallsAndWarnings(4); + }); + }); +});