Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/forty-cars-clap.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@3loop/transaction-decoder': minor
---

Implement circuit breaker and request pool for abi and meta strategies
11 changes: 9 additions & 2 deletions apps/web/src/lib/decode.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { getProvider, RPCProviderLive } from './rpc-provider'
import { Config, Effect, Layer, ManagedRuntime, Request } from 'effect'
import { Config, Effect, Layer, Logger, LogLevel, ManagedRuntime, Request } from 'effect'
import {
DecodedTransaction,
DecodeResult,
Expand All @@ -19,6 +19,13 @@ import { SqlAbiStore, SqlContractMetaStore } from '@3loop/transaction-decoder/sq
import { Hex } from 'viem'
import { DatabaseLive } from './database'

const LogLevelLive = Layer.unwrapEffect(
Effect.gen(function* () {
const level = LogLevel.Warning
return Logger.minimumLogLevel(level)
}),
)

const AbiStoreLive = Layer.unwrapEffect(
Effect.gen(function* () {
const service = yield* PublicClient
Expand Down Expand Up @@ -50,7 +57,7 @@ const CacheLayer = Layer.setRequestCache(Request.makeCache({ capacity: 100, time
const DataLayer = Layer.mergeAll(RPCProviderLive, DatabaseLive)
const LoadersLayer = Layer.mergeAll(AbiStoreLive, MetaStoreLive)

const MainLayer = Layer.provideMerge(LoadersLayer, DataLayer)
const MainLayer = Layer.provideMerge(LoadersLayer, DataLayer).pipe(Layer.provide(LogLevelLive))

const runtime = ManagedRuntime.make(Layer.provide(MainLayer, CacheLayer))

Expand Down
1 change: 1 addition & 0 deletions packages/eslint-config-custom/library.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ module.exports = {
rules: {
"@typescript-eslint/no-empty-interface": "off",
"@typescript-eslint/no-explicit-any": "off",
'@typescript-eslint/no-namespace': 'off',
'@typescript-eslint/strict-boolean-expressions': [
'error',
{
Expand Down
78 changes: 53 additions & 25 deletions packages/transaction-decoder/src/abi-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { Effect, Either, RequestResolver, Request, Array, pipe, Data, PrimaryKey
import { ContractABI } from './abi-strategy/request-model.js'
import { Abi } from 'viem'
import * as AbiStore from './abi-store.js'
import { AA_ABIS, SAFE_MULTISEND_ABI } from './decoding/constants.js'
import { SAFE_MULTISEND_SIGNATURE } from './decoding/constants.js'
import * as StrategyExecutorModule from './abi-strategy/strategy-executor.js'
import { SAFE_MULTISEND_SIGNATURE, SAFE_MULTISEND_ABI, AA_ABIS } from './decoding/constants.js'

interface LoadParameters {
readonly chainID: number
Expand Down Expand Up @@ -61,7 +61,7 @@ const getMany = (requests: Array<AbiStore.AbiParams>) =>
return yield* Effect.all(
requests.map(({ chainID, address, event, signature }) => get({ chainID, address, event, signature })),
{
concurrency: 'inherit',
concurrency: 'unbounded',
batching: 'inherit',
},
)
Expand Down Expand Up @@ -126,16 +126,22 @@ const getBestMatch = (abi: ContractABI | null) => {
* for each request and group them by that key. We then load the ABI for the unique
* requests and resolve the pending requests in a group with the same result.
*
* **Circuit Breaking and Resilience**
*
* The AbiLoader now includes circuit breaking and intelligent concurrency management:
* - Strategy-level circuit breakers prevent cascading failures
* - Adaptive concurrency based on success rates and chain health
* - Timeout protection for all external strategy calls
* - Progressive degradation when strategies become unhealthy
* - Request pooling with back-pressure handling
*/
const AbiLoaderRequestResolver: Effect.Effect<
RequestResolver.RequestResolver<AbiLoader, never>,
never,
AbiStore.AbiStore
> = RequestResolver.makeBatched((requests: Array<AbiLoader>) =>

export const AbiLoaderRequestResolver = RequestResolver.makeBatched((requests: Array<AbiLoader>) =>
Effect.gen(function* () {
if (requests.length === 0) return

const { strategies } = yield* AbiStore.AbiStore
const { strategies, circuitBreaker, requestPool } = yield* AbiStore.AbiStore
const strategyExecutor = StrategyExecutorModule.make(circuitBreaker, requestPool)

const requestGroups = Array.groupBy(requests, makeRequestKey)
const uniqueRequests = Object.values(requestGroups).map((group) => group[0])
Expand Down Expand Up @@ -164,34 +170,53 @@ const AbiLoaderRequestResolver: Effect.Effect<
},
{
discard: true,
concurrency: 'unbounded',
},
)

// Get optimal concurrency for each chain
const concurrencyMap = new Map<number, number>()
for (const req of remaining) {
if (!concurrencyMap.has(req.chainID)) {
const optimalConcurrency = yield* requestPool.getOptimalConcurrency(req.chainID)
concurrencyMap.set(req.chainID, optimalConcurrency)
}
}

const concurrency = Math.min(...[...concurrencyMap.values(), 50]) // Use minimum concurrency across all chains, capped at 25

yield* Effect.logDebug(`Executing ${remaining.length} remaining requests with concurrency ${concurrency}`)

// NOTE: Firstly we batch strategies by address because in a transaction most of events and traces are from the same abi
const response = yield* Effect.forEach(
remaining,
(req) => {
const allAvailableStrategies = Array.prependAll(strategies.default, strategies[req.chainID] ?? []).filter(
(strategy) => strategy.type === 'address',
)
return Effect.validateFirst(allAvailableStrategies, (strategy) => {
return strategy.resolver({

return strategyExecutor
.executeStrategiesSequentially(allAvailableStrategies, {
address: req.address,
chainId: req.chainID,
strategyId: strategy.id,
strategyId: 'address-batch',
})
}).pipe(
Effect.map(Either.left),
Effect.orElseSucceed(() => Either.right(req)),
)
.pipe(
Effect.tapError(Effect.logWarning),
Effect.orElseSucceed(() => null),
Effect.map((result) => (result ? Either.left(result) : Either.right(req))),
)
},
{
concurrency: 'unbounded',
concurrency,
},
)

const [addressStrategyResults, notFound] = Array.partitionMap(response, (res) => res)

yield* Effect.logDebug(
`Address strategies resolved ${addressStrategyResults.length} ABIs, ${notFound.length} not found`,
)
// NOTE: Secondly we request strategies to fetch fragments
const fragmentStrategyResults = yield* Effect.forEach(
notFound,
Expand All @@ -200,19 +225,21 @@ const AbiLoaderRequestResolver: Effect.Effect<
(strategy) => strategy.type === 'fragment',
)

// TODO: Distinct the errors and missing data, so we can retry on errors
return Effect.validateFirst(allAvailableStrategies, (strategy) =>
strategy.resolver({
return strategyExecutor
.executeStrategiesSequentially(allAvailableStrategies, {
address,
chainId: chainID,
event,
signature,
strategyId: strategy.id,
}),
).pipe(Effect.orElseSucceed(() => null))
strategyId: 'fragment-batch',
})
.pipe(
Effect.tapError(Effect.logWarning),
Effect.orElseSucceed(() => null),
) // If no strategies found, return null
},
{
concurrency: 'unbounded',
concurrency,
batching: true,
},
)
Expand Down Expand Up @@ -247,8 +274,9 @@ const AbiLoaderRequestResolver: Effect.Effect<
// We can decode with Effect.validateFirst(abis, (abi) => decodeMethod(input as Hex, abi)) and to find the first ABIs
// that decodes successfully. We might enforce a sorted array to prioritize the address match. We will have to think
// how to handle the strategy resolver in this case. Currently, we stop at first successful strategy, which might result
// in a missing Fragment. We treat this issue as a minor one for now, as we epect it to occur rarely on contracts that
// in a missing Fragment. We treat this issue as a minor one for now, as we expect it to occur rarely on contracts that
// are not verified and with a non standard events structure.

export const getAndCacheAbi = (params: AbiStore.AbiParams) =>
Effect.gen(function* () {
if (params.event === '0x' || params.signature === '0x') {
Expand Down
23 changes: 19 additions & 4 deletions packages/transaction-decoder/src/abi-store.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Context, Effect, RateLimiter, Function, Layer } from 'effect'
import { Context, Effect, RateLimiter, Function, Layer, MetricLabel } from 'effect'
import { ContractABI, ContractAbiResolverStrategy, GetContractABIStrategyParams } from './abi-strategy/request-model.js'

import * as CircuitBreaker from './circuit-breaker/circuit-breaker.js'
import * as RequestPool from './circuit-breaker/request-pool.js'
export interface AbiParams {
chainID: number
address: string
Expand Down Expand Up @@ -32,11 +33,16 @@ export interface AbiStore {
readonly set: (key: AbiParams, value: ContractAbiResult) => Effect.Effect<void, never>
readonly get: (arg: AbiParams) => Effect.Effect<ContractAbiResult, never>
readonly getMany?: (arg: Array<AbiParams>) => Effect.Effect<Array<ContractAbiResult>, never>
readonly circuitBreaker: CircuitBreaker.CircuitBreaker<unknown>
readonly requestPool: RequestPool.RequestPool
}

export const AbiStore = Context.GenericTag<AbiStore>('@3loop-decoder/AbiStore')

export const make = ({ strategies: strategiesWithoutRateLimit, ...rest }: AbiStore) =>
export const make = ({
strategies: strategiesWithoutRateLimit,
...rest
}: Omit<AbiStore, 'circuitBreaker' | 'requestPool'>) =>
Effect.gen(function* () {
const strategies = yield* Effect.reduce(
Object.entries(strategiesWithoutRateLimit),
Expand All @@ -60,10 +66,19 @@ export const make = ({ strategies: strategiesWithoutRateLimit, ...rest }: AbiSto
}
}),
)

const circuitBreaker = yield* CircuitBreaker.make({
metricLabels: [MetricLabel.make('service', 'abi-loader')],
})

const requestPool = yield* RequestPool.make({ metricLabels: [MetricLabel.make('service', 'abi-loader')] })

return {
strategies,
circuitBreaker,
requestPool,
...rest,
}
})

export const layer = (args: AbiStore) => Layer.scoped(AbiStore, make(args))
export const layer = (args: Omit<AbiStore, 'circuitBreaker' | 'requestPool'>) => Layer.scoped(AbiStore, make(args))
99 changes: 72 additions & 27 deletions packages/transaction-decoder/src/abi-strategy/blockscout-abi.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,68 @@
import { Effect } from 'effect'
import * as RequestModel from './request-model.js'

type FetchResult =
| { type: 'success'; data: RequestModel.ContractABI[] }
| { type: 'missing'; reason: string }
| { type: 'error'; cause: unknown }

async function fetchContractABI(
{ address, chainId }: RequestModel.GetContractABIStrategyParams,
config: { apikey?: string; endpoint: string },
): Promise<RequestModel.ContractABI[]> {
const endpoint = config.endpoint
): Promise<FetchResult> {
try {
const endpoint = config.endpoint

const params: Record<string, string> = {
module: 'contract',
action: 'getabi',
address,
}
const params: Record<string, string> = {
module: 'contract',
action: 'getabi',
address,
}

if (config?.apikey) {
params['apikey'] = config.apikey
}
if (config?.apikey) {
params['apikey'] = config.apikey
}

const searchParams = new URLSearchParams(params)
const searchParams = new URLSearchParams(params)

const response = await fetch(`${endpoint}?${searchParams.toString()}`)
const json = (await response.json()) as { status: string; result: string; message: string }
const response = await fetch(`${endpoint}?${searchParams.toString()}`)
const json = (await response.json()) as { status: string; result: string; message: string }

if (json.status === '1') {
return [
{
chainID: chainId,
address,
abi: json.result,
type: 'address',
},
]
}
if (json.status === '1') {
return {
type: 'success',
data: [
{
chainID: chainId,
address,
abi: json.result,
type: 'address',
},
],
}
}

throw new Error(`Failed to fetch ABI for ${address} on chain ${chainId}`)
// If the API request was successful but no ABI was found
if (
json.status === '0' &&
(json.message?.includes('not verified') || json.result === 'Contract source code not verified')
) {
return {
type: 'missing',
reason: `No verified ABI found: ${json.message || json.result}`,
}
}

return {
type: 'error',
cause: json,
}
} catch (error) {
return {
type: 'error',
cause: error,
}
}
}

export const BlockscoutStrategyResolver = (config: {
Expand All @@ -45,9 +74,25 @@ export const BlockscoutStrategyResolver = (config: {
type: 'address',
resolver: (req: RequestModel.GetContractABIStrategyParams) =>
Effect.withSpan(
Effect.tryPromise({
try: () => fetchContractABI(req, config),
catch: () => new RequestModel.ResolveStrategyABIError('Blockscout', req.address, req.chainId),
Effect.gen(function* () {
const result = yield* Effect.promise(() => fetchContractABI(req, config))

if (result.type === 'success') {
return result.data
} else if (result.type === 'missing') {
return yield* Effect.fail(
new RequestModel.MissingABIStrategyError(
req.address,
req.chainId,
'blockscout-strategy',
undefined,
undefined,
result.reason,
),
)
} else {
return yield* Effect.fail(new RequestModel.ResolveStrategyABIError('Blockscout', req.address, req.chainId))
}
}),
'AbiStrategy.BlockscoutStrategyResolver',
{ attributes: { chainId: req.chainId, address: req.address } },
Expand Down
Loading
Loading