From 5d21a34475363e2c32f7ba4070b12083e0623d4e Mon Sep 17 00:00:00 2001 From: Air1 Date: Fri, 29 Nov 2019 16:35:54 +0400 Subject: [PATCH 1/3] feat: stricter p2p msg check + ip blocking --- packages/core-p2p/src/socket-server/worker.ts | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/packages/core-p2p/src/socket-server/worker.ts b/packages/core-p2p/src/socket-server/worker.ts index eb8629ba95..466389810b 100644 --- a/packages/core-p2p/src/socket-server/worker.ts +++ b/packages/core-p2p/src/socket-server/worker.ts @@ -9,6 +9,7 @@ const ajv = new Ajv(); export class Worker extends SCWorker { private config: Record; + private ipLastError: Record = {}; private rateLimiter: RateLimiter; public async run() { @@ -64,13 +65,13 @@ export class Worker extends SCWorker { } private handlePayload(ws, req) { - ws.on("ping", () => { - ws.terminate(); + ws.prependListener("ping", () => { + this.setErrorForIpAndTerminate(ws, req); }); - ws.on("pong", () => { - ws.terminate(); + ws.prependListener("pong", () => { + this.setErrorForIpAndTerminate(ws, req); }); - ws.on("message", message => { + ws.prependListener("message", message => { try { const InvalidMessagePayloadError: Error = this.createError( SocketErrors.InvalidMessagePayload, @@ -82,6 +83,10 @@ export class Worker extends SCWorker { throw InvalidMessagePayloadError; } ws._lastPingTime = timeNow; + } else if (message.length < 10) { + // except for #2 message, we should have JSON with some required properties + // (see below) which implies that message length should be longer than 10 chars + this.setErrorForIpAndTerminate(ws, req); } else { const parsed = JSON.parse(message); if ( @@ -90,15 +95,20 @@ export class Worker extends SCWorker { (typeof parsed.cid !== "number" && (parsed.event === "#disconnect" && typeof parsed.cid !== "undefined")) ) { - throw InvalidMessagePayloadError; + this.setErrorForIpAndTerminate(ws, req); } } } catch (error) { - ws.terminate(); + this.setErrorForIpAndTerminate(ws, req); } }); } + private setErrorForIpAndTerminate(ws, req): void { + this.ipLastError[req.socket.remoteAddress] = Date.now(); + ws.terminate(); + } + private async handleConnection(socket): Promise { const { data } = await this.sendToMasterAsync("p2p.utils.getHandlers"); @@ -117,14 +127,20 @@ export class Worker extends SCWorker { } private async handleHandshake(req, next): Promise { - const isBlocked = await this.rateLimiter.isBlocked(req.socket.remoteAddress); - const isBlacklisted = (this.config.blacklist || []).includes(req.socket.remoteAddress); + const ip = req.socket.remoteAddress; + if (this.ipLastError[ip] && this.ipLastError[ip] > Date.now() - 60 * 1000) { + req.socket.destroy(); + return; + } + + const isBlocked = await this.rateLimiter.isBlocked(ip); + const isBlacklisted = (this.config.blacklist || []).includes(ip); if (isBlocked || isBlacklisted) { next(this.createError(SocketErrors.Forbidden, "Blocked due to rate limit or blacklisted.")); return; } - const cidrRemoteAddress = cidr(`${req.socket.remoteAddress}/24`); + const cidrRemoteAddress = cidr(`${ip}/24`); const sameSubnetSockets = Object.values({ ...this.scServer.clients, ...this.scServer.pendingClients }).filter( client => cidr(`${client.remoteAddress}/24`) === cidrRemoteAddress, ); From 6ab6d69384bdaaf109549ff281489aaa372a8609 Mon Sep 17 00:00:00 2001 From: Air1 Date: Mon, 2 Dec 2019 12:13:57 +0400 Subject: [PATCH 2/3] chore: purge ipLastError every hour --- packages/core-p2p/src/socket-server/worker.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/core-p2p/src/socket-server/worker.ts b/packages/core-p2p/src/socket-server/worker.ts index 466389810b..7a03c0d3d3 100644 --- a/packages/core-p2p/src/socket-server/worker.ts +++ b/packages/core-p2p/src/socket-server/worker.ts @@ -5,6 +5,9 @@ import { SocketErrors } from "../enums"; import { requestSchemas } from "../schemas"; import { RateLimiter } from "./rate-limiter"; +const MINUTE_IN_MILLISECONDS = 1000 * 60; +const HOUR_IN_MILLISECONDS = MINUTE_IN_MILLISECONDS * 60; + const ajv = new Ajv(); export class Worker extends SCWorker { @@ -17,6 +20,9 @@ export class Worker extends SCWorker { await this.loadConfiguration(); + // purge ipLastError every hour to free up memory + setInterval(() => (this.ipLastError = {}), HOUR_IN_MILLISECONDS); + // @ts-ignore this.scServer.wsServer.on("connection", (ws, req) => this.handlePayload(ws, req)); this.scServer.on("connection", socket => this.handleConnection(socket)); @@ -128,7 +134,7 @@ export class Worker extends SCWorker { private async handleHandshake(req, next): Promise { const ip = req.socket.remoteAddress; - if (this.ipLastError[ip] && this.ipLastError[ip] > Date.now() - 60 * 1000) { + if (this.ipLastError[ip] && this.ipLastError[ip] > Date.now() - MINUTE_IN_MILLISECONDS) { req.socket.destroy(); return; } From 9a10896da1c1cc4ab5f92b94710e03f363f436cc Mon Sep 17 00:00:00 2001 From: Air1 Date: Mon, 2 Dec 2019 12:15:47 +0400 Subject: [PATCH 3/3] test: fix socket server tests --- .../core-p2p/socket-server/peer.test.ts | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/__tests__/integration/core-p2p/socket-server/peer.test.ts b/__tests__/integration/core-p2p/socket-server/peer.test.ts index 0172e48f0a..0875b7afe7 100644 --- a/__tests__/integration/core-p2p/socket-server/peer.test.ts +++ b/__tests__/integration/core-p2p/socket-server/peer.test.ts @@ -151,6 +151,10 @@ describe("Peer socket endpoint", () => { await delay(1000); expect(socket.state).toBe("closed"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should disconnect the client if it sends too many pongs too quickly", async () => { @@ -172,6 +176,10 @@ describe("Peer socket endpoint", () => { await delay(1000); expect(socket.state).toBe("closed"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should disconnect the client if it sends a ping frame", async () => { @@ -183,6 +191,10 @@ describe("Peer socket endpoint", () => { ping(); await delay(500); expect(socket.state).toBe("closed"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should disconnect the client if it sends a pong frame", async () => { @@ -194,6 +206,10 @@ describe("Peer socket endpoint", () => { pong(); await delay(500); expect(socket.state).toBe("closed"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); }); });