Skip to content
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [2.5.32] - 2020-01-21

### Fixed

- Discard blocks containing too many transactions ([#3404])
- Disconnect when multiple sockets are opened from same IP ([#3404])
- Handle invalid WS opcodes ([#3404])
- Disconnect for p2p SocketCluster events that do not have a handler ([#3404])
- Handle payload with additional properties ([#3404])

## [2.5.31] - 2019-12-19

### Fixed
Expand Down
8 changes: 8 additions & 0 deletions packages/core-p2p/src/socket-server/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ export class InvalidTransactionsError extends ServerError {
}
}

export class TooManyTransactionsError extends ServerError {
constructor(block: Interfaces.IBlockData) {
super(
`Received block ${block.id} at height ${block.height} contained too many transactions (${block.transactions.length}).`,
);
}
}

export class UnchainedBlockError extends ServerError {
constructor(lastHeight: number, nextHeight: number) {
super(`Last received block ${nextHeight} cannot be chained to ${lastHeight}.`);
Expand Down
63 changes: 63 additions & 0 deletions packages/core-p2p/src/socket-server/utils/validate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,66 @@ export const validate = (schema, data) => {
throw error;
}
};

// Specific light validation for transaction, to be used in socket workers
// to perform quick validation on transaction objects received in postTransactions
// TODO rework with v3 when refactoring p2p layer
export const validateTransactionLight = (transaction: any): boolean => {
if (!transaction || typeof transaction !== "object") {
return false;
}

// except for multipayment transactions that are capped to 128 payments currently,
// a transaction should not have more than 100 properties total
const maxMainProperties = 50;
const maxAssetProperties = 100; // arbitrary, see below
const maxMultiPayments = 128; // hardcoded as will be refactored before increasing max multipayments
if (Object.keys(transaction).length > maxMainProperties) {
return false;
}

if (transaction.asset && typeof transaction.asset === "object") {
if (transaction.asset.payments && Array.isArray(transaction.asset.payments)) {
if (transaction.asset.payments.length > maxMultiPayments) {
return false;
}
for (const p of transaction.asset.payments) {
if (!p || typeof p !== "object" || Object.keys(p).length !== 2 || !p.recipientId || !p.amount) {
return false;
}
}
} else {
// no "payments" asset, default to counting properties and checking vs maxProperties.
// totally arbitrary as we could have transactions with more properties in asset,
// but this is temporary and will be removed in v3 when p2p layer is refactored
if (objectHasMorePropertiesThan(transaction.asset, maxAssetProperties)) {
return false;
}
}
}

const shallowClone = { ...transaction };
delete shallowClone.asset; // to count main properties now
if (objectHasMorePropertiesThan(shallowClone, maxMainProperties)) {
return false;
}

return true;
};

const objectHasMorePropertiesThan = (obj: object, maxProperties: number) => {
let propertiesCount = 0;
try {
JSON.stringify(obj, (key, value) => {
propertiesCount++;
if (propertiesCount > maxProperties) {
throw new Error("exceeded maxProperties");
}
return value;
});
} catch (e) {
return true;
}

return false;
};
6 changes: 5 additions & 1 deletion packages/core-p2p/src/socket-server/versions/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import pluralize from "pluralize";
import { MissingCommonBlockError } from "../../errors";
import { IPeerPingResponse } from "../../interfaces";
import { isWhitelisted } from "../../utils";
import { InvalidTransactionsError, UnchainedBlockError } from "../errors";
import { InvalidTransactionsError, TooManyTransactionsError, UnchainedBlockError } from "../errors";
import { getPeerConfig } from "../utils/get-peer-config";
import { mapAddr } from "../utils/map-addr";

Expand Down Expand Up @@ -69,6 +69,10 @@ export const postBlock = async ({ req }): Promise<void> => {
}
}

if (block.transactions.length > app.getConfig().getMilestone().block.maxTransactions) {
throw new TooManyTransactionsError(block);
}

app.resolvePlugin<Logger.ILogger>("logger").info(
`Received new block at height ${block.height.toLocaleString()} with ${pluralize(
"transaction",
Expand Down
9 changes: 8 additions & 1 deletion packages/core-p2p/src/socket-server/versions/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,12 @@ export const isForgerAuthorized = ({ req }): { authorized: boolean } => {
};

export const getConfig = (): Record<string, any> => {
return app.resolveOptions("p2p");
const config = app.resolveOptions("p2p");

// add maxTransactionsPerRequest config from transaction pool
config.maxTransactionsPerRequest = app.has("transaction-pool")
? app.resolveOptions("transaction-pool").maxTransactionsPerRequest || 40
: 40;

return config;
};
151 changes: 128 additions & 23 deletions packages/core-p2p/src/socket-server/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import SCWorker from "socketcluster/scworker";
import { SocketErrors } from "../enums";
import { requestSchemas } from "../schemas";
import { RateLimiter } from "./rate-limiter";
import { validateTransactionLight } from "./utils/validate";

const MINUTE_IN_MILLISECONDS = 1000 * 60;
const HOUR_IN_MILLISECONDS = MINUTE_IN_MILLISECONDS * 60;

const ajv = new Ajv();
const ajv = new Ajv({ extendRefs: true });

export class Worker extends SCWorker {
private config: Record<string, any>;
private handlers: string[] = [];
private ipLastError: Record<string, number> = {};
private rateLimiter: RateLimiter;

Expand All @@ -23,15 +25,40 @@ export class Worker extends SCWorker {
// purge ipLastError every hour to free up memory
setInterval(() => (this.ipLastError = {}), HOUR_IN_MILLISECONDS);

await this.loadHandlers();

// @ts-ignore
this.scServer.wsServer.on("connection", (ws, req) => this.handlePayload(ws, req));
this.scServer.wsServer.on("connection", (ws, req) => {
const clients = [...Object.values(this.scServer.clients), ...Object.values(this.scServer.pendingClients)];
const existingSockets = clients.filter(
client =>
client.remoteAddress === req.socket.remoteAddress && client.remotePort !== req.socket.remotePort,
);
if (existingSockets.length > 0) {
for (const socket of existingSockets) {
socket.terminate();
}
this.setErrorForIpAndTerminate(ws, req);
return;
}
this.handlePayload(ws, req);
});
this.scServer.on("connection", socket => this.handleConnection(socket));
this.scServer.addMiddleware(this.scServer.MIDDLEWARE_HANDSHAKE_WS, (req, next) =>
this.handleHandshake(req, next),
);
this.scServer.addMiddleware(this.scServer.MIDDLEWARE_EMIT, (req, next) => this.handleEmit(req, next));
}

private async loadHandlers(): Promise<void> {
const { data } = await this.sendToMasterAsync("p2p.utils.getHandlers");
for (const [version, handlers] of Object.entries(data)) {
for (const handler of Object.values(handlers)) {
this.handlers.push(`p2p.${version}.${handler}`);
}
}
}

private async loadConfiguration(): Promise<void> {
const { data } = await this.sendToMasterAsync("p2p.utils.getConfig");

Expand Down Expand Up @@ -71,65 +98,143 @@ export class Worker extends SCWorker {
}

private handlePayload(ws, req) {
ws.removeAllListeners("ping");
ws.removeAllListeners("pong");
ws.prependListener("ping", () => {
this.setErrorForIpAndTerminate(ws, req);
});
ws.prependListener("pong", () => {
this.setErrorForIpAndTerminate(ws, req);
});

ws.prependListener("error", error => {
if (error instanceof RangeError) {
this.setErrorForIpAndTerminate(ws, req);
}
});

const messageListeners = ws.listeners("message");
ws.removeAllListeners("message");
ws.prependListener("message", message => {
if (ws._disconnected) {
this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(ws, req);
} else if (message === "#2") {
const timeNow: number = new Date().getTime() / 1000;
if (ws._lastPingTime && timeNow - ws._lastPingTime < 1) {
this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(ws, req);
}
ws._lastPingTime = timeNow;
} else if (message.length < 10) {
// except for #2 message, we should have JSON with some required properties
// (see below) which implies that message length should be longer than 10 chars
this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(ws, req);
} else {
try {
const parsed = JSON.parse(message);
if (parsed.event === "#disconnect") {
ws._disconnected = true;
}
if (
} else if (parsed.event === "#handshake") {
if (ws._handshake) {
return this.setErrorForIpAndTerminate(ws, req);
}
ws._handshake = true;
} else if (
typeof parsed.event !== "string" ||
typeof parsed.data !== "object" ||
this.hasAdditionalProperties(parsed) ||
(typeof parsed.cid !== "number" &&
(parsed.event === "#disconnect" && typeof parsed.cid !== "undefined"))
(parsed.event === "#disconnect" && typeof parsed.cid !== "undefined")) ||
!this.handlers.includes(parsed.event)
) {
this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(ws, req);
}
} catch (error) {
this.setErrorForIpAndTerminate(ws, req);
return this.setErrorForIpAndTerminate(ws, req);
}
}

for (const listener of messageListeners) {
listener(message);
}
});
}

private hasAdditionalProperties(object): boolean {
if (Object.keys(object).filter(key => key !== "event" && key !== "data" && key !== "cid").length) {
return true;
}
const event = object.event.split(".");
if (object.event !== "#handshake" && object.event !== "#disconnect") {
if (event.length !== 3) {
return true;
}
if (Object.keys(object.data).filter(key => key !== "data" && key !== "headers").length) {
return true;
}
}
if (object.data.data) {
// @ts-ignore
const [_, version, handler] = event;
const schema = requestSchemas[version][handler];
try {
if (object.event === "p2p.peer.postTransactions") {
if (
typeof object.data.data === "object" &&
object.data.data.transactions &&
Array.isArray(object.data.data.transactions) &&
object.data.data.transactions.length <= this.config.maxTransactionsPerRequest
) {
for (const transaction of object.data.data.transactions) {
if (!validateTransactionLight(transaction)) {
return true;
}
}
} else {
return true;
}
} else if (schema && !ajv.validate(schema, object.data.data)) {
return true;
}
} catch {
//
}
}
if (object.data.headers) {
if (
Object.keys(object.data.headers).filter(
key => key !== "version" && key !== "port" && key !== "height" && key !== "Content-Type",
).length
) {
return true;
}
if (
(object.data.headers.version && typeof object.data.headers.version !== "string") ||
(object.data.headers.port && typeof object.data.headers.port !== "number") ||
(object.data.headers["Content-Type"] && typeof object.data.headers["Content-Type"] !== "string") ||
(object.data.headers.height && typeof object.data.headers.height !== "number")
) {
// this prevents the nesting of other objects inside these properties
return true;
}
}
return false;
}

private setErrorForIpAndTerminate(ws, req): void {
this.ipLastError[req.socket.remoteAddress] = Date.now();
ws.terminate();
}

private async handleConnection(socket): Promise<void> {
const { data } = await this.sendToMasterAsync("p2p.utils.getHandlers");

for (const [version, handlers] of Object.entries(data)) {
for (const handler of Object.values(handlers)) {
// @ts-ignore
socket.on(`p2p.${version}.${handler}`, async (data, res) => {
try {
return res(undefined, await this.sendToMasterAsync(`p2p.${version}.${handler}`, data));
} catch (e) {
return res(e);
}
});
}
for (const handler of this.handlers) {
// @ts-ignore
socket.on(handler, async (data, res) => {
try {
return res(undefined, await this.sendToMasterAsync(handler, data));
} catch (e) {
return res(e);
}
});
}
}

Expand Down