In [1]:
from queryportal.subgraphinterface import SubgraphInterface as sgi
from datetime import datetime

# polars dataframe manipulations
import polars as pl
pl.Config.set_fmt_str_lengths(200)

polars.config.Config

In [2]:
endpoints = [
    'https://api.thegraph.com/subgraphs/name/messari/uniswap-v3-ethereum',
    'https://api.thegraph.com/subgraphs/name/messari/uniswap-v3-arbitrum',
    # ComputeError: could not append {:?} to the builder; make sure that all rows have the same schema or consider increasing `schema_inference_length`
    # 'https://api.thegraph.com/subgraphs/name/messari/uniswap-v3-polygon', # breaking, reason - https://github.com/pola-rs/polars/issues/4489#issuecomment-1220349624
    # 'https://api.thegraph.com/subgraphs/name/messari/uniswap-v3-optimism', # breaking, reason - https://github.com/pola-rs/polars/issues/4489#issuecomment-1220349624
]

# instantiate univ3 class with subgraph key
univ3 = sgi(endpoints=endpoints)

In [3]:
query_paths = [
    'timestamp',
    'hash',
    'pool_name', 
    'pool_id',
    'account_id',
    # 'pool_fees',
    'gasLimit',
    'tokenIn_symbol', 
    'tokenOut_symbol', 
    'amountOutUSD', 
    'amountInUSD',
    'amountOut',
    'amountIn',
    ]

filter_dict={
    'timestamp_gte': int((datetime(2023, 4, 21).timestamp())),
    'timestamp_lte': int(datetime(2023, 4, 28).timestamp()),
    'tokenIn_symbol_in': ['WETH', 'USDC'],
    'tokenOut_symbol_in': ['WETH', 'USDC'],
    # 'pool_fees_feePercentage': 0.05 # PaginationError: [{'message': 'Store error: query execution failed: Child filters can not be nested'}]
}

query_size = 100000000

In [4]:
df_data = []

In [5]:
for subgraph_name in list(univ3.subject.subgraphs.keys())[1:]:
    print('subgraph_name: ', subgraph_name)
    df = univ3.query_entity(
        query_size=query_size,
        entity='swaps',
        name=subgraph_name,
        query_paths=query_paths,
        filter_dict=filter_dict,
        )
    # add subgraph endpoint
    df = df.with_columns(pl.lit(subgraph_name).alias('endpoint'))
    # save dataframe to parquet file.
    df.write_parquet(f'{subgraph_name}')
    df_data.append(df)

subgraph_name:  uniswap-v3-arbitrum
Subgraph endpoint: uniswap-v3-arbitrum with type <class 'subgrounds.subgraph.subgraph.Subgraph'>
Shape: (211662, 12)
Schema: {'timestamp': Int64, 'hash': Utf8, 'gasLimit': Int64, 'amountOutUSD': Float64, 'amountInUSD': Float64, 'amountOut': Float64, 'amountIn': Float64, 'pool_name': Utf8, 'pool_id': Utf8, 'account_id': Utf8, 'tokenIn_symbol': Utf8, 'tokenOut_symbol': Utf8}
Function 'query_entity' executed in 1937.7882s


In [6]:
# CHECKPOINT - load arbitrum and ethereum parquet files
df_arbitrum = pl.read_parquet('uniswap-v3-arbitrum')
df_ethereum = pl.read_parquet('uniswap-v3-ethereum')

# concat them into weth_usdc_df
weth_usdc_df = pl.concat([df_arbitrum, df_ethereum])

In [7]:
weth_usdc_df.shape

(247732, 13)

In [8]:
weth_usdc_df['endpoint'].unique()

endpoint
str
"""uniswap-v3-ethereum"""
"""uniswap-v3-arbitrum"""


In [9]:
# convert timestamp to datetime
weth_usdc_df = weth_usdc_df.with_columns(
       pl.from_epoch("timestamp")
   )

In [10]:
# All the pools in the data
weth_usdc_df[['pool_name', 'endpoint']].unique()

# use this to check tokens
# weth_usdc_df[['pool_name', 'tokenIn_symbol', 'tokenOut_symbol', 'endpoint']].unique()

pool_name,endpoint
str,str
"""Uniswap V3 WETH/USDC 0.01%""","""uniswap-v3-arbitrum"""
"""Uniswap V3 WETH/USDC 0.3%""","""uniswap-v3-arbitrum"""
"""Uniswap V3 WETH/USDC 1%""","""uniswap-v3-arbitrum"""
"""Uniswap V3 USD Coin/Wrapped Ether 0.3%""","""uniswap-v3-ethereum"""
"""Uniswap V3 WETH/Wrapped Ether 1%""","""uniswap-v3-arbitrum"""
"""Uniswap V3 WETH/USDC 0.05%""","""uniswap-v3-arbitrum"""
"""Uniswap V3 USD Coin/Wrapped Ether 0.05%""","""uniswap-v3-ethereum"""
"""Uniswap V3 USD Coin/Wrapped Ether 0.01%""","""uniswap-v3-ethereum"""
"""Uniswap V3 USD Coin/Wrapped Ether 1%""","""uniswap-v3-ethereum"""
"""Uniswap V3 USD Coin (PoS) (Wormhole)/USD Coin 0.01%""","""uniswap-v3-ethereum"""


In [11]:
# # drop columns when tokenIn_symbol = 'WETH' and tokenOut_symbol = 'WETH'
# filtered_sample = weth_usdc_df.filter((pl.col("tokenIn_symbol") == 'WETH') & (pl.col("tokenOut_symbol") == 'WETH'))
# filtered_sample.filter(pl.col('pool_name') == "Uniswap V3 Wrapped Ether (Wormhole)/Wrapped Ether 0.05%")

In [12]:
# change timestamp into daily with this code pl.from_epoch(weth_usdc_df["timestamp"], time_unit="d")
weth_usdc_df = weth_usdc_df.with_columns(pl.from_epoch(weth_usdc_df["timestamp"], time_unit="d").alias('timestamp'))

In [13]:
# save dataframe to parquet file.
weth_usdc_df.write_parquet('weth_usdc_df')