Skip to content
This repository has been archived by the owner on Mar 24, 2023. It is now read-only.

Commit

Permalink
chore: apply rate limit to ws
Browse files Browse the repository at this point in the history
  • Loading branch information
RetricSu committed Jul 22, 2022
1 parent 1bb4148 commit fd21eb4
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 10 deletions.
42 changes: 42 additions & 0 deletions packages/api-server/src/rate-limit.ts
Expand Up @@ -2,10 +2,23 @@ import { AccessGuard } from "./cache/guard";
import { LIMIT_EXCEEDED } from "./methods/error-code";
import { Request, Response, NextFunction } from "express";
import { logger } from "./base/logger";
import { JSONRPCError } from "jayson";

export const accessGuard = new AccessGuard();
accessGuard.connect();

export async function wsApplyRateLimitByIp(req: Request, method: string) {
const ip = getIp(req);
const methods = Object.keys(accessGuard.rpcMethods);
if (methods.includes(method) && ip != null) {
const res = await _rateLimit(method, ip);
if (res != null) {
return res.error;
}
}
return undefined;
}

export async function applyRateLimitByIp(
req: Request,
res: Response,
Expand Down Expand Up @@ -88,6 +101,35 @@ export async function rateLimit(
return isBan;
}

export async function _rateLimit(
rpcMethod: string,
reqId: string
): Promise<{ error: JSONRPCError; remainSecs: number } | undefined> {
const isExist = await accessGuard.isExist(rpcMethod, reqId);
if (!isExist) {
await accessGuard.add(rpcMethod, reqId);
}

const isOverRate = await accessGuard.isOverRate(rpcMethod, reqId);
if (isOverRate) {
const remainSecs = await accessGuard.getKeyTTL(rpcMethod, reqId);

const message = `Too Many Requests, IP: ${reqId}, please wait ${remainSecs}s and retry. RPC method: ${rpcMethod}.`;
const error: JSONRPCError = {
code: LIMIT_EXCEEDED,
message: message,
};

logger.debug(
`Rate Limit Exceed, ip: ${reqId}, method: ${rpcMethod}, ttl: ${remainSecs}s`
);
return { error, remainSecs };
} else {
await accessGuard.updateCount(rpcMethod, reqId);
}
return undefined;
}

export function hasMethod(body: any, name: string) {
if (Array.isArray(body)) {
return body.map((b) => b.method).includes(name);
Expand Down
38 changes: 28 additions & 10 deletions packages/api-server/src/ws/methods.ts
Expand Up @@ -13,6 +13,7 @@ import {
CACHE_EXPIRED_TIME_MILSECS,
TX_HASH_MAPPING_PREFIX_KEY,
} from "../cache/constant";
import { wsApplyRateLimitByIp } from "../rate-limit";

const query = new Query();
const cacheStore = new Store(
Expand All @@ -30,21 +31,32 @@ if (envConfig.newRelicLicenseKey) {
const blockEmitter = new BlockEmitter();
blockEmitter.startWorker();

export function wrapper(ws: any, _req: any) {
export function wrapper(ws: any, req: any) {
// this function gets called on each connection

wsrpc(ws);

for (const [key, value] of Object.entries(methods)) {
ws.on(key, function (...args: any[]) {
for (const [method, methodFunc] of Object.entries(methods)) {
ws.on(method, async function (...args: any[]) {
const execMethod = async () => {
const params = args.slice(0, args.length - 1);
const cb = args[args.length - 1];

// check rate limit
const err = await wsApplyRateLimitByIp(req, method);
if (err != null) {
return cb(err);
}

(methodFunc as any)(params, cb);
};

// add web transaction for websocket request
if (envConfig.newRelicLicenseKey) {
return newrelic.startWebTransaction(`/ws#${key}`, async () => {
return newrelic.startWebTransaction(`/ws#${method}`, async () => {
newrelic.getTransaction();
try {
const params = args.slice(0, args.length - 1);
const cb = args[args.length - 1];
(value as any)(params, cb);
execMethod();
} catch (error) {
throw error;
} finally {
Expand All @@ -53,9 +65,7 @@ export function wrapper(ws: any, _req: any) {
});
}

const params = args.slice(0, args.length - 1);
const cb = args[args.length - 1];
(value as any)(params, cb);
execMethod();
});
}

Expand Down Expand Up @@ -235,6 +245,14 @@ export function wrapper(ws: any, _req: any) {
};
const info = await Promise.all(
objs.map(async (obj) => {
// check rate limit
const err = await wsApplyRateLimitByIp(req, obj.method);
if (err != null) {
return {
err,
};
}

if (obj.method === "eth_subscribe") {
const r = ethSubscribe(obj.params, callback);
return r;
Expand Down

0 comments on commit fd21eb4

Please sign in to comment.