Skip to content

Commit

Permalink
address worker cleanup when an error occurs (#1315)
Browse files Browse the repository at this point in the history
This improves the cleanup of a worker if an error occurs which would
cause the worker to terminate the round.

closes #1312

Signed-off-by: D <d_kelsey@uk.ibm.com>

Co-authored-by: D <d_kelsey@uk.ibm.com>
  • Loading branch information
davidkel and davidkel committed Apr 15, 2022
1 parent 57d8d46 commit af6440f
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 18 deletions.
61 changes: 43 additions & 18 deletions packages/caliper-core/lib/worker/caliper-worker.js
Expand Up @@ -150,10 +150,11 @@ class CaliperWorker {
Logger.debug(`Worker #${this.workerIndex} creating workload module`);
const workloadModuleFactory = CaliperUtils.loadModuleFunction(new Map(), prepareTestMessage.getWorkloadSpec().module, 'createWorkloadModule');
this.workloadModule = workloadModuleFactory();
let context;

try {
// Retrieve context for this round
const context = await this.connector.getContext(roundIndex, prepareTestMessage.getWorkerArguments());
context = await this.connector.getContext(roundIndex, prepareTestMessage.getWorkerArguments());

// Run init phase of callback
Logger.info(`Info: worker ${this.workerIndex} prepare test phase for round ${roundIndex} is starting...`);
Expand All @@ -163,6 +164,7 @@ class CaliperWorker {
Logger.info(`Worker [${this.workerIndex}] encountered an error during prepare test phase for round ${roundIndex}: ${(err.stack ? err.stack : err)}`);
throw err;
} finally {
await this.connector.releaseContext(context);
Logger.info(`Info: worker ${this.workerIndex} prepare test phase for round ${roundIndex} is completed`);
}
}
Expand All @@ -179,21 +181,24 @@ class CaliperWorker {
const roundIndex = testMessage.getRoundIndex();
const roundLabel = testMessage.getRoundLabel();
Logger.debug(`Worker #${this.workerIndex} starting round #${roundIndex}`);
let context, rateController, observerActivated;

try {
Logger.debug(`Worker #${this.workerIndex} initializing adapter context`);
let context = await this.connector.getContext(roundIndex, workerArguments);
context = await this.connector.getContext(roundIndex, workerArguments);

// Activate dispatcher
Logger.debug(`Worker #${this.workerIndex} activating TX observer dispatch`);
await this.txObserverDispatch.activate(roundIndex, roundLabel);
observerActivated = true;

// Configure
Logger.debug(`Worker #${this.workerIndex} creating rate controller`);
let rateController = new RateControl(testMessage, this.internalTxObserver.getCurrentStatistics(), this.workerIndex);
rateController = new RateControl(testMessage, this.internalTxObserver.getCurrentStatistics(), this.workerIndex);

// Run the test loop
Logger.info(`Worker #${this.workerIndex} starting workload loop`);

if (testMessage.getRoundDuration()) {
const duration = testMessage.getRoundDuration(); // duration in seconds
await this.runDuration(this.workloadModule, duration, rateController);
Expand All @@ -202,25 +207,45 @@ class CaliperWorker {
await this.runFixedNumber(this.workloadModule, number, rateController);
}

// Deactivate dispatcher
Logger.debug(`Worker #${this.workerIndex} deactivating TX observer dispatch`);
await this.txObserverDispatch.deactivate();

// Clean up
Logger.debug(`Worker #${this.workerIndex} cleaning up rate controller`);
await rateController.end();

Logger.debug(`Worker #${this.workerIndex} cleaning up user test module`);
await this.workloadModule.cleanupWorkloadModule();

Logger.debug(`Worker #${this.workerIndex} cleaning up adapter context`);
await this.connector.releaseContext(context);

Logger.debug(`Worker #${this.workerIndex} finished round #${roundIndex}`, this.internalTxObserver.getCurrentStatistics().getCumulativeTxStatistics());
return this.internalTxObserver.getCurrentStatistics();
} catch (err) {
Logger.error(`Unexpected error in worker #${this.workerIndex}: ${(err.stack || err)}`);
Logger.error(`Unexpected error in worker #${this.workerIndex} caused worker to finish round #${roundIndex}: ${(err.stack || err)}`);
throw err;
} finally {
if (observerActivated) {
Logger.debug(`Worker #${this.workerIndex} deactivating TX observer dispatch`);
try {
await this.txObserverDispatch.deactivate();
} catch(err) {
Logger.warn(`Worker #${this.workerIndex} failed to deactivate TX observer dispatch: ${(err.stack || err)}`);
}
}

if (rateController) {
Logger.debug(`Worker #${this.workerIndex} cleaning up rate controller`);
try {
await rateController.end();
} catch(err) {
Logger.warn(`Worker #${this.workerIndex} failed to clean up rate controller: ${(err.stack || err)}`);
}
}

Logger.debug(`Worker #${this.workerIndex} cleaning up workload module`);
try {
await this.workloadModule.cleanupWorkloadModule();
} catch(err) {
Logger.warn(`Worker #${this.workerIndex} failed to clean up workload module: ${(err.stack || err)}`);
}

if (context) {
Logger.debug(`Worker #${this.workerIndex} cleaning up connector context`);
try {
await this.connector.releaseContext(context);
} catch(err) {
Logger.warn(`Worker #${this.workerIndex} failed to release connector context: ${(err.stack || err)}`);
}
}
}
}
}
Expand Down
165 changes: 165 additions & 0 deletions packages/caliper-core/test/worker/caliper-worker.js
@@ -0,0 +1,165 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const chai = require('chai');
chai.should();
const chaiAsPromised = require('chai-as-promised');
chai.use(chaiAsPromised);
const sinon = require('sinon');
const mockery = require('mockery');

const MessengerInterface = require('../../lib/common/messengers/messenger-interface');
const ConnectorInterface = require('../../lib/common/core/connector-interface');
const TestMessage = require('../../lib/common/messages/testMessage');
const RateInterface = require('../../lib/worker/rate-control/rateInterface');
const WorkloadInterface = require('../../lib/worker/workload/workloadModuleInterface');
const TransactionStatisticsCollector = require('../../lib/common/core/transaction-statistics-collector');

const mockRate = sinon.createStubInstance(RateInterface);
const mockWorkload = sinon.createStubInstance(WorkloadInterface);
const mockStats = sinon.createStubInstance(TransactionStatisticsCollector);
mockStats.getTotalSubmittedTx.onFirstCall().returns(0);
mockStats.getTotalSubmittedTx.onSecondCall().returns(1);
const deactivateMethod = sinon.stub();
let logwarningMethod = sinon.stub();

class MockCaliperUtils {
static resolvePath(path) {
return 'fake/path';
}

static loadModuleFunction(map, a,b,c,d) {
let mock = mockWorkload;
if (map.size > 0) {
mock = mockRate;
}
return () => {
return mock;
};
}

static getLogger() {
return {
debug: sinon.stub(),
error: sinon.stub(),
warn: logwarningMethod,
info: sinon.stub()
};
}

static sleep() {}
}

class MockInternalTxObserver {
getCurrentStatistics() {
return mockStats;
}
}

class MockTxObserverDispatch {
activate() {}
}
MockTxObserverDispatch.prototype.deactivate = deactivateMethod;

mockery.enable({
warnOnReplace: false,
warnOnUnregistered: false,
useCleanCache: true
});
mockery.registerMock('./tx-observers/internal-tx-observer', MockInternalTxObserver);
mockery.registerMock('./tx-observers/tx-observer-dispatch', MockTxObserverDispatch);

const loggerSandbox = sinon.createSandbox();
const CaliperUtils = require('../../lib/common/utils/caliper-utils');
loggerSandbox.replace(CaliperUtils, "getLogger", MockCaliperUtils.getLogger);

const CaliperWorker = require('../../lib/worker/caliper-worker');

describe('Caliper worker', () => {
after(() => {
loggerSandbox.restore();
});

describe('When executing a round', () => {
let mockConnector, mockMessenger, mockTestMessage;
const sandbox = sinon.createSandbox();

beforeEach(() => {
logwarningMethod.reset();
mockRate.end.reset();
mockWorkload.cleanupWorkloadModule.reset();
mockWorkload.submitTransaction.reset();
mockStats.getTotalSubmittedTx.resetHistory();
deactivateMethod.reset();

mockConnector = sinon.createStubInstance(ConnectorInterface);
mockConnector.getContext.resolves(1);
mockMessenger = sinon.createStubInstance(MessengerInterface);
mockTestMessage = sinon.createStubInstance(TestMessage);
mockTestMessage.getRateControlSpec.returns({type: '1zero-rate'});
mockTestMessage.getWorkloadSpec.returns({module: 'test/workload'});
mockTestMessage.getNumberOfTxs.returns(1);
sandbox.replace(CaliperUtils, 'resolvePath', MockCaliperUtils.resolvePath);
sandbox.replace(CaliperUtils, 'loadModuleFunction', MockCaliperUtils.loadModuleFunction);
sandbox.replace(CaliperUtils, 'sleep', MockCaliperUtils.sleep);
});

afterEach(() => {
sandbox.restore();
})

const validateCallsAndWarnings = (warnings) => {
sinon.assert.calledOnce(mockWorkload.submitTransaction);
sinon.assert.calledOnce(deactivateMethod);
sinon.assert.calledOnce(mockRate.end);
sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule);
sinon.assert.calledTwice(mockConnector.releaseContext);
sinon.assert.callCount(logwarningMethod, warnings);
};

it('should clean up all resources if a connector does not throw an error', async () => {
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);
mockWorkload.submitTransaction.resolves();

await worker.executeRound(mockTestMessage);
validateCallsAndWarnings(0);
});


it('should clean up all resources if a connector throws an error', async () => {
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);
mockWorkload.submitTransaction.rejects(new Error('failure'));

await worker.executeRound(mockTestMessage).should.be.rejectedWith(/failure/);
validateCallsAndWarnings(0);
});

it('should warn if any of the cleanup tasks fail', async () => {
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);
mockWorkload.submitTransaction.resolves();
deactivateMethod.rejects(new Error('deactivate error'));
mockRate.end.rejects(new Error('rate end error'));
mockWorkload.cleanupWorkloadModule.rejects(new Error('cleanup error'));
mockConnector.releaseContext.rejects(new Error('release error'));

await worker.executeRound(mockTestMessage);
validateCallsAndWarnings(4);
});
});
});

0 comments on commit af6440f

Please sign in to comment.