In [None]:
#| default_exp web3

# Web3
> web3 wrapper

## Imports -

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import asyncio
from typing import Union, Optional, Literal
from datetime import datetime
from functools import update_wrapper
import inspect

from fastcore.all import custom_dir, test_fail, listify, test_eq, test
from web3 import Web3, AsyncWeb3
from web3.method import Method
from web3.middleware import async_geth_poa_middleware

from web3_data_tools.core import ainterpolation_search, interpolation_search , batched

In [None]:
#| export
FIRST_POS_BLOCK = 15537394

In [None]:
x = 'a'
isinstance(x, (list, dict))

False

## MultiRPCWeb3 -

In [None]:
#| export
class MultiRPCWeb3:
    """Web3 object that tries to execute a method in multiple RPCs until one succeeds."""

    def __init__(self
                 , *rpcs # list of Web3 objects
                 , providers=None # list of provider objects
                 , poa=False # whether the network is a POA network
                 ):
        """Initialize the MultiRPCWeb3 object."""
        self.rpcs = rpcs
        self.providers = providers
        if not self.providers:
            self.providers = []
            for rpc in self.rpcs:
                if hasattr(rpc, 'provider'):
                    self.providers.append(rpc.provider)

        # hardcode genesis block timestamp (RPCs don't have it right)
        self.cache = {0: 1438269973}

        # add get_block_receipts method. Some RPCs have it some don't.
        for rpc in self.rpcs:
            if isinstance(rpc, (Web3, AsyncWeb3)):
                rpc.eth.attach_methods({"get_block_receipts": Method("eth_getBlockReceipts")})

        if poa:
            self.middleware_onion.inject(async_geth_poa_middleware, layer=0)

    @classmethod
    def from_rpcs(obj
                  , *rpcs # list of RPC URIs
                  ):
        """Create a MultiRPCWeb3 object from a list of RPC URIs."""
        return obj(*[Web3(Web3.HTTPProvider(rpc)) for rpc in rpcs])

    @classmethod
    def async_from_rpcs(obj
                        , *rpcs # list of RPC URIs
                        ):
        """Create an async MultiRPCWeb3 object from a list of RPC URIs."""
        return obj(*[AsyncWeb3(AsyncWeb3.AsyncHTTPProvider(rpc)) for rpc in rpcs])

    def __dir__(self): return custom_dir(self, add=self.rpcs[0].__dir__())

    def __getattr__(self, attr):

        def wrapper(*args, **kwargs):
            for i, (provider, rpc) in enumerate(zip(self.providers, self.rpcs)):
                try:
                    return rpc.__getattribute__(attr)(*args, **kwargs)
                except Exception as e:
                    print(f'{provider} failed with: {e}')
                    if i < len(self.providers) - 1:
                        print(f'Trying {self.providers[i+1]}')
                    else:
                        print(f"All the RPCs failed to execute '{attr}'")
                        raise e

        async def awrapper(*args, **kwargs):
            for i, (provider, rpc) in enumerate(zip(self.providers, self.rpcs)):
                try:
                    return await rpc.__getattribute__(attr)(*args, **kwargs)
                except Exception as e:
                    print(f'{provider} failed with: {e}')
                    if i < len(self.providers) - 1:
                        print(f'Trying {self.providers[i+1]}')
                    else:
                        print(f"All the RPCs failed to execute '{attr}'")
                        raise e

        for rpc in self.rpcs:
            rpc_attr = rpc.__getattribute__(attr)
            if callable(rpc_attr) and not inspect.iscoroutinefunction(rpc_attr):
                return update_wrapper(wrapper, rpc_attr)
            elif inspect.iscoroutinefunction(rpc_attr):
                return update_wrapper(awrapper, rpc_attr)
            elif attr in rpc.__dict__:
                return MultiRPCWeb3(*[rpc.__getattribute__(attr) for rpc in self.rpcs], providers=self.providers)
            elif attr in self.__dict__:
                return self.__getattribute__(attr)
        raise AttributeError(f"Attribute '{attr}' not found in any of the RPCs")

    def sort_providers(self
                       , tolerance: int=0 # number of blocks to consider the same
                       ):
        """Sort provider by block number, so that the one with the highest block number is first."""
        rpcs = self.rpcs[:]
        providers = self.providers[:]
        last_block_number = []
        for i, (rpc, provider) in enumerate(zip(rpcs, providers)):
            self.rpcs = [rpc]
            self.providers = [provider]
            try:
                block_number = self.eth.get_block_number()
                last_block_number.append(block_number)
            except Exception as e:
                print(f'{provider} failed with: {e}')
                last_block_number.append(-1)
        max_block_number = max(last_block_number)
        for i in range(len(last_block_number)):
            if last_block_number[i] >= max_block_number - tolerance:
                last_block_number[i] = max_block_number
        index = sorted(range(len(last_block_number)), key=lambda k: last_block_number[k], reverse=True)
        self.rpcs = [rpcs[i] for i in index]
        self.providers = [providers[i] for i in index]
        return [p.endpoint_uri for p in self.providers]

    async def asort_providers(self
                              , tolerance: int=0 # number of blocks to consider the same
                              ):
        """Sort provider by block number, so that the one with the highest block number is first."""
        rpcs = self.rpcs[:]
        providers = self.providers[:]
        last_block_number = []
        for i, (rpc, provider) in enumerate(zip(rpcs, providers)):
            self.rpcs = [rpc]
            self.providers = [provider]
            try:
                if inspect.iscoroutinefunction(self.eth.get_block_number):
                    block_number = await self.eth.get_block_number()
                else:
                    block_number = self.eth.get_block_number()
                last_block_number.append(block_number)
            except Exception as e:
                print(f'{provider} failed with: {e}')
                last_block_number.append(-1)
        max_block_number = max(last_block_number)
        for i in range(len(last_block_number)):
            if last_block_number[i] >= max_block_number - tolerance:
                last_block_number[i] = max_block_number
        index = sorted(range(len(last_block_number)), key=lambda k: last_block_number[k], reverse=True)
        self.rpcs = [rpcs[i] for i in index]
        self.providers = [providers[i] for i in index]
        return [p.endpoint_uri for p in self.providers]

    def __getitem__(self
                    , block_number: int # block number to get the timestamp of
                    ):
        """The user can get the timestamp of a block by using the syntax `web3[block_number]`."""
        if block_number in self.cache:
            return self.cache[block_number]
        if block_number==-1:
            block_number = 'latest'
        block = self.eth.get_block(block_number)
        if block_number=='latest':
            block_number = block['number']
        self.cache[block_number] = block['timestamp']
        return block['timestamp']

    async def agetitem(self
                       , block_number: int # block number to get the timestamp of
                       ):
        """The user can get the timestamp of a block by using the syntax `web3[block_number]`."""
        if block_number in self.cache:
            return self.cache[block_number]
        if block_number==-1:
            block_number = 'latest'
        block = await self.eth.get_block(block_number)
        if block_number=='latest':
            block_number = block['number']
        self.cache[block_number] = block['timestamp']
        return block['timestamp']
    
    def __len__(self):
        """Returns the number of blocks in the blockchain."""
        return self.eth.get_block_number()

    async def alen(self): 
        """Returns the number of blocks in the blockchain."""
        return await self.eth.get_block_number()

    def find_block_at_timestamp(self
                                , timestamp: Union[datetime, int] # timestamp to search for
                                , low: Optional[int]=None # block number to start the search from
                                , high: Optional[int]=None # block number to end the search at
                                , how: Literal['after', 'before'] ='after' # whether to search for the block after or before the timestamp
                                ):
        """Finds a block at a specific timestamp."""
        if isinstance(timestamp, datetime):
            timestamp = timestamp.timestamp()
        how = {'after': 'right', 'before': 'left'}[how]
        if high is None:
            highs = [k for k in self.cache if self.cache[k] > timestamp]
            if len(highs) > 0:
                high = min(highs)
        return interpolation_search(self, timestamp, low=low, high=high, how=how)

    async def afind_block_at_timestamp(self
                                       , timestamp: Union[datetime, int] # timestamp to search for
                                       , low: Optional[int]=None # block number to start the search from
                                       , high: Optional[int]=None # block number to end the search at
                                       , how: Literal['after', 'before'] ='after' # whether to search for the block after or before the timestamp
                                       ):
        """Finds a block at a specific timestamp."""
        if isinstance(timestamp, datetime):
            timestamp = timestamp.timestamp()
        how = {'after': 'right', 'before': 'left'}[how]
        if high is None:
            highs = [k for k in self.cache if self.cache[k] > timestamp]
            if len(highs) > 0:
                high = min(highs)
        return await ainterpolation_search(self, timestamp, low=low, high=high, how=how)

    async def abatch_method(self
                          , method # method to execute
                          , input_list # list of arguments to pass to the method
                          , method_args=() # extra arguments to pass to the method
                          , method_kwargs={} # extra keyword arguments to pass to the method
                          , batch_size=72 # batch size to use
                          ):
        """Execute a method asynchronously in batches."""
        data = []
        for batch in batched(input_list, batch_size):
            tasks = [self.eth.__getattr__(method)(*listify(o), *method_args, **method_kwargs) for o in batch]
            data.extend(await asyncio.gather(*tasks))
        return data


