Skip to content

Commit

Permalink
fix(core-p2p): reduce download block size when getting no block
Browse files Browse the repository at this point in the history
  • Loading branch information
air1one committed Jul 22, 2020
1 parent 20d66cf commit e223287
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 11 deletions.
56 changes: 56 additions & 0 deletions __tests__/unit/core-p2p/network-monitor.test.ts
Expand Up @@ -398,6 +398,62 @@ describe("NetworkMonitor", () => {

expect(await monitor.downloadBlocksFromHeight(20, maxParallelDownloads)).toEqual([mockBlock]);
});

it("should reduce download block chunk size after receiving no block", async () => {
communicator.getPeerBlocks = jest.fn().mockReturnValue([]);

const numPeers = maxParallelDownloads;
for (let i = 0; i < maxParallelDownloads; i++) {
storage.setPeer(
createStubPeer({
ip: `1.1.1.${i}`,
port: 4000,
state: {
height: 12500,
currentSlot: 2,
forgingAllowed: true,
},
verificationResult: { forked: false },
}),
);
}

const fromHeight = 1;

// first step, peers won't return any block: chunk size should be reduced by factor 10 for next download
for (const expectedBlockLimit of [400, 40, 4, 1, 1, 1]) {
// @ts-ignore
communicator.getPeerBlocks.mockReset();
const downloadedBlocks = await monitor.downloadBlocksFromHeight(fromHeight, maxParallelDownloads);

expect(downloadedBlocks).toEqual([]);
expect(communicator.getPeerBlocks).toBeCalledTimes(maxParallelDownloads);
expect(communicator.getPeerBlocks).toBeCalledWith(expect.anything(), {
fromBlockHeight: expect.any(Number),
blockLimit: expectedBlockLimit,
});
}

// second step, peers return blocks: chunk size should be reset to default value (400) for next download
const mockGetPeerBlocks1Block = (_, { fromBlockHeight }) => [expectedBlocksFromHeight(fromBlockHeight)[0]];
for (const expectedBlockLimit of [1, 400]) {
communicator.getPeerBlocks = jest
.fn()
.mockImplementation(expectedBlockLimit === 1 ? mockGetPeerBlocks1Block : mockedGetPeerBlocks);

const downloadedBlocks = await monitor.downloadBlocksFromHeight(fromHeight, maxParallelDownloads);

const expectedBlocks = expectedBlocksFromHeight(fromHeight).slice(0, numPeers * expectedBlockLimit);

expect(downloadedBlocks).toEqual(expectedBlocks);

expect(communicator.getPeerBlocks).toBeCalledTimes(maxParallelDownloads);
expect(communicator.getPeerBlocks).toBeCalledWith(expect.anything(), {
fromBlockHeight: expect.any(Number),
blockLimit: expectedBlockLimit,
});
}
});
});

