In [75]:
from scripts.fees_report_v2 import generate_reports
from pprint import pprint
import logging

POOL_ID = "0x32296969ef14eb0c6d29669c550d4a0449130230000200000000000000000080"
result = await generate_reports(POOL_ID, cycles=1)
swaps, joins = result

import asyncio
import json
import pprint
import pandas as pd
import httpx

LLAMA_API_URL = "https://coins.llama.fi/batchHistorical"

async def get(url, **kwargs):
    async with httpx.AsyncClient() as client:
        r = await client.get(url, **kwargs)
        try:
            r.raise_for_status()
        except Exception as e:
            print(r.text)
            raise e
        return r.json()


async def batch_request(url, coins_dict, search_width=300, batch_size=20):
    async def single_request(url, batch_coins, search_width):
        response = await get(url, params={"coins": json.dumps(batch_coins), "searchWidth": search_width})
        return response

    tasks = []

    for token, timestamps in coins_dict.items():
        n = len(timestamps)
        for i in range(0, n, batch_size):
            batch_timestamps = timestamps[i:i + batch_size]
            batch_coins = {token: batch_timestamps}
            tasks.append(single_request(url, batch_coins, search_width))

    results = await asyncio.gather(*tasks)

    logging.info(f"Batch request finished. {len(results)} results received.")
    return results


def process_tokens(df, col_name):
    df[col_name] = df[col_name].apply(lambda x: ['ethereum:' + t if ':' not in t else t for t in x] if isinstance(x, list) else 'ethereum:' + x if ':' not in x else x)
    
    # Check if the input is a list of tokens or a single token
    if df[col_name].apply(lambda x: isinstance(x, list)).any():
        tokens_agg = df.explode(col_name).groupby(col_name).agg(list).to_dict(orient="index")
    else:
        tokens_agg = df[[col_name, "timestamp"]].groupby(col_name).agg(list).to_dict(orient="index")
        
    return {k: v["timestamp"] for k, v in tokens_agg.items()}

pool_tokens_dict = process_tokens(joins, "pool.tokensList")
def merge_results(results):
    merged_list = []
    for result in results:
        for token_address, token_data in result["coins"].items():
            token_timestamps = [(token_address, t["timestamp"], t["price"], t["confidence"]) for t in token_data["prices"]]
            merged_list.extend(token_timestamps)

    return merged_list 

# Process tokenIn and tokenOut columns
token_in_dict = process_tokens(swaps, "tokenIn")
token_out_dict = process_tokens(swaps, "tokenOut")
all_tokens_dict = {}

for k, v in token_in_dict.items():
    if k in all_tokens_dict:
        all_tokens_dict[k].extend(v)
    else:
        all_tokens_dict[k] = v

for k, v in token_out_dict.items():
    if k in all_tokens_dict:
        all_tokens_dict[k].extend(v)
    else:
        all_tokens_dict[k] = v
all_tokens_response = await batch_request(LLAMA_API_URL, all_tokens_dict)
import pandas as pd
df =pd.DataFrame(merge_results(all_tokens_response), columns=["token", "timestamp", "price", "confidence"])
df = df.drop_duplicates()

def find_closest_timestamp_and_price(df, tokens, timestamp):
    if isinstance(tokens, list):
        closest_timestamps = []
        closest_prices = []
        
        for token in tokens:
            token_df = df[df["token"] == token]
            
            if token_df.empty:
                closest_timestamps.append(None)
                closest_prices.append(None)
            else:
                closest_timestamp = token_df.iloc[(token_df["timestamp"] - timestamp).abs().argsort()[:1]]["timestamp"].values[0]
                closest_price = token_df[token_df["timestamp"] == closest_timestamp]["price"].values[0]
                closest_timestamps.append(closest_timestamp)
                closest_prices.append(closest_price)
                
        return closest_timestamps, closest_prices
    else:
        token_df = df[df["token"] == tokens]
        
        if token_df.empty:
            return None, None

        closest_timestamp = token_df.iloc[(token_df["timestamp"] - timestamp).abs().argsort()[:1]]["timestamp"].values[0]
        closest_price = token_df[token_df["timestamp"] == closest_timestamp]["price"].values[0]
        return closest_timestamp, closest_price

swaps["closest_timestamp_tokenIn"], swaps["closest_timestamp_tokenIn_rate"] = zip(*swaps.apply(lambda x: find_closest_timestamp_and_price(df, x["tokenIn"], x["timestamp"]), axis=1))
swaps["closest_timestamp_tokenInUSD"] = swaps["tokenAmountIn"].astype(float) * swaps["closest_timestamp_tokenIn_rate"].astype(float)