In [None]:
show_doc(MultiRPCWeb3)

---

[source](https://github.com/flashbots/web3-data-tools/blob/main/web3_data_tools/web3.py#L26){target="_blank" style="float:right; font-size:smaller"}

### MultiRPCWeb3

>      MultiRPCWeb3 (*rpcs, providers=None, poa=False)

*Web3 object that tries to execute a method in multiple RPCs until one succeeds.*

In [None]:
show_doc(MultiRPCWeb3.sort_providers)

---

[source](https://github.com/flashbots/web3-data-tools/blob/main/web3_data_tools/web3.py#L98){target="_blank" style="float:right; font-size:smaller"}

### MultiRPCWeb3.sort_providers

>      MultiRPCWeb3.sort_providers (tolerance:int=0)

*Sort provider by block number, so that the one with the highest block number is first.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| tolerance | int | 0 | number of blocks to consider the same |

In [None]:
show_doc(MultiRPCWeb3.__getitem__)

---

[source](https://github.com/flashbots/web3-data-tools/blob/main/web3_data_tools/web3.py#L147){target="_blank" style="float:right; font-size:smaller"}

### MultiRPCWeb3.__getitem__

>      MultiRPCWeb3.__getitem__ (block_number:int)

*The user can get the timestamp of a block by using the syntax `web3[block_number]`.*

|    | **Type** | **Details** |
| -- | -------- | ----------- |
| block_number | int | block number to get the timestamp of |

In [None]:
show_doc(MultiRPCWeb3.find_block_at_timestamp)

---

[source](https://github.com/flashbots/web3-data-tools/blob/main/web3_data_tools/web3.py#L177){target="_blank" style="float:right; font-size:smaller"}

### MultiRPCWeb3.find_block_at_timestamp

>      MultiRPCWeb3.find_block_at_timestamp
>                                            (timestamp:Union[datetime.datetime,
>                                            int], low:Optional[int]=None,
>                                            high:Optional[int]=None, how:Litera
>                                            l['after','before']='after')

*Finds a block at a specific timestamp.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| timestamp | Union |  | timestamp to search for |
| low | Optional | None | block number to start the search from |
| high | Optional | None | block number to end the search at |
| how | Literal | after | whether to search for the block after or before the timestamp |

In [None]:
show_doc(MultiRPCWeb3.abatch_method)

---

### MultiRPCWeb3.abatch_method

>      MultiRPCWeb3.abatch_method (method, input_list, method_args=(),
>                                  method_kwargs={}, batch_size=72)

*Execute a method asynchronously in batches.*

|    | **Type** | **Default** | **Details** |
| -- | -------- | ----------- | ----------- |
| method |  |  | method to execute |
| input_list |  |  | list of arguments to pass to the method |
| method_args | tuple | () | extra arguments to pass to the method |
| method_kwargs | dict | {} | extra keyword arguments to pass to the method |
| batch_size | int | 72 | batch size to use |

## Example

In [None]:
from datetime import datetime, timezone, timedelta

In [None]:
w3 = MultiRPCWeb3.from_rpcs('http://bad_provider:1234', 'http://localhost:8545')
test_eq(['http://bad_provider:1234', 'http://localhost:8545'], [p.endpoint_uri for p in w3.providers])
w3.sort_providers(tolerance=0)
test_eq(['http://localhost:8545', 'http://bad_provider:1234'], [p.endpoint_uri for p in w3.providers])

In [None]:
w3 = MultiRPCWeb3.from_rpcs('http://localhost:8545')
test_eq(len(w3), w3.eth.get_block_number())
test_eq(w3.find_block_at_timestamp(datetime(2022, 10, 1, tzinfo=timezone.utc).timestamp(), low=FIRST_POS_BLOCK, how='after'), 15649595)
test_fail(w3.find_block_at_timestamp, args=(datetime.today() + timedelta(days=1),), kwargs=dict(low=FIRST_POS_BLOCK, how='after'))

## Async Interface

In [None]:
aw3 = MultiRPCWeb3.async_from_rpcs('http://bad_provider:1234', 'http://localhost:8545')
test_eq(['http://bad_provider:1234', 'http://localhost:8545'], [p.endpoint_uri for p in aw3.providers])
await aw3.asort_providers(tolerance=0)
test_eq(['http://localhost:8545', 'http://bad_provider:1234'], [p.endpoint_uri for p in aw3.providers])

RPC connection http://bad_provider:1234 failed with: Cannot connect to host bad_provider:1234 ssl:default [nodename nor servname provided, or not known]
All the RPCs failed to execute 'get_block_number'
RPC connection http://bad_provider:1234 failed with: Cannot connect to host bad_provider:1234 ssl:default [nodename nor servname provided, or not known]


In [None]:
aw3 = MultiRPCWeb3.async_from_rpcs('http://localhost:8545')
test_eq(await aw3.alen(), await aw3.eth.get_block_number())
test_eq(await aw3.agetitem(19989282), 1717152047)
test_eq(await aw3.afind_block_at_timestamp(datetime(2022, 10, 1, tzinfo=timezone.utc).timestamp(), low=FIRST_POS_BLOCK, how='after'), 15649595)

### Get Many Blocks Fast with `abatch_method`

In [None]:
import web3

aw3 = MultiRPCWeb3.async_from_rpcs('http://localhost:8545')
block_numbers = range(FIRST_POS_BLOCK, FIRST_POS_BLOCK + 72)
blocks = await aw3.abatch_method('get_block', block_numbers, method_args=(True,), method_kwargs={}, batch_size=72)
test_eq(len(blocks), 72)
test_eq(all([isinstance(b, web3.datastructures.AttributeDict) for b in blocks]), True)

## Export -

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()