In [12]:
from subgrounds import Subgrounds

import polars as pl

In [13]:
# define a timestamp variable
timestamp = 1677891498 # current block timestamp is around 1677891498 on March 3rd, 2023 8:06PM

# we set a fixed query size number
query_size = 25000

#Filter size - We filter trades out that are smaller than $1000 USD size
filter_usd = 1000

token_addr_list = [
    "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",   # weth
    "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"    # usdc
    ]

In [14]:
sg = Subgrounds()

In [15]:
# Load
cow_sg = sg.load_subgraph('https://api.thegraph.com/subgraphs/name/cowprotocol/cow')

In [16]:
schema_list = [name for name, type_ in cow_sg._schema.type_map.items() if type_.is_object]

In [28]:
schema_list

['Bundle',
 'DailyTotal',
 'HourlyTotal',
 'Order',
 'Pair',
 'PairDaily',
 'PairHourly',
 'Query',
 'Settlement',
 'Subscription',
 'Token',
 'TokenDailyTotal',
 'TokenHourlyTotal',
 'TokenTradingEvent',
 'Total',
 'Trade',
 'UniswapPool',
 'UniswapToken',
 'User',
 '_Block_',
 '_Meta_']

In [25]:
# find the string 'Query' in schema_list
trade_schema = [s for s in schema_list if "Trade" in s][0]

In [26]:
trade_schema

'Trade'

In [30]:
# Class used as namespace for all types and functions related to GraphQL schema type references. Note that most types in the schema will be object types.
from subgrounds.schema import TypeRef

# return a list of all of fields of an object
list((field.name, TypeRef.graphql(field.type_)) for field in cow_sg.Trade._object.fields)

[('id', 'ID!'),
 ('timestamp', 'Int!'),
 ('gasPrice', 'BigInt!'),
 ('feeAmount', 'BigInt!'),
 ('txHash', 'Bytes!'),
 ('settlement', 'Settlement!'),
 ('buyAmount', 'BigInt!'),
 ('sellAmount', 'BigInt!'),
 ('sellToken', 'Token!'),
 ('buyToken', 'Token!'),
 ('order', 'Order!'),
 ('buyAmountEth', 'BigDecimal'),
 ('sellAmountEth', 'BigDecimal'),
 ('buyAmountUsd', 'BigDecimal'),
 ('sellAmountUsd', 'BigDecimal')]

In [92]:
cow_trades_qp = cow_sg.Query.trades

In [93]:
trades_qp = cow_sg.Query.trades(
    orderBy=cow_sg.Query.trades.timestamp,
    orderDirection='desc',
    first=query_size,
    where = {
    'timestamp_lt': timestamp, 
    'buyAmountUsd_gt': filter_usd, 
    'sellAmountUsd_gt': filter_usd, 
    "sellToken_in": token_addr_list, 
    "buyToken_in": token_addr_list
    }
)

In [94]:
trades_df = sg.query_df(trades_qp)

In [95]:
trades_df.dtypes

trades_id                object
trades_timestamp          int64
trades_gasPrice           int64
trades_feeAmount          int64
trades_txHash            object
trades_settlement_id     object
trades_buyAmount         object
trades_sellAmount        object
trades_sellToken_id      object
trades_buyToken_id       object
trades_order_id          object
trades_buyAmountEth     float64
trades_sellAmountEth    float64
trades_buyAmountUsd     float64
trades_sellAmountUsd    float64
dtype: object

In [96]:
# convert buy and sell amounts to float64
trades_df['trades_gasPrice'] = trades_df['trades_gasPrice'].astype('float64')
trades_df['trades_feeAmount'] = trades_df['trades_feeAmount'].astype('float64')
trades_df['trades_buyAmount'] = trades_df['trades_buyAmount'].astype('float64')
trades_df['trades_sellAmount'] = trades_df['trades_sellAmount'].astype('float64')

In [97]:
# convert trades_df to polars dataframe
trades_pl = pl.from_pandas(trades_df)

