Skip to content

Commit

Permalink
fix(core-p2p): disconnect all sockets on peer disconnect (#3909)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastijankuzner committed Jul 23, 2020
1 parent dc431fa commit 95d6b2c
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 32 deletions.
6 changes: 4 additions & 2 deletions __tests__/unit/core-p2p/listeners.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe("DisconnectInvalidPeers", () => {
it("should emit 'internal.p2p.disconnectPeer' for invalid version peers", async () => {
await disconnectInvalidPeers.handle();

expect(emitter.dispatch).toBeCalledTimes(2 * 3); // 2 invalid peers version * 3 sockets (ports) per peer
expect(emitter.dispatch).toBeCalledTimes(2); // 2 invalid peers version
});
});
});
Expand Down Expand Up @@ -80,8 +80,10 @@ describe("DisconnectPeer", () => {

expect(storage.forgetPeer).toBeCalledTimes(1);
expect(storage.forgetPeer).toBeCalledWith(peer);
expect(connector.disconnect).toBeCalledTimes(1);
expect(connector.disconnect).toBeCalledTimes(3);
expect(connector.disconnect).toBeCalledWith(peer, 4000);
expect(connector.disconnect).toBeCalledWith(peer, 4010);
expect(connector.disconnect).toBeCalledWith(peer, 4020);
});
});
});
15 changes: 4 additions & 11 deletions __tests__/unit/core-p2p/network-monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { Blocks } from "@arkecosystem/crypto";
import delay from "delay";
import { cloneDeep } from "lodash";
import path from "path";
import { PortsOffset } from "@arkecosystem/core-p2p/src/enums";

