* https://web3-ethereum-defi.readthedocs.io/tutorials/uniswap-v3-liquidity-analysis.html

In [9]:
import os
from web3 import Web3, HTTPProvider
from eth_defi.event_reader.filter import Filter
from eth_defi.provider.multi_provider import create_multi_provider_web3

# Get your node JSON-RPC URL
json_rpc_url='https://mainnet.infura.io/v3/29547b52f77647b68da777e3ecc06811'
web3 = create_multi_provider_web3(json_rpc_url)

In [2]:
from eth_defi.uniswap_v3.constants import UNISWAP_V3_FACTORY_CREATED_AT_BLOCK
from eth_defi.uniswap_v3.events import fetch_events_to_csv
from eth_defi.uniswap_v3.events import get_event_mapping
from eth_defi.event_reader.json_state import JSONFileScanState
from eth_defi.event_reader.lazy_timestamp_reader import TrackedLazyTimestampReader
from eth_defi.event_reader.multithread import MultithreadEventReader
from eth_defi.event_reader.logresult import LogContext
from pathlib import Path
import csv
from tqdm.auto import tqdm
import datetime
import time

start_block = UNISWAP_V3_FACTORY_CREATED_AT_BLOCK
end_block = UNISWAP_V3_FACTORY_CREATED_AT_BLOCK + 5_000

# Stores the last block number of event data we store
state = JSONFileScanState("/tmp/uniswap-v3-liquidity-scan.json")

In [4]:
web3 = create_multi_provider_web3(json_rpc_url)

event_mapping = get_event_mapping(web3)
contract_events = [event_data["contract_event"] for event_data in event_mapping.values()]

# Create a filter for any Uniswap v3 pool contract, all our events we are interested in
filter = Filter.create_filter(address=None, event_types=contract_events)

# Start scanning
restored, restored_start_block = state.restore_state(start_block)
original_block_range = end_block - start_block

In [7]:
buffers = {}
output_folder = "/tmp"
log_info=print
max_blocks_once=2000
max_threads=10

for event_name, mapping in event_mapping.items():
    # Each event type gets its own CSV
    file_path = f"{output_folder}/uniswap-v3-{event_name.lower()}.csv"

    exists_already = Path(file_path).exists()
    file_handler = open(file_path, "a", encoding="utf-8")
    csv_writer = csv.DictWriter(file_handler, fieldnames=mapping["field_names"])
    if not restored:
        headers = ", ".join(mapping["field_names"])
        log_info(f"Creating a new CSV file: {file_path}, with headers: {headers}")
        csv_writer.writeheader()

    # For each event, we have its own
    # counters and handlers in the context dictionary
    buffers[event_name] = {
            "buffer": [],
            "total": 0,
            "file_handler": file_handler,
            "csv_writer": csv_writer,
            "file_path": file_path,
        }

log_info(f"Saving Uniswap v3 data for block range {restored_start_block:,} - {end_block:,}")

timestamp_reader = TrackedLazyTimestampReader()

Saving Uniswap v3 data for block range 12,375,494 - 12,374,621


In [None]:
event_mapping = get_event_mapping(web3)
contract_events = [event_data["contract_event"] for event_data in event_mapping.values()]

# Create a filter for any Uniswap v3 pool contract, all our events we are interested in
filter = Filter.create_filter(address=None, event_types=contract_events)

# Start scanning
restored, restored_start_block = state.restore_state(start_block)
original_block_range = end_block - start_block

In [None]:


for event_name, mapping in event_mapping.items():
    # Each event type gets its own CSV
    file_path = f"{output_folder}/uniswap-v3-{event_name.lower()}.csv"

    exists_already = Path(file_path).exists()
    file_handler = open(file_path, "a", encoding="utf-8")
    csv_writer = csv.DictWriter(file_handler, fieldnames=mapping["field_names"])
    if not restored:
        headers = ", ".join(mapping["field_names"])
        log_info(f"Creating a new CSV file: {file_path}, with headers: {headers}")
        csv_writer.writeheader()

    # For each event, we have its own
    # counters and handlers in the context dictionary
    buffers[event_name] = {
            "buffer": [],
            "total": 0,
            "file_handler": file_handler,
            "csv_writer": csv_writer,
            "file_path": file_path,
        }