In [98]:
print(f'query returned {len(trades_pl)} rows')

query returned 21848 rows


### Cowswap Trades-Settlement Merge

In [99]:
# get unique trades_settlement_id list
trades_settlement_id_list = trades_pl['trades_settlement_id'].unique().to_list()

In [100]:
query_index = len(trades_settlement_id_list)

In [101]:
# get index of every 999th value in query_index. Needs to be less than 999 otherwise we get a 413 Request Entity Too Large error. 
# Note that this doesn't happen with the Univ3 subgraph query below, it appears to be subgraph specific. I will leave this as an open conjecture.
query_index_list = [i for i in range(0, query_index, 999)]

In [102]:
# append the last value in the index list to the query_index_list
query_index_list.append(query_index)

In [103]:
data = []

In [104]:
# get field path
settlements_fp = cow_sg.Query.settlements

In [105]:
for i in range(1, len(query_index_list)):
    print(f'querying {query_index_list[i-1]} to {query_index_list[i]}. Remaining: {len(trades_settlement_id_list) - query_index_list[i]}, {((len(trades_settlement_id_list) - query_index_list[i]) / len(trades_settlement_id_list)) * 100:.2f}%')

    # define a "partition", which is part of the larger trades_settlement_id_list
    partition = trades_settlement_id_list[query_index_list[i-1]:query_index_list[i]]

    # define query path with partition of unique settlement ids
    qp = settlements_fp(
        first = query_size,
        where= {"txHash_in": partition},
    )
    
    # run query
    df = sg.query_df(qp)

    # append df to data list
    data.append(df)

querying 0 to 999. Remaining: 19629, 95.16%
querying 999 to 1998. Remaining: 18630, 90.31%
querying 1998 to 2997. Remaining: 17631, 85.47%
querying 2997 to 3996. Remaining: 16632, 80.63%
querying 3996 to 4995. Remaining: 15633, 75.79%
querying 4995 to 5994. Remaining: 14634, 70.94%
querying 5994 to 6993. Remaining: 13635, 66.10%
querying 6993 to 7992. Remaining: 12636, 61.26%
querying 7992 to 8991. Remaining: 11637, 56.41%
querying 8991 to 9990. Remaining: 10638, 51.57%
querying 9990 to 10989. Remaining: 9639, 46.73%
querying 10989 to 11988. Remaining: 8640, 41.88%
querying 11988 to 12987. Remaining: 7641, 37.04%
querying 12987 to 13986. Remaining: 6642, 32.20%
querying 13986 to 14985. Remaining: 5643, 27.36%
querying 14985 to 15984. Remaining: 4644, 22.51%
querying 15984 to 16983. Remaining: 3645, 17.67%
querying 16983 to 17982. Remaining: 2646, 12.83%
querying 17982 to 18981. Remaining: 1647, 7.98%
querying 18981 to 19980. Remaining: 648, 3.14%
querying 19980 to 20628. Remaining: 0, 

In [106]:
# convert dataframes from settlement_data_store to polars dataframes
settlement_data_store_pl = [pl.from_pandas(df) for df in data]

In [107]:
# convert list of pandas dataframes to polars dataframes
settlements_pl = pl.concat(settlement_data_store_pl)

In [108]:
# merge trades and settlement dataframes on the settlement transaction hash
cow_complete_pl = trades_pl.join(other=settlements_pl, left_on='trades_settlement_id', right_on='settlements_txHash', how='inner')

In [109]:
# sort trades_pl by trades_settlement_id
trades_pl.sort('trades_settlement_id').head(10)