describe("NetworkMonitor", () => {
let networkMonitor: NetworkMonitor;
Expand Down Expand Up @@ -152,13 +151,9 @@ describe("NetworkMonitor", () => {
it("should populate peers only once if same peer is in list and sources", async () => {
appConfigPeers.sources = ["http://peers.someurl.com"];

const peers = [
{ ip: "187.177.54.44", port: 4000 },
];
const peers = [{ ip: "187.177.54.44", port: 4000 }];

appConfigPeers.list = [
{ ip: "187.177.54.44", port: 4000 },
];
appConfigPeers.list = [{ ip: "187.177.54.44", port: 4000 }];

jest.spyOn(Utils.http, "get").mockResolvedValueOnce({ data: peers } as Utils.HttpResponse);

Expand Down Expand Up @@ -395,10 +390,8 @@ describe("NetworkMonitor", () => {
await networkMonitor.cleansePeers({ peerCount: 5 });

expect(communicator.ping).toBeCalledTimes(peers.length);
expect(emitter.dispatch).toBeCalledTimes(4); // 3 for disconnecting each peer port + 1 for peer removed event
for (const port of [4000 + PortsOffset.Peer, 4000 + PortsOffset.Blocks, 4000 + PortsOffset.Transactions]) {
expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer: expect.toBeOneOf(peers), port });
}
expect(emitter.dispatch).toBeCalledTimes(2); // 1 for disconnecting peer + 1 for peer removed event
expect(emitter.dispatch).toBeCalledWith(Enums.PeerEvent.Disconnect, { peer: expect.toBeOneOf(peers) });
expect(emitter.dispatch).toBeCalledWith(Enums.PeerEvent.Removed, expect.toBeOneOf(peers));
});

Expand Down
8 changes: 3 additions & 5 deletions __tests__/unit/core-p2p/peer-communicator.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import "jest-extended";

import { Container, Utils as KernelUtils } from "@arkecosystem/core-kernel";
import {Container, Enums, Utils as KernelUtils} from "@arkecosystem/core-kernel";
import { constants } from "@arkecosystem/core-p2p/src/constants";
import {
PeerPingTimeoutError,
Expand Down Expand Up @@ -337,10 +337,8 @@ describe("PeerCommunicator", () => {
"network.nethash",
)}, his=${wrongNethash}.`,
);
expect(emitter.dispatch).toBeCalledTimes(3);
for (const port of [4000 + PortsOffset.Peer, 4000 + PortsOffset.Blocks, 4000 + PortsOffset.Transactions]) {
expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer, port });
}
expect(emitter.dispatch).toBeCalledTimes(1);
expect(emitter.dispatch).toBeCalledWith(Enums.PeerEvent.Disconnect, { peer });
});

it("should set peer ports = -1 when pinging the port fails", async () => {
Expand Down
12 changes: 6 additions & 6 deletions packages/core-p2p/src/listeners.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Container, Contracts } from "@arkecosystem/core-kernel";
import { Container, Contracts, Enums } from "@arkecosystem/core-kernel";

import { PeerConnector } from "./peer-connector";
import { isValidVersion } from "./utils";
import { getAllPeerPorts } from "./socket-server/utils/get-peer-port";
import { isValidVersion } from "./utils";

/**
* @class DisconnectInvalidPeers
Expand Down Expand Up @@ -43,9 +43,7 @@ export class DisconnectInvalidPeers implements Contracts.Kernel.EventListener {

for (const peer of peers) {
if (!isValidVersion(this.app, peer)) {
for (const port of getAllPeerPorts(peer)) {
this.events.dispatch("internal.p2p.disconnectPeer", { peer, port });
}
await this.events.dispatch(Enums.PeerEvent.Disconnect, { peer });
}
}
}
Expand Down Expand Up @@ -79,7 +77,9 @@ export class DisconnectPeer implements Contracts.Kernel.EventListener {
* @memberof DisconnectPeer
*/
public async handle({ data }): Promise<void> {
this.connector.disconnect(data.peer, data.port);
for (const port of getAllPeerPorts(data.peer)) {
this.connector.disconnect(data.peer, port);
}

this.storage.forgetPeer(data.peer);
}
Expand Down
5 changes: 1 addition & 4 deletions packages/core-p2p/src/network-monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { NetworkState } from "./network-state";
import { Peer } from "./peer";
import { PeerCommunicator } from "./peer-communicator";
import { checkDNS, checkNTP } from "./utils";
import { getAllPeerPorts } from "./socket-server/utils/get-peer-port";

// todo: review the implementation
@Container.injectable()
Expand Down Expand Up @@ -157,9 +156,7 @@ export class NetworkMonitor implements Contracts.P2P.NetworkMonitor {
peerErrors[error] = peerErrors[error] || [];
peerErrors[error].push(peer);

for (const port of getAllPeerPorts(peer)) {
this.events.dispatch("internal.p2p.disconnectPeer", { peer, port });
}
await this.events.dispatch(Enums.PeerEvent.Disconnect, { peer });

this.events.dispatch(Enums.PeerEvent.Removed, peer);
}
Expand Down
7 changes: 3 additions & 4 deletions packages/core-p2p/src/peer-communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { SocketErrors } from "./enums";
import { PeerPingTimeoutError, PeerStatusResponseError, PeerVerificationFailedError } from "./errors";
import { PeerVerifier } from "./peer-verifier";
import { replySchemas } from "./schemas";
import { getAllPeerPorts, getPeerPortForEvent } from "./socket-server/utils/get-peer-port";
import { getPeerPortForEvent } from "./socket-server/utils/get-peer-port";
import { isValidVersion } from "./utils";

// todo: review the implementation
Expand Down Expand Up @@ -121,9 +121,8 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator {
`Disconnecting from ${peerHostPort}: ` +
`nethash mismatch: our=${ourNethash}, his=${hisNethash}.`,
);
for (const port of getAllPeerPorts(peer)) {
this.events.dispatch("internal.p2p.disconnectPeer", { peer, port });
}

await this.events.dispatch(Enums.PeerEvent.Disconnect, { peer });
}
}
} else {
Expand Down

0 comments on commit 95d6b2c

Please sign in to comment.