Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 59 additions & 18 deletions __tests__/integration/core-p2p/socket-server/peer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,23 @@ describe("Peer socket endpoint", () => {
await delay(2000); // give time to workers to respawn
});

it("should disconnect the client if it sends multiple handshakes", async () => {
connect(); // this automatically sends the first handshake
await delay(1000);

expect(socket.state).toBe("open");

// this is the second handshake
send('{"event": "#handshake", "data": {}, "cid": 1}');
await delay(500);

expect(socket.state).toBe("closed");

// kill workers to reset ipLastError (or we won't pass handshake for 1 minute)
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
});

it("should accept the request when below rate limit", async () => {
connect();
await delay(1000);
Expand Down Expand Up @@ -496,8 +513,6 @@ describe("Peer socket endpoint", () => {
connect();
await delay(1000);

expect(socket.state).toBe("open");

send('{"event":"#disconnect","data":{"code":4000}}');
await expect(
emit("p2p.peer.getStatus", {
Expand Down Expand Up @@ -540,30 +555,56 @@ describe("Peer socket endpoint", () => {
await delay(2000); // give time to workers to respawn
});

it("should close the connection when the HTTP url is not valid", async () => {
it("should close the connection when the HTTP url is not valid", async done => {
const socket = new net.Socket();
socket.connect(4007, "127.0.0.1", function() {
socket.connect(4007, "127.0.0.1", async () => {
socket.write("GET /invalid/ HTTP/1.0\r\n\r\n");
});
await delay(500);
expect(socket.destroyed).toBe(true);
await delay(500);
expect(socket.destroyed).toBe(true);

socket.connect(4007, "127.0.0.1");
await delay(500);
expect(socket.destroyed).toBe(true);
socket.connect(4007, "127.0.0.1");
await delay(500);
expect(socket.destroyed).toBe(true);

// kill workers to reset ipLastError (or we won't pass handshake for 1 minute)
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
// kill workers to reset ipLastError (or we won't pass handshake for 1 minute)
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
done();
});
});

it("should close the connection if the initial HTTP request is not processed within 2 seconds", async () => {
it("should close the connection if the initial HTTP request is not processed within 2 seconds", async done => {
const socket = new net.Socket();
socket.connect(4007, "127.0.0.1");
await delay(500);
expect(socket.destroyed).toBe(false);
await delay(2000);
expect(socket.destroyed).toBe(true);
socket.connect(4007, "127.0.0.1", async () => {
await delay(500);
expect(socket.destroyed).toBe(false);
await delay(2000);
expect(socket.destroyed).toBe(true);
server.killWorkers({ immediate: true });
await delay(2000); // give time to workers to respawn
done();
});
});

it("should close the connection if is is not fully established from start to finish within 4 seconds", async done => {
const socket = new net.Socket();
await delay(2000);
socket.connect(4007, "127.0.0.1", async () => {
expect(socket.destroyed).toBe(false);
// @ts-ignore
socket.write(`GET /${server.options.path}/ HTTP/1.0\r\n`);
socket.write("Host: 127.0.0.1");
await delay(1500);
expect(socket.destroyed).toBe(false);
socket.write("Host: 127.0.0.1");
await delay(1500);
expect(socket.destroyed).toBe(false);
socket.write("Host: 127.0.0.1");
await delay(1500);
expect(socket.destroyed).toBe(true);
done();
});
});
});
});
51 changes: 28 additions & 23 deletions packages/core-p2p/src/socket-server/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { requestSchemas } from "../schemas";
import { codec } from "../utils/sc-codec";
import { validateTransactionLight } from "./utils/validate";

const SOCKET_TIMEOUT = 2000;
const MINUTE_IN_MILLISECONDS = 1000 * 60;
const HOUR_IN_MILLISECONDS = MINUTE_IN_MILLISECONDS * 60;

Expand Down Expand Up @@ -41,7 +42,7 @@ export class Worker extends SCWorker {
await this.loadHandlers();

// @ts-ignore
this.scServer.wsServer._server.timeout = 2000;
this.scServer.wsServer._server.timeout = SOCKET_TIMEOUT;

// @ts-ignore
this.scServer.wsServer.on("connection", (ws, req) => {
Expand All @@ -59,8 +60,7 @@ export class Worker extends SCWorker {
this.httpServer.on("request", req => {
// @ts-ignore
if (req.method !== "GET" || req.url !== this.scServer.wsServer.options.path) {
this.setErrorForIpAndTerminate(req);
req.destroy();
this.setErrorForIpAndDestroy(req.socket);
}
});
// @ts-ignore
Expand Down Expand Up @@ -99,43 +99,44 @@ export class Worker extends SCWorker {
ws.removeAllListeners("ping");
ws.removeAllListeners("pong");
ws.prependListener("ping", () => {
this.setErrorForIpAndTerminate(req, ws);
this.setErrorForIpAndDestroy(req.socket);
});
ws.prependListener("pong", () => {
this.setErrorForIpAndTerminate(req, ws);
this.setErrorForIpAndDestroy(req.socket);
});

ws.prependListener("error", error => {
if (error instanceof RangeError) {
this.setErrorForIpAndTerminate(req, ws);
this.setErrorForIpAndDestroy(req.socket);
}
});

const messageListeners = ws.listeners("message");
ws.removeAllListeners("message");
ws.prependListener("message", message => {
if (ws._disconnected) {
return this.setErrorForIpAndTerminate(req, ws);
if (req.socket._disconnected) {
return this.setErrorForIpAndDestroy(req.socket);
} else if (message === "#2") {
const timeNow: number = new Date().getTime() / 1000;
if (ws._lastPingTime && timeNow - ws._lastPingTime < 1) {
return this.setErrorForIpAndTerminate(req, ws);
if (req.socket._lastPingTime && timeNow - req.socket._lastPingTime < 1) {
return this.setErrorForIpAndDestroy(req.socket);
}
ws._lastPingTime = timeNow;
req.socket._lastPingTime = timeNow;
} else if (message.length < 10) {
// except for #2 message, we should have JSON with some required properties
// (see below) which implies that message length should be longer than 10 chars
return this.setErrorForIpAndTerminate(req, ws);
return this.setErrorForIpAndDestroy(req.socket);
} else {
try {
const parsed = JSON.parse(message);
if (parsed.event === "#disconnect") {
ws._disconnected = true;
req.socket._disconnected = true;
} else if (parsed.event === "#handshake") {
if (ws._handshake) {
return this.setErrorForIpAndTerminate(req, ws);
if (req.socket._handshake) {
return this.setErrorForIpAndDestroy(req.socket);
}
ws._handshake = true;
req.socket._handshake = true;
clearTimeout(req.socket._connectionTimer);
} else if (
typeof parsed.event !== "string" ||
typeof parsed.data !== "object" ||
Expand All @@ -144,10 +145,10 @@ export class Worker extends SCWorker {
(parsed.event === "#disconnect" && typeof parsed.cid !== "undefined")) ||
!this.handlers.includes(parsed.event)
) {
return this.setErrorForIpAndTerminate(req, ws);
return this.setErrorForIpAndDestroy(req.socket);
}
} catch (error) {
return this.setErrorForIpAndTerminate(req, ws);
return this.setErrorForIpAndDestroy(req.socket);
}
}

Expand Down Expand Up @@ -219,11 +220,9 @@ export class Worker extends SCWorker {
return false;
}

private setErrorForIpAndTerminate(req, ws?): void {
this.ipLastError[req.socket.remoteAddress] = Date.now();
if (ws) {
ws.terminate();
}
private setErrorForIpAndDestroy(socket): void {
this.ipLastError[socket.remoteAddress] = Date.now();
socket.destroy();
}

private async handleConnection(socket): Promise<void> {
Expand All @@ -246,6 +245,12 @@ export class Worker extends SCWorker {
return;
}

socket._connectionTimer = setTimeout(() => {
if (!socket._handshake) {
this.setErrorForIpAndDestroy(socket);
}
}, SOCKET_TIMEOUT * 2);

const { data }: { data: { blocked: boolean } } = await this.sendToMasterAsync(
"p2p.internal.isBlockedByRateLimit",
{
Expand Down