In [1]:
import json 
import os
import logging 
import asyncio

from datetime import timedelta
from pathlib import Path 
from pprint import PrettyPrinter

from subgrounds import Subgrounds
from web3 import Web3
from pycoingecko import CoinGeckoAPI

from prefect import task, flow 
from prefect.client import get_client
from prefect.tasks import task_input_hash

import pandas as pd 
import numpy as np 
import altair as alt 
import missingno as miss

from flywheel_util.constants import (
    colors_24,
    colors_28, 
    addresses, 
    url_infura, 
    url_snapshot, 
    url_subgraphs, 
    snapshot_api_max_records_per_request, 
    snapshot_api_max_skip,
)
from flywheel_util.tasks.general import df_to_sql
from flywheel_util.utils.util import (
    ddf, 
    first_row, 
    query_attrs,
    recursive_index_merge, 
    remove_prefix,
    df_cols_change_prefix, 
    df_cols_camel_to_snake, 
)

from sqlalchemy import create_engine
from sqlalchemy import text
engine = create_engine("sqlite+pysqlite:///votium_bribes.db", echo=False, future=True)

# logging.basicConfig(level=logging.INFO)

import logging
logging.basicConfig(level=logging.DEBUG)

pp = PrettyPrinter().pprint

alt.data_transformers.disable_max_rows()

  from cytoolz import (
  if LooseVersion(eth_abi.__version__) < LooseVersion("2"):


DataTransformerRegistry.enable('default')

In [2]:
# w3 = Web3(Web3.HTTPProvider(url_infura))

In [3]:
sg = Subgrounds()
sg_curve_vol = sg.load_subgraph(url_subgraphs.convex.curve_vol_mainnet)

In [4]:
url_subgraphs.convex.curve_vol_mainnet

'https://api.thegraph.com/subgraphs/name/convex-community/volume-mainnet'

In [5]:
@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1), tags=["curve_subgraph"])
def query_pools(metapool_lp_token: str): 
    # Get all pools 
    query_pools = sg_curve_vol.Query.pools(
        first=1000, where={"coins_contains": [metapool_lp_token]}
    )
    dfs = sg.query_df([
        query_pools.id, 
        query_pools.symbol, 
        query_pools.coins, 
        query_pools.coinNames, 
    ])
    return dfs 
    
@task 
def process_pools(dfs): 
    df_pool_coin = recursive_index_merge(dfs) 
    df_pool_coin = df_cols_change_prefix(df_pool_coin, "pools_", "pool_")
    df_pool_coin = df_pool_coin.rename(columns={"pool_coins": "coin", "pool_coinNames": "coin_name"})
    return df_pool_coin

@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1), tags=["curve_subgraph"])
def query_pool_snaps(metapool_lp_token: str): 
    query_snaps = sg_curve_vol.Query.dailyPoolSnapshots(
        first=100000, orderBy="timestamp", orderDirection="desc", where={"pool_": {"coins_contains": [metapool_lp_token]}}
    )
    dfs = sg.query_df([
        query_snaps.pool.coins, 
        query_snaps.pool.id, 
        query_snaps.timestamp, 
        query_snaps.reservesUSD, 
    ])
    return dfs 

@task 
def process_pool_snaps(dfs): 
    df_pool_snaps = recursive_index_merge(dfs) 
    df_pool_snaps = remove_prefix(df_pool_snaps, "dailyPoolSnapshots_")
    df_pool_snaps = df_cols_camel_to_snake(df_pool_snaps)
    df_pool_snaps = df_pool_snaps.rename(columns={"pool_coins": "coin"})
    df_pool_snaps.timestamp = pd.to_datetime(df_pool_snaps.timestamp, unit='s')
    return df_pool_snaps

@task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1), tags=["curve_subgraph"])
def query_pool_vol(metapool_lp_token: str): 
    query_vol = sg_curve_vol.Query.swapVolumeSnapshots(
        first=100000, orderBy="timestamp", orderDirection="desc", 
        where={"period": 86400, "pool_": {"coins_contains": [metapool_lp_token]}}
    )
    df_pool_vol = sg.query_df([
        query_vol.pool.id,
        query_vol.timestamp, 
        query_vol.volumeUSD, 
    ])
    return df_pool_vol

