# 1 - Data Collection

In this section, we fetch data from Uniswap V3 subgraph, and store them in json for further processing.

In [None]:
# Standard Library
import datetime as dt
import glob
import json
import os
from pprint import pprint

# Third Party Library
import numpy as np
import pandas as pd
from flatdict import FlatDict
from gql import Client, gql
from gql.transport.aiohttp import AIOHTTPTransport

# Local Folder Library
from pyammanalysis.graphql_helper import run_query
from pyammanalysis.util import read_yaml

In [None]:
# refetch setting - if True, rerun GraphQL queries
refetch = True

# config
config = read_yaml("../config.yaml")
DATA_PATH = config["DATA_PATH"]
DATA_TOKEN_DAY_PATH = os.path.join(DATA_PATH, "token", "day")
DATA_POOL_DAY_PATH = os.path.join(DATA_PATH, "pool", "day")
UNISWAP_V3_SUBGRAPH_URL = config["UNISWAP_V3_SUBGRAPH_URL"]

if refetch:
    transport = AIOHTTPTransport(url=UNISWAP_V3_SUBGRAPH_URL)

# start timestamp for time series
START_TIMESTAMP = 1619170975  # GMT: Friday, April 23, 2021 9:42:55 AM

# create folder if needed
for folder in [DATA_PATH, DATA_TOKEN_DAY_PATH, DATA_POOL_DAY_PATH]:
    if not os.path.exists(folder):
        os.makedirs(folder)

token_dict = config["tokens"]
token_addr_dict = config["token_addr"]  # mapping from symbol to addr
whitelisted_symbols = np.sort(
    np.concatenate([i for i in FlatDict(token_dict).itervalues()])
)

# address-related config
# addresses in `config.yaml` follow EIP-55: Mixed-case checksum address encoding
# enforce lower case by `str.lower()`
sym2addr = lambda symbol: config["token_addr"][
    symbol
].lower()  # mapping from symbol to addr
addr2sym = lambda addr: {v.lower(): k for k, v in config["token_addr"].items()}[
    addr
]  # mapping from addr to symbol
whitelisted_addresses = np.array(
    [i.lower() for i in FlatDict(token_addr_dict).itervalues()]
)

## Uniswap V3 Global Data
For now we only fetch the newest pool count and TVL.

In [None]:
CURRENT_GLOBAL_DATA_QUERY = """
{
    factory(id: "0x1F98431c8aD98523631AE4a59f267346ea31F984" ) {
        poolCount
        totalValueLockedUSD
    }
}
"""

GLOBAL_DATA_PATH = os.path.join(DATA_PATH, "globalData.json")

if refetch:
    global_data = run_query(UNISWAP_V3_SUBGRAPH_URL, CURRENT_GLOBAL_DATA_QUERY)["data"][
        "factory"
    ]
    with open(GLOBAL_DATA_PATH, "w") as f:
        json.dump(global_data, f, indent=4)
else:
    with open(GLOBAL_DATA_PATH, "r") as f:
        global_data = json.load(f)

pprint(global_data)

## Token Data

In [None]:
# get top 1000 tokens by TVL (but only analyze top 30)
TOP_TOKENS_QUERY = """
{
    tokens(first: 1000, orderBy: totalValueLockedUSD, orderDirection: desc) {
        id
        symbol
        name
        totalValueLockedUSD
    }
}
"""

TOKENS_DF_PATH = os.path.join(DATA_PATH, "tokens_df.csv")

if refetch:
    top_tokens = run_query(UNISWAP_V3_SUBGRAPH_URL, TOP_TOKENS_QUERY)["data"]
    tokens_df = pd.DataFrame.from_dict(top_tokens["tokens"])
    tokens_df.to_csv(TOKENS_DF_PATH, index=False)
else:
    tokens_df = pd.read_csv(TOKENS_DF_PATH)

# set column dtype
tokens_df = tokens_df.astype(
    {"id": str, "symbol": str, "name": str, "totalValueLockedUSD": np.float64}
)

tokens_df.head()

## Token Day Time Series
Ref: https://github.com/Uniswap/v3-info/blob/770a05dc1a191cf229432ebc43c1f2ceb3666e3b/src/data/tokens/chartData.ts#L14

