In [62]:
from web3 import Web3, HTTPProvider

# Get your node JSON-RPC URL
json_rpc_url = "https://mainnet.infura.io/v3/eb83c1f3e720455a8ebd9a940ba18ced"
web3 = Web3(HTTPProvider(json_rpc_url))

In [63]:
import os
import pandas as pd

# Define the folder and file names with corresponding columns
data_structure = {
    'uniswap-v3-poolcreated.csv': [
        "block_number", "timestamp", "tx_hash", "log_index", "factory_contract_address", 
        "pool_contract_address", "fee", "token0_address", "token0_symbol", "token1_address", 
        "token1_symbol"
    ],
    'uniswap-v3-swap.csv': [
        "block_number", "timestamp", "tx_hash", "log_index", "pool_contract_address", 
        "amount0", "amount1", "sqrt_price_x96", "liquidity", "tick"
    ],
    'uniswap-v3-mint.csv': [
        "block_number", "timestamp", "tx_hash", "log_index", "pool_contract_address", 
        "tick_lower", "tick_upper", "amount", "amount0", "amount1"
    ],
    'uniswap-v3-burn.csv': [
        "block_number", "timestamp", "tx_hash", "log_index", "pool_contract_address", 
        "tick_lower", "tick_upper", "amount", "amount0", "amount1"
    ]
}

folder_name = 'data'

# Create the 'data' folder if it does not exist
if not os.path.exists(folder_name):
    os.makedirs(folder_name)

# Generate and save CSV files with specific columns
for file_name, columns in data_structure.items():
    # Define the file path
    file_path = os.path.join(folder_name, file_name)
    
    # Check if the file already exists
    if not os.path.exists(file_path):
        # If not, create an empty DataFrame with specific columns
        df = pd.DataFrame(columns=columns)
        
        # Save the DataFrame as a CSV file
        df.to_csv(file_path, index=False)
        print(f'{file_name} created successfully!')
    else:
        print(f'{file_name} already exists.')

print('Operation completed!')

uniswap-v3-poolcreated.csv created successfully!
uniswap-v3-swap.csv created successfully!
uniswap-v3-mint.csv created successfully!
uniswap-v3-burn.csv created successfully!
Operation completed!


Fetch all the on-chain transactions of Uniswap v3

In [64]:
from eth_defi.uniswap_v3.constants import UNISWAP_V3_FACTORY_CREATED_AT_BLOCK
from eth_defi.uniswap_v3.events import fetch_events_to_csv
from eth_defi.event_reader.json_state import JSONFileScanState

start_block = UNISWAP_V3_FACTORY_CREATED_AT_BLOCK
end_block = UNISWAP_V3_FACTORY_CREATED_AT_BLOCK + 50000

# Stores the last block number of event data we store
state = JSONFileScanState("/tmp/uniswap-v3-price-scan-12.json")
 
fetch_events_to_csv(json_rpc_url, state, output_folder=folder_name, end_block=end_block)

eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6


No previous scan done, starting fresh from block 12,369,621, total 50,000 blocks
Scanning block range 12,369,621 - 12,419,621


  0%|          | 0/50000 [00:00<?, ?it/s]

eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6
eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6
eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6
eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6
eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6
eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6
eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6
eth_chainId(())
Switching providers mainnet.infura.io -> mainnet.infura.io
Retrying in 0.100000 seconds, retry #0 / 6
eth_chainId(())
Switching providers mainnet.infura.io ->

Wrote 855 PoolCreated events
Wrote 230065 Swap events
Wrote 14077 Mint events
Wrote 4619 Burn events


Generate pseudo addresses for demonstration purpose

In [65]:
import os
import uuid

def generate_ethereum_address():
    return "0x" + uuid.uuid4().hex[:40]

def add_ethereum_address(folder_name, file_names):
    unique_addresses = set()
    for file_name in file_names:
        file_path = os.path.join(folder_name, file_name)
        if os.path.exists(file_path):
            df = pd.read_csv(file_path)
            df['ethereum_address'] = [generate_ethereum_address() for _ in range(len(df))]
            unique_addresses.update(df['ethereum_address'])
            df.to_csv(file_path, index=False)
            print(f'Ethereum addresses added to {file_name}')
        else:
            print(f'{file_name} does not exist')

    # Save the unique addresses to a CSV file
    unique_addresses_df = pd.DataFrame(list(unique_addresses), columns=['ethereum_address'])
    unique_addresses_df.to_csv(os.path.join(folder_name, 'uniswap-v3-user.csv'), index=False)
    print('Unique Ethereum addresses saved to unique_ethereum_addresses.csv')