trades_id,trades_timestamp,trades_gasPrice,trades_feeAmount,trades_txHash,trades_settlement_id,trades_buyAmount,trades_sellAmount,trades_sellToken_id,trades_buyToken_id,trades_order_id,trades_buyAmountEth,trades_sellAmountEth,trades_buyAmountUsd,trades_sellAmountUsd
str,i64,f64,f64,str,str,f64,f64,str,str,str,f64,f64,f64,f64
"""0x48164abf321d...",1677197603,27773000000.0,0.0,"""0x00019b1104ef...","""0x00019b1104ef...",9.0638e+19,150000000000.0,"""0xa0b86991c621...","""0xc02aaa39b223...","""0x48164abf321d...",90.637563,90.914197,149543.579352,150000.0
"""0xcc686467ca76...",1649511944,31075000000.0,2777800000000000.0,"""0x00030d1548bc...","""0x00030d1548bc...",3569300000.0,1.11e+18,"""0xc02aaa39b223...","""0xa0b86991c621...","""0xcc686467ca76...",1.106663,1.11,3569.341541,3580.103516
"""0x862f39a61405...",1655912145,40015000000.0,5.674e+16,"""0x0005c3b48d9a...","""0x0005c3b48d9a...",278130000000.0,2.5321e+20,"""0xc02aaa39b223...","""0xa0b86991c621...","""0x862f39a61405...",252.892022,253.207818,278133.767374,278481.083791
"""0xbeaec83a1e16...",1639800029,79102000000.0,1.0381e+16,"""0x000802bb157c...","""0x000802bb157c...",3448900000.0,9e+17,"""0xc02aaa39b223...","""0xa0b86991c621...","""0xbeaec83a1e16...",0.888747,0.9,3448.908738,3492.577356
"""0x2158084ae52e...",1677509399,23575000000.0,3091200000000000.0,"""0x0009d818aa53...","""0x0009d818aa53...",121540000000.0,7.3452e+19,"""0xc02aaa39b223...","""0xa0b86991c621...","""0x2158084ae52e...",73.292774,73.45208,121536.232845,121800.398218
"""0x749a8f91bb97...",1676973371,27164000000.0,4397900000000000.0,"""0x000aecb19298...","""0x000aecb19298...",4036600000.0,2.4e+18,"""0xc02aaa39b223...","""0xa0b86991c621...","""0x749a8f91bb97...",2.400228,2.4,4036.592754,4036.209219
"""0x06c6c8738c24...",1642578568,82893000000.0,5.5522e+16,"""0x00116fc14802...","""0x00116fc14802...",731680000000.0,2.4e+20,"""0xc02aaa39b223...","""0xa0b86991c621...","""0x06c6c8738c24...",239.222251,240.0,731683.061112,734061.876897
"""0x09bffb961643...",1655311984,85205000000.0,2.1293e+16,"""0x0012c800956e...","""0x0012c800956e...",4422300000.0,4.1084e+18,"""0xc02aaa39b223...","""0xa0b86991c621...","""0x09bffb961643...",4.09702,4.108368,4422.281461,4434.529818
"""0x081ad8ae645c...",1660187275,64022000000.0,23267987.0,"""0x0016ff2d2ceb...","""0x0016ff2d2ceb...",1e+18,1900800000.0,"""0xa0b86991c621...","""0xc02aaa39b223...","""0x081ad8ae645c...",1.0,1.012483,1877.329372,1900.763729
"""0x2cd19b08639f...",1659387701,15719000000.0,8630600000000000.0,"""0x001723f49308...","""0x001723f49308...",16264000000.0,1e+19,"""0xc02aaa39b223...","""0xa0b86991c621...","""0x2cd19b08639f...",10.015345,10.0,16263.982468,16239.063126


In [110]:
# print f the shapes of the dataframes
print(f'trades_pl shape: {trades_pl.shape}')
print(f'settlements_pl shape: {settlements_pl.shape}')
print(f'cow_complete_pl shape: {cow_complete_pl.shape}')

trades_pl shape: (21848, 15)
settlements_pl shape: (20628, 4)
cow_complete_pl shape: (21848, 18)


### Univ3 Swaps

In [111]:
# Load
univ3_sg = sg.load_subgraph('https://api.thegraph.com/subgraphs/name/messari/uniswap-v3-ethereum')

