From d00d4710e52d4a9167beee97741ca0bf7bfed709 Mon Sep 17 00:00:00 2001 From: Brian Faust Date: Fri, 8 Mar 2019 04:26:35 +0200 Subject: [PATCH] refactor(core-blockchain): remove old fast rebuild code (#2210) * refactor(core-blockchain): remove old fast rebuild cold * refactor: remove dead rebuild code * style: apply prettier formatting * refactor(core-blockchain): remove queue wrapper * test(core-blockchain): remove queue wrapper tests * fix(core-blockchain): use async queue methods * fix(core-blockchain): use async queue methods * test(core-blockchain): remove dead test * fix(core-blockchain): add method that disappeared * fix(core-blockchain): add drain call * refactor: drop more unused code * fix: dispatch missing event after block download finished * refactor: remove obsolete config workaround * refactor: drop even more obsolete rebuild code * refactor: cleanup leftovers * revert "refactor: remove obsolete config workaround" --- __tests__/unit/core-api/__support__/setup.ts | 2 +- .../unit/core-blockchain/blockchain.test.ts | 169 +----------------- .../machines/actions/fork.test.ts | 8 - .../actions/rebuild-from-network.test.ts | 155 ---------------- .../machines/blockchain.test.ts | 28 +-- .../handlers/unchained-handler.test.ts | 6 +- .../core-blockchain/queue/interface.test.ts | 72 -------- .../core-blockchain/queue/process.test.ts | 74 -------- .../core-blockchain/queue/rebuild.test.ts | 90 ---------- .../core-blockchain/state-machine.test.ts | 127 +------------ __tests__/utils/config/testnet/plugins.js | 4 +- __tests__/utils/config/unitnet/plugins.js | 4 +- .../src/versions/2/delegates/schema.ts | 44 ++--- packages/core-blockchain/src/blockchain.ts | 117 +++--------- packages/core-blockchain/src/defaults.ts | 1 - .../src/machines/actions/fork.ts | 43 ----- .../machines/actions/rebuild-from-network.ts | 52 ------ .../src/machines/blockchain.ts | 11 -- .../processor/handlers/unchained-handler.ts | 6 +- packages/core-blockchain/src/queue/index.ts | 53 ------ .../core-blockchain/src/queue/interface.ts | 71 -------- packages/core-blockchain/src/queue/process.ts | 29 --- packages/core-blockchain/src/queue/rebuild.ts | 31 ---- packages/core-blockchain/src/state-machine.ts | 92 +--------- packages/core-blockchain/src/state-storage.ts | 4 - .../src/core-blockchain/blockchain.ts | 23 --- packages/core/bin/config/devnet/plugins.js | 4 +- packages/core/bin/config/mainnet/plugins.js | 4 +- packages/core/bin/config/testnet/plugins.js | 4 +- 29 files changed, 76 insertions(+), 1252 deletions(-) delete mode 100644 __tests__/unit/core-blockchain/machines/actions/rebuild-from-network.test.ts delete mode 100644 __tests__/unit/core-blockchain/queue/interface.test.ts delete mode 100644 __tests__/unit/core-blockchain/queue/process.test.ts delete mode 100644 __tests__/unit/core-blockchain/queue/rebuild.test.ts delete mode 100644 packages/core-blockchain/src/machines/actions/rebuild-from-network.ts delete mode 100644 packages/core-blockchain/src/queue/index.ts delete mode 100644 packages/core-blockchain/src/queue/interface.ts delete mode 100644 packages/core-blockchain/src/queue/process.ts delete mode 100644 packages/core-blockchain/src/queue/rebuild.ts diff --git a/__tests__/unit/core-api/__support__/setup.ts b/__tests__/unit/core-api/__support__/setup.ts index 1eeb0cd907..ea67cf8c05 100644 --- a/__tests__/unit/core-api/__support__/setup.ts +++ b/__tests__/unit/core-api/__support__/setup.ts @@ -1,8 +1,8 @@ import { app } from "@arkecosystem/core-container"; import { Database } from "@arkecosystem/core-interfaces"; import delay from "delay"; -import { registerWithContainer, setUpContainer } from "../../../utils/helpers/container"; import { plugin } from "../../../../packages/core-api/src/plugin"; +import { registerWithContainer, setUpContainer } from "../../../utils/helpers/container"; import { delegates } from "../../../utils/fixtures"; import { generateRound } from "./utils/generate-round"; diff --git a/__tests__/unit/core-blockchain/blockchain.test.ts b/__tests__/unit/core-blockchain/blockchain.test.ts index df3a1c2f64..d8fe6e449a 100644 --- a/__tests__/unit/core-blockchain/blockchain.test.ts +++ b/__tests__/unit/core-blockchain/blockchain.test.ts @@ -1,12 +1,12 @@ /* tslint:disable:max-line-length */ -import "../../utils"; -import { blocks101to155 } from "../../utils/fixtures/testnet/blocks101to155"; -import { blocks2to100 } from "../../utils/fixtures/testnet/blocks2to100"; import { crypto, models, slots } from "@arkecosystem/crypto"; import { asValue } from "awilix"; import delay from "delay"; import { Blockchain } from "../../../packages/core-blockchain/src/blockchain"; import { defaults } from "../../../packages/core-blockchain/src/defaults"; +import "../../utils"; +import { blocks101to155 } from "../../utils/fixtures/testnet/blocks101to155"; +import { blocks2to100 } from "../../utils/fixtures/testnet/blocks2to100"; import { setUp, tearDown } from "./__support__/setup"; const { Block, Wallet } = models; @@ -94,22 +94,16 @@ describe("Blockchain", () => { }); }); - describe("rebuild", () => { - it("should throw an exception", () => { - expect(() => blockchain.rebuild()).toThrow("Method [rebuild] not implemented!"); - }); - }); - - describe("enQueueBlocks", () => { + describe("enqueueBlocks", () => { it("should just return if blocks provided are an empty array", async () => { - const processQueuePush = jest.spyOn(blockchain.processQueue, "push"); + const processQueuePush = jest.spyOn(blockchain.queue, "push"); blockchain.enqueueBlocks([]); expect(processQueuePush).not.toHaveBeenCalled(); }); it("should enqueue the blocks provided", async () => { - const processQueuePush = jest.spyOn(blockchain.processQueue, "push"); + const processQueuePush = jest.spyOn(blockchain.queue, "push"); const blocksToEnqueue = [blocks101to155[54]]; blockchain.enqueueBlocks(blocksToEnqueue); @@ -117,110 +111,6 @@ describe("Blockchain", () => { }); }); - describe("rebuildBlock", () => { - it("should rebuild with a known block", async () => { - const mockCallback = jest.fn(() => true); - const lastBlock = blockchain.getLastBlock(); - - await blockchain.rebuildBlock(lastBlock, mockCallback); - await delay(200); - - expect(mockCallback.mock.calls.length).toBe(1); - }); - - it("should rebuild with a new chained block", async () => { - const mockCallback = jest.fn(() => true); - const lastBlock = blockchain.getLastBlock(); - - await blockchain.removeBlocks(1); // remove 1 block so that we can add it then as a chained block - - expect(blockchain.getLastBlock()).not.toEqual(lastBlock); - - await blockchain.rebuildBlock(lastBlock, mockCallback); - await delay(200); - - expect(mockCallback.mock.calls.length).toBe(1); - expect(blockchain.getLastBlock()).toEqual(lastBlock); - }); - - it("should disregard block with height == last height but different id", async () => { - const mockCallback = jest.fn(() => true); - const lastBlock = blockchain.getLastBlock(); - const lastBlockCopy = new Block(lastBlock.data); - lastBlockCopy.data.id = "123456"; - - const loggerInfo = jest.spyOn(logger, "info"); - - await blockchain.rebuildBlock(lastBlockCopy, mockCallback); - await delay(200); - - expect(mockCallback.mock.calls.length).toBe(1); - expect(loggerInfo).toHaveBeenCalledWith( - `Block ${lastBlockCopy.data.height.toLocaleString()} disregarded because on a fork`, - ); - expect(blockchain.getLastBlock().data.id).toBe(lastBlock.data.id); - }); - - it("should disregard block with height > last height + 1", async () => { - const mockCallback = jest.fn(() => true); - const lastBlock = blockchain.getLastBlock(); - const lastBlockCopy = new Block(lastBlock.data); - lastBlockCopy.data.height += 2; - - await blockchain.rebuildBlock(lastBlockCopy, mockCallback); - await delay(200); - - expect(mockCallback.mock.calls.length).toBe(1); - expect(blockchain.getLastBlock().data.id).toBe(lastBlock.data.id); - expect(blockchain.state.lastDownloadedBlock).toBe(lastBlock); - }); - - it("should disregard block not verified", async () => { - const mockCallback = jest.fn(() => true); - const lastBlock = blockchain.getLastBlock(); - const lastBlockCopy = new Block(lastBlock.data); - lastBlockCopy.verification.verified = false; - - const loggerWarn = jest.spyOn(logger, "warn"); - - await blockchain.rebuildBlock(lastBlockCopy, mockCallback); - await delay(200); - - expect(mockCallback.mock.calls.length).toBe(1); - expect(loggerWarn).toHaveBeenCalledWith( - `Block ${lastBlockCopy.data.height.toLocaleString()} disregarded because verification failed`, - ); - expect(blockchain.getLastBlock().data.id).toBe(lastBlock.data.id); - }); - - it("should commitQueuedQueries if block height % 20 000 == 0", async () => { - const mockCallback = jest.fn(() => true); - const lastBlock = blockchain.getLastBlock(); - const lastBlockHeight = lastBlock.data.height; - const nextBlock = new Block(blocks2to100[lastBlock.data.height - 1]); - lastBlock.data.height = 19999; - nextBlock.data.height = 20000; - - const commitQueuedQueries = jest - .spyOn(blockchain.database, "commitQueuedQueries") - // @ts-ignore - .mockReturnValueOnce(true); - // @ts-ignore - jest.spyOn(blockchain.database, "enqueueSaveBlock").mockReturnValueOnce(true); - - await blockchain.rebuildBlock(nextBlock, mockCallback); - await delay(200); - - expect(mockCallback.mock.calls.length).toBe(1); - expect(commitQueuedQueries).toHaveBeenCalled(); - expect(blockchain.getLastBlock().data.id).toBe(nextBlock.data.id); - - // reset to "stable" state - lastBlock.data.height = lastBlockHeight; - blockchain.state.setLastBlock(lastBlock); - }); - }); - describe("processBlock", () => { it("should process a new chained block", async () => { const mockCallback = jest.fn(() => true); @@ -377,43 +267,6 @@ describe("Blockchain", () => { }); }); - describe("isRebuildSynced", () => { - describe("with a block param", () => { - it("should be ok", () => { - jest.spyOn(blockchain.p2p, "hasPeers").mockReturnValueOnce(true); - expect( - blockchain.isRebuildSynced({ - data: { - timestamp: slots.getTime() - 3600 * 24 * 6, - height: blocks101to155[52].height, - }, - } as models.IBlock), - ).toBeTrue(); - }); - }); - - describe("without a block param", () => { - it("should use the last block", () => { - jest.spyOn(blockchain.p2p, "hasPeers").mockReturnValueOnce(true); - const getLastBlock = jest.spyOn(blockchain, "getLastBlock").mockReturnValueOnce({ - // @ts-ignore - data: { - timestamp: slots.getTime(), - height: genesisBlock.height, - }, - }); - expect(blockchain.isRebuildSynced()).toBeTrue(); - expect(getLastBlock).toHaveBeenCalled(); - }); - }); - - it("should return true when there is no peer", () => { - jest.spyOn(blockchain.p2p, "hasPeers").mockReturnValueOnce(false); - - expect(blockchain.isRebuildSynced()).toBeTrue(); - }); - }); - describe("getBlockPing", () => { it("should return state.blockPing", () => { const blockPing = { @@ -471,16 +324,6 @@ describe("Blockchain", () => { ]); }); }); - - describe("__registerQueue", () => { - it("should be ok", () => { - blockchain.__registerQueue(); - - expect(blockchain).toHaveProperty("queue"); - expect(blockchain).toHaveProperty("processQueue"); - expect(blockchain).toHaveProperty("rebuildQueue"); - }); - }); }); async function __start(networkStart) { diff --git a/__tests__/unit/core-blockchain/machines/actions/fork.test.ts b/__tests__/unit/core-blockchain/machines/actions/fork.test.ts index c9051d8e1d..5174d2d66f 100644 --- a/__tests__/unit/core-blockchain/machines/actions/fork.test.ts +++ b/__tests__/unit/core-blockchain/machines/actions/fork.test.ts @@ -15,14 +15,6 @@ describe("Blockchain machine > Fork", () => { }); }); - it("should transition to `revertBlocks` on `REBUILD`", () => { - expect(blockchainMachine).toTransition({ - from: "fork.analysing", - on: "REBUILD", - to: "fork.revertBlocks", - }); - }); - it("should transition to `exit` on `NOFORK`", () => { expect(blockchainMachine).toTransition({ from: "fork.analysing", diff --git a/__tests__/unit/core-blockchain/machines/actions/rebuild-from-network.test.ts b/__tests__/unit/core-blockchain/machines/actions/rebuild-from-network.test.ts deleted file mode 100644 index 4934ed0b5d..0000000000 --- a/__tests__/unit/core-blockchain/machines/actions/rebuild-from-network.test.ts +++ /dev/null @@ -1,155 +0,0 @@ -import "../../../../utils"; - -import { blockchainMachine as machine } from "../../../../../packages/core-blockchain/src/machines/blockchain"; - -describe("Blockchain machine > Rebuilding", () => { - it("should start with the `rebuilding` state", () => { - expect(machine.states.rebuild).toHaveProperty("initial", "rebuilding"); - }); - - describe("state `rebuilding`", () => { - it("should execute the `checkLastDownloadedBlockSynced` action when is entered", () => { - expect(machine).toExecuteOnEntry({ - state: "rebuild.rebuilding", - actions: ["checkLastDownloadedBlockSynced"], - }); - }); - - it("should transition to `waitingFinished` on `SYNCED`", () => { - expect(machine).toTransition({ - from: "rebuild.rebuilding", - on: "SYNCED", - to: "rebuild.waitingFinished", - }); - }); - - it("should transition to `revertBlocks` on `NOTSYNCED`", () => { - expect(machine).toTransition({ - from: "rebuild.rebuilding", - on: "NOTSYNCED", - to: "rebuild.rebuildBlocks", - }); - }); - - it("should transition to `rebuildPaused` on `PAUSED`", () => { - expect(machine).toTransition({ - from: "rebuild.rebuilding", - on: "PAUSED", - to: "rebuild.rebuildPaused", - }); - }); - }); - - describe("state `idle`", () => { - it("should transition to `rebuildBlocks` on `DOWNLOADED`", () => { - expect(machine).toTransition({ - from: "rebuild.idle", - on: "DOWNLOADED", - to: "rebuild.rebuildBlocks", - }); - }); - }); - - describe("state `rebuildBlocks`", () => { - it("should execute the `rebuildBlocks` action when is entered", () => { - expect(machine).toExecuteOnEntry({ - state: "rebuild.rebuildBlocks", - actions: ["rebuildBlocks"], - }); - }); - - it("should transition to `rebuilding` on `DOWNLOADED`", () => { - expect(machine).toTransition({ - from: "rebuild.rebuildBlocks", - on: "DOWNLOADED", - to: "rebuild.rebuilding", - }); - }); - - it("should transition to `rebuilding` on `NOBLOCK`", () => { - expect(machine).toTransition({ - from: "rebuild.rebuildBlocks", - on: "NOBLOCK", - to: "rebuild.rebuilding", - }); - }); - }); - - describe("state `waitingFinished`", () => { - it("should transition to `rebuildFinished` on `REBUILDFINISHED`", () => { - expect(machine).toTransition({ - from: "rebuild.waitingFinished", - on: "REBUILDFINISHED", - to: "rebuild.rebuildFinished", - }); - }); - }); - - describe("state `processFinished`", () => { - it("should execute the `checkRebuildBlockSynced` action when is entered", () => { - expect(machine).toExecuteOnEntry({ - state: "rebuild.processFinished", - actions: ["checkRebuildBlockSynced"], - }); - }); - - it("should transition to `processFinished` on `SYNCED`", () => { - expect(machine).toTransition({ - from: "rebuild.processFinished", - on: "SYNCED", - to: "rebuild.end", - }); - }); - - it("should transition to `processFinished` on `NOTSYNCED`", () => { - expect(machine).toTransition({ - from: "rebuild.processFinished", - on: "NOTSYNCED", - to: "rebuild.rebuildBlocks", - }); - }); - }); - - describe("state `rebuildPaused`", () => { - it("should execute the `downloadPaused` action when is entered", () => { - expect(machine).toExecuteOnEntry({ - state: "rebuild.rebuildPaused", - actions: ["downloadPaused"], - }); - }); - - it("should transition to `processFinished` on `REBUILDFINISHED`", () => { - expect(machine).toTransition({ - from: "rebuild.rebuildPaused", - on: "REBUILDFINISHED", - to: "rebuild.processFinished", - }); - }); - }); - - describe("state `rebuildFinished`", () => { - it("should execute the `rebuildFinished` action when is entered", () => { - expect(machine).toExecuteOnEntry({ - state: "rebuild.rebuildFinished", - actions: ["rebuildFinished"], - }); - }); - - it("should transition to `processFinished` on `PROCESSFINISHED`", () => { - expect(machine).toTransition({ - from: "rebuild.rebuildFinished", - on: "PROCESSFINISHED", - to: "rebuild.processFinished", - }); - }); - }); - - describe("state `end`", () => { - it("should execute the `rebuildingComplete` action when is entered", () => { - expect(machine).toExecuteOnEntry({ - state: "rebuild.end", - actions: ["rebuildingComplete"], - }); - }); - }); -}); diff --git a/__tests__/unit/core-blockchain/machines/blockchain.test.ts b/__tests__/unit/core-blockchain/machines/blockchain.test.ts index 2b13ec7f38..e13ffbaee1 100644 --- a/__tests__/unit/core-blockchain/machines/blockchain.test.ts +++ b/__tests__/unit/core-blockchain/machines/blockchain.test.ts @@ -26,15 +26,7 @@ describe("Blockchain machine", () => { expect(blockchainMachine).toExecuteOnEntry({ state: "init", actions: ["init"] }); }); - it("should transition to `rebuild` on `REBUILD`", () => { - expect(blockchainMachine).toTransition({ - from: "init", - on: "REBUILD", - to: "rebuild", - }); - }); - - it("should transition to `rebuild` on `NETWORKSTART`", () => { + it("should transition to `idle` on `NETWORKSTART`", () => { expect(blockchainMachine).toTransition({ from: "init", on: "NETWORKSTART", @@ -42,7 +34,7 @@ describe("Blockchain machine", () => { }); }); - it("should transition to `rebuild` on `STARTED`", () => { + it("should transition to `syncWithNetwork` on `STARTED`", () => { expect(blockchainMachine).toTransition({ from: "init", on: "STARTED", @@ -50,25 +42,11 @@ describe("Blockchain machine", () => { }); }); - it("should transition to `rebuild` on `FAILURE`", () => { + it("should transition to `exit` on `FAILURE`", () => { expect(blockchainMachine).toTransition({ from: "init", on: "FAILURE", to: "exit" }); }); }); - describe("state `rebuild`", () => { - it("should transition to `syncWithNetwork` on `REBUILDCOMPLETE`", () => { - expect(blockchainMachine).toTransition({ - from: "rebuild", - on: "REBUILDCOMPLETE", - to: "syncWithNetwork", - }); - }); - - it("should transition to `fork` on `FORK`", () => { - expect(blockchainMachine).toTransition({ from: "rebuild", on: "FORK", to: "fork" }); - }); - }); - describe("state `syncWithNetwork`", () => { it("should transition to `idle` on `TEST`", () => { expect(blockchainMachine).toTransition({ diff --git a/__tests__/unit/core-blockchain/processor/handlers/unchained-handler.test.ts b/__tests__/unit/core-blockchain/processor/handlers/unchained-handler.test.ts index 8f479fc444..e8a3a2d439 100644 --- a/__tests__/unit/core-blockchain/processor/handlers/unchained-handler.test.ts +++ b/__tests__/unit/core-blockchain/processor/handlers/unchained-handler.test.ts @@ -1,10 +1,10 @@ -import "../../../../utils"; import { UnchainedHandler } from "../../../../../packages/core-blockchain/src/processor/handlers"; +import "../../../../utils"; import { models } from "@arkecosystem/crypto"; -import { blocks2to100 } from "../../../../utils/fixtures/testnet/blocks2to100"; import { Blockchain } from "../../../../../packages/core-blockchain/src/blockchain"; import { BlockProcessorResult } from "../../../../../packages/core-blockchain/src/processor"; +import { blocks2to100 } from "../../../../utils/fixtures/testnet/blocks2to100"; import { setUpFull, tearDownFull } from "../../__support__/setup"; const { Block } = models; @@ -44,7 +44,7 @@ describe("Exception handler", () => { it("should log that blocks are being discarded when discarding blocks with height > current + 1", async () => { jest.spyOn(blockchain, "getLastBlock").mockReturnValue(new Block(blocks2to100[0])); - blockchain.processQueue.length = () => 5; + blockchain.queue.length = () => 5; const loggerDebug = jest.spyOn(app.resolvePlugin("logger"), "debug"); diff --git a/__tests__/unit/core-blockchain/queue/interface.test.ts b/__tests__/unit/core-blockchain/queue/interface.test.ts deleted file mode 100644 index 66ff3652db..0000000000 --- a/__tests__/unit/core-blockchain/queue/interface.test.ts +++ /dev/null @@ -1,72 +0,0 @@ -import "../../../utils"; -import async from "async"; -import { asValue } from "awilix"; -import delay from "delay"; -import { Blockchain } from "../../../../packages/core-blockchain/src/blockchain"; -import { QueueInterface } from "../../../../packages/core-blockchain/src/queue/interface"; -import { setUp, tearDown } from "../__support__/setup"; - -let fakeQueue; -let container; -let blockchain: Blockchain; - -class FakeQueue extends QueueInterface { - /** - * Create an instance of the process queue. - */ - constructor(readonly blockchainInstance: Blockchain, readonly event: string) { - super(blockchainInstance, event); - - this.queue = async.queue(async (item: any, cb) => { - await delay(1000); - return cb(); - }, 1); - } -} - -beforeAll(async () => { - container = await setUp(); - - process.env.CORE_SKIP_BLOCKCHAIN = "true"; - - // Manually register the blockchain - const plugin = require("../../../../packages/core-blockchain/src").plugin; - - blockchain = await plugin.register(container, { - networkStart: false, - }); - - await container.register( - "blockchain", - asValue({ - name: "blockchain", - version: "0.1.0", - plugin: blockchain, - options: {}, - }), - ); -}); - -afterAll(async () => { - await tearDown(); -}); - -beforeEach(async () => { - process.env.CORE_SKIP_BLOCKCHAIN = "false"; - - fakeQueue = new FakeQueue(blockchain, "fake"); -}); - -describe("FakeQueue", () => { - // fails on circleci, TODO re-enable - it.skip("should remove successfully an item from the queue", async () => { - const cb = jest.fn(); - fakeQueue.push(cb); - - expect(fakeQueue.queue.length()).toBe(1); - - fakeQueue.remove(obj => true); // removes everything, see async queue doc - - expect(fakeQueue.queue.length()).toBe(0); - }); -}); diff --git a/__tests__/unit/core-blockchain/queue/process.test.ts b/__tests__/unit/core-blockchain/queue/process.test.ts deleted file mode 100644 index d679ce5f56..0000000000 --- a/__tests__/unit/core-blockchain/queue/process.test.ts +++ /dev/null @@ -1,74 +0,0 @@ -import "../../../utils"; -import { asValue } from "awilix"; -import delay from "delay"; -import { blocks2to100 } from "../../../utils/fixtures/testnet/blocks2to100"; -import { Blockchain } from "../../../../packages/core-blockchain/src/blockchain"; -import { setUp, tearDown } from "../__support__/setup"; - -let processQueue; -let container; -let blockchain: Blockchain; - -beforeAll(async () => { - container = await setUp(); - - process.env.CORE_SKIP_BLOCKCHAIN = "true"; - - // Manually register the blockchain - const plugin = require("../../../../packages/core-blockchain/src").plugin; - - blockchain = await plugin.register(container, { - networkStart: false, - }); - - await container.register( - "blockchain", - asValue({ - name: "blockchain", - version: "0.1.0", - plugin: blockchain, - options: {}, - }), - ); -}); - -afterAll(async () => { - jest.restoreAllMocks(); - await tearDown(); -}); - -beforeEach(async () => { - process.env.CORE_SKIP_BLOCKCHAIN = "false"; - jest.restoreAllMocks(); - - const ProcessQueue = require("../../../../packages/core-blockchain/src/queue").ProcessQueue; - processQueue = new ProcessQueue(blockchain, "processEvent"); -}); - -describe("ProcessQueue", () => { - it("should call blockchain processBlock when pushing a block to the queue", async () => { - // @ts-ignore - const processBlock = jest.spyOn(blockchain, "processBlock").mockReturnValue(true); - - const cb = jest.fn(); - processQueue.push(blocks2to100[3], cb); - - await delay(200); - expect(processBlock).toHaveBeenCalled(); - }); - - it("should log error and call callback when blockchain processBlock throws", async () => { - const processBlock = jest.spyOn(blockchain, "processBlock").mockImplementation(() => { - throw new Error("wooo"); - }); - - const loggerError = jest.spyOn(container.resolvePlugin("logger"), "error"); - - const cb = jest.fn(); - processQueue.push(blocks2to100[3], cb); - - await delay(200); - expect(processBlock).toHaveBeenCalled(); - expect(loggerError).toHaveBeenCalledWith(`Failed to process block in ProcessQueue: ${blocks2to100[3].height}`); - }); -}); diff --git a/__tests__/unit/core-blockchain/queue/rebuild.test.ts b/__tests__/unit/core-blockchain/queue/rebuild.test.ts deleted file mode 100644 index c38f929787..0000000000 --- a/__tests__/unit/core-blockchain/queue/rebuild.test.ts +++ /dev/null @@ -1,90 +0,0 @@ -import "../../../utils"; -import { asValue } from "awilix"; -import delay from "delay"; -import { blocks2to100 } from "../../../utils/fixtures/testnet/blocks2to100"; -import { Blockchain } from "../../../../packages/core-blockchain/src/blockchain"; -import { setUp, tearDown } from "../__support__/setup"; - -let rebuildQueue; -let container; -let blockchain: Blockchain; - -beforeAll(async () => { - container = await setUp(); - - process.env.CORE_SKIP_BLOCKCHAIN = "true"; - - // Manually register the blockchain - const plugin = require("../../../../packages/core-blockchain/src").plugin; - - blockchain = await plugin.register(container, { - networkStart: false, - }); - - await container.register( - "blockchain", - asValue({ - name: "blockchain", - version: "0.1.0", - plugin: blockchain, - options: {}, - }), - ); -}); - -afterAll(async () => { - jest.restoreAllMocks(); - await tearDown(); -}); - -beforeEach(async () => { - process.env.CORE_SKIP_BLOCKCHAIN = "false"; - jest.restoreAllMocks(); - - const RebuildQueue = require("../../../../packages/core-blockchain/src/queue").RebuildQueue; - rebuildQueue = new RebuildQueue(blockchain, "processEvent"); -}); - -describe("RebuildQueue", () => { - it("should call blockchain rebuildBlock when pushing a block to the queue", async () => { - // @ts-ignore - const rebuildBlock = jest.spyOn(blockchain, "rebuildBlock").mockReturnValue(true); - - const cb = jest.fn(); - rebuildQueue.push(blocks2to100[3], cb); - - await delay(200); - expect(rebuildBlock).toHaveBeenCalled(); - }); - - it.skip("should just call callback if queue is paused when pushing a block to the queue", async () => { - // should call callback, but doesn't seem so... TODO - // @ts-ignore - const rebuildBlock = jest.spyOn(blockchain, "rebuildBlock").mockReturnValue(true); - - const cb = jest.fn(() => { - throw new Error("uuuui"); - }); - rebuildQueue.queue.paused = true; - rebuildQueue.queue.push(blocks2to100[3], cb); - - await delay(200); - expect(rebuildBlock).not.toHaveBeenCalled(); - expect(cb).toHaveBeenCalled(); - }); - - it("should log error and call callback when blockchain rebuildBlock throws", async () => { - const rebuildBlock = jest.spyOn(blockchain, "rebuildBlock").mockImplementation(() => { - throw new Error("wooo"); - }); - - const loggerError = jest.spyOn(container.resolvePlugin("logger"), "error"); - - const cb = jest.fn(() => true); - rebuildQueue.push(blocks2to100[3], cb); - - await delay(200); - expect(rebuildBlock).toHaveBeenCalled(); - expect(loggerError).toHaveBeenCalledWith(`Failed to rebuild block in RebuildQueue: ${blocks2to100[3].height}`); - }); -}); diff --git a/__tests__/unit/core-blockchain/state-machine.test.ts b/__tests__/unit/core-blockchain/state-machine.test.ts index 91911afcf4..5cc73145ef 100644 --- a/__tests__/unit/core-blockchain/state-machine.test.ts +++ b/__tests__/unit/core-blockchain/state-machine.test.ts @@ -86,39 +86,22 @@ describe("State Machine", () => { }); }); - describe("checkRebuildBlockSynced", () => { - it('should dispatch the event "SYNCED" if the blockchain is synced after a rebuild', () => { - blockchain.isRebuildSynced = jest.fn(() => true); - expect(() => actionMap.checkRebuildBlockSynced()).toDispatch(blockchain, "SYNCED"); - }); - - it('should dispatch the event "NOTSYNCED" if the blockchain is not synced after a rebuild', () => { - blockchain.isRebuildSynced = jest.fn(() => false); - expect(() => actionMap.checkRebuildBlockSynced()).toDispatch(blockchain, "NOTSYNCED"); - }); - }); - describe("checkLastDownloadedBlockSynced", () => { it('should dispatch the event "NOTSYNCED" by default', async () => { blockchain.isSynced = jest.fn(() => false); - blockchain.processQueue.length = jest.fn(() => 1); + blockchain.queue.length = jest.fn(() => 1); await expect(actionMap.checkLastDownloadedBlockSynced).toDispatch(blockchain, "NOTSYNCED"); }); - it('should dispatch the event "PAUSED" if the blockchain rebuild / process queue is more than 10000 long', async () => { + it('should dispatch the event "PAUSED" if the blockchain process queue is more than 10000 long', async () => { blockchain.isSynced = jest.fn(() => false); - blockchain.rebuildQueue.length = jest.fn(() => 10001); - blockchain.processQueue.length = jest.fn(() => 1); - await expect(actionMap.checkLastDownloadedBlockSynced).toDispatch(blockchain, "PAUSED"); - - blockchain.rebuildQueue.length = jest.fn(() => 1); - blockchain.processQueue.length = jest.fn(() => 10001); + blockchain.queue.length = jest.fn(() => 10001); await expect(actionMap.checkLastDownloadedBlockSynced).toDispatch(blockchain, "PAUSED"); }); it('should dispatch the event "NETWORKHALTED" if stateStorage.noBlockCounter > 5 and process queue is empty', async () => { blockchain.isSynced = jest.fn(() => false); - blockchain.processQueue.length = jest.fn(() => 0); + blockchain.queue.length = jest.fn(() => 0); stateStorage.noBlockCounter = 6; await expect(actionMap.checkLastDownloadedBlockSynced).toDispatch(blockchain, "NETWORKHALTED"); }); @@ -128,7 +111,7 @@ describe("State Machine", () => { - stateStorage.p2pUpdateCounter + 1 > 3 (network keeps missing blocks) - blockchain.p2p.checkNetworkHealth() returns a forked network status`, async () => { blockchain.isSynced = jest.fn(() => false); - blockchain.processQueue.length = jest.fn(() => 0); + blockchain.queue.length = jest.fn(() => 0); stateStorage.noBlockCounter = 6; stateStorage.p2pUpdateCounter = 3; // @ts-ignore @@ -177,25 +160,6 @@ describe("State Machine", () => { }); }); - describe("rebuildFinished", () => { - it('should dispatch the event "PROCESSFINISHED"', async () => { - localConfig.set("state.maxLastBlocks", 50); - const config = container.getConfig(); - const genesisBlock = config.get("genesisBlock"); - - stateStorage.setLastBlock(new Block(genesisBlock)); - - await expect(actionMap.rebuildFinished).toDispatch(blockchain, "PROCESSFINISHED"); - }); - - it('should dispatch the event "FAILURE" when some called method threw an exception', async () => { - jest.spyOn(blockchain.database, "commitQueuedQueries").mockImplementationOnce(() => { - throw new Error("oops"); - }); - await expect(actionMap.rebuildFinished).toDispatch(blockchain, "FAILURE"); - }); - }); - describe("downloadPaused", () => { it('should log the info message "Blockchain download paused"', () => { const logger = container.resolvePlugin("logger"); @@ -211,12 +175,6 @@ describe("State Machine", () => { }); }); - describe("rebuildingComplete", () => { - it('should dispatch the event "REBUILDCOMPLETE"', () => { - expect(() => actionMap.rebuildingComplete()).toDispatch(blockchain, "REBUILDCOMPLETE"); - }); - }); - describe("stopped", () => { it('should log the info message "The blockchain has been stopped"', () => { const logger = container.resolvePlugin("logger"); @@ -351,26 +309,6 @@ describe("State Machine", () => { ); }); - it("should dispatch REBUILD if stateStorage.fastRebuild", async () => { - process.env.NODE_ENV = ""; - - // mock getLastBlock() timestamp and fastRebuild config to trigger stateStorage.fastRebuild = true - jest.spyOn(blockchain.database, "getLastBlock").mockReturnValue({ - // @ts-ignore - data: { - height: 1, - timestamp: 0, - }, - }); - const mockConfigGet = jest - .spyOn(localConfig, "get") - .mockImplementation(key => (key === "fastRebuild" ? true : "")); - - await expect(() => actionMap.init()).toDispatch(blockchain, "REBUILD"); - - mockConfigGet.mockRestore(); - }); - it("should rollbackCurrentRound and dispatch STARTED if couldnt get activeDelegates", async () => { process.env.NODE_ENV = ""; jest.spyOn(blockchain.database, "getActiveDelegates").mockReturnValue(undefined); @@ -422,61 +360,6 @@ describe("State Machine", () => { }); }); - describe("rebuildBlocks", () => { - let genesisBlock; - - beforeAll(() => { - const config = container.getConfig(); - genesisBlock = config.get("genesisBlock"); - }); - - it("should dispatch NOBLOCK if no new blocks were downloaded from peer", async () => { - stateStorage.lastDownloadedBlock = new Block(genesisBlock); - - const logger = container.resolvePlugin("logger"); - const loggerInfo = jest.spyOn(logger, "info"); - - jest.spyOn(blockchain.p2p, "downloadBlocks").mockReturnValue([]); - await expect(() => actionMap.rebuildBlocks()).toDispatch(blockchain, "NOBLOCK"); - expect(loggerInfo).toHaveBeenCalledWith("No new blocks found on this peer"); - }); - - it("should dispatch DOWNLOADED if new blocks were successfully downloaded from peer", async () => { - stateStorage.lastDownloadedBlock = new Block(genesisBlock); - - const logger = container.resolvePlugin("logger"); - const loggerInfo = jest.spyOn(logger, "info"); - - jest.spyOn(blockchain.p2p, "downloadBlocks").mockReturnValue([ - { - numberOfTransactions: 2, - previousBlock: genesisBlock.id, - }, - ]); - await expect(() => actionMap.rebuildBlocks()).toDispatch(blockchain, "DOWNLOADED"); - expect(loggerInfo).toHaveBeenCalledWith( - "Downloaded 1 new block accounting for a total of 2 transactions", - ); - }); - - it("should dispatch NOBLOCK if new blocks were downloaded from peer but didnt match last known block", async () => { - stateStorage.lastDownloadedBlock = new Block(genesisBlock); - - const logger = container.resolvePlugin("logger"); - const loggerWarn = jest.spyOn(logger, "warn"); - - const downloadedBlock = { - numberOfTransactions: 2, - previousBlock: "123456", - }; - jest.spyOn(blockchain.p2p, "downloadBlocks").mockReturnValue([downloadedBlock]); - await expect(() => actionMap.rebuildBlocks()).toDispatch(blockchain, "NOBLOCK"); - expect(loggerWarn).toHaveBeenCalledWith( - `Downloaded block not accepted: ${JSON.stringify(downloadedBlock)}`, - ); - }); - }); - describe("downloadBlocks", () => { let genesisBlock; let loggerInfo; diff --git a/__tests__/utils/config/testnet/plugins.js b/__tests__/utils/config/testnet/plugins.js index 9d8318c623..8c317ba3ee 100644 --- a/__tests__/utils/config/testnet/plugins.js +++ b/__tests__/utils/config/testnet/plugins.js @@ -29,9 +29,7 @@ module.exports = { minimumNetworkReach: 5, coldStart: 5, }, - "@arkecosystem/core-blockchain": { - fastRebuild: false, - }, + "@arkecosystem/core-blockchain": {}, "@arkecosystem/core-api": { enabled: !process.env.CORE_API_DISABLED, host: process.env.CORE_API_HOST || "0.0.0.0", diff --git a/__tests__/utils/config/unitnet/plugins.js b/__tests__/utils/config/unitnet/plugins.js index 9d8318c623..8c317ba3ee 100644 --- a/__tests__/utils/config/unitnet/plugins.js +++ b/__tests__/utils/config/unitnet/plugins.js @@ -29,9 +29,7 @@ module.exports = { minimumNetworkReach: 5, coldStart: 5, }, - "@arkecosystem/core-blockchain": { - fastRebuild: false, - }, + "@arkecosystem/core-blockchain": {}, "@arkecosystem/core-api": { enabled: !process.env.CORE_API_DISABLED, host: process.env.CORE_API_HOST || "0.0.0.0", diff --git a/packages/core-api/src/versions/2/delegates/schema.ts b/packages/core-api/src/versions/2/delegates/schema.ts index 111208b3b5..097ce6fe4f 100644 --- a/packages/core-api/src/versions/2/delegates/schema.ts +++ b/packages/core-api/src/versions/2/delegates/schema.ts @@ -14,31 +14,25 @@ const schemaUsername = Joi.string() .min(1) .max(20); -const schemaIntegerBetween = Joi.object() - .keys({ - from: Joi - .number() - .integer() - .min(0), - to: Joi - .number() - .integer() - .min(0), - }) +const schemaIntegerBetween = Joi.object().keys({ + from: Joi.number() + .integer() + .min(0), + to: Joi.number() + .integer() + .min(0), +}); -const schemaPercentage = Joi.object() - .keys({ - from: Joi - .number() - .precision(2) - .min(0) - .max(100), - to: Joi - .number() - .precision(2) - .min(0) - .max(100), - }) +const schemaPercentage = Joi.object().keys({ + from: Joi.number() + .precision(2) + .min(0) + .max(100), + to: Joi.number() + .precision(2) + .min(0) + .max(100), +}); export const index: object = { query: { @@ -79,7 +73,7 @@ export const active: object = { height: Joi.number() .integer() .min(1), - } + }, }; export const show: object = { diff --git a/packages/core-blockchain/src/blockchain.ts b/packages/core-blockchain/src/blockchain.ts index 7a1765ee68..23480f8b15 100644 --- a/packages/core-blockchain/src/blockchain.ts +++ b/packages/core-blockchain/src/blockchain.ts @@ -10,10 +10,10 @@ import { } from "@arkecosystem/core-interfaces"; import { models, slots, Transaction } from "@arkecosystem/crypto"; +import async from "async"; import delay from "delay"; import pluralize from "pluralize"; import { BlockProcessor, BlockProcessorResult } from "./processor"; -import { ProcessQueue, Queue, RebuildQueue } from "./queue"; import { stateMachine } from "./state-machine"; import { StateStorage } from "./state-storage"; import { isBlockChained } from "./utils"; @@ -58,10 +58,8 @@ export class Blockchain implements blockchain.IBlockchain { public isStopped: boolean; public options: any; - public processQueue: ProcessQueue; - public rebuildQueue: RebuildQueue; + public queue: async.AsyncQueue; private actions: any; - private queue: Queue; private blockProcessor: BlockProcessor; /** @@ -83,7 +81,17 @@ export class Blockchain implements blockchain.IBlockchain { this.actions = stateMachine.actionMap(this); this.blockProcessor = new BlockProcessor(this); - this.__registerQueue(); + this.queue = async.queue((block: models.IBlockData, cb) => { + try { + return this.processBlock(new models.Block(block), cb); + } catch (error) { + logger.error(`Failed to process block in queue: ${block.height.toLocaleString()}`); + logger.error(error.stack); + return cb(); + } + }, 1); + + this.queue.drain = () => this.dispatch("PROCESSFINISHED"); } /** @@ -151,7 +159,7 @@ export class Blockchain implements blockchain.IBlockchain { this.dispatch("STOP"); - this.queue.destroy(); + this.queue.kill(); } } @@ -185,15 +193,6 @@ export class Blockchain implements blockchain.IBlockchain { return this.p2p.updateNetworkStatus(); } - /** - * Rebuild N blocks in the blockchain. - * @param {Number} nblocks - * @return {void} - */ - public rebuild(nblocks?: number) { - throw new Error("Method [rebuild] not implemented!"); - } - /** * Reset the state of the blockchain. * @return {void} @@ -209,7 +208,15 @@ export class Blockchain implements blockchain.IBlockchain { */ public clearAndStopQueue() { this.queue.pause(); - this.queue.clear(); + this.clearQueue(); + } + + /** + * Clear the queue. + * @return {void} + */ + public clearQueue() { + this.queue.remove(() => true); } /** @@ -258,7 +265,7 @@ export class Blockchain implements blockchain.IBlockchain { return; } - this.processQueue.push(blocks); + this.queue.push(blocks); this.state.lastDownloadedBlock = new Block(blocks.slice(-1)[0]); } @@ -399,50 +406,6 @@ export class Blockchain implements blockchain.IBlockchain { await this.database.loadBlocksFromCurrentRound(); } - /** - * Hande a block during a rebuild. - * NOTE: We should be sure this is fail safe (ie callback() is being called only ONCE) - * @param {Block} block - * @param {Function} callback - * @return {Object} - */ - public async rebuildBlock(block, callback) { - const lastBlock = this.state.getLastBlock(); - - if (block.verification.verified) { - if (isBlockChained(lastBlock, block)) { - // save block on database - this.database.enqueueSaveBlock(block); - - // committing to db every 20,000 blocks - if (block.data.height % 20000 === 0) { - await this.database.commitQueuedQueries(); - } - - this.state.setLastBlock(block); - - return callback(); - } - if (block.data.height > lastBlock.data.height + 1) { - this.state.lastDownloadedBlock = lastBlock; - return callback(); - } - if ( - block.data.height < lastBlock.data.height || - (block.data.height === lastBlock.data.height && block.data.id === lastBlock.data.id) - ) { - this.state.lastDownloadedBlock = lastBlock; - return callback(); - } - this.state.lastDownloadedBlock = lastBlock; - logger.info(`Block ${block.data.height.toLocaleString()} disregarded because on a fork`); - return callback(); - } - logger.warn(`Block ${block.data.height.toLocaleString()} disregarded because verification failed`); - logger.warn(JSON.stringify(block.verification, null, 4)); - return callback(); - } - /** * Process the given block. */ @@ -518,24 +481,6 @@ export class Blockchain implements blockchain.IBlockchain { return slots.getTime() - block.data.timestamp < 3 * config.getMilestone(block.data.height).blocktime; } - /** - * Determine if the blockchain is synced after a rebuild. - */ - public isRebuildSynced(block?: models.IBlock): boolean { - if (!this.p2p.hasPeers()) { - return true; - } - - block = block || this.getLastBlock(); - - const remaining = slots.getTime() - block.data.timestamp; - logger.info(`Remaining block timestamp ${remaining}`); - - // stop fast rebuild 7 days before the last network block - return slots.getTime() - block.data.timestamp < 3600 * 24 * 7; - // return slots.getTime() - block.data.timestamp < 100 * config.getMilestone(block.data.height).blocktime - } - /** * Get the last block of the blockchain. */ @@ -604,18 +549,4 @@ export class Blockchain implements blockchain.IBlockchain { "wallet.created.cold", ]; } - - /** - * Register the block queue. - * @return {void} - */ - public __registerQueue() { - this.queue = new Queue(this, { - process: "PROCESSFINISHED", - rebuild: "REBUILDFINISHED", - }); - - this.processQueue = this.queue.process; - this.rebuildQueue = this.queue.rebuild; - } } diff --git a/packages/core-blockchain/src/defaults.ts b/packages/core-blockchain/src/defaults.ts index 2838157dd8..25d7dca523 100644 --- a/packages/core-blockchain/src/defaults.ts +++ b/packages/core-blockchain/src/defaults.ts @@ -1,5 +1,4 @@ export const defaults = { - fastRebuild: false, databaseRollback: { maxBlockRewind: 10000, steps: 1000, diff --git a/packages/core-blockchain/src/machines/actions/fork.ts b/packages/core-blockchain/src/machines/actions/fork.ts index 0b205c99e4..0f18698760 100644 --- a/packages/core-blockchain/src/machines/actions/fork.ts +++ b/packages/core-blockchain/src/machines/actions/fork.ts @@ -4,18 +4,11 @@ export const fork = { analysing: { onEntry: ["analyseFork"], on: { - REBUILD: "revertBlocks", NOFORK: "exit", }, }, network: { onEntry: ["checkNetwork"], - /* these transitions are not used yet (TODO?) - on: { - SUCCESS: 'blockchain', - FAILURE: 'reset' - } - */ }, revertBlocks: {}, exit: { @@ -23,39 +16,3 @@ export const fork = { }, }, }; - -// const fork = { -// initial: 'network', -// states: { -// network: { -// onEntry: ['checkNetwork'], -// on: { -// SUCCESS: 'blockchain', -// FAILURE: 'reset' -// } -// }, -// blockchain: { -// onEntry: ['removeBlocks'], -// on: { -// SUCCESS: 'wallets', -// FAILURE: 'reset' -// } -// }, -// wallets: { -// onEntry: ['rebuildWallets'], -// on: { -// SUCCESS: 'success', -// FAILURE: 'reset' -// } -// }, -// reset: { -// onEntry: ['resetNode'], -// on: { -// RESET: 'success', -// FAILURE: 'reset' -// } -// }, -// success: { -// } -// } -// } diff --git a/packages/core-blockchain/src/machines/actions/rebuild-from-network.ts b/packages/core-blockchain/src/machines/actions/rebuild-from-network.ts deleted file mode 100644 index 70ca9e2a7e..0000000000 --- a/packages/core-blockchain/src/machines/actions/rebuild-from-network.ts +++ /dev/null @@ -1,52 +0,0 @@ -export const rebuildFromNetwork = { - initial: "rebuilding", - states: { - rebuilding: { - onEntry: ["checkLastDownloadedBlockSynced"], - on: { - SYNCED: "waitingFinished", - NOTSYNCED: "rebuildBlocks", - PAUSED: "rebuildPaused", - }, - }, - idle: { - on: { - DOWNLOADED: "rebuildBlocks", - }, - }, - rebuildBlocks: { - onEntry: ["rebuildBlocks"], - on: { - DOWNLOADED: "rebuilding", - NOBLOCK: "rebuilding", - }, - }, - waitingFinished: { - on: { - REBUILDFINISHED: "rebuildFinished", - }, - }, - rebuildFinished: { - onEntry: ["rebuildFinished"], - on: { - PROCESSFINISHED: "processFinished", - }, - }, - rebuildPaused: { - onEntry: ["downloadPaused"], - on: { - REBUILDFINISHED: "processFinished", - }, - }, - processFinished: { - onEntry: ["checkRebuildBlockSynced"], - on: { - SYNCED: "end", - NOTSYNCED: "rebuildBlocks", - }, - }, - end: { - onEntry: ["rebuildingComplete"], - }, - }, -}; diff --git a/packages/core-blockchain/src/machines/blockchain.ts b/packages/core-blockchain/src/machines/blockchain.ts index 779bfb5b5d..c9b2ea737d 100644 --- a/packages/core-blockchain/src/machines/blockchain.ts +++ b/packages/core-blockchain/src/machines/blockchain.ts @@ -1,6 +1,5 @@ import { Machine } from "xstate"; import { fork } from "./actions/fork"; -import { rebuildFromNetwork } from "./actions/rebuild-from-network"; import { syncWithNetwork } from "./actions/sync-with-network"; export const blockchainMachine: any = Machine({ @@ -16,7 +15,6 @@ export const blockchainMachine: any = Machine({ init: { onEntry: ["init"], on: { - REBUILD: "rebuild", NETWORKSTART: "idle", STARTED: "syncWithNetwork", ROLLBACK: "rollback", @@ -24,15 +22,6 @@ export const blockchainMachine: any = Machine({ STOP: "stopped", }, }, - rebuild: { - on: { - REBUILDCOMPLETE: "syncWithNetwork", - FORK: "fork", - TEST: "syncWithNetwork", - STOP: "stopped", - }, - ...rebuildFromNetwork, - }, syncWithNetwork: { on: { TEST: "idle", diff --git a/packages/core-blockchain/src/processor/handlers/unchained-handler.ts b/packages/core-blockchain/src/processor/handlers/unchained-handler.ts index 8bfe994f7e..62f00d9539 100644 --- a/packages/core-blockchain/src/processor/handlers/unchained-handler.ts +++ b/packages/core-blockchain/src/processor/handlers/unchained-handler.ts @@ -61,7 +61,7 @@ export class UnchainedHandler extends BlockHandler { public async execute(): Promise { super.execute(); - this.blockchain.processQueue.clear(); + this.blockchain.clearQueue(); const status = this.checkUnchainedBlock(); switch (status) { @@ -102,8 +102,8 @@ export class UnchainedHandler extends BlockHandler { // it is very likely that all blocks will be disregarded at this point anyway. // NOTE: This isn't really elegant, but still better than spamming the log with // useless `not ready to accept` messages. - if (this.blockchain.processQueue.length() > 0) { - this.logger.debug(`Discarded ${this.blockchain.processQueue.length()} downloaded blocks.`); + if (this.blockchain.queue.length() > 0) { + this.logger.debug(`Discarded ${this.blockchain.queue.length()} downloaded blocks.`); } // If we consecutively fail to accept the same block, our chain is likely forked. In this diff --git a/packages/core-blockchain/src/queue/index.ts b/packages/core-blockchain/src/queue/index.ts deleted file mode 100644 index 031522c6c0..0000000000 --- a/packages/core-blockchain/src/queue/index.ts +++ /dev/null @@ -1,53 +0,0 @@ -import { ProcessQueue } from "./process"; -import { RebuildQueue } from "./rebuild"; - -export { ProcessQueue }; -export { RebuildQueue }; - -export class Queue { - public process: ProcessQueue; - public rebuild: RebuildQueue; - - /** - * Create an instance of the queue. - * @param {Blockchain} blockchain - * @param {Object} events - * @return {void} - */ - constructor(blockchain, events) { - this.process = new ProcessQueue(blockchain, events.process); - this.rebuild = new RebuildQueue(blockchain, events.rebuild); - } - - /** - * Pause all queues. - * @return {void} - */ - public pause() { - this.rebuild.pause(); - this.process.pause(); - } - - /** - * Flush all queues. - * @return {void} - */ - public clear() { - this.rebuild.clear(); - this.process.clear(); - } - - /** - * Resume all queues. - * @return {void} - */ - public resume() { - this.rebuild.resume(); - this.process.resume(); - } - - public destroy() { - this.rebuild.destroy(); - this.process.destroy(); - } -} diff --git a/packages/core-blockchain/src/queue/interface.ts b/packages/core-blockchain/src/queue/interface.ts deleted file mode 100644 index 45b863c9a4..0000000000 --- a/packages/core-blockchain/src/queue/interface.ts +++ /dev/null @@ -1,71 +0,0 @@ -import async from "async"; -import { Blockchain } from "../blockchain"; - -export abstract class QueueInterface { - protected queue: any; - - /** - * Create an instance of the process queue. - */ - constructor(readonly blockchain: Blockchain, readonly event: string) {} - - /** - * Drain the queue. - */ - public drain() { - this.queue.drain = () => this.blockchain.dispatch(this.event); - } - - /** - * Pause the queue. - * @return {void} - */ - public pause() { - return this.queue.pause(); - } - - /** - * Flush the queue. - * @return {void} - */ - public clear() { - return this.queue.remove(() => true); - } - - /** - * Resume the queue. - * @return {void} - */ - public resume() { - return this.queue.resume(); - } - - /** - * Remove the item from the queue. - * @return {void} - */ - public remove(item) { - return this.queue.remove(item); - } - - /** - * Push the item to the queue. - * @param {Function} callback - * @return {void} - */ - public push(callback) { - return this.queue.push(callback); - } - - /** - * Get the length of the queue. - * @return {void} - */ - public length() { - return this.queue.length(); - } - - public destroy() { - return this.queue.kill(); - } -} diff --git a/packages/core-blockchain/src/queue/process.ts b/packages/core-blockchain/src/queue/process.ts deleted file mode 100644 index bd4afc25ba..0000000000 --- a/packages/core-blockchain/src/queue/process.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { app } from "@arkecosystem/core-container"; -import { Logger } from "@arkecosystem/core-interfaces"; -import { models } from "@arkecosystem/crypto"; -import async from "async"; -import { Blockchain } from "../blockchain"; -import { QueueInterface } from "./interface"; - -const logger = app.resolvePlugin("logger"); - -export class ProcessQueue extends QueueInterface { - /** - * Create an instance of the process queue. - */ - constructor(readonly blockchain: Blockchain, readonly event: string) { - super(blockchain, event); - - this.queue = async.queue((block: models.IBlockData, cb) => { - try { - return blockchain.processBlock(new models.Block(block), cb); - } catch (error) { - logger.error(`Failed to process block in ProcessQueue: ${block.height.toLocaleString()}`); - logger.error(error.stack); - return cb(); - } - }, 1); - - this.drain(); - } -} diff --git a/packages/core-blockchain/src/queue/rebuild.ts b/packages/core-blockchain/src/queue/rebuild.ts deleted file mode 100644 index 786d3dc4bf..0000000000 --- a/packages/core-blockchain/src/queue/rebuild.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { app } from "@arkecosystem/core-container"; -import { Logger } from "@arkecosystem/core-interfaces"; -import { models } from "@arkecosystem/crypto"; -import async from "async"; -import { Blockchain } from "../blockchain"; -import { QueueInterface } from "./interface"; - -const logger = app.resolvePlugin("logger"); - -export class RebuildQueue extends QueueInterface { - /** - * Create an instance of the process queue. - */ - constructor(readonly blockchain: Blockchain, readonly event: string) { - super(blockchain, event); - - this.queue = async.queue((block: models.IBlockData, cb) => { - if (this.queue.paused) { - return cb(); - } - try { - return blockchain.rebuildBlock(new models.Block(block), cb); - } catch (error) { - logger.error(`Failed to rebuild block in RebuildQueue: ${block.height.toLocaleString()}`); - return cb(); - } - }, 1); - - this.drain(); - } -} diff --git a/packages/core-blockchain/src/state-machine.ts b/packages/core-blockchain/src/state-machine.ts index 77f68953d6..86fd0cf198 100644 --- a/packages/core-blockchain/src/state-machine.ts +++ b/packages/core-blockchain/src/state-machine.ts @@ -4,13 +4,13 @@ import { app } from "@arkecosystem/core-container"; import { EventEmitter, Logger } from "@arkecosystem/core-interfaces"; import { roundCalculator } from "@arkecosystem/core-utils"; -import { isException, models, slots } from "@arkecosystem/crypto"; +import { isException, models } from "@arkecosystem/crypto"; import pluralize from "pluralize"; import { config as localConfig } from "./config"; import { blockchainMachine } from "./machines/blockchain"; import { stateStorage } from "./state-storage"; -import { isBlockChained, tickSyncTracker } from "./utils"; +import { isBlockChained } from "./utils"; import { Blockchain } from "./blockchain"; @@ -47,22 +47,16 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({ return blockchain.dispatch(blockchain.isSynced() ? "SYNCED" : "NOTSYNCED"); }, - checkRebuildBlockSynced() { - return blockchain.dispatch(blockchain.isRebuildSynced() ? "SYNCED" : "NOTSYNCED"); - }, - async checkLastDownloadedBlockSynced() { let event = "NOTSYNCED"; - logger.debug( - `Queued blocks (rebuild: ${blockchain.rebuildQueue.length()} process: ${blockchain.processQueue.length()})`, - ); + logger.debug(`Queued blocks (process: ${blockchain.queue.length()})`); - if (blockchain.rebuildQueue.length() > 10000 || blockchain.processQueue.length() > 10000) { + if (blockchain.queue.length() > 10000) { event = "PAUSED"; } // tried to download but no luck after 5 tries (looks like network missing blocks) - if (stateStorage.noBlockCounter > 5 && blockchain.processQueue.length() === 0) { + if (stateStorage.noBlockCounter > 5 && blockchain.queue.length() === 0) { logger.info("Tried to sync 5 times to different nodes, looks like the network is missing blocks"); stateStorage.noBlockCounter = 0; @@ -109,30 +103,11 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({ stateStorage.networkStart = false; blockchain.dispatch("SYNCFINISHED"); - } else if (blockchain.rebuildQueue.length() === 0) { + } else { blockchain.dispatch("PROCESSFINISHED"); } }, - async rebuildFinished() { - try { - logger.info("Blockchain rebuild finished"); - - stateStorage.rebuild = false; - - await blockchain.database.commitQueuedQueries(); - await blockchain.rollbackCurrentRound(); - await blockchain.database.buildWallets(stateStorage.getLastBlock().data.height); - await blockchain.database.saveWallets(true); - await blockchain.transactionPool.buildWallets(); - - return blockchain.dispatch("PROCESSFINISHED"); - } catch (error) { - logger.error(error.stack); - return blockchain.dispatch("FAILURE"); - } - }, - downloadPaused: () => logger.info("Blockchain download paused"), syncingComplete() { @@ -140,11 +115,6 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({ blockchain.dispatch("SYNCFINISHED"); }, - rebuildingComplete() { - logger.info("Blockchain rebuild complete"); - blockchain.dispatch("REBUILDCOMPLETE"); - }, - stopped() { logger.info("The blockchain has been stopped"); }, @@ -195,7 +165,6 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({ /** ******************************* * state machine data init * ******************************* */ - const constants = config.getMilestone(block.data.height); stateStorage.setLastBlock(block); stateStorage.lastDownloadedBlock = block; @@ -208,12 +177,6 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({ return blockchain.dispatch("STARTED"); } - stateStorage.rebuild = - slots.getTime() - block.data.timestamp > (constants.activeDelegates + 1) * constants.blocktime; - // no fast rebuild if in last week - stateStorage.fastRebuild = - slots.getTime() - block.data.timestamp > 3600 * 24 * 7 && !!localConfig.get("fastRebuild"); - if (process.env.NODE_ENV === "test") { logger.verbose("TEST SUITE DETECTED! SYNCING WALLETS AND STARTING IMMEDIATELY."); @@ -223,13 +186,8 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({ return blockchain.dispatch("STARTED"); } - logger.info(`Fast rebuild: ${stateStorage.fastRebuild}`); logger.info(`Last block in database: ${block.data.height.toLocaleString()}`); - if (stateStorage.fastRebuild) { - return blockchain.dispatch("REBUILD"); - } - // removing blocks up to the last round to compute active delegate list later if needed const activeDelegates = await blockchain.database.getActiveDelegates(block.data.height); @@ -271,42 +229,6 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({ } }, - async rebuildBlocks() { - const lastBlock = stateStorage.lastDownloadedBlock || stateStorage.getLastBlock(); - const blocks = await blockchain.p2p.downloadBlocks(lastBlock.data.height); - - tickSyncTracker(blocks.length, lastBlock.data.height); - - if (!blocks || blocks.length === 0) { - logger.info("No new blocks found on this peer"); - - blockchain.dispatch("NOBLOCK"); - } else { - logger.info( - `Downloaded ${blocks.length} new ${pluralize( - "block", - blocks.length, - )} accounting for a total of ${pluralize( - "transaction", - blocks.reduce((sum, b) => sum + b.numberOfTransactions, 0), - true, - )}`, - ); - - if (blocks.length && blocks[0].previousBlock === lastBlock.data.id) { - stateStorage.lastDownloadedBlock = { data: blocks.slice(-1)[0] }; - blockchain.rebuildQueue.push(blocks); - blockchain.dispatch("DOWNLOADED"); - } else { - logger.warn(`Downloaded block not accepted: ${JSON.stringify(blocks[0])}`); - logger.warn(`Last block: ${JSON.stringify(lastBlock.data)}`); - - // disregard the whole block list - blockchain.dispatch("NOBLOCK"); - } - } - }, - async downloadBlocks() { const lastDownloadedBlock = stateStorage.lastDownloadedBlock || stateStorage.getLastBlock(); const blocks = await blockchain.p2p.downloadBlocks(lastDownloadedBlock.data.height); @@ -347,7 +269,7 @@ blockchainMachine.actionMap = (blockchain: Blockchain) => ({ } else { logger.warn(`Downloaded block not accepted: ${JSON.stringify(blocks[0])}`); logger.warn(`Last downloaded block: ${JSON.stringify(lastDownloadedBlock.data)}`); - blockchain.processQueue.clear(); + blockchain.clearQueue(); } stateStorage.noBlockCounter++; diff --git a/packages/core-blockchain/src/state-storage.ts b/packages/core-blockchain/src/state-storage.ts index b270b3cc93..1a48b11d8e 100644 --- a/packages/core-blockchain/src/state-storage.ts +++ b/packages/core-blockchain/src/state-storage.ts @@ -31,8 +31,6 @@ export class StateStorage implements Blockchain.IStateStorage { public blockPing: any; public started: boolean; public forkedBlock: models.Block | null; - public rebuild: boolean; - public fastRebuild: boolean; public wakeUpTimeout: any; public noBlockCounter: number; public p2pUpdateCounter: number; @@ -52,8 +50,6 @@ export class StateStorage implements Blockchain.IStateStorage { this.blockPing = null; this.started = false; this.forkedBlock = null; - this.rebuild = true; - this.fastRebuild = false; this.wakeUpTimeout = null; this.noBlockCounter = 0; this.p2pUpdateCounter = 0; diff --git a/packages/core-interfaces/src/core-blockchain/blockchain.ts b/packages/core-interfaces/src/core-blockchain/blockchain.ts index dc47c3f784..6c7ac31f7c 100644 --- a/packages/core-interfaces/src/core-blockchain/blockchain.ts +++ b/packages/core-interfaces/src/core-blockchain/blockchain.ts @@ -45,13 +45,6 @@ export interface IBlockchain { */ updateNetworkStatus(): Promise; - /** - * Rebuild N blocks in the blockchain. - * @param {Number} nblocks - * @return {void} - */ - rebuild(nblocks?: number): void; - /** * Reset the state of the blockchain. * @return {void} @@ -99,15 +92,6 @@ export interface IBlockchain { */ removeTopBlocks(count: any): Promise; - /** - * Hande a block during a rebuild. - * NOTE: We should be sure this is fail safe (ie callback() is being called only ONCE) - * @param {Block} block - * @param {Function} callback - * @return {Object} - */ - rebuildBlock(block: models.Block, callback: any): Promise; - /** * Process the given block. * NOTE: We should be sure this is fail safe (ie callback() is being called only ONCE) @@ -154,13 +138,6 @@ export interface IBlockchain { */ isSynced(block?: models.Block): boolean; - /** - * Determine if the blockchain is synced after a rebuild. - * @param {Block} block - * @return {Boolean} - */ - isRebuildSynced(block?: models.Block): boolean; - /** * Get the last block of the blockchain. * @return {Object} diff --git a/packages/core/bin/config/devnet/plugins.js b/packages/core/bin/config/devnet/plugins.js index 5cece283af..ebe6958fee 100644 --- a/packages/core/bin/config/devnet/plugins.js +++ b/packages/core/bin/config/devnet/plugins.js @@ -37,9 +37,7 @@ module.exports = { minimumNetworkReach: 5, coldStart: 5, }, - "@arkecosystem/core-blockchain": { - fastRebuild: false, - }, + "@arkecosystem/core-blockchain": {}, "@arkecosystem/core-api": { enabled: !process.env.CORE_API_DISABLED, host: process.env.CORE_API_HOST || "0.0.0.0", diff --git a/packages/core/bin/config/mainnet/plugins.js b/packages/core/bin/config/mainnet/plugins.js index 62c78b69bb..207b77c8b1 100644 --- a/packages/core/bin/config/mainnet/plugins.js +++ b/packages/core/bin/config/mainnet/plugins.js @@ -35,9 +35,7 @@ module.exports = { host: process.env.CORE_P2P_HOST || "0.0.0.0", port: process.env.CORE_P2P_PORT || 4001, }, - "@arkecosystem/core-blockchain": { - fastRebuild: false, - }, + "@arkecosystem/core-blockchain": {}, "@arkecosystem/core-api": { enabled: !process.env.CORE_API_DISABLED, host: process.env.CORE_API_HOST || "0.0.0.0", diff --git a/packages/core/bin/config/testnet/plugins.js b/packages/core/bin/config/testnet/plugins.js index 312e91a11d..380f139124 100644 --- a/packages/core/bin/config/testnet/plugins.js +++ b/packages/core/bin/config/testnet/plugins.js @@ -37,9 +37,7 @@ module.exports = { minimumNetworkReach: 5, coldStart: 5, }, - "@arkecosystem/core-blockchain": { - fastRebuild: false, - }, + "@arkecosystem/core-blockchain": {}, "@arkecosystem/core-api": { enabled: !process.env.CORE_API_DISABLED, host: process.env.CORE_API_HOST || "0.0.0.0",