In [1]:
from dotenv import load_dotenv

load_dotenv()  # take environment variables from .env.

from etherscan import Etherscan
import json
import os
from web3.contract import Contract, ContractFunction
from web3.auto import w3
from web3 import Web3, AsyncHTTPProvider, HTTPProvider
from web3.eth import AsyncEth
from web3.net import AsyncNet
from web3.geth import Geth, AsyncGethTxPool
from aiohttp import ClientSession, ClientTimeout
import aiohttp
import asyncio
import time

In [2]:
abi_json = Etherscan(api_key=os.environ.get('ETHERSCAN_API_KEY')).get_contract_abi(address="0x33cB657E7fd57F1f2d5f392FB78D5FA80806d1B4")
abi = json.loads(abi_json)

In [3]:
w3 = Web3(HTTPProvider(os.environ.get("WEB3_PROVIDER_URI")))
contract = w3.eth.contract(address="0x33cB657E7fd57F1f2d5f392FB78D5FA80806d1B4", abi=abi)

In [4]:
start_block = 14164514
end_block = start_block + 1000

# For loop

In [20]:
%%time

results = {}
for current_block in range(start_block, end_block + 1):
    results[current_block] = contract.functions.getInventory().call(block_identifier=current_block)

KeyboardInterrupt: 

In [21]:
results

{14164514: [0, 0], 14164515: [0, 0], 14164516: [0, 0], 14164517: [0, 0]}

# Threads

In [23]:

from queue import Queue

from threading import Thread


class Worker(Thread):
  """ Thread executing tasks from a given tasks queue """

  def __init__(self, tasks):
    Thread.__init__(self)
    self.tasks = tasks
    self.daemon = True
    self.start()

  def run(self):
    while True:
      func, args, kargs = self.tasks.get()
      try:
        func(*args, **kargs)
      except Exception as e:
        # An exception happened in this thread
        print(e)
      finally:
        # Mark this task as done, whether an exception happened or not
        self.tasks.task_done()


class ThreadPool:
  """ Pool of threads consuming tasks from a queue """

  def __init__(self, num_threads):
    self.tasks = Queue(num_threads)
    for _ in range(num_threads):
      Worker(self.tasks)

  def add_task(self, func, *args, **kargs):
    """ Add a task to the queue """
    self.tasks.put((func, args, kargs))

  def map(self, func, args_list):
    """ Add a list of tasks to the queue """
    for args in args_list:
      self.add_task(func, args)

  def wait_completion(self):
    """ Wait for completion of all the tasks in the queue """
    self.tasks.join()

In [24]:
%%time

results = {}

blocks = [block for block in range(start_block, end_block + 1)]
pool = ThreadPool(100)

def call(current_block: int):
    results[current_block] = contract.functions.getInventory().call(block_identifier=current_block)

pool.map(call, blocks)
pool.wait_completion()

CPU times: user 8.2 s, sys: 1.43 s, total: 9.63 s
Wall time: 21.9 s


In [180]:
results

{14164611: [0, 0],
 14164573: [0, 0],
 14164565: [0, 0],
 14164579: [0, 0],
 14164610: [0, 0],
 14164544: [0, 0],
 14164523: [0, 0],
 14164515: [0, 0],
 14164521: [0, 0],
 14164591: [0, 0],
 14164614: [0, 0],
 14164527: [0, 0],
 14164517: [0, 0],
 14164549: [0, 0],
 14164554: [0, 0],
 14164546: [0, 0],
 14164534: [0, 0],
 14164559: [0, 0],
 14164577: [0, 0],
 14164522: [0, 0],
 14164516: [0, 0],
 14164569: [0, 0],
 14164615: [0, 0],
 14164560: [0, 0],
 14164580: [0, 0],
 14164607: [0, 0],
 14164606: [0, 0],
 14164585: [0, 0],
 14164540: [0, 0],
 14164532: [0, 0],
 14164568: [0, 0],
 14164535: [0, 0],
 14164576: [0, 0],
 14164558: [0, 0],
 14164630: [0, 0],
 14164526: [0, 0],
 14164612: [0, 0],
 14164574: [0, 0],
 14164595: [0, 0],
 14164587: [0, 0],
 14164589: [0, 0],
 14164557: [0, 0],
 14164605: [0, 0],
 14164525: [0, 0],
 14164571: [0, 0],
 14164639: [0, 0],
 14164602: [0, 0],
 14164599: [0, 0],
 14164545: [0, 0],
 14164633: [0, 0],
 14164562: [0, 0],
 14164541: [0, 0],
 14164537: [

# Async

In [5]:
async_w3 = Web3(
AsyncHTTPProvider(os.environ.get("WEB3_PROVIDER_URI"), request_kwargs=dict(timeout=60)),
    modules={'eth': (AsyncEth,),
             'net': (AsyncNet,),
             'geth': (Geth,
                 {'txpool': (AsyncGethTxPool,)})
             },
         middlewares=[])

In [6]:
async def gather_with_concurrency(n, *tasks):
    semaphore = asyncio.Semaphore(n)

    async def sem_task(task):
        async with semaphore:
            return await task

    return await asyncio.gather(*(sem_task(task) for task in tasks))

In [7]:
conn = aiohttp.TCPConnector(limit=None, ttl_dns_cache=300)
session = aiohttp.ClientSession(connector=conn)

In [8]:
tx = contract.functions.getInventory().buildTransaction()
del tx['chainId']

async def call_async(block):
    await async_w3.eth.call(tx, block_identifier=block)

In [9]:
await call_async(start_block)

In [10]:
blocks = [block for block in range(start_block, end_block + 1)]
concurrency = 300
s = time.perf_counter()
await gather_with_concurrency(concurrency, *[call_async(block) for block in blocks])
elapsed = time.perf_counter() - s
print(elapsed)

32.683786664


In [19]:
8.461942031000035 / 1000 * (14746827 - start_block) / 3600

1.3687496805271453

# Batched