# Seminar - Example of getting chain data
Getting current state of Carmine Options AMM from StarkNet.

In [41]:
from dataclasses import dataclass
import asyncio
import time
import datetime
import os

from starknet_py.net.full_node_client import FullNodeClient
from starknet_py.hash.selector import get_selector_from_name
from starknet_py.net.client_models import Call

In [4]:
AMM_ADDRESS = 0x047472e6755afc57ada9550b6a3ac93129cc4b5f98f51c73e0644d129fd208d9 # Carmine Options Amm address

CUBIT_SHIFT = 2**64

# RPC setup
RPC_URL = 'https://starknet-mainnet.public.blastapi.io'
NET = FullNodeClient(RPC_URL)

In [6]:
def felt_to_str(felt) -> str:
    """
    Helper function that takes felt and creates string out of it.
    """
    num_bytes = (felt.bit_length() + 7) // 8
    bytes_ = felt.to_bytes(num_bytes, "big")
    return bytes_.decode("utf-8")

In [7]:
async def function_call(client: FullNodeClient, address: int, selector: str, calldata: list[int | None]) -> list[int]:
    """
    Wrapper for StarkNet chain calls. -> gets chain data
    """
    call = Call(
        to_addr=address,
        selector=get_selector_from_name(selector),
        calldata=calldata
    )
    res = await client.call_contract(call)
    return res

async def function_call_from_block(client: FullNodeClient, address: int, selector: str, calldata: list[int], block_num: int) -> list[int]:
    """
    Wrapper for StarkNet chain calls for HISTORICAL data. -> gets HISTORICAL chain data
    """
    call = Call(
        to_addr=address,
        selector=get_selector_from_name(selector),
        calldata=calldata
    )
    res = await client.call_contract(call, block_number=block_num)
    return res

In [8]:
# Fetch list of lptokens
lptokens = await function_call(NET, AMM_ADDRESS, 'get_all_lptoken_addresses', [])
lptokens = lptokens[1:] # First element is length of the array 

In [9]:
@dataclass
class Token:
    """
    Class representing single Starknet token.
    """
    decimals: int
    symbol: str
    address: int

    @classmethod
    async def from_address(cls, address: int, client: FullNodeClient) -> 'Token':
        decimals = await function_call(client, address, 'decimals', [])
        symbol = await function_call(client, address, 'symbol', [])

        return Token(
            decimals=decimals[0],
            symbol= felt_to_str(symbol[0]),
            address = address
        )

    def __repr__(self) -> str:
        return f"""Token(
    decimals={self.decimals},
    symbol={self.symbol},
    address={hex(self.address)}
)"""


@dataclass
class PoolInfo:
    """
    Class representing Carmine Options AMM pool info.

    Footnote: ETH/USDC, ETH is the base token and USDC is the quote token.
    """
    quote_token_address: int
    base_token_address: int
    option_type: int
    address: int

    @classmethod
    async def from_address(cls, address: int, client: FullNodeClient) -> 'PoolInfo':
        """
        Constructs Pool info from the address of the lptoken.
        """
        res = await function_call(
            client,
            AMM_ADDRESS, 
            'get_pool_definition_from_lptoken_address', [address]
        )
        return PoolInfo(
            quote_token_address=res[0],
            base_token_address=res[1],
            option_type=res[2],
            address=address
        )

    def __repr__(self) -> str:
        return f"""PoolInfo(
    quote_token_address={hex(self.quote_token_address)},
    base_token_address={hex(self.base_token_address)},
    option_type={self.option_type},
    address={hex(self.address)}
)"""

    

In [10]:
tokens = [ Token.from_address(i, NET) for i in lptokens ]
tokens = await asyncio.gather(*tokens)

In [14]:
pool_infos = [ PoolInfo.from_address(i, NET) for i in lptokens ]
pool_infos = await asyncio.gather(*pool_infos)