@task 
def process_pool_vol(dfs): 
    df_pool_vol = remove_prefix(df_pool_vol, "swapVolumeSnapshots_")
    df_pool_vol = df_cols_camel_to_snake(df_pool_vol)
    df_pool_vol.timestamp = pd.to_datetime(df_pool_vol.timestamp, unit='s')
    return df_pool_vol

@flow 
async def flow_metapool(metapool_lp_token: str):
    dfs_pool_coin = query_pools.submit(metapool_lp_token)  
    dfs_pool_snaps = query_pool_snaps.submit(metapool_lp_token)
    df_pool_vol = query_pool_vol.submit(metapool_lp_token) 
    
    df_pool_coin = process_pools(dfs_pool_coin)
    df_pool_snaps = process_pool_snaps(dfs_pool_snaps)
    df_pool_vol = process_pool_vol(df_pool_vol)

    first_row(df_pool_coin) 
    first_row(df_pool_snaps) 
    first_row(df_pool_vol) 

    df_reserves = df_pool_coin.merge(df_pool_snaps, on=['pool_id', 'coin'])
    df_volume = df_pool_coin[['pool_id', 'pool_symbol']].drop_duplicates().merge(df_pool_vol, on=['pool_id'])

    first_row(df_reserves) 
    first_row(df_volume) 
    
    return {"reserves": df_reserves, "volume": df_volume} 

@flow 
async def flow_fraxbp_vs_3pool(): 
    return await asyncio.gather(*[
        flow_metapool(addresses.token[key]) 
        for key in ['3crv', 'crvfrax']
    ])

In [None]:
async with get_client() as client:
    # set a concurrency limit of 10 on the 'small_instance' tag
    limit_id = await client.create_concurrency_limit(tag="curve_subgraph", concurrency_limit=2)

data = await flow_fraxbp_vs_3pool()

--- Orion logging error ---
Traceback (most recent call last):
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
    self._adapt_connection._handle_exception(error)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
    raise error
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/util/_concurrency

--- Orion logging error ---
Traceback (most recent call last):
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
    self._adapt_connection._handle_exception(error)
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
    raise error
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/opt/miniconda3/envs/flywheel/lib/python3.10/site-packages/sqlalchemy/util/_concurrency

In [None]:
# first_row(df_pool_coin) 
# first_row(df_pool_snaps) 
# first_row(df_pool_vol) 

# df_reserves = df_pool_coin.merge(df_pool_snaps, on=['pool_id', 'coin'])
# df_volume = df_pool_coin[['pool_id', 'pool_symbol']].drop_duplicates().merge(df_pool_vol, on=['pool_id'])

# first_row(df_reserves) 
# first_row(df_volume) 

In [None]:
# @task(persist_result=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1))
# def query_pool_reserves(metapool_lp_token, ): 
#     # Get most recent snapshot for all metapools 
#     query_pools = sg_curve_vol.Query.pools(
#         first=1000, where={"coins_contains": [metapool_lp_token]}
#     )
#     query_snaps = query_pools.dailyPoolSnapshots(first=365*3, orderBy="timestamp", orderDirection="desc") 
#     query_vol = query_pools.swapVolumeSnapshots(first=365*3, orderBy="timestamp", orderDirection="desc")
#     data = sg.query_json([
#         query_pools.id, 
#         query_pools.symbol, 
#         query_pools.coins, 
#         query_pools.coinNames, 
#         query_snaps.timestamp, 
#         query_snaps.reservesUSD, 
#         query_vol.timestamp, 
#         query_vol.volumeUSD, 
#     ])
#     return data 
    