file_names = list(data_structure.keys())

add_ethereum_address(folder_name, file_names)

Ethereum addresses added to uniswap-v3-poolcreated.csv
Ethereum addresses added to uniswap-v3-swap.csv
Ethereum addresses added to uniswap-v3-mint.csv
Ethereum addresses added to uniswap-v3-burn.csv
Unique Ethereum addresses saved to unique_ethereum_addresses.csv


Fetch information of tokens on Uniswap v3

In [66]:
import aiohttp
import asyncio
import pandas as pd
import nest_asyncio
from tqdm.notebook import tqdm

# Apply the patch to enable running asyncio code in Jupyter notebook
nest_asyncio.apply()

ALCHEMY_API_KEY = 'Po5QDQwc-ejbU22kv3lM4bG8slMP52DW'
BASE_URL = f'https://eth-mainnet.alchemyapi.io/v2/{ALCHEMY_API_KEY}'

async def fetch(session, url, json):
    async with session.post(url, json=json) as response:
        if response.status != 200:
            print(f"Failed to fetch data: {await response.text()}")
            return None
        return await response.json()

async def get_token_details(session, token_address):
    payload = {
        "jsonrpc": "2.0",
        "id": 1,
        "method": "alchemy_getTokenMetadata",
        "params": [token_address]
    }
    response = await fetch(session, BASE_URL, payload)
    if response and 'result' in response:
        token_data = response['result']
        token_data['token_address'] = token_address  # Include token address in the result
        return token_data
    print(f"Failed to get token details for {token_address}")
    return None

async def fetch_all_token_details(token_addresses):
    async with aiohttp.ClientSession() as session:
        tasks = [
            asyncio.ensure_future(get_token_details(session, token_address))
            for token_address in tqdm(token_addresses, desc="Fetching token details")
        ]
        return await asyncio.gather(*tasks)

df = pd.read_csv('data/uniswap-v3-poolcreated.csv')

# Get unique token addresses
token_addresses = set(df['token0_address'].tolist() + df['token1_address'].tolist())

print(f"Total unique token addresses: {len(token_addresses)}")
tokens_data = await fetch_all_token_details(token_addresses)

# Filter out None responses
tokens_data = [token for token in tokens_data if token]
print(f"Number of tokens fetched: {len(tokens_data)}")

# Convert to DataFrame, ensuring 'address' is the first column
column_order = ['token_address'] + [col for col in tokens_data[0] if col != 'token_address']
tokens_df = pd.DataFrame(tokens_data)[column_order]

tokens_df.to_csv(os.path.join(folder_name, 'uniswap-v3-token.csv'), index=False)

print("Data fetched and saved successfully!")


Total unique token addresses: 14


Fetching token details:   0%|          | 0/14 [00:00<?, ?it/s]

Number of tokens fetched: 14
Data fetched and saved successfully!


Generate liquidity pool entity from the four transaction logs table.

In [67]:
import os
import pandas as pd

# Paths to the folders
pool_created_file = os.path.join(folder_name, 'uniswap-v3-poolcreated.csv')
swap_file = os.path.join(folder_name, 'uniswap-v3-swap.csv')
output_file = os.path.join(folder_name, 'uniswap-v3-liquiditypool.csv')

# Read the CSV files
pool_created_df = pd.read_csv(pool_created_file)
swap_df = pd.read_csv(swap_file)

# Convert addresses to lower case for case-insensitive comparison
pool_created_df['pool_contract_address'] = pool_created_df['pool_contract_address'].str.lower()
swap_df['pool_contract_address'] = swap_df['pool_contract_address'].str.lower()

# Convert the block number to integer for proper comparison
swap_df['block_number'] = swap_df['block_number'].astype(int)

# Drop duplicates from the pool_created_df based on the 'pool_contract_address' column
pool_created_df = pool_created_df.drop_duplicates(subset='pool_contract_address')

# Initialize the DataFrame to store the liquidity pool information
liquidity_pool_data = {
    'liquidity_pool_address': [],
    'token0_address': [],
    'token1_address': [],
    'sqrtPriceX96': [],
    'liquidity': []
}