In [16]:
for pool in pool_infos:
    # Fetch unlocked and value of position
    unlocked = await function_call(NET, AMM_ADDRESS, 'get_unlocked_capital', [pool.address])

    value_of_position = await function_call(NET, AMM_ADDRESS, 'get_value_of_pool_position', [pool.address])
    value_of_position = value_of_position[0] / CUBIT_SHIFT 

    base_token = await Token.from_address(pool.base_token_address, NET)
    quote_token = await Token.from_address(pool.quote_token_address, NET)

    if pool.option_type == 0: # Call options
        type_str = 'Call'
        capital_token = base_token.symbol

        pool_tvl = (unlocked[0] / 10**base_token.decimals) + value_of_position 
    else: # Put
        type_str = 'Put'
        capital_token = quote_token.symbol

        pool_tvl = (unlocked[0] / 10**quote_token.decimals) + value_of_position 


    print(
        '{:<17}'.format(f'TVL of {base_token.symbol}/{quote_token.symbol} ')
        + '{:<12}'.format(f'{type_str} pool: ')
        + '{:>9}'.format(str(float('%0.2f' % pool_tvl)))
        + f' {capital_token: >4}'
    )
    

TVL of ETH/USDC  Call pool:      19.02  ETH
TVL of ETH/USDC  Put pool:    53151.81 USDC
TVL of WBTC/USDC Call pool:       0.02 WBTC
TVL of WBTC/USDC Put pool:    16913.38 USDC
TVL of ETH/STRK  Call pool:       3.43  ETH
TVL of ETH/STRK  Put pool:    88859.89 STRK
TVL of STRK/USDC Call pool:  223789.39 STRK
TVL of STRK/USDC Put pool:    73670.63 USDC


### Interacting with Starknet
- Call vs. Invoke
- Call reads data (or dry-runs some function), invoke changes state

In [146]:
# Pragma docs:
# https://docs.pragmaoracle.com/Resources/Cairo%201/data-feeds/consuming-data

# Oracle Address
PRAGMA_ADDRESS = 0x2a85bd616f912537c50a49a4076db02c00b29b2cdc8a197ce92ed1837fa875b

# Keys representing assets
BTC_USD_KEY = 18669995996566340
SOL_USD_KEY = 23449611697214276
ETH_USD_KEY = 19514442401534788

In [25]:
# Enums used by Pragma

# enum DataType {
#     SpotEntry: felt252,
#     FutureEntry: (felt252, u64),
#     GenericEntry: felt252,
# }

# enum AggregationMode {
#     Median: (),
#     Mean: (),
#     Error: (),
# }

# Note on using enums on starknet in client: 
#   To pass enum via client, you have to pass the index of enum member
#       ie -> we want to pass DataType::SpotEntry(BTC_KEY) -> we would pass [0, BTC_KEY] (SpotEntry is first member in DataType Enum)
#   Works slightly differently when calling/invoking via Contract instance, explained down the road

In [147]:
@dataclass
class PragmaPrice:
    """
    Dataclass representing Pragma price info entry.
    """
    price: int
    decimals: int
    last_updated_timestamp: int
    num_sources_aggregated: int

async def get_pragma_median_price(client: FullNodeClient, oracle_address: int, asset: int) -> PragmaPrice:
    """
    Fetches median price from Pragma oracle.

    Parameters:
    - client: FullNodeClient instance
    - oracle_address: Pragma oracle contract address
    - asset: Asset key (ie 23449611697214276 for SOL_USD)

    Returns:
    - PragmaPrice -> struct containig price info from Pragma oracle
    """
    
    call_data = [
        0, # SpotEntry DataType member
        asset, # Key of asset
        0, # Aggregation mode, in this case Median 
    ]
    res = await function_call(client, oracle_address, 'get_data', call_data)
    return PragmaPrice(
        price = res[0], 
        decimals = res[1],
        last_updated_timestamp=res[2],
        num_sources_aggregated= res[3],
    )

In [151]:
@dataclass
class PragmaCheckpoint:
    """
    Dataclass representing Pragma checkpoint entry.
    """
    timestamp: int
    value: int # Always with 8 decimals (value / 10**8 to get human readable price)
    aggregation_mode: int
    num_sources_aggregated: int
    index: int

async def get_pragma_checkpoint(client: FullNodeClient, oracle_address: int, timestamp: int, asset: int) -> PragmaCheckpoint:
    """
    Fetches checkpoint from Pragma oracle.

    Parameters:
    - client: FullNodeClient instance
    - oracle_address: Pragma oracle contract address
    - timestamp: timestamp before which to get the latest checkpoint
    - asset: Asset key (ie 23449611697214276 for SOL_USD)


    Returns:
    - PragmaCheckpoint -> Struct checkpoint info info from Pragma oracle 

    Notes: 
    - For given timestamp, always returns the last checkpoint set before the timestamp.
    """
    call_data = [
        0, # DataType Spot
        asset,
        timestamp,
        0, # AggregationMode::Median
    ]
    
    res = await function_call(
        client,
        oracle_address,
        'get_last_checkpoint_before',
        call_data
    )   
    return PragmaCheckpoint(
        timestamp= res[0],
        value= res[1],
        aggregation_mode= res[2],
        num_sources_aggregated= res[3],
        index = res[-1]
    )