# @task 
# def process_pools(data): 
#     """The flattening performed by query_df is really messy so instead we use query_json 
#     and postprocess the output. 
#     """
#     assert len(data) == 1
#     data = data[0]
#     assert len(data) == 1
#     outer_alias = list(data.keys())[0]
#     expected_keys = ['id', 'symbol', 'coins', 'coinNames']
#     records = []
#     for pool_data in data[outer_alias]: 
#         diff_keys = set(pool_data.keys()).difference(set(expected_keys))
#         assert len(diff_keys) == 1 
#         inner_alias = diff_keys.pop() 
#         pool_snaps = pool_data[inner_alias] 
#         for s in pool_snaps: 
#             for coin, coin_name, reserves_usd in zip(
#                 pool_data['coins'], pool_data['coinNames'], s['reservesUSD']
#             ): 
#                 r = {
#                     "pool_id": pool_data['id'], 
#                     "pool_symbol": pool_data['symbol'], 
#                     "coin": coin, 
#                     "coin_name": coin_name, 
#                     "timestamp": pd.to_datetime(s['timestamp'], unit="s"), 
#                     "reserves_usd": reserves_usd, 
#                 }
#                 records.append(r)
#     df = pd.DataFrame(records)
#     return df

In [None]:
# @flow 
# def flow_fraxbp_vs_3pool(): 
#     data_fraxbp = query_pools.submit(addresses.token.crvfrax)
#     data_3pool = query_pools.submit(addresses.token['3crv'])
#     df_fraxbp = process_pools(data_fraxbp)  
#     df_3pool = process_pools(data_3pool) 
#     return df_fraxbp, df_3pool

In [None]:
# df_fraxbp, df_3pool = flow_fraxbp_vs_3pool()

In [None]:
def chart_metapool_lp_against_paired_asset(df, coin_name):
    df_mp = df.loc[df.coin_name == coin_name].groupby("timestamp")['reserves_usd'].sum().reset_index()
    df_mp['segment'] = coin_name
    df_mp_paired = df.loc[df.coin_name != coin_name].groupby("timestamp")['reserves_usd'].sum().reset_index()
    df_mp_paired['segment'] = 'paired asset'
    df_full = pd.concat([df_mp, df_mp_paired])
    return (
        alt.Chart(df_full)
        .mark_area()
        .encode(
            x="timestamp:T", 
            y="reserves_usd:Q", 
            color="segment:N", 
            tooltip=["timestamp", "reserves_usd", "segment"]
        )
    )

In [None]:
# first_row(df_fraxbp)

In [None]:
df_fraxbp_last = df_fraxbp.copy()
df_fraxbp_last['max_timestamp'] = df_fraxbp_last.groupby(['pool_symbol'])['timestamp'].transform("max")
df_fraxbp_last = df_fraxbp_last.loc[df_fraxbp_last.timestamp == df_fraxbp_last.max_timestamp]

df_3pool_last = df_3pool.copy()
df_3pool_last['max_timestamp'] = df_3pool_last.groupby(['pool_symbol'])['timestamp'].transform("max")
df_3pool_last = df_3pool_last.loc[df_3pool_last.timestamp == df_3pool_last.max_timestamp]

ddf(df_fraxbp_last.sort_values('reserves_usd', ascending=False))

In [None]:
# len(df_3pool_last)
# ddf(df_3pool_last.sort_values('reserves_usd', ascending=False))

In [None]:
(
    chart_metapool_lp_against_paired_asset(df_fraxbp, 'crvFRAX').properties(title="FraxBP Metapools") | 
    chart_metapool_lp_against_paired_asset(df_3pool, '3Crv').properties(title="3Pool Metapools")
)

In [None]:
def chart_tvl_breakdown(df, asset_name): 
    print(f"Total tvl ($) paired against {asset_name} in Metapools: ${df.reserves_usd.sum():,}")
    return (
        alt.Chart(df)
        .mark_arc()
        .encode(
            theta="reserves_usd", 
            color="pools_name", 
            tooltip=["pools_name", alt.Tooltip("reserves_usd", format=",d")]
        )
        .properties(title=f"Breakdown of TVL paired against {asset_name} in Metapools")
    ) 


In [None]:
chart_tvl_breakdown(df_fraxbp, "crvFRAX")

In [None]:
chart_tvl_breakdown(df_3pool, "3Crv")