swaps["closest_timestamp_tokenOut"], swaps["closest_timestamp_tokenOut_rate"] = zip(*swaps.apply(lambda x: find_closest_timestamp_and_price(df, x["tokenOut"], x["timestamp"]), axis=1))
swaps["closest_timestamp_tokenOutUSD"] = swaps["tokenAmountOut"].astype(float) * swaps["closest_timestamp_tokenOut_rate"].astype(float)

swaps["closest_timestamp_tokenDifferenceUSD"] = swaps["closest_timestamp_tokenInUSD"].astype(float) - swaps["closest_timestamp_tokenOutUSD"].astype(float)
swaps.head()

swaps.groupby("tokenIn").agg({"closest_timestamp_tokenDifferenceUSD": "sum", "swapFeesUSD": "sum", "valueUSD": "sum", "closest_timestamp_tokenOutUSD": "sum", "closest_timestamp_tokenInUSD": "sum"}).sort_values("closest_timestamp_tokenDifferenceUSD", ascending=False)


# Merge pool tokens dictionary with all_tokens_dict to avoid making duplicate requests
for k, v in pool_tokens_dict.items():
    if k in all_tokens_dict:
        all_tokens_dict[k].extend(v)
    else:
        all_tokens_dict[k] = v

# Perform batch requests for all tokens
all_tokens_response = await batch_request(LLAMA_API_URL, all_tokens_dict)
# Merge results and create a DataFrame
df = pd.DataFrame(merge_results(all_tokens_response), columns=["token", "timestamp", "price", "confidence"])
df = df.drop_duplicates()

# Find the closest timestamp and price for each join/exit event
joins["closest_timestamp_tokens"], joins["closest_timestamp_tokens_rate"] = zip(*joins.apply(lambda x: find_closest_timestamp_and_price(df, x["pool.tokensList"], x["timestamp"]), axis=1))

# Calculate the value in USD for each protocol fee amount
joins["protocolFeeAmountsUSD"] = joins.apply(lambda x: [float(amount) * float(rate) for amount, rate in zip(x["protocolFeeAmounts"], x["closest_timestamp_tokens_rate"])], axis=1)

# Calculate the value in USD for each amount
joins["amountsUSD"] = joins.apply(lambda x: [float(amount) * float(rate) for amount, rate in zip(x["amounts"], x["closest_timestamp_tokens_rate"])], axis=1)


joins.head()
# and now let's get the total values for protocolFeeAmountsUSD
joins["protocolFeeAmountsUSD"].explode().sum()


