diff --git a/.changeset/forty-cars-clap.md b/.changeset/forty-cars-clap.md new file mode 100644 index 00000000..e35d9cf1 --- /dev/null +++ b/.changeset/forty-cars-clap.md @@ -0,0 +1,5 @@ +--- +'@3loop/transaction-decoder': minor +--- + +Implement circuit breaker and request pool for abi and meta strategies diff --git a/apps/web/src/lib/decode.ts b/apps/web/src/lib/decode.ts index 2ae7ea14..d62be905 100644 --- a/apps/web/src/lib/decode.ts +++ b/apps/web/src/lib/decode.ts @@ -1,5 +1,5 @@ import { getProvider, RPCProviderLive } from './rpc-provider' -import { Config, Effect, Layer, ManagedRuntime, Request } from 'effect' +import { Config, Effect, Layer, Logger, LogLevel, ManagedRuntime, Request } from 'effect' import { DecodedTransaction, DecodeResult, @@ -19,6 +19,13 @@ import { SqlAbiStore, SqlContractMetaStore } from '@3loop/transaction-decoder/sq import { Hex } from 'viem' import { DatabaseLive } from './database' +const LogLevelLive = Layer.unwrapEffect( + Effect.gen(function* () { + const level = LogLevel.Warning + return Logger.minimumLogLevel(level) + }), +) + const AbiStoreLive = Layer.unwrapEffect( Effect.gen(function* () { const service = yield* PublicClient @@ -50,7 +57,7 @@ const CacheLayer = Layer.setRequestCache(Request.makeCache({ capacity: 100, time const DataLayer = Layer.mergeAll(RPCProviderLive, DatabaseLive) const LoadersLayer = Layer.mergeAll(AbiStoreLive, MetaStoreLive) -const MainLayer = Layer.provideMerge(LoadersLayer, DataLayer) +const MainLayer = Layer.provideMerge(LoadersLayer, DataLayer).pipe(Layer.provide(LogLevelLive)) const runtime = ManagedRuntime.make(Layer.provide(MainLayer, CacheLayer)) diff --git a/packages/eslint-config-custom/library.js b/packages/eslint-config-custom/library.js index 5995e993..11062bf3 100644 --- a/packages/eslint-config-custom/library.js +++ b/packages/eslint-config-custom/library.js @@ -24,6 +24,7 @@ module.exports = { rules: { "@typescript-eslint/no-empty-interface": "off", "@typescript-eslint/no-explicit-any": "off", + '@typescript-eslint/no-namespace': 'off', '@typescript-eslint/strict-boolean-expressions': [ 'error', { diff --git a/packages/transaction-decoder/src/abi-loader.ts b/packages/transaction-decoder/src/abi-loader.ts index 7be5892a..d30db654 100644 --- a/packages/transaction-decoder/src/abi-loader.ts +++ b/packages/transaction-decoder/src/abi-loader.ts @@ -2,8 +2,8 @@ import { Effect, Either, RequestResolver, Request, Array, pipe, Data, PrimaryKey import { ContractABI } from './abi-strategy/request-model.js' import { Abi } from 'viem' import * as AbiStore from './abi-store.js' -import { AA_ABIS, SAFE_MULTISEND_ABI } from './decoding/constants.js' -import { SAFE_MULTISEND_SIGNATURE } from './decoding/constants.js' +import * as StrategyExecutorModule from './abi-strategy/strategy-executor.js' +import { SAFE_MULTISEND_SIGNATURE, SAFE_MULTISEND_ABI, AA_ABIS } from './decoding/constants.js' interface LoadParameters { readonly chainID: number @@ -61,7 +61,7 @@ const getMany = (requests: Array) => return yield* Effect.all( requests.map(({ chainID, address, event, signature }) => get({ chainID, address, event, signature })), { - concurrency: 'inherit', + concurrency: 'unbounded', batching: 'inherit', }, ) @@ -126,16 +126,22 @@ const getBestMatch = (abi: ContractABI | null) => { * for each request and group them by that key. We then load the ABI for the unique * requests and resolve the pending requests in a group with the same result. * + * **Circuit Breaking and Resilience** + * + * The AbiLoader now includes circuit breaking and intelligent concurrency management: + * - Strategy-level circuit breakers prevent cascading failures + * - Adaptive concurrency based on success rates and chain health + * - Timeout protection for all external strategy calls + * - Progressive degradation when strategies become unhealthy + * - Request pooling with back-pressure handling */ -const AbiLoaderRequestResolver: Effect.Effect< - RequestResolver.RequestResolver, - never, - AbiStore.AbiStore -> = RequestResolver.makeBatched((requests: Array) => + +export const AbiLoaderRequestResolver = RequestResolver.makeBatched((requests: Array) => Effect.gen(function* () { if (requests.length === 0) return - const { strategies } = yield* AbiStore.AbiStore + const { strategies, circuitBreaker, requestPool } = yield* AbiStore.AbiStore + const strategyExecutor = StrategyExecutorModule.make(circuitBreaker, requestPool) const requestGroups = Array.groupBy(requests, makeRequestKey) const uniqueRequests = Object.values(requestGroups).map((group) => group[0]) @@ -164,9 +170,23 @@ const AbiLoaderRequestResolver: Effect.Effect< }, { discard: true, + concurrency: 'unbounded', }, ) + // Get optimal concurrency for each chain + const concurrencyMap = new Map() + for (const req of remaining) { + if (!concurrencyMap.has(req.chainID)) { + const optimalConcurrency = yield* requestPool.getOptimalConcurrency(req.chainID) + concurrencyMap.set(req.chainID, optimalConcurrency) + } + } + + const concurrency = Math.min(...[...concurrencyMap.values(), 50]) // Use minimum concurrency across all chains, capped at 25 + + yield* Effect.logDebug(`Executing ${remaining.length} remaining requests with concurrency ${concurrency}`) + // NOTE: Firstly we batch strategies by address because in a transaction most of events and traces are from the same abi const response = yield* Effect.forEach( remaining, @@ -174,24 +194,29 @@ const AbiLoaderRequestResolver: Effect.Effect< const allAvailableStrategies = Array.prependAll(strategies.default, strategies[req.chainID] ?? []).filter( (strategy) => strategy.type === 'address', ) - return Effect.validateFirst(allAvailableStrategies, (strategy) => { - return strategy.resolver({ + + return strategyExecutor + .executeStrategiesSequentially(allAvailableStrategies, { address: req.address, chainId: req.chainID, - strategyId: strategy.id, + strategyId: 'address-batch', }) - }).pipe( - Effect.map(Either.left), - Effect.orElseSucceed(() => Either.right(req)), - ) + .pipe( + Effect.tapError(Effect.logWarning), + Effect.orElseSucceed(() => null), + Effect.map((result) => (result ? Either.left(result) : Either.right(req))), + ) }, { - concurrency: 'unbounded', + concurrency, }, ) const [addressStrategyResults, notFound] = Array.partitionMap(response, (res) => res) + yield* Effect.logDebug( + `Address strategies resolved ${addressStrategyResults.length} ABIs, ${notFound.length} not found`, + ) // NOTE: Secondly we request strategies to fetch fragments const fragmentStrategyResults = yield* Effect.forEach( notFound, @@ -200,19 +225,21 @@ const AbiLoaderRequestResolver: Effect.Effect< (strategy) => strategy.type === 'fragment', ) - // TODO: Distinct the errors and missing data, so we can retry on errors - return Effect.validateFirst(allAvailableStrategies, (strategy) => - strategy.resolver({ + return strategyExecutor + .executeStrategiesSequentially(allAvailableStrategies, { address, chainId: chainID, event, signature, - strategyId: strategy.id, - }), - ).pipe(Effect.orElseSucceed(() => null)) + strategyId: 'fragment-batch', + }) + .pipe( + Effect.tapError(Effect.logWarning), + Effect.orElseSucceed(() => null), + ) // If no strategies found, return null }, { - concurrency: 'unbounded', + concurrency, batching: true, }, ) @@ -247,8 +274,9 @@ const AbiLoaderRequestResolver: Effect.Effect< // We can decode with Effect.validateFirst(abis, (abi) => decodeMethod(input as Hex, abi)) and to find the first ABIs // that decodes successfully. We might enforce a sorted array to prioritize the address match. We will have to think // how to handle the strategy resolver in this case. Currently, we stop at first successful strategy, which might result -// in a missing Fragment. We treat this issue as a minor one for now, as we epect it to occur rarely on contracts that +// in a missing Fragment. We treat this issue as a minor one for now, as we expect it to occur rarely on contracts that // are not verified and with a non standard events structure. + export const getAndCacheAbi = (params: AbiStore.AbiParams) => Effect.gen(function* () { if (params.event === '0x' || params.signature === '0x') { diff --git a/packages/transaction-decoder/src/abi-store.ts b/packages/transaction-decoder/src/abi-store.ts index e6f30d27..d3bb45d8 100644 --- a/packages/transaction-decoder/src/abi-store.ts +++ b/packages/transaction-decoder/src/abi-store.ts @@ -1,6 +1,7 @@ -import { Context, Effect, RateLimiter, Function, Layer } from 'effect' +import { Context, Effect, RateLimiter, Function, Layer, MetricLabel } from 'effect' import { ContractABI, ContractAbiResolverStrategy, GetContractABIStrategyParams } from './abi-strategy/request-model.js' - +import * as CircuitBreaker from './circuit-breaker/circuit-breaker.js' +import * as RequestPool from './circuit-breaker/request-pool.js' export interface AbiParams { chainID: number address: string @@ -32,11 +33,16 @@ export interface AbiStore { readonly set: (key: AbiParams, value: ContractAbiResult) => Effect.Effect readonly get: (arg: AbiParams) => Effect.Effect readonly getMany?: (arg: Array) => Effect.Effect, never> + readonly circuitBreaker: CircuitBreaker.CircuitBreaker + readonly requestPool: RequestPool.RequestPool } export const AbiStore = Context.GenericTag('@3loop-decoder/AbiStore') -export const make = ({ strategies: strategiesWithoutRateLimit, ...rest }: AbiStore) => +export const make = ({ + strategies: strategiesWithoutRateLimit, + ...rest +}: Omit) => Effect.gen(function* () { const strategies = yield* Effect.reduce( Object.entries(strategiesWithoutRateLimit), @@ -60,10 +66,19 @@ export const make = ({ strategies: strategiesWithoutRateLimit, ...rest }: AbiSto } }), ) + + const circuitBreaker = yield* CircuitBreaker.make({ + metricLabels: [MetricLabel.make('service', 'abi-loader')], + }) + + const requestPool = yield* RequestPool.make({ metricLabels: [MetricLabel.make('service', 'abi-loader')] }) + return { strategies, + circuitBreaker, + requestPool, ...rest, } }) -export const layer = (args: AbiStore) => Layer.scoped(AbiStore, make(args)) +export const layer = (args: Omit) => Layer.scoped(AbiStore, make(args)) diff --git a/packages/transaction-decoder/src/abi-strategy/blockscout-abi.ts b/packages/transaction-decoder/src/abi-strategy/blockscout-abi.ts index 2614ff04..7f300b54 100644 --- a/packages/transaction-decoder/src/abi-strategy/blockscout-abi.ts +++ b/packages/transaction-decoder/src/abi-strategy/blockscout-abi.ts @@ -1,39 +1,68 @@ import { Effect } from 'effect' import * as RequestModel from './request-model.js' +type FetchResult = + | { type: 'success'; data: RequestModel.ContractABI[] } + | { type: 'missing'; reason: string } + | { type: 'error'; cause: unknown } + async function fetchContractABI( { address, chainId }: RequestModel.GetContractABIStrategyParams, config: { apikey?: string; endpoint: string }, -): Promise { - const endpoint = config.endpoint +): Promise { + try { + const endpoint = config.endpoint - const params: Record = { - module: 'contract', - action: 'getabi', - address, - } + const params: Record = { + module: 'contract', + action: 'getabi', + address, + } - if (config?.apikey) { - params['apikey'] = config.apikey - } + if (config?.apikey) { + params['apikey'] = config.apikey + } - const searchParams = new URLSearchParams(params) + const searchParams = new URLSearchParams(params) - const response = await fetch(`${endpoint}?${searchParams.toString()}`) - const json = (await response.json()) as { status: string; result: string; message: string } + const response = await fetch(`${endpoint}?${searchParams.toString()}`) + const json = (await response.json()) as { status: string; result: string; message: string } - if (json.status === '1') { - return [ - { - chainID: chainId, - address, - abi: json.result, - type: 'address', - }, - ] - } + if (json.status === '1') { + return { + type: 'success', + data: [ + { + chainID: chainId, + address, + abi: json.result, + type: 'address', + }, + ], + } + } - throw new Error(`Failed to fetch ABI for ${address} on chain ${chainId}`) + // If the API request was successful but no ABI was found + if ( + json.status === '0' && + (json.message?.includes('not verified') || json.result === 'Contract source code not verified') + ) { + return { + type: 'missing', + reason: `No verified ABI found: ${json.message || json.result}`, + } + } + + return { + type: 'error', + cause: json, + } + } catch (error) { + return { + type: 'error', + cause: error, + } + } } export const BlockscoutStrategyResolver = (config: { @@ -45,9 +74,25 @@ export const BlockscoutStrategyResolver = (config: { type: 'address', resolver: (req: RequestModel.GetContractABIStrategyParams) => Effect.withSpan( - Effect.tryPromise({ - try: () => fetchContractABI(req, config), - catch: () => new RequestModel.ResolveStrategyABIError('Blockscout', req.address, req.chainId), + Effect.gen(function* () { + const result = yield* Effect.promise(() => fetchContractABI(req, config)) + + if (result.type === 'success') { + return result.data + } else if (result.type === 'missing') { + return yield* Effect.fail( + new RequestModel.MissingABIStrategyError( + req.address, + req.chainId, + 'blockscout-strategy', + undefined, + undefined, + result.reason, + ), + ) + } else { + return yield* Effect.fail(new RequestModel.ResolveStrategyABIError('Blockscout', req.address, req.chainId)) + } }), 'AbiStrategy.BlockscoutStrategyResolver', { attributes: { chainId: req.chainId, address: req.address } }, diff --git a/packages/transaction-decoder/src/abi-strategy/etherscan-abi.ts b/packages/transaction-decoder/src/abi-strategy/etherscan-abi.ts index 3ef53d24..e1118975 100644 --- a/packages/transaction-decoder/src/abi-strategy/etherscan-abi.ts +++ b/packages/transaction-decoder/src/abi-strategy/etherscan-abi.ts @@ -1,6 +1,11 @@ import { Effect } from 'effect' import * as RequestModel from './request-model.js' +type FetchResult = + | { type: 'success'; data: RequestModel.ContractABI[] } + | { type: 'missing'; reason: string } + | { type: 'error'; cause: unknown } + const endpoints: { [k: number]: string } = { // all mainnet 1: 'https://api.etherscan.io/api', @@ -50,35 +55,59 @@ const endpoints: { [k: number]: string } = { async function fetchContractABI( { address, chainId }: RequestModel.GetContractABIStrategyParams, config?: { apikey?: string; endpoint?: string }, -): Promise { - const endpoint = config?.endpoint ?? endpoints[chainId] - const params: Record = { - module: 'contract', - action: 'getabi', - address, - } +): Promise { + try { + const endpoint = config?.endpoint ?? endpoints[chainId] + const params: Record = { + module: 'contract', + action: 'getabi', + address, + } - if (config?.apikey) { - params['apikey'] = config.apikey - } + if (config?.apikey) { + params['apikey'] = config.apikey + } - const searchParams = new URLSearchParams(params) + const searchParams = new URLSearchParams(params) - const response = await fetch(`${endpoint}?${searchParams.toString()}`) - const json = (await response.json()) as { status: string; result: string } + const response = await fetch(`${endpoint}?${searchParams.toString()}`) + const json = (await response.json()) as { status: string; result: string } - if (json.status === '1') { - return [ - { - type: 'address', - address, - chainID: chainId, - abi: json.result, - }, - ] - } + if (json.status === '1') { + return { + type: 'success', + data: [ + { + type: 'address', + address, + chainID: chainId, + abi: json.result, + }, + ], + } + } - throw new Error(`Failed to fetch ABI for ${address} on chain ${chainId}`) + // If the API request was successful but no ABI was found + if ( + json.status === '0' && + (json.result === 'Contract source code not verified' || json.result.includes('not verified')) + ) { + return { + type: 'missing', + reason: `No verified ABI found: ${json.result}`, + } + } + + return { + type: 'error', + cause: json, + } + } catch (error) { + return { + type: 'error', + cause: error, + } + } } export const EtherscanStrategyResolver = (config?: { @@ -90,9 +119,25 @@ export const EtherscanStrategyResolver = (config?: { type: 'address', resolver: (req: RequestModel.GetContractABIStrategyParams) => Effect.withSpan( - Effect.tryPromise({ - try: () => fetchContractABI(req, config), - catch: () => new RequestModel.ResolveStrategyABIError('etherscan', req.address, req.chainId), + Effect.gen(function* () { + const result = yield* Effect.promise(() => fetchContractABI(req, config)) + + if (result.type === 'success') { + return result.data + } else if (result.type === 'missing') { + return yield* Effect.fail( + new RequestModel.MissingABIStrategyError( + req.address, + req.chainId, + 'etherscan-strategy', + undefined, + undefined, + result.reason, + ), + ) + } else { + return yield* Effect.fail(new RequestModel.ResolveStrategyABIError('etherscan', req.address, req.chainId)) + } }), 'AbiStrategy.EtherscanStrategyResolver', { attributes: { chainId: req.chainId, address: req.address } }, diff --git a/packages/transaction-decoder/src/abi-strategy/etherscanv2-abi.ts b/packages/transaction-decoder/src/abi-strategy/etherscanv2-abi.ts index 6fcf5631..335c4df7 100644 --- a/packages/transaction-decoder/src/abi-strategy/etherscanv2-abi.ts +++ b/packages/transaction-decoder/src/abi-strategy/etherscanv2-abi.ts @@ -3,40 +3,67 @@ import * as RequestModel from './request-model.js' const endpoint = 'https://api.etherscan.io/v2/api' +type FetchResult = + | { type: 'success'; data: RequestModel.ContractABI[] } + | { type: 'missing'; reason: string } + | { type: 'error'; cause: unknown } + async function fetchContractABI( { address, chainId }: RequestModel.GetContractABIStrategyParams, config?: { apikey?: string }, -): Promise { - const params: Record = { - module: 'contract', - action: 'getabi', - chainId: chainId.toString(), - address, - } +): Promise { + try { + const params: Record = { + module: 'contract', + action: 'getabi', + chainId: chainId.toString(), + address, + } - if (config?.apikey) { - params['apikey'] = config.apikey - } + if (config?.apikey) { + params['apikey'] = config.apikey + } - const searchParams = new URLSearchParams(params) + const searchParams = new URLSearchParams(params) - const response = await fetch(`${endpoint}?${searchParams.toString()}`) - const json = (await response.json()) as { status: string; result: string } + const response = await fetch(`${endpoint}?${searchParams.toString()}`) + const json = (await response.json()) as { status: string; result: string } - if (json.status === '1') { - return [ - { - type: 'address', - address, - chainID: chainId, - abi: json.result, - }, - ] - } + if (json.status === '1') { + return { + type: 'success', + data: [ + { + type: 'address', + address, + chainID: chainId, + abi: json.result, + }, + ], + } + } - throw new Error(`Failed to fetch ABI for ${address} on chain ${chainId}`, { - cause: json, - }) + // If the API request was successful but no ABI was found + if ( + json.status === '0' && + (json.result === 'Contract source code not verified' || json.result.includes('not verified')) + ) { + return { + type: 'missing', + reason: `No verified ABI found: ${json.result}`, + } + } + + return { + type: 'error', + cause: json, + } + } catch (error) { + return { + type: 'error', + cause: error, + } + } } export const EtherscanV2StrategyResolver = (config?: { @@ -55,9 +82,25 @@ export const EtherscanV2StrategyResolver = (config?: { }, resolver: (req: RequestModel.GetContractABIStrategyParams) => Effect.withSpan( - Effect.tryPromise({ - try: () => fetchContractABI(req, config), - catch: () => new RequestModel.ResolveStrategyABIError('etherscanV2', req.address, req.chainId), + Effect.gen(function* () { + const result = yield* Effect.promise(() => fetchContractABI(req, config)) + + if (result.type === 'success') { + return result.data + } else if (result.type === 'missing') { + return yield* Effect.fail( + new RequestModel.MissingABIStrategyError( + req.address, + req.chainId, + 'etherscanV2-strategy', + undefined, + undefined, + result.reason, + ), + ) + } else { + return yield* Effect.fail(new RequestModel.ResolveStrategyABIError('etherscanV2', req.address, req.chainId)) + } }), 'AbiStrategy.EtherscanV2StrategyResolver', { attributes: { chainId: req.chainId, address: req.address } }, diff --git a/packages/transaction-decoder/src/abi-strategy/experimental-erc20.ts b/packages/transaction-decoder/src/abi-strategy/experimental-erc20.ts index 2a29f437..b8c94c83 100644 --- a/packages/transaction-decoder/src/abi-strategy/experimental-erc20.ts +++ b/packages/transaction-decoder/src/abi-strategy/experimental-erc20.ts @@ -44,7 +44,17 @@ const getLocalFragments = (service: PublicClient, { address, chainId }: RequestM ] as RequestModel.ContractABI[] } - return yield* Effect.fail(new RequestModel.ResolveStrategyABIError('local-strategy', address, chainId)) + // Contract exists but is not an ERC20 token - this is a "no data found" case + return yield* Effect.fail( + new RequestModel.MissingABIStrategyError( + address, + chainId, + 'experimental-erc20-strategy', + undefined, + undefined, + 'Contract is not an ERC20 token', + ), + ) }) export const ExperimentalErc20AbiStrategyResolver = ( diff --git a/packages/transaction-decoder/src/abi-strategy/fourbyte-abi.ts b/packages/transaction-decoder/src/abi-strategy/fourbyte-abi.ts index 3388e5a7..6039ed76 100644 --- a/packages/transaction-decoder/src/abi-strategy/fourbyte-abi.ts +++ b/packages/transaction-decoder/src/abi-strategy/fourbyte-abi.ts @@ -19,43 +19,80 @@ function parseEventSignature(signature: string): string { return JSON.stringify(parseAbiItem('event ' + signature)) } +type FetchResult = + | { type: 'success'; data: RequestModel.ContractABI[] } + | { type: 'missing'; reason: string } + | { type: 'error'; cause: unknown } + // TODO: instead of getting the first match, we should detect the best match async function fetchABI({ address, event, signature, chainId, -}: RequestModel.GetContractABIStrategyParams): Promise { - if (signature != null) { - const full_match = await fetch(`${endpoint}/signatures/?hex_signature=${signature}`) - if (full_match.status === 200) { - const json = (await full_match.json()) as FourBytesResponse +}: RequestModel.GetContractABIStrategyParams): Promise { + try { + if (signature != null) { + const full_match = await fetch(`${endpoint}/signatures/?hex_signature=${signature}`) + if (full_match.status === 200) { + const json = (await full_match.json()) as FourBytesResponse - return json.results.map((result) => ({ - type: 'func', - address, - chainID: chainId, - abi: parseFunctionSignature(result.text_signature), - signature, - })) + if (json.count > 0) { + return { + type: 'success', + data: json.results.map((result) => ({ + type: 'func', + address, + chainID: chainId, + abi: parseFunctionSignature(result.text_signature), + signature, + })), + } + } else { + // Successful request but no signatures found + return { + type: 'missing', + reason: `No function signature found for ${signature}`, + } + } + } } - } - if (event != null) { - const partial_match = await fetch(`${endpoint}/event-signatures/?hex_signature=${event}`) - if (partial_match.status === 200) { - const json = (await partial_match.json()) as FourBytesResponse - return json.results.map((result) => ({ - type: 'event', - address, - chainID: chainId, - abi: parseEventSignature(result.text_signature), - event, - })) + if (event != null) { + const partial_match = await fetch(`${endpoint}/event-signatures/?hex_signature=${event}`) + if (partial_match.status === 200) { + const json = (await partial_match.json()) as FourBytesResponse + if (json.count > 0) { + return { + type: 'success', + data: json.results.map((result) => ({ + type: 'event', + address, + chainID: chainId, + abi: parseEventSignature(result.text_signature), + event, + })), + } + } else { + // Successful request but no events found + return { + type: 'missing', + reason: `No event signature found for ${event}`, + } + } + } } - } - throw new Error(`Failed to fetch ABI for ${address} on chain ${chainId}`) + return { + type: 'error', + cause: `Failed to fetch ABI for ${address} on chain ${chainId}`, + } + } catch (error) { + return { + type: 'error', + cause: error, + } + } } export const FourByteStrategyResolver = (): RequestModel.ContractAbiResolverStrategy => { @@ -64,9 +101,27 @@ export const FourByteStrategyResolver = (): RequestModel.ContractAbiResolverStra type: 'fragment', resolver: (req: RequestModel.GetContractABIStrategyParams) => Effect.withSpan( - Effect.tryPromise({ - try: () => fetchABI(req), - catch: () => new RequestModel.ResolveStrategyABIError('4byte.directory', req.address, req.chainId), + Effect.gen(function* () { + const result = yield* Effect.promise(() => fetchABI(req)) + + if (result.type === 'success') { + return result.data + } else if (result.type === 'missing') { + return yield* Effect.fail( + new RequestModel.MissingABIStrategyError( + req.address, + req.chainId, + 'fourbyte-strategy', + req.event, + req.signature, + result.reason, + ), + ) + } else { + return yield* Effect.fail( + new RequestModel.ResolveStrategyABIError('4byte.directory', req.address, req.chainId), + ) + } }), 'AbiStrategy.FourByteStrategyResolver', { attributes: { chainId: req.chainId, address: req.address } }, diff --git a/packages/transaction-decoder/src/abi-strategy/openchain-abi.ts b/packages/transaction-decoder/src/abi-strategy/openchain-abi.ts index 04205268..950dcc7b 100644 --- a/packages/transaction-decoder/src/abi-strategy/openchain-abi.ts +++ b/packages/transaction-decoder/src/abi-strategy/openchain-abi.ts @@ -38,42 +38,79 @@ function parseEventSignature(signature: string): string { return JSON.stringify(parseAbiItem('event ' + signature)) } +type FetchResult = + | { type: 'success'; data: RequestModel.ContractABI[] } + | { type: 'missing'; reason: string } + | { type: 'error'; cause: unknown } + async function fetchABI({ address, chainId, signature, event, -}: RequestModel.GetContractABIStrategyParams): Promise { - if (signature != null) { - const response = await fetch(`${endpoint}?function=${signature}`, options) - if (response.status === 200) { - const json = (await response.json()) as OpenchainResponse +}: RequestModel.GetContractABIStrategyParams): Promise { + try { + if (signature != null) { + const response = await fetch(`${endpoint}?function=${signature}`, options) + if (response.status === 200) { + const json = (await response.json()) as OpenchainResponse - return json.result.function[signature].map((f) => ({ - type: 'func', - address, - chainID: chainId, - abi: parseFunctionSignature(f.name), - signature, - })) + if (json.ok && json.result.function[signature] && json.result.function[signature].length > 0) { + return { + type: 'success', + data: json.result.function[signature].map((f) => ({ + type: 'func', + address, + chainID: chainId, + abi: parseFunctionSignature(f.name), + signature, + })), + } + } else { + // Successful request but no function signatures found + return { + type: 'missing', + reason: `No function signature found for ${signature}`, + } + } + } } - } - if (event != null) { - const response = await fetch(`${endpoint}?event=${event}`, options) - if (response.status === 200) { - const json = (await response.json()) as OpenchainResponse + if (event != null) { + const response = await fetch(`${endpoint}?event=${event}`, options) + if (response.status === 200) { + const json = (await response.json()) as OpenchainResponse - return json.result.event[event].map((e) => ({ - type: 'event', - address, - chainID: chainId, - abi: parseEventSignature(e.name), - event, - })) + if (json.ok && json.result.event[event] && json.result.event[event].length > 0) { + return { + type: 'success', + data: json.result.event[event].map((e) => ({ + type: 'event', + address, + chainID: chainId, + abi: parseEventSignature(e.name), + event, + })), + } + } else { + // Successful request but no event signatures found + return { + type: 'missing', + reason: `No event signature found for ${event}`, + } + } + } } - } - throw new Error(`Failed to fetch ABI for ${address} on chain ${chainId}`) + return { + type: 'error', + cause: `Failed to fetch ABI for ${address} on chain ${chainId}`, + } + } catch (error) { + return { + type: 'error', + cause: error, + } + } } export const OpenchainStrategyResolver = (): RequestModel.ContractAbiResolverStrategy => { @@ -82,9 +119,25 @@ export const OpenchainStrategyResolver = (): RequestModel.ContractAbiResolverStr type: 'fragment', resolver: (req: RequestModel.GetContractABIStrategyParams) => Effect.withSpan( - Effect.tryPromise({ - try: () => fetchABI(req), - catch: () => new RequestModel.ResolveStrategyABIError('openchain', req.address, req.chainId), + Effect.gen(function* () { + const result = yield* Effect.promise(() => fetchABI(req)) + + if (result.type === 'success') { + return result.data + } else if (result.type === 'missing') { + return yield* Effect.fail( + new RequestModel.MissingABIStrategyError( + req.address, + req.chainId, + 'openchain-strategy', + req.event, + req.signature, + result.reason, + ), + ) + } else { + return yield* Effect.fail(new RequestModel.ResolveStrategyABIError('openchain', req.address, req.chainId)) + } }), 'AbiStrategy.OpenchainStrategyResolver', { attributes: { chainId: req.chainId, address: req.address } }, diff --git a/packages/transaction-decoder/src/abi-strategy/request-model.ts b/packages/transaction-decoder/src/abi-strategy/request-model.ts index da1bdd50..1b602fb9 100644 --- a/packages/transaction-decoder/src/abi-strategy/request-model.ts +++ b/packages/transaction-decoder/src/abi-strategy/request-model.ts @@ -16,6 +16,18 @@ export class ResolveStrategyABIError { ) {} } +export class MissingABIStrategyError { + readonly _tag = 'MissingABIStrategyError' + constructor( + readonly address: string, + readonly chainId: number, + readonly strategyId: string, + readonly event?: string, + readonly signature?: string, + readonly message: string = 'Missing contract ABI', + ) {} +} + interface FunctionFragmentABI { type: 'func' abi: string @@ -47,7 +59,9 @@ export interface ContractAbiResolverStrategy { type: 'address' | 'fragment' id: string rateLimit?: RateLimiterOptions - resolver: (_: GetContractABIStrategyParams) => Effect.Effect + resolver: ( + _: GetContractABIStrategyParams, + ) => Effect.Effect } export interface GetContractABIStrategyParams { diff --git a/packages/transaction-decoder/src/abi-strategy/sourcify-abi.ts b/packages/transaction-decoder/src/abi-strategy/sourcify-abi.ts index f3842d24..799a44da 100644 --- a/packages/transaction-decoder/src/abi-strategy/sourcify-abi.ts +++ b/packages/transaction-decoder/src/abi-strategy/sourcify-abi.ts @@ -10,41 +10,67 @@ interface SourcifyResponse { const endpoint = 'https://repo.sourcify.dev/contracts/' -async function fetchContractABI({ - address, - chainId, -}: RequestModel.GetContractABIStrategyParams): Promise { - const normalisedAddress = getAddress(address) +type FetchResult = + | { type: 'success'; data: RequestModel.ContractABI[] } + | { type: 'missing'; reason: string } + | { type: 'error'; cause: unknown } - const full_match = await fetch(`${endpoint}/full_match/${chainId}/${normalisedAddress}/metadata.json`) +async function fetchContractABI({ address, chainId }: RequestModel.GetContractABIStrategyParams): Promise { + try { + const normalisedAddress = getAddress(address) - if (full_match.status === 200) { - const json = (await full_match.json()) as SourcifyResponse + const full_match = await fetch(`${endpoint}/full_match/${chainId}/${normalisedAddress}/metadata.json`) - return [ - { - type: 'address', - address, - chainID: chainId, - abi: JSON.stringify(json.output.abi), - }, - ] - } + if (full_match.status === 200) { + const json = (await full_match.json()) as SourcifyResponse - const partial_match = await fetch(`${endpoint}/partial_match/${chainId}/${normalisedAddress}/metadata.json`) - if (partial_match.status === 200) { - const json = (await partial_match.json()) as SourcifyResponse - return [ - { - type: 'address', - address, - chainID: chainId, - abi: JSON.stringify(json.output.abi), - }, - ] - } + return { + type: 'success', + data: [ + { + type: 'address', + address, + chainID: chainId, + abi: JSON.stringify(json.output.abi), + }, + ], + } + } + + const partial_match = await fetch(`${endpoint}/partial_match/${chainId}/${normalisedAddress}/metadata.json`) + if (partial_match.status === 200) { + const json = (await partial_match.json()) as SourcifyResponse + return { + type: 'success', + data: [ + { + type: 'address', + address, + chainID: chainId, + abi: JSON.stringify(json.output.abi), + }, + ], + } + } - throw new Error(`Failed to fetch ABI for ${address} on chain ${chainId}`) + // Check if it's a 404 (not found) which means the contract is not verified on Sourcify + if (full_match.status === 404 && partial_match.status === 404) { + return { + type: 'missing', + reason: 'Contract not found on Sourcify', + } + } + + return { + type: 'error', + cause: `Failed to fetch ABI for ${address} on chain ${chainId}`, + } + } catch (error) { + return { + type: 'error', + cause: error, + } + } } export const SourcifyStrategyResolver = (): RequestModel.ContractAbiResolverStrategy => { @@ -53,9 +79,25 @@ export const SourcifyStrategyResolver = (): RequestModel.ContractAbiResolverStra type: 'address', resolver: (req: RequestModel.GetContractABIStrategyParams) => Effect.withSpan( - Effect.tryPromise({ - try: () => fetchContractABI(req), - catch: () => new RequestModel.ResolveStrategyABIError('sourcify', req.address, req.chainId), + Effect.gen(function* () { + const result = yield* Effect.promise(() => fetchContractABI(req)) + + if (result.type === 'success') { + return result.data + } else if (result.type === 'missing') { + return yield* Effect.fail( + new RequestModel.MissingABIStrategyError( + req.address, + req.chainId, + 'sourcify-strategy', + undefined, + undefined, + result.reason, + ), + ) + } else { + return yield* Effect.fail(new RequestModel.ResolveStrategyABIError('sourcify', req.address, req.chainId)) + } }), 'AbiStrategy.SourcifyStrategyResolver', { attributes: { chainId: req.chainId, address: req.address } }, diff --git a/packages/transaction-decoder/src/abi-strategy/strategy-executor.ts b/packages/transaction-decoder/src/abi-strategy/strategy-executor.ts new file mode 100644 index 00000000..34b481aa --- /dev/null +++ b/packages/transaction-decoder/src/abi-strategy/strategy-executor.ts @@ -0,0 +1,72 @@ +import { Effect, Schedule, Duration, pipe, Data } from 'effect' +import { GetContractABIStrategyParams, ContractAbiResolverStrategy, MissingABIStrategyError } from './request-model.js' +import type { CircuitBreaker } from '../circuit-breaker/circuit-breaker.js' +import { RequestPool } from '../circuit-breaker/request-pool.js' +import * as Constants from '../circuit-breaker/constants.js' + +export class MissingHealthyStrategy extends Data.TaggedError('MissingHealthyStrategy')<{ + chainId: number + strategies: string[] +}> { + constructor(params: { chainId: number; strategies: string[] }) { + super(params) + } +} + +export const make = (circuitBreaker: CircuitBreaker, requestPool: RequestPool) => { + const executeStrategy = (strategy: ContractAbiResolverStrategy, params: GetContractABIStrategyParams) => { + return pipe( + strategy.resolver(params), + Effect.timeout(Duration.decode(Constants.STRATEGY_TIMEOUT)), + Effect.catchTag('MissingABIStrategyError', (error) => { + // Log error but don't fail the entire operation + return Effect.gen(function* () { + yield* Effect.logWarning(`Strategy ${strategy.id} found no ABI: ${error.message}`) + return yield* Effect.succeed(error) + }) + }), + Effect.retry( + Schedule.exponential(Duration.decode(Constants.INITIAL_RETRY_DELAY)).pipe( + Schedule.compose(Schedule.recurs(Constants.DEFAULT_RETRY_TIMES)), + ), + ), + (effect) => circuitBreaker.withCircuitBreaker(strategy.id, effect), + (effect) => requestPool.withPoolManagement(params.chainId, effect), + Effect.flatMap((data) => (data instanceof MissingABIStrategyError ? Effect.fail(data) : Effect.succeed(data))), + ) + } + + const executeStrategiesSequentially = ( + strategies: ContractAbiResolverStrategy[], + params: GetContractABIStrategyParams, + ) => + Effect.gen(function* () { + // Filter out unhealthy strategies first + const healthyStrategies: ContractAbiResolverStrategy[] = [] + + for (const strategy of strategies) { + const isHealthy = yield* circuitBreaker.isHealthy(strategy.id) + if (isHealthy) { + healthyStrategies.push(strategy) + } else { + yield* Effect.logDebug(`Skipping unhealthy strategy: ${strategy.id}`) + } + } + + if (healthyStrategies.length === 0) { + return yield* Effect.fail( + new MissingHealthyStrategy({ + chainId: params.chainId, + strategies: strategies.map((s) => s.id), + }), + ) + } + + // Try strategies one by one until one succeeds + return yield* Effect.validateFirst(healthyStrategies, (strategy) => executeStrategy(strategy, params)) + }) + + return { + executeStrategiesSequentially, + } +} diff --git a/packages/transaction-decoder/src/circuit-breaker/README.md b/packages/transaction-decoder/src/circuit-breaker/README.md new file mode 100644 index 00000000..e93d3483 --- /dev/null +++ b/packages/transaction-decoder/src/circuit-breaker/README.md @@ -0,0 +1,440 @@ +# Circuit Breaker Module + +A comprehensive circuit breaker implementation for Effect-based applications with advanced observability, metrics collection, and request pool management. + +## Overview + +The circuit breaker module provides fault tolerance and resilience patterns for handling failures in distributed systems. It includes: + +- **Circuit Breaker**: Prevents cascading failures by monitoring and controlling request flow +- **Request Pool**: Manages concurrent requests with adaptive concurrency control +- **Comprehensive Metrics**: Detailed observability and monitoring capabilities +- **Real-time Notifications**: State change events and streaming observability + +## Quick Start + +### Basic Circuit Breaker + +```typescript +import { Effect, Duration } from 'effect' +import * as CircuitBreaker from './circuit-breaker.js' + +const program = Effect.gen(function* () { + // Create a basic circuit breaker + const circuitBreaker = yield* CircuitBreaker.make({ + maxFailures: 5, + resetTimeout: Duration.seconds(60), + halfOpenMaxCalls: 3, + }) + + // Use the circuit breaker to protect an operation + const result = yield* circuitBreaker.withCircuitBreaker( + 'external-api', + Effect.tryPromise(() => fetch('https://api.example.com/data').then((r) => r.json())), + ) + + return result +}) +``` + +### With Metrics and Observability + +```typescript +import { Effect, Duration, MetricLabel, Console } from 'effect' +import * as CircuitBreaker from './circuit-breaker.js' + +const program = Effect.gen(function* () { + // Create enhanced circuit breaker with full observability + const circuitBreaker = yield* CircuitBreaker.make({ + maxFailures: 5, + resetTimeout: Duration.seconds(60), + + // Enable metrics collection + metricLabels: [ + MetricLabel.make('service', 'user-api'), + MetricLabel.make('environment', 'production'), + MetricLabel.make('version', '1.0'), + ], + + // Get notified of state changes + onStateChange: (change) => Console.log(`Circuit breaker state: ${change.from} → ${change.to}`), + }) + + // Monitor state changes in real-time + const monitoring = circuitBreaker.stateChanges.pipe( + Stream.tap((change) => Console.log(`Stream: ${change.from} → ${change.to}`)), + Stream.runDrain, + Effect.fork, + ) + + // Use the circuit breaker + const result = yield* circuitBreaker.withCircuitBreaker('external-api', someExternalApiCall) + + return result +}) +``` + +## Core Features + +### 1. Circuit Breaker States + +The circuit breaker operates in three states: + +- **Closed** (0): Normal operation, requests pass through +- **Open** (2): Circuit is open, requests are rejected immediately +- **Half-Open** (1): Testing state, limited requests allowed to check if service recovered + +### 2. Failure Strategies + +#### Failure Count Strategy (Default) + +Trips the circuit after a fixed number of consecutive failures: + +```typescript +const circuitBreaker = + yield * + CircuitBreaker.make({ + maxFailures: 5, // Trip after 5 failures + resetTimeout: Duration.seconds(60), + }) +``` + +#### Failure Rate Strategy + +Trips based on failure percentage over a sliding window: + +```typescript +const failureRateStrategy = + yield * + CircuitBreaker.failureRate({ + threshold: 0.5, // Trip at 50% failure rate + minCalls: 10, // Need at least 10 calls to evaluate + windowSize: 20, // Track last 20 calls + }) + +const circuitBreaker = + yield * + CircuitBreaker.make({ + strategy: failureRateStrategy, + resetTimeout: Duration.seconds(30), + }) +``` + +### 3. Error Classification + +Control which errors count as failures using custom predicates: + +```typescript +class NetworkError extends Error { + readonly _tag = 'NetworkError' +} + +const circuitBreaker = + yield * + CircuitBreaker.make({ + maxFailures: 3, + isFailure: (error) => error instanceof NetworkError, // Only network errors count + }) +``` + +### 4. State Inspection + +Check circuit breaker status without side effects: + +```typescript +const currentState = yield * circuitBreaker.currentState('my-service') +const isHealthy = yield * circuitBreaker.isHealthy('my-service') + +console.log(`State: ${currentState}, Healthy: ${isHealthy}`) +``` + +## Metrics and Observability + +### Collected Metrics + +When `metricLabels` is provided, the following metrics are automatically collected: + +| Metric | Type | Description | +| ----------------------------------------- | ------- | -------------------------------------------- | +| `effect_circuit_breaker_state` | Gauge | Current state (0=Closed, 1=HalfOpen, 2=Open) | +| `effect_circuit_breaker_state_changes` | Counter | Number of state transitions | +| `effect_circuit_breaker_successful_calls` | Counter | Successful call count | +| `effect_circuit_breaker_failed_calls` | Counter | Failed call count | +| `effect_circuit_breaker_rejected_calls` | Counter | Rejected call count | + +### State Change Events + +Subscribe to state changes for real-time monitoring: + +```typescript +// Immediate notifications via callback +const circuitBreaker = + yield * + CircuitBreaker.make({ + onStateChange: (change) => + Effect.gen(function* () { + yield* Console.log(`State changed: ${change.from} → ${change.to}`) + yield* alertingService.notify(`Circuit breaker state change at ${change.atNanos}`) + }), + }) + +// Continuous stream for dashboard updates +circuitBreaker.stateChanges.pipe( + Stream.map((change) => ({ + service: 'user-api', + timestamp: change.atNanos, + state: change.to, + transition: `${change.from}_to_${change.to}`, + })), + Stream.tap(dashboardApi.sendUpdate), + Stream.runDrain, + Effect.fork, +) +``` + +## Request Pool Management + +The module also includes a request pool for managing concurrent requests with adaptive concurrency: + +```typescript +import { make as makeRequestPool } from './request-pool.js' + +const requestPool = + yield * + makeRequestPool({ + maxConcurrentRequests: 50, + adaptiveConcurrency: true, + healthThreshold: 0.85, + concurrencyStep: 5, + metricLabels: [MetricLabel.make('service', 'rpc-pool'), MetricLabel.make('chain', 'ethereum')], + }) + +// Use the pool to manage requests +const result = + yield * + requestPool.withPoolManagement( + 1, // chainId + Effect.tryPromise(() => makeRpcCall()), + ) +``` + +### Request Pool Metrics + +| Metric | Type | Description | +| --------------------------------------------- | ------- | ------------------------------------------------ | +| `effect_request_pool_active_requests` | Gauge | Current active requests | +| `effect_request_pool_max_concurrency` | Gauge | Current max concurrency setting | +| `effect_request_pool_success_rate` | Gauge | Success rate (0.0 to 1.0) | +| `effect_request_pool_state` | Gauge | Pool state (0=healthy, 1=degraded, 2=overloaded) | +| `effect_request_pool_successful_requests` | Counter | Total successful requests | +| `effect_request_pool_failed_requests` | Counter | Total failed requests | +| `effect_request_pool_concurrency_adjustments` | Counter | Number of concurrency changes | + +## Configuration Options + +### Circuit Breaker Config + +```typescript +interface Config { + // Core settings + readonly maxFailures?: number // Default: 5 + readonly resetTimeout?: Duration.Duration // Default: 60 seconds + readonly halfOpenMaxCalls?: number // Default: 3 + + // Advanced strategies + readonly strategy?: Effect.Effect + readonly resetPolicy?: Schedule.Schedule + + // Error handling + readonly isFailure?: Predicate.Predicate + + // Observability + readonly metricLabels?: Iterable + readonly onStateChange?: (change: StateChange) => Effect.Effect +} +``` + +### Request Pool Config + +```typescript +interface RequestPoolConfig { + readonly maxConcurrentRequests: number // Default: 50 + readonly adaptiveConcurrency: boolean // Default: true + readonly healthThreshold: number // Default: 0.8 + readonly concurrencyStep: number // Default: 5 + readonly metricLabels?: Iterable +} +``` + +## Use Cases + +### 1. Microservices Protection + +```typescript +const serviceCircuitBreaker = + yield * + CircuitBreaker.make({ + maxFailures: 5, + resetTimeout: Duration.seconds(30), + metricLabels: [ + MetricLabel.make('service', 'user-service'), + MetricLabel.make('version', 'v2.1'), + MetricLabel.make('datacenter', 'us-east-1'), + ], + }) +``` + +### 2. Database Connection Protection + +```typescript +const dbStrategy = + yield * + CircuitBreaker.failureRate({ + threshold: 0.3, // Trip at 30% failure rate + minCalls: 5, + windowSize: 15, + }) + +const dbCircuitBreaker = + yield * + CircuitBreaker.make({ + strategy: dbStrategy, + isFailure: (error) => error instanceof DatabaseError, + onStateChange: (change) => alerting.notify(`DB Circuit Breaker: ${change.from} → ${change.to}`), + }) +``` + +### 3. External API Rate Limiting + +```typescript +const apiPool = + yield * + makeRequestPool({ + maxConcurrentRequests: 20, + adaptiveConcurrency: true, + healthThreshold: 0.9, + metricLabels: [MetricLabel.make('api', 'external-provider'), MetricLabel.make('tier', 'premium')], + }) + +const apiCircuitBreaker = + yield * + CircuitBreaker.make({ + maxFailures: 3, + resetTimeout: Duration.seconds(120), + isFailure: (error) => error.status >= 500, // Only server errors + }) + +// Combine both patterns +const protectedApiCall = (data: any) => + apiPool.withPoolManagement( + 1, + apiCircuitBreaker.withCircuitBreaker( + 'external-api', + Effect.tryPromise(() => externalApi.call(data)), + ), + ) +``` + +## Performance Considerations + +- **Zero Overhead**: Metrics only created when `metricLabels` is provided +- **Lazy Evaluation**: State streams only active when subscribed +- **Memory Efficient**: Failure rate strategy uses bounded sliding windows +- **Thread Safe**: All operations use Effect's concurrent primitives +- **Minimal Allocations**: Optimized data structures for high-throughput scenarios + +## Testing + +The module is designed to work seamlessly with Effect's testing utilities: + +```typescript +import { TestClock, TestMetrics } from 'effect/Test' + +const testProgram = Effect.gen(function* () { + const circuitBreaker = yield* CircuitBreaker.make({ + maxFailures: 2, + resetTimeout: Duration.seconds(10), + metricLabels: [MetricLabel.make('test', 'suite')], + }) + + // Simulate failures + yield* circuitBreaker.withCircuitBreaker('test', Effect.fail('error')) + yield* circuitBreaker.withCircuitBreaker('test', Effect.fail('error')) + + // Advance time + yield* TestClock.adjust(Duration.seconds(10)) + + // Check metrics + const metrics = yield* TestMetrics.get() + expect(metrics.counters['effect_circuit_breaker_failed_calls']).toBe(2) +}) +``` + +## Migration Guide + +The enhanced version is fully backward compatible: + +```typescript +// Old basic usage (still works) +const basic = + yield * + CircuitBreaker.make({ + maxFailures: 5, + resetTimeout: Duration.seconds(60), + }) + +// Enhanced usage (add features gradually) +const enhanced = + yield * + CircuitBreaker.make({ + maxFailures: 5, + resetTimeout: Duration.seconds(60), + metricLabels: [MetricLabel.make('service', 'api')], // Add metrics + onStateChange: logStateChange, // Add notifications + isFailure: isRetryableError, // Add error classification + }) +``` + +## API Reference + +### Circuit Breaker + +- `make(config?: Config): Effect>` - Create a new circuit breaker +- `withCircuitBreaker(strategyId: string, effect: Effect): Effect` - Execute effect with circuit breaker protection +- `currentState(strategyId: string): Effect` - Get current state +- `isHealthy(strategyId: string): Effect` - Check if circuit is healthy +- `stateChanges: Stream` - Stream of state change events + +### Strategies + +- `failureCount(maxFailures: number): Effect` - Fixed failure count strategy +- `failureRate(options): Effect` - Percentage-based failure strategy + +### Request Pool + +- `make(config?: RequestPoolConfig): Effect` - Create a new request pool +- `withPoolManagement(chainId: number, effect: Effect): Effect` - Execute with pool management +- `getOptimalConcurrency(chainId: number): Effect` - Get optimal concurrency for chain +- `updateMetrics(chainId: number, success: boolean): Effect` - Update pool metrics + +### Types + +- `State = 'Open' | 'Closed' | 'HalfOpen'` - Circuit breaker states +- `OpenError` - Error returned when circuit is open +- `StateChange` - State transition event with timestamp +- `TrippingStrategy` - Strategy for determining when to trip circuit + +## Constants + +Default configuration values are available in `constants.ts`: + +```typescript +export const STRATEGY_TIMEOUT = '30 seconds' +export const DEFAULT_RETRY_TIMES = 2 +export const INITIAL_RETRY_DELAY = '1 seconds' +export const MAX_CONCURRENT_REQUESTS = 50 +export const CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5 +export const CIRCUIT_BREAKER_RESET_TIMEOUT = '60 seconds' +export const REQUEST_POOL_HEALTH_THRESHOLD = 0.8 +export const CONCURRENCY_ADJUSTMENT_STEP = 5 +``` diff --git a/packages/transaction-decoder/src/circuit-breaker/circuit-breaker.ts b/packages/transaction-decoder/src/circuit-breaker/circuit-breaker.ts new file mode 100644 index 00000000..efdddb35 --- /dev/null +++ b/packages/transaction-decoder/src/circuit-breaker/circuit-breaker.ts @@ -0,0 +1,510 @@ +import { + Effect, + Ref, + Duration, + Clock, + Predicate, + Schedule, + Metric, + MetricLabel, + Option, + PubSub, + Stream, + Either, +} from 'effect' + +/** + * @since 1.0.0 + * @category symbols + */ +export const CircuitBreakerTypeId: unique symbol = Symbol.for('CircuitBreaker') + +/** + * @since 1.0.0 + * @category symbols + */ +export type CircuitBreakerTypeId = typeof CircuitBreakerTypeId + +class CircuitBreakerMetrics { + constructor( + readonly state: Metric.Metric.Gauge, + readonly stateChanges: Metric.Metric.Counter, + readonly successfulCalls: Metric.Metric.Counter, + readonly failedCalls: Metric.Metric.Counter, + readonly rejectedCalls: Metric.Metric.Counter, + ) {} +} + +/** State change event with timestamp for observability */ +class StateChange { + constructor( + readonly from: CircuitBreaker.State, + readonly to: CircuitBreaker.State, + readonly atNanos: bigint, + ) {} +} + +/** + * @since 1.0.0 + */ +export declare namespace CircuitBreaker { + /** + * @since 1.0.0 + * @category models + */ + export interface OpenError { + readonly _tag: 'OpenError' + readonly strategyId: string + } + + /** + * @since 1.0.0 + * @category models + */ + export type State = 'Open' | 'Closed' | 'HalfOpen' + + /** + * @since 1.0.0 + * @category models + */ + export interface CircuitBreakerState { + readonly failures: number + readonly lastFailureTime: number + readonly state: State + readonly halfOpenCalls: number + } + + /** + * @since 1.0.0 + * @category models + */ + export interface TrippingStrategy { + readonly shouldTrip: (successful: boolean) => Effect.Effect + readonly onReset: Effect.Effect + } + + /** + * @since 1.0.0 + * @category models + */ + export interface FailureRateOptions { + /** + * Minimum number of calls before failure rate calculation kicks in. + */ + readonly minThroughput?: number + /** + * Number of buckets to track for failure rate calculation. + */ + readonly sampleBuckets?: number + /** + * Duration of each sample bucket. + */ + readonly sampleDuration?: Duration.DurationInput + /** + * Failure rate threshold (0.0 to 1.0). + */ + readonly threshold?: number + } + + /** + * @since 1.0.0 + * @category models + */ + export interface Config { + /** + * The maximum number of failures that can occur before the CircuitBreaker trips. + */ + readonly maxFailures?: number + /** + * The reset timeout duration. + */ + readonly resetTimeout?: Duration.Duration + /** + * Maximum calls allowed in half-open state. + */ + readonly halfOpenMaxCalls?: number + /** + * The tripping strategy to use. + */ + readonly strategy?: Effect.Effect + /** + * The reset policy schedule. + */ + readonly resetPolicy?: Schedule.Schedule + /** + * Only failures that match according to isFailure are treated as failures. + */ + readonly isFailure?: Predicate.Predicate | undefined + /** + * Labels for metrics collection. + */ + readonly metricLabels?: Iterable + /** + * Callback for state change notifications. + */ + readonly onStateChange?: (change: StateChange) => Effect.Effect + } +} + +/** + * @since 1.0.0 + * @category models + */ +export interface CircuitBreaker { + readonly [CircuitBreakerTypeId]: CircuitBreakerTypeId + /** + * Executes the specified effect with the circuit breaker. + */ + readonly withCircuitBreaker: ( + strategyId: string, + effect: Effect.Effect, + ) => Effect.Effect + /** + * Returns the current state of the CircuitBreaker for a strategy. + */ + readonly currentState: (strategyId: string) => Effect.Effect + /** + * Checks if a strategy is healthy. + */ + readonly isHealthy: (strategyId: string) => Effect.Effect + /** + * Returns a stream of state changes for observability. + */ + readonly stateChanges: Stream.Stream +} + +/** + * Represents the error that will be returned by calls to a CircuitBreaker in the Open state. + * + * @since 1.0.0 + * @category constructors + */ +export const OpenError = (strategyId: string): CircuitBreaker.OpenError => ({ + _tag: 'OpenError', + strategyId, +}) + +/** + * @since 1.0.0 + * @category constructors + */ +export const isCircuitBreakerOpenError = (u: unknown): u is CircuitBreaker.OpenError => + typeof u === 'object' && u != null && '_tag' in u && u._tag === 'OpenError' + +const defaultState: CircuitBreaker.CircuitBreakerState = { + failures: 0, + lastFailureTime: 0, + state: 'Closed', + halfOpenCalls: 0, +} + +const defaultConfig: CircuitBreaker.Config = { + maxFailures: 5, + resetTimeout: Duration.seconds(60), + halfOpenMaxCalls: 3, +} + +/** + * Creates a failure count tripping strategy. + * + * @since 1.0.0 + * @category constructors + */ +export const failureCount = (maxFailures: number): Effect.Effect => + Ref.make(0).pipe( + Effect.map((ref) => ({ + shouldTrip: (successful: boolean) => + successful ? Ref.set(ref, 0).pipe(Effect.as(false)) : Ref.modify(ref, (n) => [n + 1 === maxFailures, n + 1]), + onReset: Ref.set(ref, 0), + })), + ) + +/** + * Creates a simple failure rate tripping strategy. + * Tracks success/failure ratio over a sliding window. + * + * @since 1.0.0 + * @category constructors + */ +export const failureRate = ({ + threshold = 0.5, + minCalls = 10, + windowSize = 20, +}: { + /** + * Failure rate threshold (0.0 to 1.0). Circuit trips when failure rate exceeds this. + * @default 0.5 + */ + readonly threshold?: number + /** + * Minimum number of calls before failure rate is evaluated. + * @default 10 + */ + readonly minCalls?: number + /** + * Size of the sliding window for tracking calls. + * @default 20 + */ + readonly windowSize?: number +} = {}): Effect.Effect => + Effect.gen(function* () { + // Validate inputs + if (threshold < 0 || threshold > 1) { + return yield* Effect.die('threshold must be between 0 and 1') + } + if (minCalls < 1) { + return yield* Effect.die('minCalls must be at least 1') + } + if (windowSize < minCalls) { + return yield* Effect.die('windowSize must be at least minCalls') + } + + // Track calls in a sliding window (true = success, false = failure) + const window = yield* Ref.make>([]) + + return { + shouldTrip: (successful: boolean) => + Ref.modify(window, (calls) => { + // Add new call to window + const newCalls = [...calls, successful] + + // Keep only the most recent windowSize calls + const trimmedCalls = newCalls.length > windowSize ? newCalls.slice(-windowSize) : newCalls + + // Only evaluate failure rate if we have enough calls + if (trimmedCalls.length < minCalls) { + return [false, trimmedCalls] + } + + // Calculate failure rate + const failures = trimmedCalls.filter((call) => !call).length + const failureRate = failures / trimmedCalls.length + + return [failureRate >= threshold, trimmedCalls] + }), + onReset: Ref.set(window, []), + } + }) + +/** + * Creates a new CircuitBreaker. + * + * @since 1.0.0 + * @category constructors + */ +export const make = ( + config: Partial> = {}, +): Effect.Effect, never, never> => + Effect.gen(function* () { + const finalConfig = { ...defaultConfig, ...config } + const states = yield* Ref.make(new Map()) + + // Create a PubSub for state change notifications + const stateChangesPubSub = yield* PubSub.unbounded() + + // Setup metrics if provided + const metrics = Option.fromNullable(finalConfig.metricLabels).pipe(Option.map(makeMetrics)) + + function withMetrics(f: (metrics: CircuitBreakerMetrics) => Effect.Effect): Effect.Effect { + return Effect.ignore(Effect.flatMap(metrics, f)) + } + + const notifyStateChange = (from: CircuitBreaker.State, to: CircuitBreaker.State): Effect.Effect => + Effect.gen(function* () { + const now = yield* Clock.currentTimeNanos + const stateChange = new StateChange(from, to, now) + + // Publish to the stream + yield* PubSub.publish(stateChangesPubSub, stateChange) + + // Call the user-provided callback if present + if (finalConfig.onStateChange) { + yield* finalConfig.onStateChange(stateChange) + } + }) + + const getState = (strategyId: string): Effect.Effect => + Ref.get(states).pipe(Effect.map((map) => map.get(strategyId) ?? defaultState)) + + const updateState = (strategyId: string, newState: CircuitBreaker.CircuitBreakerState) => + Ref.update(states, (map) => new Map(map).set(strategyId, newState)) + + const shouldAllowRequest = (state: CircuitBreaker.CircuitBreakerState): Effect.Effect => + Effect.gen(function* () { + const now = yield* Clock.currentTimeMillis + + switch (state.state) { + case 'Closed': + return true + case 'Open': + return now - state.lastFailureTime >= Duration.toMillis(finalConfig.resetTimeout ?? Duration.seconds(60)) + case 'HalfOpen': + return state.halfOpenCalls < (finalConfig.halfOpenMaxCalls ?? 3) + } + }) + + // Create tripping strategy if provided, otherwise use default failure count + const trippingStrategy = finalConfig.strategy + ? yield* finalConfig.strategy + : yield* failureCount(finalConfig.maxFailures ?? 5) + + const onSuccess = (strategyId: string, state: CircuitBreaker.CircuitBreakerState) => + Effect.gen(function* () { + if (state.state === 'HalfOpen') { + // Reset to closed state after successful half-open calls + yield* trippingStrategy.onReset + yield* notifyStateChange(state.state, 'Closed') + yield* updateState(strategyId, { + ...defaultState, + state: 'Closed', + }) + yield* withMetrics((metrics) => + Metric.increment(metrics.stateChanges).pipe( + Effect.zipRight(Metric.set(metrics.state, stateToCode('Closed'))), + ), + ) + } else if (state.failures > 0) { + // Reset failures on success + yield* updateState(strategyId, { + ...state, + failures: 0, + }) + } + }) + + const onFailure = (strategyId: string, state: CircuitBreaker.CircuitBreakerState) => + Effect.gen(function* () { + const now = yield* Clock.currentTimeMillis + const shouldTrip = yield* trippingStrategy.shouldTrip(false) + + if (state.state === 'HalfOpen') { + // Failed during half-open, go back to open + yield* notifyStateChange(state.state, 'Open') + yield* updateState(strategyId, { + ...state, + state: 'Open', + failures: state.failures + 1, + lastFailureTime: now, + halfOpenCalls: 0, + }) + yield* withMetrics((metrics) => + Metric.increment(metrics.stateChanges).pipe( + Effect.zipRight(Metric.set(metrics.state, stateToCode('Open'))), + ), + ) + } else if (shouldTrip && state.state === 'Closed') { + // Threshold reached, open the circuit + yield* notifyStateChange(state.state, 'Open') + yield* updateState(strategyId, { + ...state, + state: 'Open', + failures: state.failures + 1, + lastFailureTime: now, + }) + yield* withMetrics((metrics) => + Metric.increment(metrics.stateChanges).pipe( + Effect.zipRight(Metric.set(metrics.state, stateToCode('Open'))), + ), + ) + } else { + // Increment failures but keep closed + yield* updateState(strategyId, { + ...state, + failures: state.failures + 1, + lastFailureTime: now, + }) + } + }) + + const withCircuitBreaker = ( + strategyId: string, + effect: Effect.Effect, + ): Effect.Effect => + Effect.gen(function* () { + const state = yield* getState(strategyId) + const shouldAllow = yield* shouldAllowRequest(state) + + if (!shouldAllow) { + yield* withMetrics((metrics) => Metric.increment(metrics.rejectedCalls)) + return yield* Effect.fail(OpenError(strategyId)) + } + + // Transition to half-open if we're allowing a request from open state + if (state.state === 'Open') { + yield* notifyStateChange(state.state, 'HalfOpen') + yield* updateState(strategyId, { + ...state, + state: 'HalfOpen', + halfOpenCalls: 1, + }) + yield* withMetrics((metrics) => + Metric.increment(metrics.stateChanges).pipe( + Effect.zipRight(Metric.set(metrics.state, stateToCode('HalfOpen'))), + ), + ) + } else if (state.state === 'HalfOpen') { + yield* updateState(strategyId, { + ...state, + halfOpenCalls: state.halfOpenCalls + 1, + }) + } + + const result = yield* Effect.either(effect) + + if (Either.isRight(result)) { + yield* onSuccess(strategyId, yield* getState(strategyId)) + yield* withMetrics((metrics) => Metric.increment(metrics.successfulCalls)) + return result.right + } else { + // Check if this failure should be counted based on the isFailure predicate + const shouldCountFailure = finalConfig.isFailure ? finalConfig.isFailure(result.left) : true + + if (shouldCountFailure) { + yield* onFailure(strategyId, yield* getState(strategyId)) + yield* withMetrics((metrics) => Metric.increment(metrics.failedCalls)) + } + + return yield* Effect.fail(result.left) + } + }) + + const currentState = (strategyId: string): Effect.Effect => + getState(strategyId).pipe(Effect.map((state) => state.state)) + + const isHealthy = (strategyId: string): Effect.Effect => + getState(strategyId).pipe(Effect.map((state) => state.state === 'Closed' || state.state === 'HalfOpen')) + + return { + [CircuitBreakerTypeId]: CircuitBreakerTypeId, + withCircuitBreaker, + currentState, + isHealthy, + stateChanges: Stream.fromPubSub(stateChangesPubSub), + } as const + }) + +// ============================================================================= +// Metrics +// ============================================================================= + +const stateToCode = (state: CircuitBreaker.State): number => { + switch (state) { + case 'Closed': + return 0 + case 'HalfOpen': + return 1 + case 'Open': + return 2 + } +} + +const makeMetrics = (labels: Iterable) => { + const state = Metric.gauge('effect_circuit_breaker_state').pipe(Metric.taggedWithLabels(labels)) + const stateChanges = Metric.counter('effect_circuit_breaker_state_changes').pipe(Metric.taggedWithLabels(labels)) + const successfulCalls = Metric.counter('effect_circuit_breaker_successful_calls').pipe( + Metric.taggedWithLabels(labels), + ) + const failedCalls = Metric.counter('effect_circuit_breaker_failed_calls').pipe(Metric.taggedWithLabels(labels)) + const rejectedCalls = Metric.counter('effect_circuit_breaker_rejected_calls').pipe(Metric.taggedWithLabels(labels)) + return new CircuitBreakerMetrics(state, stateChanges, successfulCalls, failedCalls, rejectedCalls) +} diff --git a/packages/transaction-decoder/src/circuit-breaker/constants.ts b/packages/transaction-decoder/src/circuit-breaker/constants.ts new file mode 100644 index 00000000..892ad8fa --- /dev/null +++ b/packages/transaction-decoder/src/circuit-breaker/constants.ts @@ -0,0 +1,8 @@ +export const STRATEGY_TIMEOUT = '10 seconds' as const +export const DEFAULT_RETRY_TIMES = 2 as const +export const INITIAL_RETRY_DELAY = '500 millis' as const +export const MAX_CONCURRENT_REQUESTS = 50 as const +export const CIRCUIT_BREAKER_FAILURE_THRESHOLD = 5 as const +export const CIRCUIT_BREAKER_RESET_TIMEOUT = '30 seconds' as const +export const REQUEST_POOL_HEALTH_THRESHOLD = 0.8 as const +export const CONCURRENCY_ADJUSTMENT_STEP = 5 as const diff --git a/packages/transaction-decoder/src/circuit-breaker/request-pool.ts b/packages/transaction-decoder/src/circuit-breaker/request-pool.ts new file mode 100644 index 00000000..626a6b25 --- /dev/null +++ b/packages/transaction-decoder/src/circuit-breaker/request-pool.ts @@ -0,0 +1,270 @@ +import { Effect, Ref, Option, MetricLabel, Metric } from 'effect' + +export interface RequestPoolConfig { + readonly maxConcurrentRequests: number + readonly adaptiveConcurrency: boolean + readonly healthThreshold: number + readonly concurrencyStep: number + readonly metricLabels?: Iterable +} + +export interface RequestPoolState { + readonly activeRequests: number + readonly maxConcurrency: number + readonly successRate: number + readonly totalRequests: number + readonly successfulRequests: number +} + +export interface RequestPool { + getOptimalConcurrency: (chainID: number) => Effect.Effect + withPoolManagement: (chainID: number, effect: Effect.Effect) => Effect.Effect + updateMetrics: (chainID: number, success: boolean) => Effect.Effect +} + +const defaultConfig: RequestPoolConfig = { + maxConcurrentRequests: 50, + adaptiveConcurrency: true, + healthThreshold: 0.8, // 80% success rate + concurrencyStep: 5, +} + +const defaultState: RequestPoolState = { + activeRequests: 0, + maxConcurrency: 10, // Start conservative + successRate: 1.0, + totalRequests: 0, + successfulRequests: 0, +} + +export const make = (configParam: Partial = {}): Effect.Effect => + Effect.gen(function* () { + const poolStates = yield* Ref.make(new Map()) + const activeCounters = yield* Ref.make(new Map()) + const config: RequestPoolConfig = { + ...defaultConfig, + ...configParam, + } + + // Setup metrics if provided + const metrics = Option.fromNullable(config.metricLabels).pipe(Option.map(makeRequestPoolMetrics)) + + const getState = (chainID: number): Effect.Effect => + Ref.get(poolStates).pipe(Effect.map((map) => map.get(chainID) ?? defaultState)) + + const updateState = (chainID: number, newState: RequestPoolState) => + Ref.update(poolStates, (map) => new Map(map).set(chainID, newState)) + + const incrementActive = (chainID: number) => + Effect.gen(function* () { + yield* Ref.update(activeCounters, (map) => { + const current = map.get(chainID) ?? 0 + return new Map(map).set(chainID, current + 1) + }) + + // Update active requests metric + const activeCount = yield* Ref.get(activeCounters).pipe(Effect.map((map) => map.get(chainID) ?? 0)) + yield* withRequestPoolMetrics(metrics, (m) => Metric.set(m.activeRequests, activeCount)) + }) + + const decrementActive = (chainID: number) => + Effect.gen(function* () { + yield* Ref.update(activeCounters, (map) => { + const current = map.get(chainID) ?? 0 + return new Map(map).set(chainID, Math.max(0, current - 1)) + }) + + // Update active requests metric + const activeCount = yield* Ref.get(activeCounters).pipe(Effect.map((map) => map.get(chainID) ?? 0)) + yield* withRequestPoolMetrics(metrics, (m) => Metric.set(m.activeRequests, activeCount)) + }) + + const calculateOptimalConcurrency = (state: RequestPoolState): number => { + if (!config.adaptiveConcurrency) { + return Math.min(config.maxConcurrentRequests, state.maxConcurrency) + } + + const { successRate, maxConcurrency } = state + + if (successRate >= config.healthThreshold) { + // High success rate: can increase concurrency + const newConcurrency = Math.min(config.maxConcurrentRequests, maxConcurrency + config.concurrencyStep) + return newConcurrency + } else if (successRate < config.healthThreshold * 0.7) { + // Low success rate: decrease concurrency + const newConcurrency = Math.max(1, maxConcurrency - config.concurrencyStep) + return newConcurrency + } + + // Moderate success rate: maintain current concurrency + return maxConcurrency + } + + const determinePoolState = ( + successRate: number, + activeRequests: number, + maxConcurrency: number, + ): 'healthy' | 'degraded' | 'overloaded' => { + if (activeRequests >= maxConcurrency * 0.9) { + return 'overloaded' + } else if (successRate < config.healthThreshold * 0.7) { + return 'degraded' + } else { + return 'healthy' + } + } + + const getOptimalConcurrency = (chainID: number): Effect.Effect => + Effect.gen(function* () { + const state = yield* getState(chainID) + const optimalConcurrency = calculateOptimalConcurrency(state) + + // Update the max concurrency in state if it changed + if (optimalConcurrency !== state.maxConcurrency) { + yield* updateState(chainID, { + ...state, + maxConcurrency: optimalConcurrency, + }) + + // Track concurrency adjustment in metrics + yield* withRequestPoolMetrics(metrics, (m) => + Effect.all([ + Metric.increment(m.concurrencyAdjustments), + Metric.set(m.maxConcurrency, optimalConcurrency), + ]).pipe(Effect.asVoid), + ) + } + + return optimalConcurrency + }) + + const withPoolManagement = ( + chainID: number, + effect: Effect.Effect, + ): Effect.Effect => + Effect.gen(function* () { + yield* incrementActive(chainID) + + const result = yield* effect.pipe( + Effect.tap(() => updateMetrics(chainID, true)), + Effect.catchAll((error) => + Effect.gen(function* () { + yield* updateMetrics(chainID, false) + return yield* Effect.fail(error) + }), + ), + Effect.ensuring(decrementActive(chainID)), + ) + + return result + }) + + const updateMetrics = (chainID: number, success: boolean): Effect.Effect => + Effect.gen(function* () { + const state = yield* getState(chainID) + const newTotalRequests = state.totalRequests + 1 + const newSuccessfulRequests = state.successfulRequests + (success ? 1 : 0) + + // Use sliding window to prevent metrics from becoming stale + const windowSize = 100 + const effectiveTotalRequests = Math.min(newTotalRequests, windowSize) + const effectiveSuccessfulRequests = Math.min(newSuccessfulRequests, windowSize) + const effectiveSuccessRate = effectiveSuccessfulRequests / effectiveTotalRequests + + yield* updateState(chainID, { + ...state, + totalRequests: effectiveTotalRequests, + successfulRequests: effectiveSuccessfulRequests, + successRate: effectiveSuccessRate, + }) + + // Update metrics + yield* withRequestPoolMetrics(metrics, (m: RequestPoolMetrics) => + Effect.gen(function* () { + if (success) { + yield* Metric.increment(m.successfulRequests) + } else { + yield* Metric.increment(m.failedRequests) + } + yield* Metric.set(m.successRate, effectiveSuccessRate) + + // Determine and update pool state + const activeCount = yield* Ref.get(activeCounters).pipe(Effect.map((map) => map.get(chainID) ?? 0)) + const poolState = determinePoolState(effectiveSuccessRate, activeCount, state.maxConcurrency) + yield* Metric.set(m.poolState, poolStateToCode(poolState)) + }), + ) + }) + + return { + getOptimalConcurrency, + withPoolManagement, + updateMetrics, + } + }) + +/** + * Metrics tracking for request pool operations + */ +export class RequestPoolMetrics { + constructor( + readonly activeRequests: Metric.Metric.Gauge, + readonly maxConcurrency: Metric.Metric.Gauge, + readonly successfulRequests: Metric.Metric.Counter, + readonly failedRequests: Metric.Metric.Counter, + readonly concurrencyAdjustments: Metric.Metric.Counter, + readonly successRate: Metric.Metric.Gauge, + readonly poolState: Metric.Metric.Gauge, + ) {} +} + +/** + * Pool state codes for metrics + */ +export const poolStateToCode = (state: 'healthy' | 'degraded' | 'overloaded'): number => { + switch (state) { + case 'healthy': + return 0 + case 'degraded': + return 1 + case 'overloaded': + return 2 + } +} + +/** + * Creates metrics for request pool with the given labels + */ +export const makeRequestPoolMetrics = (labels: Iterable) => { + const activeRequests = Metric.gauge('effect_request_pool_active_requests').pipe(Metric.taggedWithLabels(labels)) + const maxConcurrency = Metric.gauge('effect_request_pool_max_concurrency').pipe(Metric.taggedWithLabels(labels)) + const successfulRequests = Metric.counter('effect_request_pool_successful_requests').pipe( + Metric.taggedWithLabels(labels), + ) + const failedRequests = Metric.counter('effect_request_pool_failed_requests').pipe(Metric.taggedWithLabels(labels)) + const concurrencyAdjustments = Metric.counter('effect_request_pool_concurrency_adjustments').pipe( + Metric.taggedWithLabels(labels), + ) + const successRate = Metric.gauge('effect_request_pool_success_rate').pipe(Metric.taggedWithLabels(labels)) + const poolState = Metric.gauge('effect_request_pool_state').pipe(Metric.taggedWithLabels(labels)) + + return new RequestPoolMetrics( + activeRequests, + maxConcurrency, + successfulRequests, + failedRequests, + concurrencyAdjustments, + successRate, + poolState, + ) +} + +/** + * Helper function to conditionally update metrics + */ +export const withRequestPoolMetrics = ( + metrics: Option.Option, + f: (metrics: RequestPoolMetrics) => Effect.Effect, +): Effect.Effect => { + return Effect.ignore(Effect.flatMap(metrics, f)) +} diff --git a/packages/transaction-decoder/src/contract-meta-loader.ts b/packages/transaction-decoder/src/contract-meta-loader.ts index 7cfbcd12..7432e59a 100644 --- a/packages/transaction-decoder/src/contract-meta-loader.ts +++ b/packages/transaction-decoder/src/contract-meta-loader.ts @@ -1,6 +1,6 @@ import { Effect, RequestResolver, Request, Array, Either, pipe, Schema, PrimaryKey, SchemaAST } from 'effect' import { ContractData } from './types.js' -import { GetContractMetaStrategy } from './meta-strategy/request-model.js' +import * as MetaStrategyExecutor from './meta-strategy/strategy-executor.js' import { Address } from 'viem' import { ZERO_ADDRESS } from './decoding/constants.js' import * as ContractMetaStore from './contract-meta-store.js' @@ -79,10 +79,22 @@ const setValue = ({ chainID, address }: ContractMetaLoader, result: ContractData * inside the resolver's body. We use the `makeKey` function to generate a unique key * for each request and group them by that key. We then load the metadata for the unique * requests and resolve the pending requests in a group with the same result + * + * **Circuit Breaking and Resilience** + * + * The ContractMetaLoader now includes circuit breaking and intelligent concurrency management + * by reusing the same resilience infrastructure as the ABI loader: + * - Strategy-level circuit breakers prevent cascading failures + * - Adaptive concurrency based on success rates and chain health + * - Timeout protection for all external strategy calls + * - Progressive degradation when strategies become unhealthy + * - Request pooling with back-pressure handling + * - Shared circuit breaker and request pool instances with ABI loader for consistent behavior */ const ContractMetaLoaderRequestResolver = RequestResolver.makeBatched((requests: Array) => Effect.gen(function* () { - const { strategies } = yield* ContractMetaStore.ContractMetaStore + const { strategies, circuitBreaker, requestPool } = yield* ContractMetaStore.ContractMetaStore + const metaStrategyExecutor = MetaStrategyExecutor.make(circuitBreaker, requestPool) const groups = Array.groupBy(requests, makeKey) const uniqueRequests = Object.values(groups).map((group) => group[0]) @@ -108,31 +120,44 @@ const ContractMetaLoaderRequestResolver = RequestResolver.makeBatched((requests: }, { discard: true, + concurrency: 'unbounded', }, ) - // Fetch ContractMeta from the strategies + // Get optimal concurrency for each chain + const concurrencyMap = new Map() + for (const req of remaining) { + if (!concurrencyMap.has(req.chainID)) { + const optimalConcurrency = yield* requestPool.getOptimalConcurrency(req.chainID) + concurrencyMap.set(req.chainID, optimalConcurrency) + } + } + + const concurrency = Math.min(...[...concurrencyMap.values()], 25) + + yield* Effect.logDebug(`Executing ${remaining.length} remaining meta strategies with concurrency ${concurrency}`) + + // Fetch ContractMeta from the strategies using circuit breaker and request pool const strategyResults = yield* Effect.forEach( remaining, ({ chainID, address }) => { const allAvailableStrategies = Array.prependAll(strategies.default, strategies[chainID] ?? []) - // TODO: Distinct the errors and missing data, so we can retry on errors - return Effect.validateFirst(allAvailableStrategies, (strategy) => - pipe( - Effect.request( - new GetContractMetaStrategy({ - address, - chainId: chainID, - strategyId: strategy.id, - }), - strategy.resolver, - ), - Effect.withRequestCaching(true), - ), - ).pipe(Effect.orElseSucceed(() => null)) + return metaStrategyExecutor + .executeStrategiesSequentially(allAvailableStrategies, { + address, + chainId: chainID, + strategyId: 'meta-batch', + }) + .pipe( + Effect.tapError(Effect.logWarning), + Effect.orElseSucceed(() => null), + ) + }, + { + concurrency, + batching: true, }, - { concurrency: 'unbounded', batching: true }, ) // Store results and resolve pending requests @@ -146,7 +171,7 @@ const ContractMetaLoaderRequestResolver = RequestResolver.makeBatched((requests: Effect.forEach(group, (req) => Request.succeed(req, result), { discard: true }), ) }, - { discard: true }, + { discard: true, concurrency: 'unbounded' }, ) }), ).pipe(RequestResolver.contextFromServices(ContractMetaStore.ContractMetaStore), Effect.withRequestCaching(true)) diff --git a/packages/transaction-decoder/src/contract-meta-store.ts b/packages/transaction-decoder/src/contract-meta-store.ts index c2eddeba..89aa17c7 100644 --- a/packages/transaction-decoder/src/contract-meta-store.ts +++ b/packages/transaction-decoder/src/contract-meta-store.ts @@ -1,6 +1,8 @@ -import { Context, Effect, Layer } from 'effect' +import { Context, Effect, Layer, MetricLabel } from 'effect' import { ContractData } from './types.js' import { ContractMetaResolverStrategy } from './meta-strategy/request-model.js' +import * as CircuitBreaker from './circuit-breaker/circuit-breaker.js' +import * as RequestPool from './circuit-breaker/request-pool.js' type ChainOrDefault = number | 'default' @@ -43,9 +45,25 @@ export interface ContractMetaStore { */ readonly get: (arg: ContractMetaParams) => Effect.Effect readonly getMany?: (arg: Array) => Effect.Effect, never> + readonly circuitBreaker: CircuitBreaker.CircuitBreaker + readonly requestPool: RequestPool.RequestPool } export const ContractMetaStore = Context.GenericTag('@3loop-decoder/ContractMetaStore') -export const make = (args: ContractMetaStore) => Effect.succeed(ContractMetaStore.of(args)) -export const layer = (args: ContractMetaStore) => Layer.effect(ContractMetaStore, make(args)) +export const make = (args: Omit) => + Effect.gen(function* () { + const circuitBreaker = yield* CircuitBreaker.make({ + metricLabels: [MetricLabel.make('service', 'contract-meta-loader')], + }) + + const requestPool = yield* RequestPool.make({ metricLabels: [MetricLabel.make('service', 'contract-meta-loader')] }) + + return { + ...args, + circuitBreaker, + requestPool, + } + }) +export const layer = (args: Omit) => + Layer.effect(ContractMetaStore, make(args)) diff --git a/packages/transaction-decoder/src/meta-strategy/erc20-rpc-strategy.ts b/packages/transaction-decoder/src/meta-strategy/erc20-rpc-strategy.ts index aa99235e..bf5a8060 100644 --- a/packages/transaction-decoder/src/meta-strategy/erc20-rpc-strategy.ts +++ b/packages/transaction-decoder/src/meta-strategy/erc20-rpc-strategy.ts @@ -44,7 +44,10 @@ export const ERC20RPCStrategyResolver = ( }) if (decimalsResponse.status !== 'success') { - return yield* Effect.fail(fail) + // Contract exists but doesn't have ERC20 decimals - this is a "no data found" case + return yield* Effect.fail( + new RequestModel.MissingMetaError(address, chainId, 'erc20-rpc-strategy', 'Contract is not an ERC20 token'), + ) } const meta: ContractData = { diff --git a/packages/transaction-decoder/src/meta-strategy/index.ts b/packages/transaction-decoder/src/meta-strategy/index.ts index edd39669..3fc27a64 100644 --- a/packages/transaction-decoder/src/meta-strategy/index.ts +++ b/packages/transaction-decoder/src/meta-strategy/index.ts @@ -2,3 +2,4 @@ export * from './erc20-rpc-strategy.js' export * from './nft-rpc-strategy.js' export * from './proxy-rpc-strategy.js' export * from './request-model.js' +export * as MetaStrategyExecutor from './strategy-executor.js' diff --git a/packages/transaction-decoder/src/meta-strategy/nft-rpc-strategy.ts b/packages/transaction-decoder/src/meta-strategy/nft-rpc-strategy.ts index 0defad1f..4b8ac78d 100644 --- a/packages/transaction-decoder/src/meta-strategy/nft-rpc-strategy.ts +++ b/packages/transaction-decoder/src/meta-strategy/nft-rpc-strategy.ts @@ -41,7 +41,17 @@ export const NFTRPCStrategyResolver = (publicClientLive: PublicClient): RequestM }, ) - if (!isERC721 && !isERC1155) return yield* Effect.fail(fail) + if (!isERC721 && !isERC1155) { + // Contract exists but doesn't support NFT interfaces - this is a "no data found" case + return yield* Effect.fail( + new RequestModel.MissingMetaError( + address, + chainId, + 'nft-rpc-strategy', + 'Contract is not an NFT (ERC721/ERC1155)', + ), + ) + } const erc721inst = getContract({ abi: erc721Abi, diff --git a/packages/transaction-decoder/src/meta-strategy/proxy-rpc-strategy.ts b/packages/transaction-decoder/src/meta-strategy/proxy-rpc-strategy.ts index 4f5a193b..6001fafc 100644 --- a/packages/transaction-decoder/src/meta-strategy/proxy-rpc-strategy.ts +++ b/packages/transaction-decoder/src/meta-strategy/proxy-rpc-strategy.ts @@ -11,10 +11,16 @@ export const ProxyRPCStrategyResolver = (publicClientLive: PublicClient) => ({ const proxyResult = yield* getProxyImplementation({ address, chainID: chainId }) const { address: implementationAddress, type: proxyType } = proxyResult ?? {} - const fail = new RequestModel.ResolveStrategyMetaError('ProxyRPCStrategy', address, chainId) - if (!implementationAddress || !proxyType) { - return yield* Effect.fail(fail) + // Contract exists but is not a recognized proxy - this is a "no data found" case + return yield* Effect.fail( + new RequestModel.MissingMetaError( + address, + chainId, + 'proxy-rpc-strategy', + 'Contract is not a recognized proxy', + ), + ) } let meta: ContractData | undefined @@ -39,7 +45,10 @@ export const ProxyRPCStrategyResolver = (publicClientLive: PublicClient) => ({ // } if (!meta) { - return yield* Effect.fail(fail) + // Proxy detected but not supported type - this is a "no data found" case + return yield* Effect.fail( + new RequestModel.MissingMetaError(address, chainId, 'proxy-rpc-strategy', 'Proxy type not supported'), + ) } return meta diff --git a/packages/transaction-decoder/src/meta-strategy/request-model.ts b/packages/transaction-decoder/src/meta-strategy/request-model.ts index e2c2795a..320470f9 100644 --- a/packages/transaction-decoder/src/meta-strategy/request-model.ts +++ b/packages/transaction-decoder/src/meta-strategy/request-model.ts @@ -18,6 +18,16 @@ export class ResolveStrategyMetaError { ) {} } +export class MissingMetaError { + readonly _tag = 'MissingMetaError' + constructor( + readonly address: Address, + readonly chainId: number, + readonly strategyId: string, + readonly message: string = 'Missing contract metadata', + ) {} +} + export interface ContractMetaResolverStrategy { id: string resolver: RequestResolver.RequestResolver @@ -32,6 +42,7 @@ export class GetContractMetaStrategy extends Schema.TaggedRequest { + constructor(params: { chainId: number; strategies: string[] }) { + super(params) + } +} + +export const make = (circuitBreaker: CircuitBreaker, requestPool: RequestPool) => { + const executeStrategy = (strategy: ContractMetaResolverStrategy, params: FetchMetaParams) => { + return pipe( + Effect.request( + new GetContractMetaStrategy({ + address: params.address, + chainId: params.chainId, + strategyId: strategy.id, + }), + strategy.resolver, + ), + Effect.withRequestCaching(true), + Effect.timeout(Duration.decode(Constants.STRATEGY_TIMEOUT)), + // Treate MissingMetaError as a success for circuit breaker + Effect.catchTag('MissingMetaError', (error) => { + return Effect.gen(function* () { + yield* Effect.logWarning(`Meta strategy ${strategy.id} found no metadata: ${error.message}`) + return yield* Effect.succeed(error) + }) + }), + Effect.retry( + Schedule.exponential(Duration.decode(Constants.INITIAL_RETRY_DELAY)).pipe( + Schedule.compose(Schedule.recurs(Constants.DEFAULT_RETRY_TIMES)), + ), + ), + (effect) => circuitBreaker.withCircuitBreaker(strategy.id, effect), + (effect) => requestPool.withPoolManagement(params.chainId, effect), + Effect.flatMap((data) => (data instanceof MissingMetaError ? Effect.fail(data) : Effect.succeed(data))), + ) + } + + const executeStrategiesSequentially = (strategies: ContractMetaResolverStrategy[], params: FetchMetaParams) => + Effect.gen(function* () { + // Filter out unhealthy strategies first + const healthyStrategies: ContractMetaResolverStrategy[] = [] + + for (const strategy of strategies) { + const isHealthy = yield* circuitBreaker.isHealthy(strategy.id) + if (isHealthy) { + healthyStrategies.push(strategy) + } else { + yield* Effect.logDebug(`Skipping unhealthy meta strategy: ${strategy.id}`) + } + } + + if (healthyStrategies.length === 0) { + return yield* Effect.fail( + new MissingHealthyStrategy({ + chainId: params.chainId, + strategies: strategies.map((s) => s.id), + }), + ) + } + + // Try strategies one by one until one succeeds + return yield* Effect.validateFirst(healthyStrategies, (strategy) => executeStrategy(strategy, params)) + }) + + return { + executeStrategiesSequentially, + } +} diff --git a/packages/transaction-decoder/test/circuit-breaker-with-metrics.test.ts b/packages/transaction-decoder/test/circuit-breaker-with-metrics.test.ts new file mode 100644 index 00000000..4fc184af --- /dev/null +++ b/packages/transaction-decoder/test/circuit-breaker-with-metrics.test.ts @@ -0,0 +1,183 @@ +import { describe, it, expect } from 'vitest' +import { Effect, MetricLabel, Duration, Either, TestContext } from 'effect' +import * as CircuitBreaker from '../src/circuit-breaker/circuit-breaker.js' + +describe('CircuitBreaker with Metrics', () => { + const createCircuitBreakerWithMetrics = () => { + const metricLabels = [ + MetricLabel.make('service', 'test-api'), + MetricLabel.make('version', '1.0'), + MetricLabel.make('environment', 'test'), + ] + + return CircuitBreaker.make({ + maxFailures: 3, + resetTimeout: Duration.seconds(2), // Shorter timeout for tests + halfOpenMaxCalls: 2, + metricLabels, + isFailure: (error) => error.name !== 'TimeoutError', + }) + } + + const mockApiCall = (shouldFail: boolean): Effect.Effect => + shouldFail ? Effect.fail(new Error('API call failed')) : Effect.succeed('API response') + + it('should create circuit breaker with metrics configuration', async () => { + const testEffect = Effect.gen(function* () { + const circuitBreaker = yield* createCircuitBreakerWithMetrics() + + // Verify circuit breaker is created and functional + const state = yield* circuitBreaker.currentState('test-strategy') + expect(state).toBe('Closed') // Initial state should be closed + + const isHealthy = yield* circuitBreaker.isHealthy('test-strategy') + expect(isHealthy).toBe(true) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should track successful calls and maintain closed state', async () => { + const testEffect = Effect.gen(function* () { + const circuitBreaker = yield* createCircuitBreakerWithMetrics() + + // Make successful calls + for (let i = 0; i < 5; i++) { + const result = yield* circuitBreaker.withCircuitBreaker('success-test', mockApiCall(false)) + expect(result).toBe('API response') + } + + // Circuit should remain closed and healthy + const state = yield* circuitBreaker.currentState('success-test') + expect(state).toBe('Closed') + + const isHealthy = yield* circuitBreaker.isHealthy('success-test') + expect(isHealthy).toBe(true) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should open circuit after max failures and track metrics', async () => { + const testEffect = Effect.gen(function* () { + const circuitBreaker = yield* createCircuitBreakerWithMetrics() + + // Make calls that fail to trip the circuit (maxFailures = 3) + for (let i = 0; i < 4; i++) { + const result = yield* Effect.either(circuitBreaker.withCircuitBreaker('failure-test', mockApiCall(true))) + expect(Either.isLeft(result)).toBe(true) + } + + // Circuit should now be open + const state = yield* circuitBreaker.currentState('failure-test') + expect(state).toBe('Open') + + const isHealthy = yield* circuitBreaker.isHealthy('failure-test') + expect(isHealthy).toBe(false) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should reject calls when circuit is open', async () => { + const testEffect = Effect.gen(function* () { + const circuitBreaker = yield* createCircuitBreakerWithMetrics() + + // Trip the circuit by making it fail + for (let i = 0; i < 4; i++) { + yield* Effect.either(circuitBreaker.withCircuitBreaker('rejection-test', mockApiCall(true))) + } + + // Now try to make a call when circuit is open - should be rejected + const result = yield* Effect.either( + circuitBreaker.withCircuitBreaker( + 'rejection-test', + mockApiCall(false), // This would succeed, but circuit is open + ), + ) + + expect(Either.isLeft(result)).toBe(true) + if (Either.isLeft(result)) { + expect(CircuitBreaker.isCircuitBreakerOpenError(result.left)).toBe(true) + } + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should handle TimeoutError differently based on isFailure configuration', async () => { + const testEffect = Effect.gen(function* () { + const circuitBreaker = yield* createCircuitBreakerWithMetrics() + + const timeoutError = new Error('Request timeout') + timeoutError.name = 'TimeoutError' + + // TimeoutError should not count as failure due to isFailure config + for (let i = 0; i < 5; i++) { + const result = yield* Effect.either( + circuitBreaker.withCircuitBreaker('timeout-test', Effect.fail(timeoutError)), + ) + expect(Either.isLeft(result)).toBe(true) + } + + // Circuit should still be closed since TimeoutErrors don't count as failures + const state = yield* circuitBreaker.currentState('timeout-test') + expect(state).toBe('Closed') + + const isHealthy = yield* circuitBreaker.isHealthy('timeout-test') + expect(isHealthy).toBe(true) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should work with multiple strategy IDs independently', async () => { + const testEffect = Effect.gen(function* () { + const circuitBreaker = yield* createCircuitBreakerWithMetrics() + + // Trip circuit for strategy1 but not strategy2 + for (let i = 0; i < 4; i++) { + yield* Effect.either(circuitBreaker.withCircuitBreaker('strategy1', mockApiCall(true))) + } + + // Make successful calls to strategy2 + for (let i = 0; i < 3; i++) { + const result = yield* circuitBreaker.withCircuitBreaker('strategy2', mockApiCall(false)) + expect(result).toBe('API response') + } + + // Check states - strategy1 should be open, strategy2 should be closed + const state1 = yield* circuitBreaker.currentState('strategy1') + const state2 = yield* circuitBreaker.currentState('strategy2') + + expect(state1).toBe('Open') + expect(state2).toBe('Closed') + + const isHealthy1 = yield* circuitBreaker.isHealthy('strategy1') + const isHealthy2 = yield* circuitBreaker.isHealthy('strategy2') + + expect(isHealthy1).toBe(false) + expect(isHealthy2).toBe(true) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) +}) diff --git a/packages/transaction-decoder/test/request-pool.test.ts b/packages/transaction-decoder/test/request-pool.test.ts new file mode 100644 index 00000000..0ad9959a --- /dev/null +++ b/packages/transaction-decoder/test/request-pool.test.ts @@ -0,0 +1,298 @@ +import { describe, it, expect } from 'vitest' +import { Effect, Either, MetricLabel, TestContext, TestClock } from 'effect' +import { make as makeRequestPool } from '../src/circuit-breaker/request-pool.js' + +describe('RequestPool with Metrics', () => { + it('should create a request pool with metrics configuration', async () => { + const testEffect = Effect.gen(function* () { + const labels = [MetricLabel.make('service', 'test'), MetricLabel.make('pool_type', 'test_pool')] + + const pool = yield* makeRequestPool({ + maxConcurrentRequests: 10, + adaptiveConcurrency: true, + healthThreshold: 0.8, + concurrencyStep: 2, + metricLabels: labels, + }) + + // Test that the pool functions are available + expect(typeof pool.getOptimalConcurrency).toBe('function') + expect(typeof pool.withPoolManagement).toBe('function') + expect(typeof pool.updateMetrics).toBe('function') + + // Test getting optimal concurrency for a chain + const chainId = 1 + const concurrency = yield* pool.getOptimalConcurrency(chainId) + expect(concurrency).toBeGreaterThan(0) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should track successful requests and update metrics', async () => { + const testEffect = Effect.gen(function* () { + const labels = [MetricLabel.make('test', 'success_tracking')] + + const pool = yield* makeRequestPool({ + maxConcurrentRequests: 5, + adaptiveConcurrency: true, + healthThreshold: 0.8, + concurrencyStep: 1, + metricLabels: labels, + }) + + const chainId = 1 + + // Simulate a successful request + const mockSuccessfulRequest = Effect.succeed('success') + + const result = yield* pool.withPoolManagement(chainId, mockSuccessfulRequest) + expect(result).toBe('success') + + // The request should have been tracked (we can't directly assert metrics values + // in this test, but we can verify the pool management completed successfully) + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should track failed requests and update metrics', async () => { + const testEffect = Effect.gen(function* () { + const labels = [MetricLabel.make('test', 'failure_tracking')] + + const pool = yield* makeRequestPool({ + maxConcurrentRequests: 5, + adaptiveConcurrency: true, + healthThreshold: 0.8, + concurrencyStep: 1, + metricLabels: labels, + }) + + const chainId = 1 + + // Simulate a failed request + const mockFailedRequest = Effect.fail(new Error('test error')) + + const result = yield* Effect.either(pool.withPoolManagement(chainId, mockFailedRequest)) + + expect(Either.isLeft(result)).toBe(true) + if (Either.isLeft(result)) { + expect(result.left).toBeInstanceOf(Error) + } + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should adapt concurrency based on success rate', async () => { + const testEffect = Effect.gen(function* () { + const labels = [MetricLabel.make('test', 'concurrency_adaptation')] + + const pool = yield* makeRequestPool({ + maxConcurrentRequests: 20, + adaptiveConcurrency: true, + healthThreshold: 0.8, + concurrencyStep: 2, + metricLabels: labels, + }) + + const chainId = 1 + + // Get initial concurrency + const initialConcurrency = yield* pool.getOptimalConcurrency(chainId) + + // Simulate multiple successful requests to increase concurrency + for (let i = 0; i < 10; i++) { + yield* pool.updateMetrics(chainId, true) + } + + const concurrencyAfterSuccess = yield* pool.getOptimalConcurrency(chainId) + + // Should increase or stay the same + expect(concurrencyAfterSuccess).toBeGreaterThanOrEqual(initialConcurrency) + + // Simulate multiple failed requests to decrease concurrency + for (let i = 0; i < 15; i++) { + yield* pool.updateMetrics(chainId, false) + } + + const concurrencyAfterFailures = yield* pool.getOptimalConcurrency(chainId) + + // Should decrease from the previous value + expect(concurrencyAfterFailures).toBeLessThan(concurrencyAfterSuccess) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should work without metrics when metricLabels is not provided', async () => { + const testEffect = Effect.gen(function* () { + const pool = yield* makeRequestPool({ + maxConcurrentRequests: 10, + adaptiveConcurrency: true, + healthThreshold: 0.8, + concurrencyStep: 2, + // No metricLabels provided + }) + + const chainId = 1 + const mockRequest = Effect.succeed('success without metrics') + + const result = yield* pool.withPoolManagement(chainId, mockRequest) + expect(result).toBe('success without metrics') + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should handle comprehensive request pool metrics example scenario', async () => { + const testEffect = Effect.gen(function* () { + // Create metric labels for identifying this request pool instance + const labels = [ + MetricLabel.make('service', 'transaction-decoder'), + MetricLabel.make('pool_type', 'rpc_pool'), + MetricLabel.make('environment', 'test'), + ] + + // Create request pool with metrics enabled + const pool = yield* makeRequestPool({ + maxConcurrentRequests: 20, + adaptiveConcurrency: true, + healthThreshold: 0.85, + concurrencyStep: 2, + metricLabels: labels, + }) + + // Example chain IDs for different blockchain networks + const ethereumChainId = 1 + const polygonChainId = 137 + const arbitrumChainId = 42161 + + // Simulate RPC request + const simulateRpcRequest = (chainId: number, shouldSucceed = true): Effect.Effect => + Effect.gen(function* () { + // Simulate network delay using test clock + yield* TestClock.adjust('50 millis') + + if (shouldSucceed) { + return `RPC response for chain ${chainId}` + } else { + return yield* Effect.fail(new Error(`RPC request failed for chain ${chainId}`)) + } + }) + + // Function to make requests through the pool + const makePooledRequest = (chainId: number, shouldSucceed = true) => + pool.withPoolManagement(chainId, simulateRpcRequest(chainId, shouldSucceed)) + + // Check initial optimal concurrency for each chain + const ethereumConcurrency = yield* pool.getOptimalConcurrency(ethereumChainId) + const polygonConcurrency = yield* pool.getOptimalConcurrency(polygonChainId) + const arbitrumConcurrency = yield* pool.getOptimalConcurrency(arbitrumChainId) + + expect(ethereumConcurrency).toBeGreaterThan(0) + expect(polygonConcurrency).toBeGreaterThan(0) + expect(arbitrumConcurrency).toBeGreaterThan(0) + + // Simulate successful requests to Ethereum + for (let i = 0; i < 5; i++) { + const result = yield* makePooledRequest(ethereumChainId, true) + expect(result).toBe(`RPC response for chain ${ethereumChainId}`) + } + + // Check concurrency after successful requests + const ethereumConcurrencyAfterSuccess = yield* pool.getOptimalConcurrency(ethereumChainId) + expect(ethereumConcurrencyAfterSuccess).toBeGreaterThanOrEqual(ethereumConcurrency) + + // Simulate failed requests to Polygon + for (let i = 0; i < 5; i++) { + const result = yield* Effect.either(makePooledRequest(polygonChainId, false)) + expect(Either.isLeft(result)).toBe(true) + } + + // Check concurrency after failed requests + const polygonConcurrencyAfterFailures = yield* pool.getOptimalConcurrency(polygonChainId) + // Should be less than or equal to initial (adaptive concurrency should reduce it) + expect(polygonConcurrencyAfterFailures).toBeLessThanOrEqual(polygonConcurrency + 5) // Allow some variance + + // Simulate mixed success/failure for Arbitrum + for (let i = 0; i < 6; i++) { + const shouldSucceed = i % 2 === 0 // Alternate success/failure + yield* Effect.either(makePooledRequest(arbitrumChainId, shouldSucceed)) + } + + const arbitrumConcurrencyAfterMixed = yield* pool.getOptimalConcurrency(arbitrumChainId) + expect(arbitrumConcurrencyAfterMixed).toBeGreaterThan(0) + + // Test concurrent requests + const concurrentRequests = Effect.all( + [ + Effect.either(makePooledRequest(ethereumChainId)), + Effect.either(makePooledRequest(polygonChainId)), + Effect.either(makePooledRequest(arbitrumChainId)), + ], + { concurrency: 3 }, + ) + + const results = yield* concurrentRequests + expect(results).toHaveLength(3) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) + + it('should handle burst of requests and track metrics correctly', async () => { + const testEffect = Effect.gen(function* () { + const labels = [MetricLabel.make('test', 'burst_tracking'), MetricLabel.make('scenario', 'high_load')] + + const pool = yield* makeRequestPool({ + maxConcurrentRequests: 15, + adaptiveConcurrency: true, + healthThreshold: 0.8, + concurrencyStep: 1, + metricLabels: labels, + }) + + const chainId = 1 + const initialConcurrency = yield* pool.getOptimalConcurrency(chainId) + + // Simulate burst of successful requests + const burstRequests = [] + for (let i = 0; i < 10; i++) { + burstRequests.push(pool.withPoolManagement(chainId, Effect.succeed(`Success ${i}`))) + } + + const results = yield* Effect.all(burstRequests, { concurrency: 5 }) + expect(results).toHaveLength(10) + results.forEach((result, index) => { + expect(result).toBe(`Success ${index}`) + }) + + // Check that concurrency potentially increased due to successful requests + const finalConcurrency = yield* pool.getOptimalConcurrency(chainId) + expect(finalConcurrency).toBeGreaterThanOrEqual(initialConcurrency) + + return true + }) + + const result = await Effect.runPromise(testEffect.pipe(Effect.provide(TestContext.TestContext))) + expect(result).toBe(true) + }) +})