Skip to content

Commit

Permalink
fix(core-blockchain): discard blocks containing forged tx and stop pr…
Browse files Browse the repository at this point in the history
…ocess queue on fork (#1732)

* refactor: remove misleading log output

* refactor: move block exception check

* fix: reset queue at the beginning of fork recovery

* refactor: drop process queue after disregarding unchained block

* fix: update blocks from current round after recovery

* fix: check if received block contains forged transactions

* refactor: only fork when downloaded block cannot be chained with last accepted block

* refactor: only check forged transactions if it is a chained block

* test: fix

* fix(core-p2p): call updateNetworkStatusIfNotEnoughPeers in the correct context (#1737)

* refactor(core-p2p): replace the network update timeout with a period check (#1738)

* refactor(core-p2p): replace the network update timeout with a period check

* refactor(core-p2p): only sett the last network timeout if it is not set or expired

* chore(core-p2p): remove pTimeout dependency

* refactor: change name and make sure the next network update is scheduled

* chore(package): update better-sqlite3 to version 5.2.0 (#1739)

* test(core-utils): increase coverage (#1742)

* test(core-debugger-cli): increase coverage (#1743)

* chore: update @babel/core and @babel/preset-env (#1740)

* chore(package): update @babel/core to version 7.2.2

* chore(package): update @babel/preset-env to version 7.2.0

* chore(package): update lockfile yarn.lock

* chore: update yarn.lock

* test(crypto): increase identity coverage (#1744)

* test(crypto): increase handler coverage (#1745)

* test(crypto): increase handler coverage (#1746)

* test(crypto): increase multi signature/payment coverage (#1747)

* docs: changelog and readme formatting (#1824)

* refactor: rename resetQueue

* fix: only care about missing blocks when process queue is empty

* refactor: reword log message

* refactor: move fork into function

* fix: accept internal blocks even if in fork state

* fix: dont show peer tracker after init

* refactor: remove useless forked flag
  • Loading branch information
spkjp committed Dec 17, 2018
1 parent db6590e commit 35dbb99
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 58 deletions.
4 changes: 2 additions & 2 deletions packages/core-blockchain/__tests__/blockchain.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ describe("Blockchain", () => {
const debugMessage = `Blockchain not ready to accept new block at height ${lastBlock.data.height.toLocaleString()}. Last block: ${(
lastBlock.data.height - 2
).toLocaleString()} :warning:`;
expect(mockLoggerDebug).toHaveBeenLastCalledWith(debugMessage);
expect(mockLoggerDebug).toHaveBeenCalledWith(debugMessage);

expect(blockchain.getLastBlock().data.height).toBe(lastBlock.data.height - 2);
});
Expand Down Expand Up @@ -403,7 +403,7 @@ async function __start() {
}

async function __resetBlocksInCurrentRound() {
blockchain.database.blocksInCurrentRound = await blockchain.database.__getBlocksForRound();
await blockchain.database.loadBlocksFromCurrentRound();
}

async function __resetToHeight1() {
Expand Down
96 changes: 77 additions & 19 deletions packages/core-blockchain/src/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ const { Block } = models;
export class Blockchain {
public isStopped: boolean;
public options: any;
public processQueue: ProcessQueue;
public rebuildQueue: RebuildQueue;
private actions: any;
private queue: Queue;
private processQueue: ProcessQueue;
private rebuildQueue: RebuildQueue;

/**
* Create a new blockchain manager instance.
Expand Down Expand Up @@ -136,10 +136,17 @@ export class Blockchain {
* @return {void}
*/
public resetState() {
this.clearAndStopQueue();
this.state.reset();
}

/**
* Clear and stop the queue.
* @return {void}
*/
public clearAndStopQueue() {
this.queue.pause();
this.queue.clear();

this.state.reset();
}

/**
Expand Down Expand Up @@ -167,15 +174,13 @@ export class Blockchain {
)} from ${block.ip}`,
);

if (this.state.started && this.state.blockchain.value === "idle" && !this.state.forked) {
if (this.state.started && this.state.blockchain.value === "idle") {
this.dispatch("NEWBLOCK");

this.processQueue.push(block);
this.state.lastDownloadedBlock = new Block(block);
} else {
logger.info(
`Block disregarded because blockchain is ${this.state.forked ? "forked" : "not ready"} :exclamation:`,
);
logger.info(`Block disregarded because blockchain is not ready :exclamation:`);
}
}

Expand Down Expand Up @@ -231,6 +236,8 @@ export class Blockchain {
* @return {void}
*/
public async removeBlocks(nblocks) {
this.clearAndStopQueue();

const blocksToRemove = await this.database.getBlocks(
this.state.getLastBlock().data.height - nblocks,
nblocks - 1,
Expand Down Expand Up @@ -274,9 +281,6 @@ export class Blockchain {
const resetHeight = lastBlock.data.height - nblocks;
logger.info(`Removing ${pluralize("block", nblocks, true)}. Reset to height ${resetHeight.toLocaleString()}`);

this.queue.pause();
this.queue.clear();

this.state.lastDownloadedBlock = lastBlock;

await __removeBlocks(nblocks);
Expand Down Expand Up @@ -308,6 +312,7 @@ export class Blockchain {
}

await this.database.commitQueuedQueries();
await this.database.loadBlocksFromCurrentRound();
}

/**
Expand Down Expand Up @@ -362,7 +367,7 @@ export class Blockchain {
* @return {(Function|void)}
*/
public async processBlock(block, callback) {
if (!block.verification.verified) {
if (!block.verification.verified && !this.database.__isException(block.data)) {
logger.warn(`Block ${block.data.height.toLocaleString()} disregarded because verification failed :scroll:`);
logger.warn(JSON.stringify(block.verification, null, 4));

Expand All @@ -374,7 +379,6 @@ export class Blockchain {
try {
if (this.__isChained(this.state.getLastBlock(), block)) {
await this.acceptChainedBlock(block);
this.state.setLastBlock(block);
} else {
await this.manageUnchainedBlock(block);
}
Expand All @@ -383,8 +387,8 @@ export class Blockchain {
logger.debug(error.stack);

this.transactionPool.purgeBlock(block);
this.forkBlock(block);

this.dispatch("FORK");
return callback();
}

Expand All @@ -409,13 +413,18 @@ export class Blockchain {
* @return {void}
*/
public async acceptChainedBlock(block) {
const containsForgedTransactions = await this.checkBlockContainsForgedTransactions(block);
if (containsForgedTransactions) {
this.state.lastDownloadedBlock = this.state.getLastBlock();
return;
}

await this.database.applyBlock(block);
await this.database.saveBlock(block);

// Check if we recovered from a fork
if (this.state.forked && this.state.forkedBlock.height === block.data.height) {
if (this.state.forkedBlock && this.state.forkedBlock.height === block.data.height) {
logger.info("Successfully recovered from fork :star2:");
this.state.forked = false;
this.state.forkedBlock = null;
}

Expand All @@ -427,6 +436,13 @@ export class Blockchain {
logger.debug(error.stack);
}
}

this.state.setLastBlock(block);

// Ensure the lastDownloadedBlock is not behind the last accepted block.
if (this.state.lastDownloadedBlock && this.state.lastDownloadedBlock.data.height < block.data.height) {
this.state.lastDownloadedBlock = block;
}
}

/**
Expand All @@ -442,6 +458,16 @@ export class Blockchain {
logger.debug(
`Blockchain not ready to accept new block at height ${block.data.height.toLocaleString()}. Last block: ${lastBlock.data.height.toLocaleString()} :warning:`,
);

// Also remove all remaining queued blocks. Since blocks are downloaded in batches,
// it is very likely that all blocks will be disregarded at this point anyway.
// NOTE: This isn't really elegant, but still better than spamming the log with
// useless `not ready to accept` messages.
if (this.processQueue.length() > 0) {
logger.debug(`Discarded ${this.processQueue.length()} downloaded blocks.`);
}
this.queue.clear();

this.state.lastDownloadedBlock = lastBlock;
} else if (block.data.height < lastBlock.data.height) {
logger.debug(
Expand All @@ -453,7 +479,7 @@ export class Blockchain {
const isValid = await this.database.validateForkedBlock(block);

if (isValid) {
this.dispatch("FORK");
this.forkBlock(block);
} else {
logger.info(
`Forked block disregarded because it is not allowed to forge. Caused by delegate: ${
Expand All @@ -464,6 +490,27 @@ export class Blockchain {
}
}

/**
* Checks if the given block contains already forged transactions.
* @param {Block} block
* @returns {Boolean}
*/
public async checkBlockContainsForgedTransactions(block) {
// Discard block if it contains already forged transactions
if (block.transactions.length > 0) {
const forgedIds = await this.database.getForgedTransactionsIds(block.transactions.map(tx => tx.id));
if (forgedIds.length > 0) {
logger.warn(
`Block ${block.data.height.toLocaleString()} disregarded, because it contains already forged transactions :scroll:`,
);
logger.debug(`${JSON.stringify(forgedIds, null, 4)}`);
return true;
}
}

return false;
}

/**
* Called by forger to wake up and sync with the network.
* It clears the checkLaterTimeout if set.
Expand All @@ -476,6 +523,17 @@ export class Blockchain {
this.dispatch("WAKEUP");
}

/**
* Fork the chain at the given block.
* @param {Block} block
* @returns {void}
*/
public forkBlock(block) {
this.state.forkedBlock = block;

this.dispatch("FORK");
}

/**
* Get unconfirmed transactions for the specified block size.
* @param {Number} blockSize
Expand All @@ -497,7 +555,7 @@ export class Blockchain {
* @param {Block} [block=getLastBlock()] block
* @return {Boolean}
*/
public isSynced(block) {
public isSynced(block?) {
if (!this.p2p.hasPeers()) {
return true;
}
Expand All @@ -512,7 +570,7 @@ export class Blockchain {
* @param {Block} block
* @return {Boolean}
*/
public isRebuildSynced(block) {
public isRebuildSynced(block?) {
if (!this.p2p.hasPeers()) {
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core-blockchain/src/queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export class Queue {
}

/**
* Resue all queues.
* Resume all queues.
* @return {void}
*/
public resume() {
Expand Down
42 changes: 26 additions & 16 deletions packages/core-blockchain/src/state-machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { blockchainMachine } from "./machines/blockchain";
import { stateStorage } from "./state-storage";
import { tickSyncTracker } from "./utils/tick-sync-tracker";

import { Blockchain } from "./blockchain";

const { Block } = models;
const config = app.resolvePlugin("config");
const emitter = app.resolvePlugin("event-emitter");
Expand All @@ -26,7 +28,7 @@ blockchainMachine.state = stateStorage;
* @param {Blockchain} blockchain
* @return {Object}
*/
blockchainMachine.actionMap = blockchain => ({
blockchainMachine.actionMap = (blockchain: Blockchain) => ({
blockchainReady: () => {
if (!stateStorage.started) {
stateStorage.started = true;
Expand Down Expand Up @@ -62,7 +64,7 @@ blockchainMachine.actionMap = blockchain => ({
}

// tried to download but no luck after 5 tries (looks like network missing blocks)
if (stateStorage.noBlockCounter > 5) {
if (stateStorage.noBlockCounter > 5 && blockchain.processQueue.length() === 0) {
// TODO: make this dynamic in 2.1
logger.info(
"Tried to sync 5 times to different nodes, looks like the network is missing blocks :umbrella:",
Expand Down Expand Up @@ -175,7 +177,7 @@ blockchainMachine.actionMap = blockchain => ({
await blockchain.database.saveBlock(block);
}

if (!blockchain.restoredDatabaseIntegrity) {
if (!blockchain.database.restoredDatabaseIntegrity) {
logger.info("Verifying database integrity :hourglass_flowing_sand:");

const blockchainAudit = await blockchain.database.verifyBlockchain();
Expand Down Expand Up @@ -312,8 +314,8 @@ blockchainMachine.actionMap = blockchain => ({
},

async downloadBlocks() {
const lastBlock = stateStorage.lastDownloadedBlock || stateStorage.getLastBlock();
const blocks = await blockchain.p2p.downloadBlocks(lastBlock.data.height);
const lastDownloadedBlock = stateStorage.lastDownloadedBlock || stateStorage.getLastBlock();
const blocks = await blockchain.p2p.downloadBlocks(lastDownloadedBlock.data.height);

if (blockchain.isStopped) {
return;
Expand All @@ -337,7 +339,7 @@ blockchainMachine.actionMap = blockchain => ({
)}`,
);

if (blocks.length && blocks[0].previousBlock === lastBlock.data.id) {
if (blockchain.__isChained(lastDownloadedBlock, { data: blocks[0] })) {
stateStorage.noBlockCounter = 0;
stateStorage.p2pUpdateCounter = 0;
stateStorage.lastDownloadedBlock = { data: blocks.slice(-1)[0] };
Expand All @@ -346,16 +348,23 @@ blockchainMachine.actionMap = blockchain => ({

blockchain.dispatch("DOWNLOADED");
} else {
stateStorage.lastDownloadedBlock = lastBlock;

logger.warn(`Downloaded block not accepted: ${JSON.stringify(blocks[0])}`);
logger.warn(`Last block: ${JSON.stringify(lastBlock.data)}`);

stateStorage.forked = true;
stateStorage.forkedBlock = blocks[0];

// disregard the whole block list
blockchain.dispatch("FORK");
logger.warn(`Last downloaded block: ${JSON.stringify(lastDownloadedBlock.data)}`);

// Reset lastDownloadedBlock to last accepted block
const lastAcceptedBlock = stateStorage.getLastBlock();
stateStorage.lastDownloadedBlock = lastAcceptedBlock;

// Fork only if the downloaded block could not be chained with the last accepted block.
// Otherwise simply discard the downloaded blocks by resetting the queue.
const shouldFork = blocks[0].height === lastAcceptedBlock.data.height + 1;
if (shouldFork) {
blockchain.forkBlock(blocks[0]);
} else {
// TODO: only remove blocks from last downloaded block height
blockchain.clearAndStopQueue();
blockchain.dispatch("DOWNLOADED");
}
}
}
},
Expand All @@ -366,6 +375,7 @@ blockchainMachine.actionMap = blockchain => ({

async startForkRecovery() {
logger.info("Starting fork recovery :fork_and_knife:");
blockchain.clearAndStopQueue();

await blockchain.database.commitQueuedQueries();

Expand Down Expand Up @@ -408,7 +418,7 @@ blockchainMachine.actionMap = blockchain => ({
return;
}

blockchain.restoredDatabaseIntegrity = true;
blockchain.database.restoredDatabaseIntegrity = true;

const lastBlock = await blockchain.database.getLastBlock();
logger.info(
Expand Down
2 changes: 0 additions & 2 deletions packages/core-blockchain/src/state-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class StateStorage {
public lastDownloadedBlock: any;
public blockPing: any;
public started: boolean;
public forked: boolean;
public forkedBlock: any;
public rebuild: boolean;
public fastRebuild: boolean;
Expand All @@ -51,7 +50,6 @@ class StateStorage {
this.lastDownloadedBlock = null;
this.blockPing = null;
this.started = false;
this.forked = false;
this.forkedBlock = null;
this.rebuild = true;
this.fastRebuild = false;
Expand Down
2 changes: 1 addition & 1 deletion packages/core-database-postgres/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class PostgresConnection extends ConnectionInterface {
await super._registerRepositories();
await super._registerWalletManager();

this.blocksInCurrentRound = await this.__getBlocksForRound();
await this.loadBlocksFromCurrentRound();

return this;
} catch (error) {
Expand Down
Loading

0 comments on commit 35dbb99

Please sign in to comment.