### Setting checkpoint

In [47]:
from starknet_py.contract import Contract
from starknet_py.net.signer.stark_curve_signer import KeyPair
from starknet_py.net.account.account import Account
from starknet_py.net.models.chains import StarknetChainId

In [149]:
MAX_FEE = int(1e16)

# Load keys
MAINNET_PRIVATE_KEY = os.environ['MAINNET_PRIVATE_KEY']
MAINNET_PUBLIC_KEY = os.environ['MAINNET_PUBLIC_KEY']
MAINNET_WALLET_ADDRESS = os.environ['MAINNET_WALLET_ADDRESS']

In [52]:
# Create Account instance containg our wallet info
account = Account(
    client=NET,
    address=MAINNET_WALLET_ADDRESS,
    key_pair=KeyPair(
        private_key=int(MAINNET_PRIVATE_KEY, 16),
        public_key=int(MAINNET_PUBLIC_KEY, 16)
    ),
    chain=StarknetChainId.MAINNET,
)

In [152]:
async def set_pragma_checkpoint(account: Account, oracle_address: int, asset: int) -> str:
    """
    Sets onchain pragma checkpoint for given asset.

    Parameters:
    - account: Account instance that contains address, keys etc.
    - oracle_address: Pragma oracle address
    - asset: Pragma asset key

    Returns: 
    - tx_hash: string of transaction hash
    """
    
    
    # Create Starknet contract instance (will fetch list of functions from chain)
    contract = await Contract.from_address(
        address=oracle_address,
        provider=account
    )
    # Prepare Invoke data
    invoke_data = [
        # Here we're working with parsed abi so instead of using order of member we just write it as a string
        # list of possible member can ve found with 'contract.data.parsed_abi'
        # This argument is DataType
        ['SpotEntry', asset], 
        
        # Here the argument is AggregationMode which doesn't contain any value, so we have to pass None as a second arg
        ['Median', None]
    ]

    invoke = contract.functions['set_checkpoint'].prepare_invoke_v1(*invoke_data)

    # Execute invoke
    resp = await account.execute_v1(calls = invoke, max_fee=MAX_FEE)

    # return tx hash
    return hex(resp.transaction_hash)

In [131]:
now = int(time.time())

In [160]:
checkpoint_1 = await get_pragma_checkpoint(
    NET, PRAGMA_ADDRESS, now, ETH_USD_KEY
)
print('Checkpoint timestamp:')
print(f"{checkpoint_1.timestamp} -> {datetime.datetime.fromtimestamp(checkpoint_1.timestamp)}")
print(f'Checkpoint index: {checkpoint_1.index}')

Checkpoint timestamp:
1715073911 -> 2024-05-07 11:25:11
Checkpoint index: 105


In [154]:
invoke_tx_hash = await set_pragma_checkpoint(
    account,
    PRAGMA_ADDRESS,
    ETH_USD_KEY
)

In [157]:
print('Tx Hash: ', invoke_tx_hash)
print(f'Tx Hash link: https://starkscan.co/tx/{invoke_tx_hash}')

Tx Hash:  0x7c76c9bc4578e9cfeedfd753cd1b32f7d323aef4716a487f0174d0a8a4eb904
Tx Hash link: https://starkscan.co/tx/0x7c76c9bc4578e9cfeedfd753cd1b32f7d323aef4716a487f0174d0a8a4eb904


In [159]:
checkpoint_2 = await get_pragma_checkpoint(
    NET, PRAGMA_ADDRESS, now, ETH_USD_KEY
)

print('Checkpoint timestamp:')
print(f"{checkpoint_2.timestamp} -> {datetime.datetime.fromtimestamp(checkpoint_2.timestamp)}")
print(f'Checkpoint index: {checkpoint_2.index}')

Checkpoint timestamp:
1715073911 -> 2024-05-07 11:25:11
Checkpoint index: 105
