Skip to content

Commit

Permalink
moving indexing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
elicbarbieri committed Apr 16, 2024
1 parent e8c2493 commit 2808dc1
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 248 deletions.
159 changes: 2 additions & 157 deletions starknet_abi/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from bisect import bisect_right
from dataclasses import dataclass
from typing import Any, Sequence

from starknet_abi.abi_types import (
STARKNET_ACCOUNT_CALL,
STARKNET_V0_CALL,
AbiParameter,
StarknetArray,
StarknetType,
)
from starknet_abi.core import StarknetAbi
from starknet_abi.decode import decode_from_params, decode_from_types
from starknet_abi.decoding_types import DecodedEvent, DecodedFunction, DecodedOperation
from starknet_abi.exceptions import DispatcherDecodeError, InvalidCalldataError
from starknet_abi.decoding_types import DecodedEvent, DecodedFunction
from starknet_abi.exceptions import InvalidCalldataError

# fmt: off

Expand Down Expand Up @@ -86,52 +84,6 @@ class ClassDispatcher:
class_hash: bytes


def _parse_tx_calldata(
calldata: list[int],
tx_version: int,
) -> list[tuple[bytes, bytes, list[int]]]:
"""
Parses the calldata for a multicall operation, returning a list of tuples containing the contract address,
function selector, and function calldata for each call in the multicall operation.
:param calldata: list of integers representing the calldata
:param tx_version: version of the call data
:return: [(contract_address, selector, calldata), ...]
"""
match tx_version:
case 0:
decoded_calls = decode_from_types(
[StarknetArray(STARKNET_V0_CALL)], calldata
)
calldata.pop(0) # Discard call count

return [
(
bytes.fromhex(call["to"][2:]),
bytes.fromhex(call["selector"][2:]),
calldata[
call["data_offset"] : call["data_offset"] + call["data_len"]
],
)
for call in decoded_calls[0]
]

case 1 | 2 | 3:
decoded_calls = decode_from_types(
[StarknetArray(STARKNET_ACCOUNT_CALL)], calldata
)
return [
(
bytes.fromhex(call["to"][2:]),
bytes.fromhex(call["selector"][2:]),
[int(c, 16) for c in call["calldata"]],
)
for call in decoded_calls[0]
]
case _:
raise NotImplementedError(f"Unsupported Transaction Version: {tx_version}")


@dataclass(slots=True)
class DecodingDispatcher:
"""
Expand Down Expand Up @@ -162,11 +114,6 @@ class DecodingDispatcher:
],
]

contract_mapping: dict[
bytes, # Last 8 bytes of Contract Address
dict[int, bytes], # Mapping of Declaration Blocks to Class Hashes
]

def __init__(self):
self.class_ids = {}
self.event_types = {}
Expand Down Expand Up @@ -243,24 +190,6 @@ def add_abi(self, abi: StarknetAbi):
)
self.class_ids.update({class_id: class_dispatcher})

def add_contract(
self, contract_address: bytes, class_hash: bytes, declaration_block: int
):
"""
Adds a contract address and an implementation class to the contract mapping.
"""
if class_hash[-8:] not in self.class_ids:
raise DispatcherDecodeError(
f"Class 0x{class_hash.hex()} not present in dispatcher. Cannot add implementation for "
f"contract 0x{contract_address.hex()}"
)

contract_id = contract_address[-8:]
if contract_id not in self.contract_mapping:
self.contract_mapping.update({contract_id: {}})

self.contract_mapping[contract_id].update({declaration_block: class_hash})

def decode_function( # pylint: disable=too-many-locals
self,
calldata: list[int],
Expand Down Expand Up @@ -368,87 +297,3 @@ def decode_event(
name=event_dispatcher.event_name,
data=decoded_data,
)

def decode_multicall(
self,
calldata: list[int],
block: int,
tx_version: int = 3,
) -> list[DecodedOperation]:
"""
Decodes a multicall operation from transaction calldata using the internal mapping of contract_addresses to
implemented class hashes. If the class hash is not present in the dispatcher, the operation is skipped.
:param calldata: list of integers representing the calldata
:param block: block number to decode the transaction at
:param tx_version: transaction version. Earlier txns used a different calldata format
:return: [DecodedOperation, ...]
"""

_calldata = calldata.copy()

decoded_operations = []
for (
contract_address,
function_selector,
function_calldata,
) in _parse_tx_calldata(_calldata, tx_version):
decoded_op = self._decode_tx_call(
contract_address=contract_address,
function_selector=function_selector,
function_calldata=function_calldata,
block=block,
)
decoded_operations.append(decoded_op)

return decoded_operations

def _decode_tx_call(
self,
contract_address: bytes,
function_selector: bytes,
function_calldata: list[int],
block: int,
) -> DecodedOperation:
contract_implementations = self.contract_mapping.get(contract_address[-8:])
if contract_implementations is None:
return DecodedOperation(
operation_name="Unknown",
operation_params={"raw_calldata": function_calldata},
)

block_history = sorted(list(contract_implementations.keys()))
if block < block_history[0]:
raise DispatcherDecodeError(
f"Contract 0x{contract_address.hex()} has no implementation history before block {block_history[0]}."
f" Cannot decode transaction at block {block}"
)

if len(contract_implementations) == 1:
contract_class = contract_implementations[block_history[0]]
else:
impl_block = block_history[bisect_right(block_history, block) - 1]
contract_class = contract_implementations[impl_block]

if contract_class[-8:] not in self.class_ids:
return DecodedOperation(
operation_name="Unknown",
operation_params={"raw_calldata": function_calldata},
)

class_dispatcher = self.class_ids[contract_class[-8:]]
try:
function_decoder = class_dispatcher.function_ids[function_selector[-8:]]
function_types, _ = self.function_types[function_decoder.decoder_reference]
decoded_inputs = decode_from_params(function_types, function_calldata)
except Exception:
raise DispatcherDecodeError( # pylint: disable=raise-missing-from
f"Contract 0x{contract_address.hex()} mapped to Class 0x{class_dispatcher.class_hash.hex()} "
f"at block {block} -- Could Note Decode Function 0x{function_selector.hex()} with Calldata "
f"{function_calldata}"
)

return DecodedOperation(
operation_name=function_decoder.function_name,
operation_params=decoded_inputs,
)
91 changes: 0 additions & 91 deletions tests/test_dispatcher/test_multicall_decoding.py

This file was deleted.

0 comments on commit 2808dc1

Please sign in to comment.