In [None]:
def get_token_key(symbol: str, addr: str) -> str:
    """
    Generates a key for a token.
    The uniqueness of the key is guaranteed by the address,
    but the symbol is also prefixed for readability.
    """
    return f"{symbol}_{addr}"

In [None]:
TOKEN_DAY_TIME_SERIES = """
    query tokenDayDatas($startTime: Int!, $skip: Int!, $address: String!) {
        tokenDayDatas(
            first: 1000
            skip: $skip
            where: { token: $address, date_gt: $startTime }
            orderBy: date
            orderDirection: asc
            subgraphError: allow
        ) {
            date
            volumeUSD
            totalValueLockedUSD
        }
    }
"""

In [None]:
fetch_token_error = []


async def fetch_token_chart_data(
    address: str, symbol: str, transport: AIOHTTPTransport = transport
):
    error = False
    skip = 0
    all_found = False
    result = {"tokenDayDatas": []}

    async with Client(
        transport=transport,
        fetch_schema_from_transport=True,
        execute_timeout=8,  # TODO: investigate timeout
    ) as session:
        params = {"address": address, "startTime": START_TIMESTAMP, "skip": skip}
        try:
            while not all_found:
                temp = await session.execute(
                    gql(TOKEN_DAY_TIME_SERIES), variable_values=params
                )
                skip += 1000
                if len(temp["tokenDayDatas"]) < 1000 or error:
                    all_found = True
                if temp:
                    # concat the lists
                    result["tokenDayDatas"] = (
                        result["tokenDayDatas"] + temp["tokenDayDatas"]
                    )
        except Exception as e:
            print(e)
            error = True
            fetch_token_error.append(address)

    if not error:
        if not os.path.exists(DATA_TOKEN_DAY_PATH):
            os.makedirs(DATA_TOKEN_DAY_PATH)

        with open(
            f"{DATA_TOKEN_DAY_PATH}/{get_token_key(symbol, address)}.json",
            "w",
        ) as f:
            json.dump(result, f, indent=4)

In [None]:
fetch_token_error = []

if refetch:
    # remove existing content in the out folder
    for f in glob.glob(DATA_TOKEN_DAY_PATH + "/*"):
        os.remove(f)

    # fetch token day data for each token
    for row in tokens_df.iloc[:30].itertuples():
        await fetch_token_chart_data(
            row.id,
            row.symbol,
        )
    print(fetch_token_error)

## Pool Data

In [None]:
def get_pool_key(symbol0: str, symbol1: str, fee_tier: int) -> str:
    """
    Generates a key for a pool.
    `token0`, `token1` and `feeTier` together uniquely define a pool.
    But using symbol instead of token address involve a risk.
    """
    return f"{symbol0}_{symbol1}_{fee_tier}"

In [None]:
# get top 50 pools by TVL (but only analyze top 20)
TOP_POOLS_QUERY = """
{
    pools(first: 50, orderBy: totalValueLockedUSD, orderDirection: desc) {
        id
        token0 {
            id
            symbol
        }
        token1 {
            id
            symbol
        }
        feeTier
        totalValueLockedUSD
    } 
}
"""

POOLS_DF_PATH = os.path.join(DATA_PATH, "pools_df.csv")

if refetch:
    top_pools = run_query(UNISWAP_V3_SUBGRAPH_URL, TOP_POOLS_QUERY)["data"]
    pools_df = pd.json_normalize(top_pools["pools"])
    pools_df.to_csv(POOLS_DF_PATH, index=False)
else:
    pools_df = pd.read_csv(POOLS_DF_PATH)

# set column dtype
pools_df = pools_df.astype(
    {
        "id": str,
        "feeTier": int,
        "totalValueLockedUSD": np.float64,
        "token0.id": str,
        "token0.symbol": str,
        "token1.id": str,
        "token1.symbol": str,
    }
)

# only analyze top 20
pools_df = pools_df.iloc[:20]

pools_df.head()

In [None]:
# whitelist a pool if both its token0 and token1 are whitelisted
is_whitelisted_pool = pools_df["token0.id"].isin(whitelisted_addresses) & pools_df[
    "token1.id"
].isin(whitelisted_addresses)
pools_df = pools_df[is_whitelisted_pool]

# add name
pools_df["name"] = pools_df.apply(
    lambda x: get_pool_key(
        addr2sym(x["token0.id"]), addr2sym(x["token1.id"]), x["feeTier"]
    ),
    axis=1,
)

