New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement unit tests for Block Synchronization Mechanism - Closes #4375 #4476
Changes from all commits
56c98d3
c8dbbf9
1631483
8608827
c92c5aa
1b155f1
930fcc6
dbcf703
d2e946a
209ef8b
e8ddcda
b26dcd1
4598552
490ba93
b79eed1
cccbf02
92956f6
9501143
4a8d679
9ac6ceb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,12 +89,11 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
/** | ||
* Check if this sync mechanism is valid for the received block | ||
* | ||
* @param {Object} receivedBlock - The blocked received from the network | ||
* @return {Promise.<Boolean|undefined>} - If the mechanism applied to received block | ||
* @throws {Error} - In case want to abort the sync pipeline | ||
*/ | ||
// eslint-disable-next-line no-unused-vars | ||
async isValidFor(receivedBlock) { | ||
async isValidFor() { | ||
// 2. Step: Check whether current chain justifies triggering the block synchronization mechanism | ||
const finalizedBlock = await this.storage.entities.Block.getOne({ | ||
height_eql: this.bft.finalizedHeight, | ||
|
@@ -172,12 +171,98 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
} | ||
} | ||
|
||
/** | ||
* When there is a failure applying blocks received from the peer, | ||
* it's needede to check whether the tip of the temp block chain has | ||
* preference over the current tip. If so, the temporary chain is restored | ||
* on top of the current chain and the blocks temp table is cleaned up | ||
* @param {ExtendedBlock} lastCommonBlock | ||
* @return {Promise<void>} | ||
* @private | ||
*/ | ||
async _handleBlockProcessingError(lastCommonBlock, peerId) { | ||
// If the list of blocks has not been fully applied | ||
this.logger.debug('Failed to apply obtained blocks from peer'); | ||
const [tipBeforeApplying] = await this.storage.entities.TempBlock.get( | ||
{}, | ||
{ sort: 'height:desc', limit: 1, extended: true }, | ||
); | ||
|
||
if (!tipBeforeApplying) { | ||
this.logger.error('Blocks temp table should not be empty'); | ||
throw new RestartError('Blocks temp table should not be empty'); | ||
2snEM6 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
const tipBeforeApplyingInstance = await this.processorModule.deserialize( | ||
tipBeforeApplying.fullBlock, | ||
); | ||
// Check if the new tip has priority over the last tip we had before applying | ||
const forkStatus = await this.processorModule.forkStatus( | ||
this.blocks.lastBlock, // New tip of the chain | ||
tipBeforeApplyingInstance, // Previous tip of the chain | ||
); | ||
|
||
const newTipHasPreference = forkStatus === FORK_STATUS_DIFFERENT_CHAIN; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason not to use directly in if condition? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it is easier to read and to understand what the condition means, especially because the FORK_STATUS_DIFFERENT_CHAIN is not the best name to represent that behavior and I didn't want to change it in this PR |
||
|
||
if (!newTipHasPreference) { | ||
this.logger.debug( | ||
{ | ||
currentTip: this.blocks.lastBlock.id, | ||
previousTip: tipBeforeApplyingInstance.id, | ||
}, | ||
'Previous tip of the chain has preference over current tip. Restoring chain from temp table', | ||
); | ||
try { | ||
this.logger.debug( | ||
{ height: lastCommonBlock.height }, | ||
'Deleting blocks after height', | ||
); | ||
await deleteBlocksAfterHeight( | ||
this.processorModule, | ||
this.blocks, | ||
lastCommonBlock.height, | ||
); | ||
this.logger.debug('Restoring blocks from temporary table'); | ||
await restoreBlocks(this.blocks, this.processorModule); | ||
|
||
this.logger.debug('Cleaning blocks temp table'); | ||
await clearBlocksTempTable(this.storage); | ||
} catch (error) { | ||
this.logger.error( | ||
{ err: error }, | ||
'Failed to restore blocks from blocks temp table', | ||
); | ||
} | ||
throw new ApplyPenaltyAndRestartError( | ||
peerId, | ||
'New tip of the chain has no preference over the previous tip before synchronizing', | ||
); | ||
} | ||
|
||
this.logger.debug( | ||
{ | ||
currentTip: this.blocks.lastBlock.id, | ||
previousTip: tipBeforeApplyingInstance.id, | ||
}, | ||
'Current tip of the chain has preference over previous tip', | ||
); | ||
|
||
this.logger.debug('Cleaning blocks temporary table'); | ||
await clearBlocksTempTable(this.storage); | ||
|
||
this.logger.info('Restarting block synchronization'); | ||
|
||
throw new RestartError( | ||
'The list of blocks has not been fully applied. Trying again', | ||
); | ||
} | ||
|
||
/** | ||
* Requests blocks from startingBlockID to an specific peer until endingBlockID | ||
* is met and applies them on top of the current chain. | ||
* | ||
* @param {Object} receivedBlock | ||
* @param {Object} lastCommonBlock | ||
* @param {ExtendedBlock} receivedBlock | ||
* @param {ExtendedBlock} lastCommonBlock | ||
* @param {string} peerId - The ID of the peer to target | ||
* @return {Promise<void | boolean>} | ||
* @throws {ApplyPenaltyAndRestartError} - In case peer didn't return any blocks after a number of retries | ||
|
@@ -209,71 +294,7 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
if (!(err instanceof BlockProcessingError)) { | ||
throw err; | ||
} | ||
|
||
this.logger.debug('Failed to apply obtained blocks from peer'); | ||
const [tipBeforeApplying] = await this.storage.entities.TempBlock.get( | ||
{}, | ||
{ | ||
sort: 'height:desc', | ||
limit: 1, | ||
extended: true, | ||
}, | ||
); | ||
|
||
const tipBeforeApplyingInstance = await this.processorModule.deserialize( | ||
tipBeforeApplying.fullBlock, | ||
); | ||
// Check if the new tip has priority over the last tip we had before applying | ||
const forkStatus = await this.processorModule.forkStatus( | ||
this.blocks.lastBlock, // New tip of the chain | ||
tipBeforeApplyingInstance, // Previous tip of the chain | ||
); | ||
|
||
const isDifferentChain = forkStatus === FORK_STATUS_DIFFERENT_CHAIN; | ||
|
||
if (!isDifferentChain) { | ||
this.logger.debug( | ||
{ | ||
currentTip: this.blocks.lastBlock.id, | ||
previousTip: tipBeforeApplyingInstance.id, | ||
}, | ||
'Previous tip of the chain has preference over current tip. Restoring chain from temp table', | ||
); | ||
try { | ||
this.logger.debug( | ||
{ height: lastCommonBlock.height }, | ||
'Deleting blocks after height', | ||
); | ||
await deleteBlocksAfterHeight( | ||
this.processorModule, | ||
this.blocks, | ||
lastCommonBlock.height, | ||
); | ||
this.logger.debug('Restoring blocks from temporary table'); | ||
await restoreBlocks(this.blocks, this.processorModule); | ||
|
||
this.logger.debug('Cleaning blocks temp table'); | ||
await clearBlocksTempTable(this.storage); | ||
} catch (error) { | ||
this.logger.error( | ||
{ err: error }, | ||
'Failed to restore blocks from blocks temp table', | ||
); | ||
} | ||
throw new ApplyPenaltyAndRestartError( | ||
peerId, | ||
'New tip of the chain has no preference over the previous tip before synchronizing', | ||
); | ||
} | ||
|
||
this.logger.debug('Cleaning blocks temporary table'); | ||
await clearBlocksTempTable(this.storage); | ||
|
||
this.logger.info('Restarting block synchronization'); | ||
|
||
throw new RestartError( | ||
'The list of blocks has not been fully applied. Trying again', | ||
); | ||
await this._handleBlockProcessingError(lastCommonBlock, peerId); | ||
} | ||
|
||
this.logger.debug('Cleaning up blocks temporary table'); | ||
|
@@ -292,7 +313,7 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
* last common block. | ||
* | ||
* @param {string} peerId - The ID of the selected peer to target. | ||
* @return {Promise<object>} - Returns the last common block | ||
* @return {Promise<ExtendedBlock>} - Returns the last common block | ||
* @throws {ApplyPenaltyAndRestartError} - In case no common block has been found | ||
* @throws {ApplyPenaltyAndRestartError} - In case the common block height is lower than the finalized height | ||
* @private | ||
|
@@ -348,12 +369,12 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
* corresponding to the first block of descendent consecutive rounds (starting from the last one). | ||
* | ||
* @param {string} peerId - The ID of the peer to target. | ||
* @return {Promise<Object | undefined>} | ||
* @return {Promise<ExtendedBlock | undefined>} | ||
* @private | ||
*/ | ||
async _requestLastCommonBlock(peerId) { | ||
const blocksPerRequestLimit = 10; // Maximum number of block IDs to be included in a single request | ||
const requestLimit = 10; // Maximum number of requests to be made to the remote peer | ||
const requestLimit = 3; // Maximum number of requests to be made to the remote peer | ||
|
||
let numberOfRequests = 1; // Keeps track of the number of requests made to the remote peer | ||
let highestCommonBlock; // Holds the common block returned by the peer if found. | ||
|
@@ -379,21 +400,34 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
}, | ||
)).map(block => block.id); | ||
|
||
// Request the highest common block with the previously computed list | ||
// to the given peer | ||
const { data } = await this.channel.invoke('network:requestFromPeer', { | ||
procedure: 'getHighestCommonBlock', | ||
peerId, | ||
data: { | ||
ids: blockIds, | ||
}, | ||
}); | ||
let data; | ||
|
||
try { | ||
// Request the highest common block with the previously computed list | ||
// to the given peer | ||
data = (await this.channel.invoke('network:requestFromPeer', { | ||
procedure: 'getHighestCommonBlock', | ||
peerId, | ||
data: { | ||
ids: blockIds, | ||
}, | ||
})).data; | ||
} catch (e) { | ||
numberOfRequests += 1; | ||
// eslint-disable-next-line no-continue | ||
continue; | ||
} | ||
|
||
if (!data) { | ||
numberOfRequests += 1; | ||
// eslint-disable-next-line no-continue | ||
continue; | ||
} | ||
|
||
highestCommonBlock = data; // If no common block, data is undefined. | ||
|
||
currentRound -= blocksPerRequestLimit; | ||
currentHeight = currentRound * this.constants.activeDelegates; | ||
numberOfRequests += 1; | ||
} | ||
|
||
return highestCommonBlock; | ||
|
@@ -409,7 +443,7 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
* @link https://github.com/LiskHQ/lips/blob/master/proposals/lip-0014.md#block-synchronization-mechanism | ||
* @param {string} peerId - Peer ID, used to target an specific peer | ||
* the peer specifically to request its last block of its chain. | ||
* @return {Promise<Object>} | ||
* @return {Promise<void>} | ||
* @throws {ApplyPenaltyAndRestartError} - in case the tip of the chain of the peer is not valid or is not a different chain | ||
* @private | ||
*/ | ||
|
@@ -421,6 +455,13 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
peerId, | ||
}); | ||
|
||
if (!data) { | ||
throw new ApplyPenaltyAndRestartError( | ||
peerId, | ||
"Peer didn't provide its last block", | ||
); | ||
} | ||
|
||
const networkLastBlock = await this.processorModule.deserialize(data); | ||
|
||
this.logger.debug( | ||
|
@@ -453,7 +494,7 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
* of the Pipeline but not in other cases | ||
* that's why we wrap it here. | ||
* | ||
* @param {Object} networkLastBlock | ||
* @param {ExtendedBlock} networkLastBlock | ||
* @return {Promise<{valid: boolean, err: null}|{valid: boolean, err: *}>} | ||
* @private | ||
*/ | ||
|
@@ -479,6 +520,10 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
state: PEER_STATE_CONNECTED, | ||
}); | ||
|
||
if (!peers || peers.length === 0) { | ||
throw new Error('List of connected peers is empty'); | ||
} | ||
|
||
this.logger.trace( | ||
{ peers: peers.map(peer => `${peer.ip}:${peer.wsPort}`) }, | ||
'List of connected peers', | ||
|
@@ -524,7 +569,7 @@ class BlockSynchronizationMechanism extends BaseSynchronizer { | |
let maxNumberOfPeersInSet = 0; | ||
let selectedPeers = []; | ||
let selectedBlockId = blockIds[0]; | ||
// Find the largest subset | ||
// Find the largest subset with same block ID | ||
// eslint-disable-next-line no-restricted-syntax | ||
for (const blockId of blockIds) { | ||
const peersByBlockId = peersGroupedByBlockId[blockId]; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tipBeforeApplying can be
undefined
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, in that case I guess we should restart mechanism only? and cleanup temp table just in case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if it fails to
get
then it's already cleaned up?but in that case, i think we can just restart
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe temp table is full and there is an error accessing database, hmmm