In [112]:
uni_swaps_qp = univ3_sg.Query.swaps

In [113]:
weth_usdc_list = [
    "0x88e6a0c2ddd26feeb64f039a2c41296fcb3f5640", # usdc/weth .05%
    "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" #usdc/weth .3%
]

# get timestamps list
timestamps_list = cow_complete_pl['trades_timestamp'].to_list()

# filter for unique values
cow_timestamps = list(set(timestamps_list))

In [114]:
swaps_query_index = len(cow_timestamps)

In [115]:
cow_timestamp_query_list = [i for i in range(0, len(cow_timestamps), 999)]
# append the last value in the index list to the query_index_list
cow_timestamp_query_list.append(swaps_query_index)

In [116]:
swaps_data = []

In [118]:
for i in range(1, len(cow_timestamp_query_list)):
    print(f'querying {cow_timestamp_query_list[i-1]} to {cow_timestamp_query_list[i]}. Remaining: {len(cow_timestamps) - cow_timestamp_query_list[i]}, {((len(cow_timestamps) - cow_timestamp_query_list[i]) / len(cow_timestamps)) * 100:.2f}%')

    # define a "partition", which is part of the larger cow_timestamps
    partition = cow_timestamps[cow_timestamp_query_list[i-1]:cow_timestamp_query_list[i]]

    # define query path with partition of unique settlement ids
    swaps_qp = uni_swaps_qp(
        orderBy=uni_swaps_qp.timestamp,
        orderDirection='desc',
        first=query_size * 5,
        where = {'timestamp_in': partition, 'amountInUSD_gt': filter_usd, 'amountOutUSD_gt': filter_usd, 'pool_in': weth_usdc_list} 
    )
    
    # run query
    df = sg.query_df(swaps_qp)

    # convert all int64 columns to float64 to avoid large int overflows
    df['swaps_gasLimit'] = df['swaps_gasLimit'].astype('float64')
    df['swaps_gasPrice'] = df['swaps_gasPrice'].astype('float64')
    df['swaps_tick'] = df['swaps_tick'].astype('float64')
    df['swaps_amountIn'] = df['swaps_amountIn'].astype('float64')
    df['swaps_amountOut'] = df['swaps_amountOut'].astype('float64')

    # append df to data list
    swaps_data.append(df)

querying 0 to 999. Remaining: 19604, 95.15%
querying 999 to 1998. Remaining: 18605, 90.30%
querying 1998 to 2997. Remaining: 17606, 85.45%
querying 2997 to 3996. Remaining: 16607, 80.60%
querying 3996 to 4995. Remaining: 15608, 75.76%
querying 4995 to 5994. Remaining: 14609, 70.91%
querying 5994 to 6993. Remaining: 13610, 66.06%
querying 6993 to 7992. Remaining: 12611, 61.21%
querying 7992 to 8991. Remaining: 11612, 56.36%
querying 8991 to 9990. Remaining: 10613, 51.51%
querying 9990 to 10989. Remaining: 9614, 46.66%
querying 10989 to 11988. Remaining: 8615, 41.81%
querying 11988 to 12987. Remaining: 7616, 36.97%
querying 12987 to 13986. Remaining: 6617, 32.12%
querying 13986 to 14985. Remaining: 5618, 27.27%
querying 14985 to 15984. Remaining: 4619, 22.42%
querying 15984 to 16983. Remaining: 3620, 17.57%
querying 16983 to 17982. Remaining: 2621, 12.72%
querying 17982 to 18981. Remaining: 1622, 7.87%
querying 18981 to 19980. Remaining: 623, 3.02%
querying 19980 to 20603. Remaining: 0, 

In [119]:
# convert dataframes to polars in swaps_data
swaps_df = [pl.from_pandas(df) for df in swaps_data]

In [120]:
# concat polars dataframes in swaps_df
swaps_pl = pl.concat(swaps_df)

In [121]:
# print swaps shape
print(f'swaps_pl shape: {swaps_pl.shape}')