# Iterate through the liquidity pools
for index, row in pool_created_df.iterrows():
    pool_address = row['pool_contract_address']
    token0_address = row['token0_address']
    token1_address = row['token1_address']
    
    # Find the latest swap interaction for the current liquidity pool
    latest_swap = swap_df[swap_df['pool_contract_address'] == pool_address].sort_values(by=['block_number', 'log_index'], ascending=False).head(1)
    
    # Check if there is any swap interaction
    if not latest_swap.empty:
        sqrtPriceX96 = latest_swap.iloc[0]['sqrt_price_x96']
        liquidity = latest_swap.iloc[0]['liquidity']
    else:
        sqrtPriceX96 = 0
        liquidity = 0
    
    # Add the data to the dictionary
    liquidity_pool_data['liquidity_pool_address'].append(pool_address)
    liquidity_pool_data['token0_address'].append(token0_address)
    liquidity_pool_data['token1_address'].append(token1_address)
    liquidity_pool_data['sqrtPriceX96'].append(sqrtPriceX96)
    liquidity_pool_data['liquidity'].append(liquidity)

# Create a DataFrame from the dictionary
liquidity_pool_df = pd.DataFrame(liquidity_pool_data)

# Save the DataFrame to a CSV file
liquidity_pool_df.to_csv(output_file, index=False)

print('Liquidity pool information saved to', output_file)


Liquidity pool information saved to data/liquidity_pool.csv


Modify all the duplicate data that will be primary key in database schema

In [75]:
import pandas as pd
import random

def modify_duplicate(hash_value):
    """
    Modify the given hash by replacing one of its characters.
    """
    print("modifying the tx_hash: " + hash_value)
    # Convert the hash to a list
    hash_list = list(hash_value)
    
    # Randomly select an index to modify (avoiding the '0x' prefix)
    index = random.randint(2, len(hash_list) - 1)
    
    # Randomly select a hexadecimal character
    hex_chars = '0123456789abcdef'
    new_char = random.choice(hex_chars)
    
    # Ensure the new character is different
    while new_char == hash_list[index]:
        new_char = random.choice(hex_chars)
    
    # Replace the character at the selected index
    hash_list[index] = new_char
    
    # Convert the list back to a string and return
    return ''.join(hash_list)

def remove_duplicates(df):
    """
    Keep finding and modifying duplicate hash values until there are no more duplicates.
    """
    while True:
        # Find duplicated tx_hash values
        duplicated = df['tx_hash'].duplicated(keep=False)
        
        # If no duplicates, break out of the loop
        if not duplicated.any():
            break
            
        # Modify the duplicated tx_hash values
        df.loc[duplicated, 'tx_hash'] = df.loc[duplicated, 'tx_hash'].apply(modify_duplicate)
        
    return df


In [76]:
# Find duplicated tx_hash values and modify them
swap_df = pd.read_csv(swap_file)
swap_df = remove_duplicates(swap_df)
# Save the modified DataFrame to a new file
swap_df.to_csv(swap_file, index=False)

In [77]:
# Find duplicated tx_hash values and modify them
create_df = pd.read_csv(pool_created_file)
create_df = remove_duplicates(create_df)
# Save the modified DataFrame to a new file
create_df.to_csv(pool_created_file, index=False)

In [79]:
mint_file = os.path.join(folder_name, 'uniswap-v3-mint.csv')
mint_df = pd.read_csv(mint_file)
mint_df = remove_duplicates(mint_df)
# Save the modified DataFrame to a new file
mint_df.to_csv(mint_file, index=False)

In [80]:
burn_file = os.path.join(folder_name, 'uniswap-v3-burn.csv')
burn_df = pd.read_csv(burn_file)
burn_df = remove_duplicates(burn_df)
burn_df.to_csv(burn_file, index=False)

modifying the tx_hash: 0xfd9da3a9aa36fc28e5f5e70c8503dae321bf88e8143faea072b9cdf9a6783c67
modifying the tx_hash: 0xfd9da3a9aa36fc28e5f5e70c8503dae321bf88e8143faea072b9cdf9a6783c67
modifying the tx_hash: 0xf3719d3519a8a446cca050e1850f68623fc98ff085f240bc4a1f8a136e666be5
modifying the tx_hash: 0xf3719d3519a8a446cca050e1850f68623fc98ff085f240bc4a1f8a136e666be5
modifying the tx_hash: 0xbcf5afb528e6dadbb5e701278d2f7916472e0fa2b8227fa5ae0a2390b97ddaf3
modifying the tx_hash: 0xbcf5afb528e6dadbb5e701278d2f7916472e0fa2b8227fa5ae0a2390b97ddaf3
