Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BE-876 Stop unnecessary discovery request #255

Merged
merged 1 commit into from Jul 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion app/platform/fabric/sync/FabricEvent.ts
Expand Up @@ -66,7 +66,12 @@ export class FabricEvent {
async event => {
// Skip first block, it is process by peer event hub
if (!(event.blockNumber.low === 0 && event.blockNumber.high === 0)) {
await this.fabricServices.processBlockEvent(this.client, event.blockData);
const noDiscovery = false;
await this.fabricServices.processBlockEvent(
this.client,
event.blockData,
noDiscovery
);
}
},
{
Expand Down
13 changes: 7 additions & 6 deletions app/platform/fabric/sync/SyncPlatform.ts
Expand Up @@ -117,10 +117,11 @@ export class SyncPlatform {
* Setting interval for validating any missing block from the current client ledger
* Set blocksSyncTime property in platform config.json in minutes
*/
(function validateMissingBlocks(sync) {
sync.isChannelEventHubConnected();
setTimeout(validateMissingBlocks, sync.blocksSyncTime, sync);
})(this);
// During initial sync-up phase, disable discovery request
(function validateMissingBlocks(sync: SyncPlatform, noDiscovery: boolean) {
sync.isChannelEventHubConnected(noDiscovery);
setTimeout(validateMissingBlocks, sync.blocksSyncTime, sync, false);
})(this, true);

logger.debug(
'******* Initialization end for child client process %s ******',
Expand All @@ -133,12 +134,12 @@ export class SyncPlatform {
*
* @memberof SyncPlatform
*/
async isChannelEventHubConnected() {
async isChannelEventHubConnected(noDiscovery: boolean) {
for (const channel_name of this.client.getChannels()) {
// Validate channel event is connected
const status = this.eventHub.isChannelEventHubConnected(channel_name);
if (status) {
await this.syncService.syncBlocks(this.client, channel_name);
await this.syncService.syncBlocks(this.client, channel_name, noDiscovery);
} else {
// Channel client is not connected then it will reconnect
this.eventHub.connectChannelEventHub(channel_name);
Expand Down
30 changes: 16 additions & 14 deletions app/platform/fabric/sync/SyncService.ts
Expand Up @@ -357,18 +357,16 @@ export class SyncServices {
.saveChaincodPeerRef(network_id, chaincode_peer_row);
}

async syncBlocks(client, channel_name) {
async syncBlocks(client, channel_name, noDiscovery) {
const network_id = client.getNetworkId();

// Get channel information from ledger
const channelInfo = await client.fabricGateway.queryChainInfo(channel_name);
// Get channel information from ledger
const channelInfo = await client.fabricGateway.queryChainInfo(channel_name);

if (!channelInfo) {
logger.info(
`syncBlocks: Failed to retrieve channelInfo >> ${channel_name}`,
);
return;
}
if (!channelInfo) {
logger.info(`syncBlocks: Failed to retrieve channelInfo >> ${channel_name}`);
return;
}
const synch_key = `${network_id}_${channel_name}`;
logger.info(`syncBlocks: Start >> ${synch_key}`);
if (this.synchInProcess.includes(synch_key)) {
Expand All @@ -393,7 +391,7 @@ export class SyncServices {
result.missing_id
);
if (block) {
await this.processBlockEvent(client, block);
await this.processBlockEvent(client, block, noDiscovery);
}
} catch {
logger.error(`Failed to process Block # ${result.missing_id}`);
Expand All @@ -410,7 +408,6 @@ export class SyncServices {
async updateDiscoveredChannel(client, channel_name, channel_genesis_hash) {
const network_id = client.getNetworkId();
// get discovery and insert new peer, orders details to db
await client.initializeChannelFromDiscover(channel_name);
await this.insertFromDiscoveryResults(
client,
channel_name,
Expand Down Expand Up @@ -459,7 +456,7 @@ export class SyncServices {
* @returns
* @memberof SyncServices
*/
async processBlockEvent(client, block) {
async processBlockEvent(client, block, noDiscovery) {
const network_id = client.getNetworkId();
// Get the first transaction
const first_tx = block.data.data[0];
Expand Down Expand Up @@ -492,7 +489,10 @@ export class SyncServices {
}
this.blocksInProcess.push(blockPro_key);

if (header.channel_header.typeString === fabric_const.BLOCK_TYPE_CONFIG) {
if (
!noDiscovery &&
header.channel_header.typeString === fabric_const.BLOCK_TYPE_CONFIG
) {
setTimeout(
async (cli, chName, chGenHash) => {
await this.updateDiscoveredChannel(cli, chName, chGenHash);
Expand Down Expand Up @@ -629,9 +629,11 @@ export class SyncServices {
const chaincode_id = String.fromCharCode.apply(null, chaincodeID);
// checking new chaincode is deployed
if (
!noDiscovery &&
header.channel_header.typeString ===
fabric_const.BLOCK_TYPE_ENDORSER_TRANSACTION &&
chaincode === fabric_const.CHAINCODE_LSCC
(chaincode === fabric_const.CHAINCODE_LSCC ||
chaincode === fabric_const.CHAINCODE_LIFECYCLE)
) {
setTimeout(
async (cli, chName, chGenHash) => {
Expand Down
1 change: 1 addition & 0 deletions app/platform/fabric/utils/FabricConst.ts
Expand Up @@ -9,6 +9,7 @@ export const fabric = {
BLOCK_TYPE_CONFIG: 'CONFIG',
BLOCK_TYPE_ENDORSER_TRANSACTION: 'ENDORSER_TRANSACTION',
CHAINCODE_LSCC: 'lscc',
CHAINCODE_LIFECYCLE: '_lifecycle',
NOTITY_TYPE_NEWCHANNEL: '1',
NOTITY_TYPE_UPDATECHANNEL: '2',
NOTITY_TYPE_CHAINCODE: '3',
Expand Down
24 changes: 16 additions & 8 deletions app/test/SyncService.test.ts
Expand Up @@ -10,6 +10,7 @@ import sinon from 'sinon';
import { SyncServices } from '../platform/fabric/sync/SyncService';
import * as stubBlock from './block.json';
import * as stubConfigBlock from './block_config.json';
import * as stubLifecycleBlock from './block_lifecycle.json';

import { ExplorerError } from '../common/ExplorerError';
import * as FabricConst from '../platform/fabric/utils/FabricConst';
Expand Down Expand Up @@ -128,8 +129,8 @@ describe('processBlockEvent', () => {
it('should return without error', async () => {
const stubClient = setupClient();

await expect(sync.processBlockEvent(stubClient, stubBlock)).eventually.to.be
.true;
await expect(sync.processBlockEvent(stubClient, stubBlock, false)).eventually
.to.be.true;
sinon.assert.calledOnce(stubPlatform.send);
sinon.assert.calledWith(
stubPlatform.send,
Expand All @@ -142,7 +143,7 @@ describe('processBlockEvent', () => {
const stubClient = setupClient();
sync.blocksInProcess = ['mychannel_9'];

await expect(sync.processBlockEvent(stubClient, stubBlock))
await expect(sync.processBlockEvent(stubClient, stubBlock, false))
.to.eventually.be.rejectedWith('Block already in processing')
.and.be.an.instanceOf(ExplorerError);
sinon.assert.notCalled(stubPlatform.send);
Expand All @@ -158,7 +159,7 @@ describe('processBlockEvent', () => {
const spyInsertDiscoveredCH = sinon.spy(sync, 'insertDiscoveredChannel');

const clock = sinon.useFakeTimers();
await expect(sync.processBlockEvent(stubClient, stubBlock))
await expect(sync.processBlockEvent(stubClient, stubBlock, false))
.to.eventually.be.rejectedWith('mychannel has not been inserted yet')
.and.be.an.instanceOf(ExplorerError);
clock.tick(20000);
Expand All @@ -174,7 +175,7 @@ describe('processBlockEvent', () => {
const spyUpdateDiscoveredCH = sinon.spy(sync, 'updateDiscoveredChannel');

const clock = sinon.useFakeTimers();
await expect(sync.processBlockEvent(stubClient, stubConfigBlock)).to
await expect(sync.processBlockEvent(stubClient, stubConfigBlock, false)).to
.eventually.to.be.true;
clock.tick(20000);

Expand All @@ -199,7 +200,14 @@ describe('processBlockEvent', () => {
const stubClient = setupClient();

stubConfigBlock.data.data[0].payload.data.last_update.payload = null;
await expect(sync.processBlockEvent(stubClient, stubConfigBlock)).to
await expect(sync.processBlockEvent(stubClient, stubConfigBlock, false)).to
.eventually.to.be.true;
});

it('should be done without any errors when _lifecycle block is processed', async () => {
const stubClient = setupClient();

await expect(sync.processBlockEvent(stubClient, stubLifecycleBlock, false)).to
.eventually.to.be.true;
});
});
Expand All @@ -219,7 +227,7 @@ describe('syncBlocks', () => {
const stubClient = setupClient();
const stubProcessBlockEvent = sinon.stub(sync, 'processBlockEvent');

await sync.syncBlocks(stubClient, VALID_CHANNEL_NAME);
await sync.syncBlocks(stubClient, VALID_CHANNEL_NAME, false);
expect(stubProcessBlockEvent.calledTwice).to.be.true;
stubProcessBlockEvent.restore();
});
Expand All @@ -230,7 +238,7 @@ describe('syncBlocks', () => {
stubProcessBlockEvent.onFirstCall().throws('Block already in processing');
stubError.reset();

await sync.syncBlocks(stubClient, VALID_CHANNEL_NAME);
await sync.syncBlocks(stubClient, VALID_CHANNEL_NAME, false);
expect(stubProcessBlockEvent.calledTwice).to.be.true;
expect(stubError.calledWith('Failed to process Block # 1')).to.be.true;
expect(stubError.calledWith('Failed to process Block # 2')).to.be.false;
Expand Down