swaps_pl shape: (36444, 19)


In [122]:
# get median transactions_gasUsed amount. Typical V3 swap is 127k gas for One-hop. However with multiple hops, gas will be higher. 352 reflects an avg of 3 hops worth of gas
tx_gas_median = swaps_pl['swaps_gasLimit'].median()
print(f'transaction gas median is {tx_gas_median}')

transaction gas median is 328304.5


In [123]:
# to get transaction gas used, we do gasUsed * gasPrice
swaps_pl = swaps_pl.with_columns([
    (127000 * pl.col("swaps_gasPrice")).alias('transaction_gas_fee_one_hop'),
    (tx_gas_median * pl.col("swaps_gasPrice")).alias('transaction_gas_fee_median')
    ])

In [124]:
swaps_pl = swaps_pl.with_columns([
    (pl.col("transaction_gas_fee_one_hop") / 10**18).alias('transaction_gas_fee_one_hop'), # wei is 10^9, but eth is 10^18
        (pl.col("transaction_gas_fee_median") / 10**18).alias('transaction_gas_fee_median') # wei is 10^9, but eth is 10^18
])

In [125]:
# print swaps shape
print(f'swaps_pl shape: {swaps_pl.shape}')

swaps_pl shape: (36444, 21)


### Merge Univ3 and CoW

In [126]:
# merge trades and swaps on timestamp value. We use outer join because we want to preserve all the datapoints.
cow_uni_outer_pl = cow_complete_pl.join(other=swaps_pl, left_on='trades_timestamp', right_on='swaps_timestamp', how='outer')

In [127]:
cow_uni_outer_pl.shape

(43182, 38)

In [128]:
# Truncate the dataframe
cow_uni_trunc_pl = cow_uni_outer_pl[[
    'trades_timestamp', 
    'trades_txHash',
    'trades_feeAmount',
    'trades_sellToken_id', 
    'trades_buyToken_id', 
    'trades_buyAmount',
    'trades_sellAmount',
    'swaps_pool_id', 
    'swaps_hash',
    'swaps_tokenIn_id', 
    'swaps_tokenOut_id',
    'swaps_amountIn',
    'swaps_amountOut',  
    'swaps_blockNumber',
    'transaction_gas_fee_one_hop',
    'transaction_gas_fee_median'
    ]]

In [129]:
#check pl dataframe size
cow_uni_trunc_pl.shape

(43182, 16)

In [130]:
chain_sg = sg.load_subgraph('https://api.thegraph.com/subgraphs/name/openpredict/chainlink-prices-subgraph')

In [131]:
chain_price_qp = chain_sg.Query.prices

In [132]:
chain_qp = chain_price_qp(
    first=query_size * 5,
    orderBy='timestamp',
    orderDirection='desc',
    where = {'timestamp_lt': timestamp, 'assetPair': "ETH/USD"}
    )

In [133]:
chain_df = sg.query_df(chain_qp)

In [None]:
# convert chain_df to polars
chain_pl = pl.from_pandas(chain_df)

In [None]:
# drop prices_id, endpoint
chain_pl = chain_pl.drop(['prices_id'])

In [None]:
# divide prices_prices by 10 ** 8
chain_pl = chain_pl.with_columns([
    (pl.col("prices_price") / 10**8).alias('prices_prices')
    ])

In [None]:
chain_pl.shape

In [None]:
# outer merge chain_df with cow_uni_outer_pl on timestamp
cow_uni_chain_outer_pl = cow_uni_trunc_pl.join(other=chain_pl, left_on='trades_timestamp', right_on='prices_timestamp', how='outer')

In [None]:
cow_uni_chain_outer_pl.shape

### Price Calculations

#### Decimals

In [None]:
# add decimals to cow trades sell tokens
cow_uni_chain_outer_pl = cow_uni_chain_outer_pl.with_columns(
    [
        pl.col('trades_sellToken_id'),
        (
            pl.when(pl.col('trades_sellToken_id') == 'WETH')
            .then(18)
            .otherwise(6)
            .cast(pl.UInt8)
        ).alias('trades_sellToken_decimals'),
    ]
)