INFO:root:query MySwaps ( Fetching data for 1658718000 - 1657508400 with skip 0, page size 1000, pages per group 1, max pages 1000
INFO:root:query JoinExits Fetching data for 1658718000 - 1657508400 with skip 0, page size 1000, pages per group 1, max pages 1000
INFO:root:query JoinExits Total items: 126
INFO:root:query JoinExits Last task items: 126
INFO:root:query MySwaps ( Total items: 638
INFO:root:query MySwaps ( Last task items: 638
INFO:root:Batch request finished. 64 results received.
INFO:root:Batch request finished. 78 results received.


104818.04610616481

In [22]:
import asyncio
import json
import logging
import pandas as pd
import httpx
from pprint import pprint
from scripts.fees_report_v2 import generate_reports
import logging

LLAMA_API_URL = "https://coins.llama.fi/batchHistorical"


async def get(url, **kwargs):
    async with httpx.AsyncClient() as client:
        r = await client.get(url, **kwargs)
        try:
            r.raise_for_status()
        except Exception as e:
            print(r.text)
            raise e
        return r.json()


async def batch_request(url, coins_dict, search_width=300, batch_size=20):

    async def single_request(url, batch_coins, search_width):
        response = await get(url, params={"coins": json.dumps(batch_coins), "searchWidth": search_width})
        return response

    tasks = []

    for token, timestamps in coins_dict.items():
        n = len(timestamps)
        for i in range(0, n, batch_size):
            batch_timestamps = timestamps[i:i + batch_size]
            batch_coins = {token: batch_timestamps}
            tasks.append(single_request(url, batch_coins, search_width))

    results = await asyncio.gather(*tasks)

    logging.info(f"Batch request finished. {len(results)} results received.")
    return results


def process_tokens(df, col_name):
    df[col_name] = df[col_name].apply(lambda x: ['ethereum:' + t if ':' not in t else t for t in x] if isinstance(x, list) else 'ethereum:' + x if ':' not in x else x)
    
    if df[col_name].apply(lambda x: isinstance(x, list)).any():
        tokens_agg = df.explode(col_name).groupby(col_name).agg(list).to_dict(orient="index")
    else:
        tokens_agg = df[[col_name, "timestamp"]].groupby(col_name).agg(list).to_dict(orient="index")
        
    return {k: v["timestamp"] for k, v in tokens_agg.items()}


def merge_results(results):
    merged_list = []
    for result in results:
        for token_address, token_data in result["coins"].items():
            token_timestamps = [(token_address, t["timestamp"], t["price"], t["confidence"]) for t in token_data["prices"]]
            merged_list.extend(token_timestamps)

    return merged_list 


def find_closest_timestamp_and_price(df, tokens, timestamp):
    if isinstance(tokens, list):
        closest_timestamps = []
        closest_prices = []
        
        for token in tokens:
            token_df = df[df["token"] == token]
            
            if token_df.empty:
                closest_timestamps.append(None)
                closest_prices.append(None)
            else:
                closest_timestamp = token_df.iloc[(token_df["timestamp"] - timestamp).abs().argsort()[:1]]["timestamp"].values[0]
                closest_price = token_df[token_df["timestamp"] == closest_timestamp]["price"].values[0]
                closest_timestamps.append(closest_timestamp)
                closest_prices.append(closest_price)
                
        return closest_timestamps, closest_prices
    else:
        token_df = df[df["token"] == tokens]
        
        if token_df.empty:
            return None, None

        closest_timestamp = token_df.iloc[(token_df["timestamp"] - timestamp).abs().argsort()[:1]]["timestamp"].values[0]
        closest_price = token_df[token_df["timestamp"] == closest_timestamp]["price"].values[0]
        return closest_timestamp, closest_price

async def get_all_tokens_rates(swaps, joins):
    # Process tokenIn and tokenOut columns
    all_tokens_dict = process_tokens(swaps, "tokenIn")
    all_tokens_dict.update(process_tokens(swaps, "tokenOut"))
    
    pool_tokens_dict = process_tokens(joins, "pool.tokensList")
    all_tokens_dict.update((k, all_tokens_dict.get(k, []) + v) for k, v in pool_tokens_dict.items())

    # Perform batch requests for all tokens
    all_tokens_response = await batch_request(LLAMA_API_URL, all_tokens_dict)
    
    # Merge results and create a DataFrame
    df = pd.DataFrame(merge_results(all_tokens_response), columns=["token", "timestamp", "price", "confidence"])
    df = df.drop_duplicates()
    return df

async def analyze_pool(pool_id, cycles=1) -> pd.DataFrame:
    swaps, joins = await generate_reports(pool_id, cycles=1)

    df = await get_all_tokens_rates(swaps, joins)

    # Calculate values for swaps
    swaps["closest_timestamp_tokenIn"], swaps["closest_timestamp_tokenIn_rate"] = zip(*swaps.apply(lambda x: find_closest_timestamp_and_price(df, x["tokenIn"], x["timestamp"]), axis=1))
    swaps["closest_timestamp_tokenInUSD"] = swaps["tokenAmountIn"].astype(float) * swaps["closest_timestamp_tokenIn_rate"].astype(float)

    swaps["closest_timestamp_tokenOut"], swaps["closest_timestamp_tokenOut_rate"] = zip(*swaps.apply(lambda x: find_closest_timestamp_and_price(df, x["tokenOut"], x["timestamp"]), axis=1))
    swaps["closest_timestamp_tokenOutUSD"] = swaps["tokenAmountOut"].astype(float) * swaps["closest_timestamp_tokenOut_rate"].astype(float)

    swaps["closest_timestamp_tokenDifferenceUSD"] = swaps["closest_timestamp_tokenInUSD"].astype(float) -  swaps["closest_timestamp_tokenOutUSD"].astype(float)

    # Calculate values for joins
    joins["closest_timestamp_tokens"], joins["closest_timestamp_tokens_rate"] = zip(*joins.apply(lambda x: find_closest_timestamp_and_price(df, x["pool.tokensList"], x["timestamp"]), axis=1))

    joins["protocolFeeAmountsUSD"] = joins.apply(lambda x: [float(amount) * float(rate) for amount, rate in zip(x["protocolFeeAmounts"], x["closest_timestamp_tokens_rate"])], axis=1)

    joins["amountsUSD"] = joins.apply(lambda x: [float(amount) * float(rate) for amount, rate in zip(x["amounts"], x["closest_timestamp_tokens_rate"])], axis=1)
    return joins, swaps

In [77]:
# now let's explode amountsUSD and group by cycle
custom_analysis = joins.explode("protocolFeeAmountsUSD").groupby("Cycle")["protocolFeeAmountsUSD"].sum() + swaps.groupby("Cycle")["closest_timestamp_tokenDifferenceUSD"].sum()

joins["protocolFeeUSD"] = joins["protocolFeeUSD"].astype(float)
swaps["swapFeesUSD"] = swaps["swapFeesUSD"].astype(float)

base_analysis = joins.groupby("Cycle")["protocolFeeUSD"].sum() + swaps.groupby("Cycle")["swapFeesUSD"].sum()

pd.concat([base_analysis, custom_analysis], axis=1)

Unnamed: 0_level_0,0,1
Cycle,Unnamed: 1_level_1,Unnamed: 2_level_1
1,118758.367193,167075.910105


In [114]:


fee_differences = swaps[["swapFeeBasedOnPercentageAndTokenIn", "swapFeesUSD", "closest_timestamp_tokenDifferenceUSD", "timestamp"]]
fee_differences["pct_based_vs_subgraph"] =( fee_differences["swapFeeBasedOnPercentageAndTokenIn"] - fee_differences["swapFeesUSD"])/fee_differences["swapFeesUSD"]
# fee_differences["pct_based_vs_subgraph"].hist(bins=100)

swaps["swapFeeBasedOnPercentageAndTokenIn"] = swaps["tokenAmountIn"].astype(float) * swaps["closest_timestamp_tokenIn_rate"].astype(float) * .0004
swaps.groupby("Cycle")["swapFeeBasedOnPercentageAndTokenIn"].sum(),swaps.groupby("Cycle")["swapFeesUSD"].sum()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  fee_differences["pct_based_vs_subgraph"] =( fee_differences["swapFeeBasedOnPercentageAndTokenIn"] - fee_differences["swapFeesUSD"])/fee_differences["swapFeesUSD"]


(Cycle
 1    14402.81311
 Name: swapFeeBasedOnPercentageAndTokenIn, dtype: float64,
 Cycle
 1    14346.767216
 Name: swapFeesUSD, dtype: float64)

In [122]:
joins["protocolFeeAmountsUSD"].explode().sum(), joins["protocolFeeUSD"].sum()

(104818.04148647649, 104411.5999766607)

In [106]:
fee_differences["pct_based_vs_subgraph"].describe()

count    638.000000
mean       0.001483
std        0.011237
min       -0.050686
25%       -0.002763
50%        0.000471
75%        0.005025
max        0.073910
Name: pct_based_vs_subgraph, dtype: float64

In [None]:

differences_df = swaps[["timestamp", "closest_timestamp_tokenIn", "closest_timestamp_tokenOut"]]
differences_df["difference_in"] = swaps["closest_timestamp_tokenIn"] - swaps["timestamp"]
differences_df["difference_out"] = swaps["closest_timestamp_tokenOut"] - swaps["timestamp"]
differences_df.describe()
# Swap fee is always given in the token in, and merely multiplied by the pool swap fee

In [107]:
from ydata_profiling import ProfileReport

profile = ProfileReport(fee_differences, title="Swaps report")
profile.to_file("fee_differences.html")

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns={"index": "df_index"}, inplace=True)
Summarize dataset: 100%|██████████| 39/39 [00:01<00:00, 20.83it/s, Completed]                                                                         
Generate report structure: 100%|██████████| 1/1 [00:00<00:00,  1.43it/s]
Render HTML: 100%|██████████| 1/1 [00:00<00:00,  2.68it/s]
Export report to file: 100%|██████████| 1/1 [00:00<00:00, 294.09it/s]


In [None]:
joins["protocolFeeUSD"] = joins["protocolFeeUSD"].astype(float)
swaps["swapFeesUSD"] = swaps["swapFeesUSD"].astype(float)
swaps["swapFeeBasedOnPercentageAndTokenIn"] = swaps["tokenAmountIn"].astype(float) * swaps["closest_timestamp_tokenIn_rate"].astype(float) * .0004
fee_differences = swaps[["swapFeeBasedOnPercentageAndTokenIn", "swapFeesUSD", "closest_timestamp_tokenDifferenceUSD", "timestamp"]]
fee_differences["pct_based_vs_subgraph"] =( fee_differences["swapFeeBasedOnPercentageAndTokenIn"] - fee_differences["swapFeesUSD"])/fee_differences["swapFeesUSD"]
fee_differences["pct_based_vs_subgraph"].hist(bins=100)