Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core-p2p): share rate limiter between workers #2912

Merged
merged 3 commits into from Sep 6, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Next

refactor: shared rate limiter

  • Loading branch information...
supaiku0 committed Sep 6, 2019
commit a9e53b555ab06a8a523cff4b09d1cb0fab71f8fe
@@ -7,6 +7,11 @@ export interface INetworkStatus {
blocksToRollback?: number;
}

export interface IRateLimitStatus {
blocked: boolean;
exceededLimitOnEndpoint: boolean;
}

export interface INetworkMonitor {
start(): Promise<void>;
updateNetworkStatus(initialRun?: boolean): Promise<void>;
@@ -22,6 +27,8 @@ export interface INetworkMonitor {
discoverPeers(initialRun?: boolean): Promise<boolean>;
getNetworkHeight(): number;
getNetworkState(): Promise<INetworkState>;
getRateLimitStatus(ip: string, endpoint?: string): Promise<IRateLimitStatus>;
isBlockedByRateLimit(ip: string): Promise<boolean>;
refreshPeersAfterFork(): Promise<void>;
checkNetworkHealth(): Promise<INetworkStatus>;
syncWithNetwork(fromBlockHeight: number, maxParallelDownloads?: number): Promise<Interfaces.IBlockData[]>;
@@ -14,7 +14,9 @@ import prettyMs from "pretty-ms";
import SocketCluster from "socketcluster";
import { IPeerData } from "./interfaces";
import { NetworkState } from "./network-state";
import { RateLimiter } from "./rate-limiter";
import { checkDNS, checkNTP } from "./utils";
import { buildRateLimiter } from "./utils/build-rate-limiter";

export class NetworkMonitor implements P2P.INetworkMonitor {
public server: SocketCluster;
@@ -29,6 +31,7 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
private readonly communicator: P2P.IPeerCommunicator;
private readonly processor: P2P.IPeerProcessor;
private readonly storage: P2P.IPeerStorage;
private readonly rateLimiter: RateLimiter;

public constructor({
communicator,
@@ -45,6 +48,7 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
this.communicator = communicator;
this.processor = processor;
this.storage = storage;
this.rateLimiter = buildRateLimiter(options);
}

public getServer(): SocketCluster {
@@ -209,6 +213,17 @@ export class NetworkMonitor implements P2P.INetworkMonitor {
return false;
}

public async getRateLimitStatus(ip: string, endpoint?: string): Promise<P2P.IRateLimitStatus> {
return {
blocked: await this.rateLimiter.isBlocked(ip),
exceededLimitOnEndpoint: await this.rateLimiter.hasExceededRateLimit(ip, endpoint),
};
}

public async isBlockedByRateLimit(ip: string): Promise<boolean> {
return this.rateLimiter.isBlocked(ip);
}

public isColdStart(): boolean {
return this.coldStart;
}
File renamed without changes.
@@ -60,6 +60,28 @@ export const getNetworkState = async ({ service }: { service: P2P.IPeerService }
return service.getMonitor().getNetworkState();
};

export const getRateLimitStatus = async ({
service,
req,
}: {
service: P2P.IPeerService;
req: { data: { ip: string; endpoint?: string } };
}): Promise<P2P.IRateLimitStatus> => {
return service.getMonitor().getRateLimitStatus(req.data.ip, req.data.endpoint);
};

export const isBlockedByRateLimit = async ({
service,
req,
}: {
service: P2P.IPeerService;
req: { data: { ip: string } };
}): Promise<{ blocked: boolean }> => {
return {
blocked: await service.getMonitor().isBlockedByRateLimit(req.data.ip),
};
};

export const syncBlockchain = (): void => {
app.resolvePlugin<Logger.ILogger>("logger").debug("Blockchain sync check WAKEUP requested by forger");

@@ -1,10 +1,9 @@
import { P2P } from "@arkecosystem/core-interfaces";
import SCWorker from "socketcluster/scworker";
import { SocketErrors } from "../enums";
import { RateLimiter } from "./rate-limiter";

export class Worker extends SCWorker {
private config: Record<string, any>;
private rateLimiter: RateLimiter;

public async run() {
this.log(`Socket worker started, PID: ${process.pid}`);
@@ -22,39 +21,7 @@ export class Worker extends SCWorker {

private async loadConfiguration(): Promise<void> {
const { data } = await this.sendToMasterAsync("p2p.utils.getConfig");

this.config = data;
this.rateLimiter = new RateLimiter({
whitelist: [...this.config.whitelist, ...this.config.remoteAccess],
configurations: {
global: {
rateLimit: this.config.rateLimit,
blockDuration: 60 * 1, // 1 minute ban for now
},
endpoints: [
{
rateLimit: 1,
endpoint: "p2p.peer.postBlock",
},
{
rateLimit: 1,
endpoint: "p2p.peer.getBlocks",
},
{
rateLimit: 1,
endpoint: "p2p.peer.getPeers",
},
{
rateLimit: 2,
endpoint: "p2p.peer.getStatus",
},
{
rateLimit: 5,
endpoint: "p2p.peer.getCommonBlocks",
},
],
},
});
}

private handlePayload(ws, req) {
@@ -105,9 +72,12 @@ export class Worker extends SCWorker {
}

private async handleHandshake(req, next): Promise<void> {
const isBlocked = await this.rateLimiter.isBlocked(req.socket.remoteAddress);
const isBlacklisted = (this.config.blacklist || []).includes(req.socket.remoteAddress);
if (isBlocked || isBlacklisted) {
const { data } = await this.sendToMasterAsync("p2p.internal.isBlockedByRateLimit", {
data: { ip: req.socket.remoteAddress },
});

const isBlacklisted: boolean = (this.config.blacklist || []).includes(req.socket.remoteAddress);
if (data.isBlocked || isBlacklisted) {
next(this.createError(SocketErrors.Forbidden, "Blocked due to rate limit or blacklisted."));
return;
}
@@ -116,8 +86,24 @@ export class Worker extends SCWorker {
}

private async handleEmit(req, next): Promise<void> {
if (await this.rateLimiter.hasExceededRateLimit(req.socket.remoteAddress, req.event)) {
if (await this.rateLimiter.isBlocked(req.socket.remoteAddress)) {
if (req.event.length > 128) {
req.socket.terminate();
return;
}

const { data }: { data: P2P.IRateLimitStatus } = await this.sendToMasterAsync(
"p2p.internal.getRateLimitStatus",
{
data: {
ip: req.socket.remoteAddress,
endpoint: req.event,
},
},
);

if (data.exceededLimitOnEndpoint) {
if (data.blocked) {
// Global ban
req.socket.terminate();
return;
}
@@ -131,11 +117,6 @@ export class Worker extends SCWorker {
}

try {
if (req.event.length > 128) {
req.socket.disconnect(4413, "Payload Too Large");
return;
}

const [prefix, version] = req.event.split(".");

if (prefix !== "p2p") {
@@ -0,0 +1,40 @@
import { app } from "@arkecosystem/core-container";
import { RateLimiter } from "../rate-limiter";

export const buildRateLimiter = options => {
if (!options || Object.keys(options).length === 0) {
options = app.resolveOptions("p2p");
}

return new RateLimiter({
whitelist: [...options.whitelist, ...options.remoteAccess],
configurations: {
global: {
rateLimit: options.rateLimit,
blockDuration: 60 * 1, // 1 minute ban for now
},
endpoints: [
{
rateLimit: 1,
endpoint: "p2p.peer.postBlock",
},
{
rateLimit: 1,
endpoint: "p2p.peer.getBlocks",
},
{
rateLimit: 1,
endpoint: "p2p.peer.getPeers",
},
{
rateLimit: 2,
endpoint: "p2p.peer.getStatus",
},
{
rateLimit: 5,
endpoint: "p2p.peer.getCommonBlocks",
},
],
},
});
};
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.