log_info(f"Saving Uniswap v3 data for block range {restored_start_block:,} - {end_block:,}")

In [None]:
timestamp_reader = TrackedLazyTimestampReader()

# Wrap everything in a TQDM progress bar, notebook friendly version
with tqdm(total=end_block - start_block) as progress_bar:

    def update_progress(
            current_block,
            start_block,
            end_block,
            chunk_size: int,
            total_events: int,
            last_timestamp: int,
            context: LogContext,
    ):
        # Update progress bar
        #nonlocal buffers
    
        header_count = timestamp_reader.get_count()
    
        if last_timestamp:
            # Display progress with the date information
            d = datetime.datetime.utcfromtimestamp(last_timestamp)
            formatted_time = d.strftime("%Y-%m-%d")
            progress_bar.set_description(f"Block: {current_block:,}, events: {total_events:,}, time:{formatted_time}, block headers: {header_count:,}")
        else:
            progress_bar.set_description(f"Block: {current_block:,}, events: {total_events:,}, block headers: {header_count:,}")
    
        progress_bar.update(chunk_size)

In [None]:
timestamp_reader = TrackedLazyTimestampReader()

# Wrap everything in a TQDM progress bar, notebook friendly version
with tqdm(total=end_block - start_block) as progress_bar:

    def update_progress(
            current_block,
            start_block,
            end_block,
            chunk_size: int,
            total_events: int,
            last_timestamp: int,
            context: LogContext,
    ):
        # Update progress bar
        #nonlocal buffers
    
        header_count = timestamp_reader.get_count()
    
        if last_timestamp:
            # Display progress with the date information
            d = datetime.datetime.utcfromtimestamp(last_timestamp)
            formatted_time = d.strftime("%Y-%m-%d")
            progress_bar.set_description(f"Block: {current_block:,}, events: {total_events:,}, time:{formatted_time}, block headers: {header_count:,}")
        else:
            progress_bar.set_description(f"Block: {current_block:,}, events: {total_events:,}, block headers: {header_count:,}")
    
        progress_bar.update(chunk_size)
    
        # Update event specific contexes
        for buffer_data in buffers.values():
            buffer = buffer_data["buffer"]
    
            # write events to csv
            for entry in buffer:
                buffer_data["csv_writer"].writerow(entry)
                buffer_data["total"] += 1
    
                # then reset buffer
                buffer_data["buffer"] = []
    
                # Sync the state of updated events
        state.save_state(current_block)

    # Create a multi-threaded Solidity event reader
    # that has a callback to our progress bar notifier
    reader = MultithreadEventReader(
            json_rpc_url,
            notify=update_progress,
            max_blocks_once=max_blocks_once,
            max_threads=max_threads,
        )

    for log_result in reader(
                    web3,
                    restored_start_block,
                    end_block,
                    filter=filter,
                    extract_timestamps=timestamp_reader.extract_timestamps_json_rpc_lazy,
                ):
                    print('test')
                    try:
                        event_name = log_result["event"].event_name  # Which event this is: Swap, Burn,...
                        buffer = buffers[event_name]["buffer"]  # Choose CSV buffer for this event
                        decode_function = event_mapping[event_name]["decode_function"]  # Each event needs its own decoder
                        buffer.append(decode_function(web3, log_result))
                    except Exception as e:
                        raise RuntimeError(f"Could not decode {log_result}") from e


In [None]:
# close files and print stats
for event_name, buffer in buffers.items():
    buffer["file_handler"].close()
    log_info(f"Wrote {buffer['total']} {event_name} events to {buffer['file_path']}")

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv('/tmp/uniswap-v3-swap.csv')  

In [None]:
df

In [None]:
from eth_defi.uniswap_v3.liquidity import create_tick_delta_csv

tick_delta_csv = create_tick_delta_csv("/tmp/uniswap-v3-mint.csv", "/tmp/uniswap-v3-burn.csv")