In [1]:
"""
univ3-price.ipynb

Goal with this notebook is to plot the historical price timeseries
of a given pool over time.
"""
# switch to univ3-ape wd first for ape project to work properly for all contract address possibilities
import os
from pathlib import Path

if Path(*Path(os.getcwd()).parts[-3:]) == Path('univ3-ape/notebook/queries'):
    os.chdir('../..')

In [3]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import typing as tp
from ape import accounts, chain, Contract, networks, project

In [4]:
# SEE: https://gist.github.com/banteg/dcf6082ff7fc6ad51ce220146f29d9ff
networks.parse_network_choice('ethereum:mainnet:alchemy').__enter__()

<alchemy chain_id=1>

In [5]:
univ3 = project.dependencies['UniswapV3Core']['main']

In [8]:
# SEE: https://docs.uniswap.org/contracts/v3/reference/deployments
factory = Contract("0x1F98431c8aD98523631AE4a59f267346ea31F984")
factory

<UniswapV3Factory 0x1F98431c8aD98523631AE4a59f267346ea31F984>

In [6]:
head_block = chain.blocks.head.number

In [30]:
deploy_block = 12369621  # block when factory deployed
step = 2400  # sample price once per 8 hour
start_block = head_block - 7200 * 365 * 2 # last 2 years

In [31]:
# relevant block numbers to query for
blocks = range(start_block, head_block, step)
blocks

range(12509859, 17765859, 2400)

In [32]:
len(blocks)

2190

In [11]:
pool = univ3.UniswapV3Pool.at("0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640")  # USDC/ETH 0.05%

In [12]:
print(f"token0: {pool.token0()}, token1: {pool.token1()}, fee: {pool.fee()}")

token0: 0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48, token1: 0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2, fee: 500


In [14]:
slot0 = pool.slot0()
slot0

slot0_return(sqrtPriceX96=1842074316720006327801396470392331, tick=201091, observationIndex=112, observationCardinality=722, observationCardinalityNext=722, feeProtocol=0, unlocked=True)

In [33]:
%time

# query for tick data each block and insert into pandas dataframe
FILENAME = f"notebook/data/price_{pool.address}_{start_block}_{head_block}_{step}.csv"
is_head = True
for block in blocks:
    print(f"Processing block {block} ...", end='\r')
    
    # get the sqrt price data at block
    slot0 = pool.slot0(block_identifier=block)
    row = {'block_number': [block], 'sqrt_price_x96': [slot0.sqrtPriceX96]}
        
    # convert to pd dataframe then append to file
    df = pd.DataFrame(data=row)
    df.to_csv(FILENAME, mode='a', index=False, header=is_head)
        
    if is_head:
        is_head = False

CPU times: user 1 µs, sys: 0 ns, total: 1 µs
Wall time: 2.86 µs
Processing block 17763459 ...