describe("broadcastBlock", () => {
Expand Down
24 changes: 17 additions & 7 deletions packages/core-p2p/src/network-monitor.ts
Expand Up @@ -16,6 +16,8 @@ import { NetworkState } from "./network-state";
import { RateLimiter } from "./rate-limiter";
import { buildRateLimiter, checkDNS, checkNTP } from "./utils";

const defaultDownloadChunkSize = 400;

export class NetworkMonitor implements P2P.INetworkMonitor {
public server: SocketCluster;
public config: any;
Expand All @@ -35,6 +37,8 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
*/
private downloadedChunksCacheMax: number = 100;

private downloadChunkSize: number = defaultDownloadChunkSize;

private readonly logger: Logger.ILogger = app.resolvePlugin<Logger.ILogger>("logger");
private readonly emitter: EventEmitter.EventEmitter = app.resolvePlugin<EventEmitter.EventEmitter>("event-emitter");

Expand Down Expand Up @@ -344,12 +348,11 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
}

const networkHeight: number = this.getNetworkHeight();
const chunkSize: number = 400;
let chunksMissingToSync: number;
if (!networkHeight || networkHeight <= fromBlockHeight) {
chunksMissingToSync = 1;
} else {
chunksMissingToSync = Math.ceil((networkHeight - fromBlockHeight) / chunkSize);
chunksMissingToSync = Math.ceil((networkHeight - fromBlockHeight) / this.downloadChunkSize);
}
const chunksToDownload: number = Math.min(chunksMissingToSync, peersNotForked.length, maxParallelDownloads);

Expand All @@ -362,9 +365,9 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
let chunksHumanReadable: string = "";

for (let i = 0; i < chunksToDownload; i++) {
const height: number = fromBlockHeight + chunkSize * i;
const height: number = fromBlockHeight + this.downloadChunkSize * i;
const isLastChunk: boolean = i === chunksToDownload - 1;
const blocksRange: string = `[${height + 1}, ${isLastChunk ? ".." : height + chunkSize}]`;
const blocksRange: string = `[${height + 1}, ${isLastChunk ? ".." : height + this.downloadChunkSize}]`;

downloadJobs.push(async () => {
if (this.downloadedChunksCache[height] !== undefined) {
Expand All @@ -388,9 +391,12 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
for (peer of peersToTry) {
peerPrint = `${peer.ip}:${peer.port}`;
try {
blocks = await this.communicator.getPeerBlocks(peer, { fromBlockHeight: height });
blocks = await this.communicator.getPeerBlocks(peer, {
fromBlockHeight: height,
blockLimit: this.downloadChunkSize,
});

if (blocks.length === chunkSize || (isLastChunk && blocks.length > 0)) {
if (blocks.length === this.downloadChunkSize || (isLastChunk && blocks.length > 0)) {
this.logger.debug(
`Downloaded blocks ${blocksRange} (${blocks.length}) ` + `from ${peerPrint}`,
);
Expand Down Expand Up @@ -457,10 +463,14 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
downloadResults[i] !== undefined &&
Object.keys(this.downloadedChunksCache).length <= this.downloadedChunksCacheMax
) {
this.downloadedChunksCache[fromBlockHeight + chunkSize * i] = downloadResults[i];
this.downloadedChunksCache[fromBlockHeight + this.downloadChunkSize * i] = downloadResults[i];
}
}

// if we did not manage to download any block, reduce chunk size for next time
this.downloadChunkSize =
downloadedBlocks.length === 0 ? Math.ceil(this.downloadChunkSize / 10) : defaultDownloadChunkSize;

return downloadedBlocks;
}

Expand Down
18 changes: 14 additions & 4 deletions packages/core-p2p/src/peer-communicator.ts
Expand Up @@ -198,6 +198,7 @@ export class PeerCommunicator implements P2P.IPeerCommunicator {
},
app.resolveOptions("p2p").getBlocksTimeout,
maxPayload,
false,
);

if (!peerBlocks) {
Expand Down Expand Up @@ -247,7 +248,14 @@ export class PeerCommunicator implements P2P.IPeerCommunicator {
return true;
}

private async emit(peer: P2P.IPeer, event: string, data?: any, timeout?: number, maxPayload?: number) {
private async emit(
peer: P2P.IPeer,
event: string,
data?: any,
timeout?: number,
maxPayload?: number,
disconnectOnError: boolean = true,
) {
await this.throttle(peer, event);

let response;
Expand Down Expand Up @@ -276,7 +284,7 @@ export class PeerCommunicator implements P2P.IPeerCommunicator {
throw new Error(`Response validation failed from peer ${peer.ip} : ${JSON.stringify(response.data)}`);
}
} catch (e) {
this.handleSocketError(peer, event, e);
this.handleSocketError(peer, event, e, disconnectOnError);
return undefined;
}

Expand All @@ -293,7 +301,7 @@ export class PeerCommunicator implements P2P.IPeerCommunicator {
}
}

private handleSocketError(peer: P2P.IPeer, event: string, error: Error): void {
private handleSocketError(peer: P2P.IPeer, event: string, error: Error, disconnect: boolean = true): void {
if (!error.name) {
return;
}
Expand All @@ -313,7 +321,9 @@ export class PeerCommunicator implements P2P.IPeerCommunicator {
if (process.env.CORE_P2P_PEER_VERIFIER_DEBUG_EXTRA) {
this.logger.debug(`Socket error (peer ${peer.ip}) : ${error.message}`);
}
this.emitter.emit("internal.p2p.disconnectPeer", { peer });
if (disconnect) {
this.emitter.emit("internal.p2p.disconnectPeer", { peer });
}
}
}
}

0 comments on commit e223287

Please sign in to comment.