From e22328753844d4388bd7fdd4663cae5d83e9a3c1 Mon Sep 17 00:00:00 2001 From: Air1 Date: Wed, 22 Jul 2020 10:50:50 +0200 Subject: [PATCH] fix(core-p2p): reduce download block size when getting no block --- .../unit/core-p2p/network-monitor.test.ts | 56 +++++++++++++++++++ packages/core-p2p/src/network-monitor.ts | 24 +++++--- packages/core-p2p/src/peer-communicator.ts | 18 ++++-- 3 files changed, 87 insertions(+), 11 deletions(-) diff --git a/__tests__/unit/core-p2p/network-monitor.test.ts b/__tests__/unit/core-p2p/network-monitor.test.ts index 6d0f1e2a35..748ea50e70 100644 --- a/__tests__/unit/core-p2p/network-monitor.test.ts +++ b/__tests__/unit/core-p2p/network-monitor.test.ts @@ -398,6 +398,62 @@ describe("NetworkMonitor", () => { expect(await monitor.downloadBlocksFromHeight(20, maxParallelDownloads)).toEqual([mockBlock]); }); + + it("should reduce download block chunk size after receiving no block", async () => { + communicator.getPeerBlocks = jest.fn().mockReturnValue([]); + + const numPeers = maxParallelDownloads; + for (let i = 0; i < maxParallelDownloads; i++) { + storage.setPeer( + createStubPeer({ + ip: `1.1.1.${i}`, + port: 4000, + state: { + height: 12500, + currentSlot: 2, + forgingAllowed: true, + }, + verificationResult: { forked: false }, + }), + ); + } + + const fromHeight = 1; + + // first step, peers won't return any block: chunk size should be reduced by factor 10 for next download + for (const expectedBlockLimit of [400, 40, 4, 1, 1, 1]) { + // @ts-ignore + communicator.getPeerBlocks.mockReset(); + const downloadedBlocks = await monitor.downloadBlocksFromHeight(fromHeight, maxParallelDownloads); + + expect(downloadedBlocks).toEqual([]); + expect(communicator.getPeerBlocks).toBeCalledTimes(maxParallelDownloads); + expect(communicator.getPeerBlocks).toBeCalledWith(expect.anything(), { + fromBlockHeight: expect.any(Number), + blockLimit: expectedBlockLimit, + }); + } + + // second step, peers return blocks: chunk size should be reset to default value (400) for next download + const mockGetPeerBlocks1Block = (_, { fromBlockHeight }) => [expectedBlocksFromHeight(fromBlockHeight)[0]]; + for (const expectedBlockLimit of [1, 400]) { + communicator.getPeerBlocks = jest + .fn() + .mockImplementation(expectedBlockLimit === 1 ? mockGetPeerBlocks1Block : mockedGetPeerBlocks); + + const downloadedBlocks = await monitor.downloadBlocksFromHeight(fromHeight, maxParallelDownloads); + + const expectedBlocks = expectedBlocksFromHeight(fromHeight).slice(0, numPeers * expectedBlockLimit); + + expect(downloadedBlocks).toEqual(expectedBlocks); + + expect(communicator.getPeerBlocks).toBeCalledTimes(maxParallelDownloads); + expect(communicator.getPeerBlocks).toBeCalledWith(expect.anything(), { + fromBlockHeight: expect.any(Number), + blockLimit: expectedBlockLimit, + }); + } + }); }); describe("broadcastBlock", () => { diff --git a/packages/core-p2p/src/network-monitor.ts b/packages/core-p2p/src/network-monitor.ts index cfa30073e9..ca600c8eec 100644 --- a/packages/core-p2p/src/network-monitor.ts +++ b/packages/core-p2p/src/network-monitor.ts @@ -16,6 +16,8 @@ import { NetworkState } from "./network-state"; import { RateLimiter } from "./rate-limiter"; import { buildRateLimiter, checkDNS, checkNTP } from "./utils"; +const defaultDownloadChunkSize = 400; + export class NetworkMonitor implements P2P.INetworkMonitor { public server: SocketCluster; public config: any; @@ -35,6 +37,8 @@ export class NetworkMonitor implements P2P.INetworkMonitor { */ private downloadedChunksCacheMax: number = 100; + private downloadChunkSize: number = defaultDownloadChunkSize; + private readonly logger: Logger.ILogger = app.resolvePlugin("logger"); private readonly emitter: EventEmitter.EventEmitter = app.resolvePlugin("event-emitter"); @@ -344,12 +348,11 @@ export class NetworkMonitor implements P2P.INetworkMonitor { } const networkHeight: number = this.getNetworkHeight(); - const chunkSize: number = 400; let chunksMissingToSync: number; if (!networkHeight || networkHeight <= fromBlockHeight) { chunksMissingToSync = 1; } else { - chunksMissingToSync = Math.ceil((networkHeight - fromBlockHeight) / chunkSize); + chunksMissingToSync = Math.ceil((networkHeight - fromBlockHeight) / this.downloadChunkSize); } const chunksToDownload: number = Math.min(chunksMissingToSync, peersNotForked.length, maxParallelDownloads); @@ -362,9 +365,9 @@ export class NetworkMonitor implements P2P.INetworkMonitor { let chunksHumanReadable: string = ""; for (let i = 0; i < chunksToDownload; i++) { - const height: number = fromBlockHeight + chunkSize * i; + const height: number = fromBlockHeight + this.downloadChunkSize * i; const isLastChunk: boolean = i === chunksToDownload - 1; - const blocksRange: string = `[${height + 1}, ${isLastChunk ? ".." : height + chunkSize}]`; + const blocksRange: string = `[${height + 1}, ${isLastChunk ? ".." : height + this.downloadChunkSize}]`; downloadJobs.push(async () => { if (this.downloadedChunksCache[height] !== undefined) { @@ -388,9 +391,12 @@ export class NetworkMonitor implements P2P.INetworkMonitor { for (peer of peersToTry) { peerPrint = `${peer.ip}:${peer.port}`; try { - blocks = await this.communicator.getPeerBlocks(peer, { fromBlockHeight: height }); + blocks = await this.communicator.getPeerBlocks(peer, { + fromBlockHeight: height, + blockLimit: this.downloadChunkSize, + }); - if (blocks.length === chunkSize || (isLastChunk && blocks.length > 0)) { + if (blocks.length === this.downloadChunkSize || (isLastChunk && blocks.length > 0)) { this.logger.debug( `Downloaded blocks ${blocksRange} (${blocks.length}) ` + `from ${peerPrint}`, ); @@ -457,10 +463,14 @@ export class NetworkMonitor implements P2P.INetworkMonitor { downloadResults[i] !== undefined && Object.keys(this.downloadedChunksCache).length <= this.downloadedChunksCacheMax ) { - this.downloadedChunksCache[fromBlockHeight + chunkSize * i] = downloadResults[i]; + this.downloadedChunksCache[fromBlockHeight + this.downloadChunkSize * i] = downloadResults[i]; } } + // if we did not manage to download any block, reduce chunk size for next time + this.downloadChunkSize = + downloadedBlocks.length === 0 ? Math.ceil(this.downloadChunkSize / 10) : defaultDownloadChunkSize; + return downloadedBlocks; } diff --git a/packages/core-p2p/src/peer-communicator.ts b/packages/core-p2p/src/peer-communicator.ts index 99236058b8..ad134f673c 100644 --- a/packages/core-p2p/src/peer-communicator.ts +++ b/packages/core-p2p/src/peer-communicator.ts @@ -198,6 +198,7 @@ export class PeerCommunicator implements P2P.IPeerCommunicator { }, app.resolveOptions("p2p").getBlocksTimeout, maxPayload, + false, ); if (!peerBlocks) { @@ -247,7 +248,14 @@ export class PeerCommunicator implements P2P.IPeerCommunicator { return true; } - private async emit(peer: P2P.IPeer, event: string, data?: any, timeout?: number, maxPayload?: number) { + private async emit( + peer: P2P.IPeer, + event: string, + data?: any, + timeout?: number, + maxPayload?: number, + disconnectOnError: boolean = true, + ) { await this.throttle(peer, event); let response; @@ -276,7 +284,7 @@ export class PeerCommunicator implements P2P.IPeerCommunicator { throw new Error(`Response validation failed from peer ${peer.ip} : ${JSON.stringify(response.data)}`); } } catch (e) { - this.handleSocketError(peer, event, e); + this.handleSocketError(peer, event, e, disconnectOnError); return undefined; } @@ -293,7 +301,7 @@ export class PeerCommunicator implements P2P.IPeerCommunicator { } } - private handleSocketError(peer: P2P.IPeer, event: string, error: Error): void { + private handleSocketError(peer: P2P.IPeer, event: string, error: Error, disconnect: boolean = true): void { if (!error.name) { return; } @@ -313,7 +321,9 @@ export class PeerCommunicator implements P2P.IPeerCommunicator { if (process.env.CORE_P2P_PEER_VERIFIER_DEBUG_EXTRA) { this.logger.debug(`Socket error (peer ${peer.ip}) : ${error.message}`); } - this.emitter.emit("internal.p2p.disconnectPeer", { peer }); + if (disconnect) { + this.emitter.emit("internal.p2p.disconnectPeer", { peer }); + } } } }