pools_df.head()

In [None]:
pools_df.info()

## Pool Day Time Series
Ref: https://github.com/Uniswap/v3-info/blob/770a05dc1a191cf229432ebc43c1f2ceb3666e3b/src/data/pools/chartData.ts#L14

In [None]:
POOL_DAY_TIME_SERIES = """
    query poolDayDatas($startTime: Int!, $skip: Int!, $address: String!) {
        poolDayDatas(
            first: 1000
            skip: $skip
            where: { pool: $address, date_gt: $startTime }
            orderBy: date
            orderDirection: asc
            subgraphError: allow
        ) {
            date
            volumeUSD
            tvlUSD
        }
    }
"""

In [None]:
fetch_pool_error = []


async def fetch_pool_chart_data(
    address: str,
    symbol0: str,
    symbol1: str,
    fee_tier: int,
    transport: AIOHTTPTransport = transport,
):
    error = False
    skip = 0
    all_found = False
    result = {"poolDayDatas": []}

    async with Client(
        transport=transport,
        fetch_schema_from_transport=True,
        execute_timeout=8,  # TODO: investigate timeout
    ) as session:
        params = {"address": address, "startTime": START_TIMESTAMP, "skip": skip}
        try:
            while not all_found:
                temp = await session.execute(
                    gql(POOL_DAY_TIME_SERIES), variable_values=params
                )
                skip += 1000
                if len(temp["poolDayDatas"]) < 1000 or error:
                    all_found = True
                if temp:
                    # concat the lists
                    result["poolDayDatas"] = (
                        result["poolDayDatas"] + temp["poolDayDatas"]
                    )
        except Exception as e:
            print(e)
            error = True
            fetch_pool_error.append(address)

    if not error:
        if not os.path.exists(DATA_POOL_DAY_PATH):
            os.makedirs(DATA_POOL_DAY_PATH)

        with open(
            f"{DATA_POOL_DAY_PATH}/{get_pool_key(symbol0, symbol1, fee_tier)}.json",
            "w",
        ) as f:
            json.dump(result, f, indent=4)

In [None]:
if refetch:
    # remove existing content in the out folder
    for f in glob.glob(DATA_POOL_DAY_PATH + "/*"):
        os.remove(f)

    # fetch pool data for each pool
    for i, row in pools_df.iterrows():
        await fetch_pool_chart_data(
            row["id"],
            row["token0.symbol"],
            row["token1.symbol"],
            row["feeTier"],
        )
    print(fetch_pool_error)

In [None]:
# reads pool day datas from json
pool_day_df = pd.DataFrame(columns=["date"])
pool_names = []

for f in os.listdir(DATA_POOL_DAY_PATH):
    fullname = os.fsdecode(f)

    # not a rigorous check
    with open(os.path.join(DATA_POOL_DAY_PATH, fullname), "r") as file:
        pool_day_datas = json.load(file)

    # parse dict as df
    temp = pd.DataFrame.from_dict(pool_day_datas["poolDayDatas"]).astype(
        {"volumeUSD": np.float64, "tvlUSD": np.float64}
    )

    # prefix columns (except "date") with pool name
    cols = temp.columns[~temp.columns.isin(["date"])]
    pool_name = fullname.split(os.sep)[-1].split(".")[0]
    pool_names.append(pool_name)
    temp.rename(columns=dict(zip(cols, pool_name + "_" + cols)), inplace=True)

    # outer join: union of items on "date"
    pool_day_df = pd.merge(pool_day_df, temp, how="outer", on=["date"])

# sort by "date"
pool_day_df = pool_day_df.sort_values(by="date").reset_index(drop="index")

pool_day_df.head()

In [None]:
# ["date"]: int -> date (in "YYYY-MM-DD")
pool_day_df["timestamp"] = pool_day_df["date"]  # keep timestamp in a new col
pool_day_df["date"] = pool_day_df["date"].map(dt.date.fromtimestamp)

pool_day_df.head()

In [None]:
pool_day_df.info()

In [None]:
# sanity check for number of days elapsed
d0 = pool_day_df["date"].iloc[0]
d1 = pool_day_df["date"].iloc[-1]
print(f"{d0} to {d1} has {(d1 - d0).days} days")

In [None]:
pool_day_df.to_csv(os.path.join(DATA_PATH, "pool_day_df.csv"), index=False)