From 45454a06d0708195b21a06f76b73f2456f38f895 Mon Sep 17 00:00:00 2001 From: noymaxx Date: Mon, 23 Mar 2026 23:31:23 +0100 Subject: [PATCH] cache and fallbacks in rpc --- backend/src/config/chains.ts | 21 +++++- backend/src/index.ts | 2 + backend/src/middleware/errorHandler.ts | 38 ++++++++++- backend/src/middleware/serialize-by-user.ts | 66 +++++++++++++++++++ backend/src/providers/chain.provider.ts | 48 +++++++++++--- backend/src/shared/errorCodes.ts | 3 +- .../src/shared/services/aerodrome.service.ts | 42 +++++++++++- 7 files changed, 205 insertions(+), 15 deletions(-) create mode 100644 backend/src/middleware/serialize-by-user.ts diff --git a/backend/src/config/chains.ts b/backend/src/config/chains.ts index 0cf3def..fd14f3f 100644 --- a/backend/src/config/chains.ts +++ b/backend/src/config/chains.ts @@ -2,6 +2,7 @@ export interface ChainConfig { chainId: number; name: string; rpcUrl: string; + rpcUrls: string[]; nativeCurrency: { name: string; symbol: string; decimals: number }; blockExplorer: string; contracts: { @@ -11,12 +12,26 @@ export interface ChainConfig { }; } +/** + * Parse comma-separated RPC URLs from env var, falling back to single URL. + * Example: BASE_RPC_URLS="https://mainnet.base.org,https://base.llamarpc.com" + */ +function parseRpcUrls(listEnv: string | undefined, singleEnv: string | undefined, defaultUrl: string): string[] { + if (listEnv) { + const urls = listEnv.split(",").map(u => u.trim()).filter(Boolean); + if (urls.length > 0) return urls; + } + return [singleEnv || defaultUrl]; +} + export function getChainConfig(chain: string): ChainConfig { if (chain === "base") { + const rpcUrls = parseRpcUrls(process.env.BASE_RPC_URLS, process.env.BASE_RPC_URL, "https://mainnet.base.org"); return { chainId: 8453, name: "Base", - rpcUrl: process.env.BASE_RPC_URL || "https://mainnet.base.org", + rpcUrl: rpcUrls[0], + rpcUrls, nativeCurrency: { name: "Ether", symbol: "ETH", decimals: 18 }, blockExplorer: "https://basescan.org", contracts: { @@ -28,10 +43,12 @@ export function getChainConfig(chain: string): ChainConfig { } if (chain === "avalanche") { + const rpcUrls = parseRpcUrls(process.env.AVAX_RPC_URLS, process.env.AVAX_RPC_URL, "https://api.avax.network/ext/bc/C/rpc"); return { chainId: 43114, name: "Avalanche C-Chain", - rpcUrl: process.env.AVAX_RPC_URL || "https://api.avax.network/ext/bc/C/rpc", + rpcUrl: rpcUrls[0], + rpcUrls, nativeCurrency: { name: "Avalanche", symbol: "AVAX", decimals: 18 }, blockExplorer: "https://snowtrace.io", contracts: { diff --git a/backend/src/index.ts b/backend/src/index.ts index 23d22c0..4b58ece 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -15,6 +15,7 @@ import { avaxLendingRoutes } from "./modules/avax-lending/routes/avax-le import { avaxLiquidStakingRoutes } from "./modules/avax-liquid-staking/routes/avax-liquid-staking.routes"; import { errorHandler } from "./middleware/errorHandler"; import { rateLimiter } from "./middleware/rateLimiter"; +import { serializeByUser } from "./middleware/serialize-by-user"; const app = express(); const PORT = process.env.PORT || 3010; @@ -42,6 +43,7 @@ app.use(cors({ })); app.use(express.json({ limit: "1mb" })); app.use(rateLimiter); +app.use(serializeByUser); // Swagger only in non-production environments if (process.env.NODE_ENV !== "production") { diff --git a/backend/src/middleware/errorHandler.ts b/backend/src/middleware/errorHandler.ts index 3d13eb4..30d898b 100644 --- a/backend/src/middleware/errorHandler.ts +++ b/backend/src/middleware/errorHandler.ts @@ -1,12 +1,48 @@ import { Request, Response, NextFunction, RequestHandler } from "express"; import { AppError } from "../shared/errorCodes"; +const RPC_ERROR_PATTERNS = [ + /timeout/i, + /ECONNREFUSED/i, + /ENOTFOUND/i, + /missing response/i, + /could not detect network/i, + /bad response/i, + /server error/i, + /rate.?limit/i, + /too many requests/i, + /circuit breaker/i, + /NETWORK_ERROR/i, + /SERVER_ERROR/i, + /TIMEOUT/i, +]; + +function isRpcError(err: Error): boolean { + const msg = err.message || ""; + return RPC_ERROR_PATTERNS.some((pattern) => pattern.test(msg)); +} + export function errorHandler(err: Error, _req: Request, res: Response, _next: NextFunction) { if (err instanceof AppError) { - return res.status(err.status).json({ + const headers: Record = {}; + if (err.status === 503) headers["Retry-After"] = "5"; + return res.status(err.status).set(headers).json({ error: { code: err.code, message: err.details ?? err.message, + ...(err.status === 503 ? { retryAfter: 5 } : {}), + }, + }); + } + + // Detect RPC/provider failures and return 503 instead of 500 + if (isRpcError(err)) { + console.error("[RPC_UNAVAILABLE]", err.message); + return res.status(503).set({ "Retry-After": "5" }).json({ + error: { + code: "RPC_UNAVAILABLE", + message: "Blockchain node temporarily unavailable. Please retry.", + retryAfter: 5, }, }); } diff --git a/backend/src/middleware/serialize-by-user.ts b/backend/src/middleware/serialize-by-user.ts new file mode 100644 index 0000000..723ac2f --- /dev/null +++ b/backend/src/middleware/serialize-by-user.ts @@ -0,0 +1,66 @@ +import { Request, Response, NextFunction } from "express"; + +const QUEUE_TIMEOUT_MS = 30_000; +const MAX_QUEUE_SIZE = 10; + +/** + * Per-user request serialization middleware. + * + * Ensures that concurrent requests from the same wallet address are processed + * sequentially, preventing RPC call bursts from a single user. + * + * User is identified by: req.verifiedAddress (set by auth middleware), + * req.body.userAddress, req.query.userAddress, or req.params.userAddress. + */ +const locks = new Map>(); + +function getUserKey(req: Request): string | null { + const addr = + (req as any).verifiedAddress || + req.body?.userAddress || + (req.query?.userAddress as string) || + req.params?.userAddress; + + return addr ? String(addr).toLowerCase() : null; +} + +export function serializeByUser(req: Request, res: Response, next: NextFunction): void { + const userKey = getUserKey(req); + + // No user identified — pass through (health checks, public routes) + if (!userKey) { + next(); + return; + } + + const prev = locks.get(userKey) ?? Promise.resolve(); + + // Create a deferred so we can control when this request's slot is released + let releaseLock: () => void; + const currentLock = new Promise((resolve) => { + releaseLock = resolve; + }); + + locks.set(userKey, currentLock); + + // Timeout to prevent deadlocks from stuck requests + const timeout = setTimeout(() => { + releaseLock!(); + }, QUEUE_TIMEOUT_MS); + + // Release the lock when the response finishes (or closes prematurely) + const release = () => { + clearTimeout(timeout); + releaseLock!(); + // Clean up if this is the last pending request for the user + if (locks.get(userKey) === currentLock) { + locks.delete(userKey); + } + }; + + res.once("finish", release); + res.once("close", release); + + // Wait for previous request from same user to complete + prev.then(() => next()); +} diff --git a/backend/src/providers/chain.provider.ts b/backend/src/providers/chain.provider.ts index 8eeba94..9d7a8e0 100644 --- a/backend/src/providers/chain.provider.ts +++ b/backend/src/providers/chain.provider.ts @@ -1,17 +1,47 @@ import { ethers } from "ethers"; import { getChainConfig } from "../config/chains"; -const baseNetwork = ethers.Network.from(8453); -const providers: Record = {}; +const networks: Record = { + base: ethers.Network.from(8453), + avalanche: ethers.Network.from(43114), +}; -export function getProvider(chain: string): ethers.JsonRpcProvider { +const providers: Record = {}; + +/** + * Creates a provider with automatic fallback across multiple RPC endpoints. + * If only one RPC URL is configured, returns a simple JsonRpcProvider. + * If multiple are configured, returns a FallbackProvider that tries them in priority order. + */ +function createProvider(chain: string): ethers.JsonRpcProvider | ethers.FallbackProvider { + const config = getChainConfig(chain); + const network = networks[chain]; + + if (config.rpcUrls.length === 1) { + const opts = network ? { staticNetwork: network } : {}; + return new ethers.JsonRpcProvider(config.rpcUrls[0], network, opts); + } + + // Multiple RPCs: create FallbackProvider with priority ordering + const rpcProviders = config.rpcUrls.map((url, index) => { + const opts = network ? { staticNetwork: network } : {}; + const provider = new ethers.JsonRpcProvider(url, network, opts); + return { + provider, + priority: index + 1, // lower = preferred (first URL is primary) + stallTimeout: 2000, // wait 2s before trying next provider + weight: 1, + }; + }); + + console.log(`[ChainProvider] ${chain}: ${config.rpcUrls.length} RPC endpoints configured (fallback enabled)`); + + return new ethers.FallbackProvider(rpcProviders, network); +} + +export function getProvider(chain: string): ethers.JsonRpcProvider | ethers.FallbackProvider { if (!providers[chain]) { - const config = getChainConfig(chain); - if (chain === "base") { - providers[chain] = new ethers.JsonRpcProvider(config.rpcUrl, baseNetwork, { staticNetwork: baseNetwork }); - } else { - providers[chain] = new ethers.JsonRpcProvider(config.rpcUrl); - } + providers[chain] = createProvider(chain); } return providers[chain]; } diff --git a/backend/src/shared/errorCodes.ts b/backend/src/shared/errorCodes.ts index 67148e8..a2bfe59 100644 --- a/backend/src/shared/errorCodes.ts +++ b/backend/src/shared/errorCodes.ts @@ -25,7 +25,8 @@ export const ErrorCodes = { // Server errors (500) INTERNAL_ERROR: { code: "INTERNAL_ERROR", status: 500, message: "Internal server error" }, - RPC_ERROR: { code: "RPC_ERROR", status: 502, message: "Blockchain RPC call failed" }, + RPC_ERROR: { code: "RPC_ERROR", status: 503, message: "Blockchain RPC call failed" }, + RPC_UNAVAILABLE: { code: "RPC_UNAVAILABLE", status: 503, message: "Blockchain node temporarily unavailable. Please retry." }, PROVIDER_ERROR: { code: "PROVIDER_ERROR", status: 502, message: "External provider error" }, } as const; diff --git a/backend/src/shared/services/aerodrome.service.ts b/backend/src/shared/services/aerodrome.service.ts index fc6f4b1..7d6542d 100644 --- a/backend/src/shared/services/aerodrome.service.ts +++ b/backend/src/shared/services/aerodrome.service.ts @@ -24,6 +24,32 @@ interface Route { const BALANCE_CACHE_TTL_MS = 90_000; const walletBalanceCache = new Map(); +// Cache for non-critical data (pool info, gauge mappings) — reduces RPC calls +const POOL_INFO_CACHE_TTL_MS = 60_000; // 60s — reserves change slowly +const GAUGE_CACHE_TTL_MS = 300_000; // 5min — gauge address is effectively static +const poolInfoCache = new Map(); +const gaugeCache = new Map(); + +function getCached(cache: Map, key: string): T | null { + const entry = cache.get(key); + if (!entry || Date.now() >= entry.expiresAt) { + cache.delete(key); + return null; + } + return entry.value; +} + +function setCache(cache: Map, key: string, value: T, ttl: number): void { + cache.set(key, { value, expiresAt: Date.now() + ttl }); + // Prune if cache grows too large + if (cache.size > 500) { + const now = Date.now(); + for (const [k, v] of cache) { + if (now >= v.expiresAt) cache.delete(k); + } + } +} + function resolveTokenAddress(address: string): string { return address === ETH_ADDRESS ? WETH : address; } @@ -142,6 +168,10 @@ export class AerodromeService { reserve0: string; reserve1: string; }> { + const cacheKey = poolAddress.toLowerCase(); + const cached = getCached(poolInfoCache, cacheKey); + if (cached) return cached; + const pool = getContract(poolAddress, POOL_ABI, CHAIN); const [token0, token1, stable, reserves] = await Promise.all([ pool.token0() as Promise, @@ -155,16 +185,24 @@ export class AerodromeService { t0.symbol() as Promise, t1.symbol() as Promise, ]); - return { + const result = { address: poolAddress, token0, token1, token0Symbol, token1Symbol, stable, reserve0: reserves[0].toString(), reserve1: reserves[1].toString(), }; + setCache(poolInfoCache, cacheKey, result, POOL_INFO_CACHE_TTL_MS); + return result; } async getGaugeForPool(poolAddress: string): Promise { + const cacheKey = poolAddress.toLowerCase(); + const cached = getCached(gaugeCache, cacheKey); + if (cached) return cached; + const config = getProtocolConfig("aerodrome"); const voter = getContract(config.contracts.voter, VOTER_ABI, CHAIN); - return voter.gauges(poolAddress); + const gaugeAddress: string = await voter.gauges(poolAddress); + setCache(gaugeCache, cacheKey, gaugeAddress, GAUGE_CACHE_TTL_MS); + return gaugeAddress; } async getStakedBalance(gaugeAddress: string, adapterAddress: string): Promise {