# add decimals to cow trades buy tokens
cow_uni_chain_outer_pl = cow_uni_chain_outer_pl.with_columns(
    [
        pl.col('trades_buyToken_id'),
        (
            pl.when(pl.col('trades_buyToken_id') == 'WETH')
            .then(18)
            .otherwise(6)
            .cast(pl.UInt8)
        ).alias('trades_buyToken_decimals'),
    ]
)

# add decimals to cow trades sell tokens
cow_uni_chain_outer_pl = cow_uni_chain_outer_pl.with_columns(
    [
        pl.col('swaps_tokenIn_id'),
        (
            pl.when(pl.col('swaps_tokenIn_id') == 'WETH')
            .then(18)
            .otherwise(6)
            .cast(pl.UInt8)
        ).alias('swaps_tokenIn_decimals'),
    ]
)

# add decimals to cow trades buy tokens
cow_uni_chain_outer_pl = cow_uni_chain_outer_pl.with_columns(
    [
        pl.col('swaps_tokenOut_id'),
        (
            pl.when(pl.col('swaps_tokenOut_id') == 'WETH')
            .then(18)
            .otherwise(6)
            .cast(pl.UInt8)
        ).alias('swaps_tokenOut_decimals'),
    ]
)

In [None]:
# divide values by decimals
trades_swaps_converted_pl = cow_uni_chain_outer_pl.with_columns([
    (pl.col("trades_buyAmount") / (10**pl.col("trades_buyToken_decimals"))).alias('trades_buyAmount_converted'),
    (pl.col("trades_sellAmount") / (10**pl.col("trades_sellToken_decimals"))).alias('trades_sellAmount_converted'),
    (pl.col("swaps_amountIn") / (10**pl.col("swaps_tokenIn_decimals"))).alias('swaps_amountIn_converted'),
    (pl.col("swaps_amountOut") / (10**pl.col("swaps_tokenOut_decimals"))).alias('swaps_amountOut_converted'),
])

In [None]:
# divide buy/sell amounts to get directional execution price
trades_swaps_converted_trunc_pl = trades_swaps_converted_pl.with_columns([
    (pl.col("trades_buyAmount_converted") / pl.col("trades_sellAmount_converted")).alias('trades_buy_sell_ratio'),
    (pl.col("trades_sellAmount_converted") / pl.col("trades_buyAmount_converted")).alias('trades_sell_buy_ratio'),
    (pl.col("swaps_amountIn_converted") / pl.col("swaps_amountOut_converted")).alias('swaps_amountIn_amountOut_ratio'),
    (pl.col("swaps_amountOut_converted") / pl.col("swaps_amountIn_converted")).alias('swaps_amountOut_amountIn_ratio'),
])

In [None]:
# truncate dataframe
trades_swaps_converted_trunc_pl = trades_swaps_converted_trunc_pl[
    'trades_timestamp',
    'swaps_blockNumber',
    'trades_txHash',
    'trades_feeAmount',
    'trades_sellToken_id',
    'trades_buyToken_id',
    'trades_sellAmount_converted',
    'trades_buyAmount_converted',
    'swaps_pool_id',
    'swaps_tokenIn_id',
    'swaps_tokenOut_id',
    'swaps_amountIn_converted',
    'swaps_amountOut_converted',
    'transaction_gas_fee_one_hop',
    'transaction_gas_fee_median',
    'trades_buy_sell_ratio',
    'trades_sell_buy_ratio',
    'swaps_amountIn_amountOut_ratio',
    'swaps_amountOut_amountIn_ratio',
    'prices_assetPair_id',
    'prices_price'
]

In [None]:
trades_swaps_converted_trunc_pl.shape

In [None]:
# checkpoint, save to parquet
trades_swaps_converted_trunc_pl.write_parquet('cow_uni_chain_outer_pl_historical.parquet')