Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core-p2p): disconnect all sockets on peer disconnect #3909

Merged
merged 4 commits into from
Jul 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions __tests__/unit/core-p2p/listeners.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe("DisconnectInvalidPeers", () => {
it("should emit 'internal.p2p.disconnectPeer' for invalid version peers", async () => {
await disconnectInvalidPeers.handle();

expect(emitter.dispatch).toBeCalledTimes(2 * 3); // 2 invalid peers version * 3 sockets (ports) per peer
expect(emitter.dispatch).toBeCalledTimes(2); // 2 invalid peers version
});
});
});
Expand Down Expand Up @@ -80,8 +80,10 @@ describe("DisconnectPeer", () => {

expect(storage.forgetPeer).toBeCalledTimes(1);
expect(storage.forgetPeer).toBeCalledWith(peer);
expect(connector.disconnect).toBeCalledTimes(1);
expect(connector.disconnect).toBeCalledTimes(3);
expect(connector.disconnect).toBeCalledWith(peer, 4000);
expect(connector.disconnect).toBeCalledWith(peer, 4010);
expect(connector.disconnect).toBeCalledWith(peer, 4020);
});
});
});
15 changes: 4 additions & 11 deletions __tests__/unit/core-p2p/network-monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { Blocks } from "@arkecosystem/crypto";
import delay from "delay";
import { cloneDeep } from "lodash";
import path from "path";
import { PortsOffset } from "@arkecosystem/core-p2p/src/enums";

describe("NetworkMonitor", () => {
let networkMonitor: NetworkMonitor;
Expand Down Expand Up @@ -152,13 +151,9 @@ describe("NetworkMonitor", () => {
it("should populate peers only once if same peer is in list and sources", async () => {
appConfigPeers.sources = ["http://peers.someurl.com"];

const peers = [
{ ip: "187.177.54.44", port: 4000 },
];
const peers = [{ ip: "187.177.54.44", port: 4000 }];

appConfigPeers.list = [
{ ip: "187.177.54.44", port: 4000 },
];
appConfigPeers.list = [{ ip: "187.177.54.44", port: 4000 }];

jest.spyOn(Utils.http, "get").mockResolvedValueOnce({ data: peers } as Utils.HttpResponse);

Expand Down Expand Up @@ -395,10 +390,8 @@ describe("NetworkMonitor", () => {
await networkMonitor.cleansePeers({ peerCount: 5 });

expect(communicator.ping).toBeCalledTimes(peers.length);
expect(emitter.dispatch).toBeCalledTimes(4); // 3 for disconnecting each peer port + 1 for peer removed event
for (const port of [4000 + PortsOffset.Peer, 4000 + PortsOffset.Blocks, 4000 + PortsOffset.Transactions]) {
expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer: expect.toBeOneOf(peers), port });
}
expect(emitter.dispatch).toBeCalledTimes(2); // 1 for disconnecting peer + 1 for peer removed event
expect(emitter.dispatch).toBeCalledWith(Enums.PeerEvent.Disconnect, { peer: expect.toBeOneOf(peers) });
expect(emitter.dispatch).toBeCalledWith(Enums.PeerEvent.Removed, expect.toBeOneOf(peers));
});

Expand Down
8 changes: 3 additions & 5 deletions __tests__/unit/core-p2p/peer-communicator.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import "jest-extended";

import { Container, Utils as KernelUtils } from "@arkecosystem/core-kernel";
import {Container, Enums, Utils as KernelUtils} from "@arkecosystem/core-kernel";
import { constants } from "@arkecosystem/core-p2p/src/constants";
import {
PeerPingTimeoutError,
Expand Down Expand Up @@ -337,10 +337,8 @@ describe("PeerCommunicator", () => {
"network.nethash",
)}, his=${wrongNethash}.`,
);
expect(emitter.dispatch).toBeCalledTimes(3);
for (const port of [4000 + PortsOffset.Peer, 4000 + PortsOffset.Blocks, 4000 + PortsOffset.Transactions]) {
expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer, port });
}
expect(emitter.dispatch).toBeCalledTimes(1);
expect(emitter.dispatch).toBeCalledWith(Enums.PeerEvent.Disconnect, { peer });
});

it("should set peer ports = -1 when pinging the port fails", async () => {
Expand Down
12 changes: 6 additions & 6 deletions packages/core-p2p/src/listeners.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Container, Contracts } from "@arkecosystem/core-kernel";
import { Container, Contracts, Enums } from "@arkecosystem/core-kernel";

import { PeerConnector } from "./peer-connector";
import { isValidVersion } from "./utils";
import { getAllPeerPorts } from "./socket-server/utils/get-peer-port";
import { isValidVersion } from "./utils";

/**
* @class DisconnectInvalidPeers
Expand Down Expand Up @@ -43,9 +43,7 @@ export class DisconnectInvalidPeers implements Contracts.Kernel.EventListener {

for (const peer of peers) {
if (!isValidVersion(this.app, peer)) {
for (const port of getAllPeerPorts(peer)) {
this.events.dispatch("internal.p2p.disconnectPeer", { peer, port });
}
await this.events.dispatch(Enums.PeerEvent.Disconnect, { peer });
}
}
}
Expand Down Expand Up @@ -79,7 +77,9 @@ export class DisconnectPeer implements Contracts.Kernel.EventListener {
* @memberof DisconnectPeer
*/
public async handle({ data }): Promise<void> {
this.connector.disconnect(data.peer, data.port);
for (const port of getAllPeerPorts(data.peer)) {
this.connector.disconnect(data.peer, port);
}

this.storage.forgetPeer(data.peer);
}
Expand Down
5 changes: 1 addition & 4 deletions packages/core-p2p/src/network-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { NetworkState } from "./network-state";
import { Peer } from "./peer";
import { PeerCommunicator } from "./peer-communicator";
import { checkDNS, checkNTP } from "./utils";
import { getAllPeerPorts } from "./socket-server/utils/get-peer-port";

// todo: review the implementation
@Container.injectable()
Expand Down Expand Up @@ -157,9 +156,7 @@ export class NetworkMonitor implements Contracts.P2P.NetworkMonitor {
peerErrors[error] = peerErrors[error] || [];
peerErrors[error].push(peer);

for (const port of getAllPeerPorts(peer)) {
this.events.dispatch("internal.p2p.disconnectPeer", { peer, port });
}
await this.events.dispatch(Enums.PeerEvent.Disconnect, { peer });

this.events.dispatch(Enums.PeerEvent.Removed, peer);
}
Expand Down
7 changes: 3 additions & 4 deletions packages/core-p2p/src/peer-communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { SocketErrors } from "./enums";
import { PeerPingTimeoutError, PeerStatusResponseError, PeerVerificationFailedError } from "./errors";
import { PeerVerifier } from "./peer-verifier";
import { replySchemas } from "./schemas";
import { getAllPeerPorts, getPeerPortForEvent } from "./socket-server/utils/get-peer-port";
import { getPeerPortForEvent } from "./socket-server/utils/get-peer-port";
import { isValidVersion } from "./utils";

// todo: review the implementation
Expand Down Expand Up @@ -121,9 +121,8 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator {
`Disconnecting from ${peerHostPort}: ` +
`nethash mismatch: our=${ourNethash}, his=${hisNethash}.`,
);
for (const port of getAllPeerPorts(peer)) {
this.events.dispatch("internal.p2p.disconnectPeer", { peer, port });
}

await this.events.dispatch(Enums.PeerEvent.Disconnect, { peer });
}
}
} else {
Expand Down