diff --git a/elfpy/data/acquire_data.py b/elfpy/data/acquire_data.py index d7dbe823b1..96f1f28082 100644 --- a/elfpy/data/acquire_data.py +++ b/elfpy/data/acquire_data.py @@ -101,8 +101,11 @@ def main( postgres.add_checkpoint_infos([convert_data.convert_checkpoint_info(checkpoint_info_dict)], session) # Query and add block transactions - block_transactions = convert_data.fetch_contract_transactions_for_block(web3, hyperdrive_contract, block_number) + block_transactions, wallet_deltas = convert_data.fetch_contract_transactions_for_block( + web3, hyperdrive_contract, block_number + ) postgres.add_transactions(block_transactions, session) + postgres.add_wallet_deltas(wallet_deltas, session) # monitor for new blocks & add pool info per block logging.info("Monitoring for pool info updates...") @@ -160,9 +163,10 @@ def main( # keep querying until it returns to avoid random crashes with ValueError on some intermediate block block_transactions = None + wallet_deltas = None for _ in range(RETRY_COUNT): try: - block_transactions = convert_data.fetch_contract_transactions_for_block( + block_transactions, wallet_deltas = convert_data.fetch_contract_transactions_for_block( web3, hyperdrive_contract, block_number ) break @@ -171,10 +175,18 @@ def main( time.sleep(1) continue - if block_transactions is None: # Proceed only if we have data, otherwise do nothing + # This case only happens if fetch_contract_transactions throws an exception + # e.g., the web3 call fails. fetch_contract_transactions_for_block will return + # empty lists (which doesn't execute the if statement below) if there are no hyperdrive + # transactions for the block + if block_transactions is None or wallet_deltas is None: raise ValueError("Error in getting transactions") + postgres.add_transactions(block_transactions, session) + postgres.add_wallet_deltas(wallet_deltas, session) + # TODO put the wallet info query as an optional block, + # and check these wallet values with what we get from the deltas wallet_info_for_transactions = convert_data.get_wallet_info( hyperdrive_contract, base_contract, block_number, block_transactions, block_pool_info ) diff --git a/elfpy/data/convert_data.py b/elfpy/data/convert_data.py index 144da1906b..121e94d50b 100644 --- a/elfpy/data/convert_data.py +++ b/elfpy/data/convert_data.py @@ -51,7 +51,7 @@ def _convert_scaled_value(input_val: int | None) -> float | None: # TODO move this function to hyperdrive_interface and return a list of dictionaries def fetch_contract_transactions_for_block( web3: Web3, contract: Contract, block_number: BlockNumber -) -> list[db_schema.Transaction]: +) -> tuple[list[db_schema.Transaction], list[db_schema.WalletDelta]]: """Fetch transactions related to the contract. Returns the block pool info from the Hyperdrive contract @@ -67,15 +67,17 @@ def fetch_contract_transactions_for_block( Returns ------- - list[Transaction] - A list of Transaction objects ready to be inserted into Postgres + tuple[list[Transaction], list[WalletDelta]] + A list of Transaction objects ready to be inserted into Postgres, and + a list of wallet delta objects ready to be inserted into Postgres """ block: BlockData = web3.eth.get_block(block_number, full_transactions=True) transactions = block.get("transactions") if not transactions: logging.info("no transactions in block %s", block.get("number")) - return [] + return ([], []) out_transactions = [] + out_wallet_deltas = [] for transaction in transactions: if isinstance(transaction, HexBytes): logging.warning("transaction HexBytes") @@ -97,7 +99,234 @@ def fetch_contract_transactions_for_block( logs = eth.get_transaction_logs(web3, contract, tx_receipt) receipt: dict[str, Any] = _recursive_dict_conversion(tx_receipt) # type: ignore out_transactions.append(_build_hyperdrive_transaction_object(transaction_dict, logs, receipt)) - return out_transactions + # Build wallet deltas based on transaction logs + out_wallet_deltas.extend(_build_wallet_deltas(logs, transaction_dict["hash"], block_number)) + return out_transactions, out_wallet_deltas + + +# TODO this function likely should be decoupled from postgres and added into +# hyperdrive interface returning a list of dictionaries, with a conversion function to translate +# into postgres +def _build_wallet_deltas(logs: list[dict], tx_hash: str, block_number) -> list[db_schema.WalletDelta]: + """From decoded transaction logs, we look at the log that contains the trade summary + + Arguments + --------- + logs: list[dict] + The list of dictionaries that was decoded from `eth.get_transaction_logs` + tx_hash: str + The transaction hash that resulted in this wallet delta + block_number: BlockNumber + The current block number of the log + + Returns + ------- + list[Transaction] + A list of Transaction objects ready to be inserted into Postgres + """ + wallet_deltas = [] + # We iterate through the logs looking for specific events that describe the transaction + # We then create a WalletDelta object with their corresponding token and base deltas for + # each action + for log in logs: + if log["event"] == "AddLiquidity": + wallet_addr = log["args"]["provider"] + token_delta = _convert_scaled_value(log["args"]["lpAmount"]) + base_delta = _convert_scaled_value(-log["args"]["baseAmount"]) + wallet_deltas.extend( + [ + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="LP", + tokenType="LP", + delta=token_delta, + ), + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="BASE", + tokenType="BASE", + delta=base_delta, + ), + ] + ) + + elif log["event"] == "OpenLong": + wallet_addr = log["args"]["trader"] + token_delta = _convert_scaled_value(log["args"]["bondAmount"]) + base_delta = _convert_scaled_value(-log["args"]["baseAmount"]) + maturity_time = log["args"]["maturityTime"] + wallet_deltas.extend( + [ + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="LONG", + tokenType="LONG-" + str(maturity_time), + delta=token_delta, + maturityTime=maturity_time, + ), + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="BASE", + tokenType="BASE", + delta=base_delta, + ), + ] + ) + + elif log["event"] == "OpenShort": + wallet_addr = log["args"]["trader"] + token_delta = _convert_scaled_value(log["args"]["bondAmount"]) + base_delta = _convert_scaled_value(-log["args"]["baseAmount"]) + maturity_time = log["args"]["maturityTime"] + wallet_deltas.extend( + [ + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="SHORT", + tokenType="SHORT-" + str(maturity_time), + delta=token_delta, + maturityTime=maturity_time, + ), + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="BASE", + tokenType="BASE", + delta=base_delta, + ), + ] + ) + + elif log["event"] == "RemoveLiquidity": + wallet_addr = log["args"]["provider"] + # Two deltas, one for withdrawal shares, one for lp tokens + lp_delta = _convert_scaled_value(-log["args"]["lpAmount"]) + withdrawal_delta = _convert_scaled_value(log["args"]["withdrawalShareAmount"]) + base_delta = _convert_scaled_value(log["args"]["baseAmount"]) + wallet_deltas.extend( + [ + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="LP", + tokenType="LP", + delta=lp_delta, + ), + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="WITHDRAWAL_SHARE", + tokenType="WITHDRAWAL_SHARE", + delta=withdrawal_delta, + ), + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="BASE", + tokenType="BASE", + delta=base_delta, + ), + ] + ) + + elif log["event"] == "CloseLong": + wallet_addr = log["args"]["trader"] + token_delta = _convert_scaled_value(-log["args"]["bondAmount"]) + base_delta = _convert_scaled_value(log["args"]["baseAmount"]) + maturity_time = log["args"]["maturityTime"] + wallet_deltas.extend( + [ + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="LONG", + tokenType="LONG-" + str(maturity_time), + delta=token_delta, + maturityTime=maturity_time, + ), + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="BASE", + tokenType="BASE", + delta=base_delta, + ), + ] + ) + + elif log["event"] == "CloseShort": + wallet_addr = log["args"]["trader"] + token_delta = _convert_scaled_value(-log["args"]["bondAmount"]) + base_delta = _convert_scaled_value(log["args"]["baseAmount"]) + maturity_time = log["args"]["maturityTime"] + wallet_deltas.extend( + [ + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="SHORT", + tokenType="SHORT-" + str(maturity_time), + delta=token_delta, + maturityTime=maturity_time, + ), + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="BASE", + tokenType="BASE", + delta=base_delta, + ), + ] + ) + + elif log["event"] == "RedeemWithdrawalShares": + wallet_addr = log["args"]["provider"] + maturity_time = None + token_delta = _convert_scaled_value(-log["args"]["withdrawalShareAmount"]) + base_delta = _convert_scaled_value(log["args"]["baseAmount"]) + wallet_deltas.extend( + [ + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="WITHDRAWAL_SHARE", + tokenType="WITHDRAWAL_SHARE", + delta=token_delta, + maturityTime=maturity_time, + ), + db_schema.WalletDelta( + transactionHash=tx_hash, + blockNumber=block_number, + walletAddress=wallet_addr, + baseTokenType="BASE", + tokenType="BASE", + delta=base_delta, + ), + ] + ) + # Every log should have either 0 (no op), 2(two deltas per transaction), or 3(in the case of remove liquidity) + # entries in the wallet delta + assert len(wallet_deltas) in (0, 2, 3) + return wallet_deltas def _build_hyperdrive_transaction_object( @@ -312,20 +541,20 @@ def get_wallet_info( ) # Query and add withdraw tokens to wallet info - withdrawl_token_prefix = hyperdrive_interface.AssetIdPrefix.WITHDRAWAL_SHARE.value - # Withdrawl tokens always have 0 maturity - withdrawl_token_id = hyperdrive_interface.encode_asset_id(withdrawl_token_prefix, timestamp=0) - num_withdrawl_token = _query_contract_for_balance( - hyperdrive_contract, wallet_addr, block_number, withdrawl_token_id + withdrawal_token_prefix = hyperdrive_interface.AssetIdPrefix.WITHDRAWAL_SHARE.value + # Withdrawal tokens always have 0 maturity + withdrawal_token_id = hyperdrive_interface.encode_asset_id(withdrawal_token_prefix, timestamp=0) + num_withdrawal_token = _query_contract_for_balance( + hyperdrive_contract, wallet_addr, block_number, withdrawal_token_id ) - if num_withdrawl_token is not None: + if num_withdrawal_token is not None: out_wallet_info.append( db_schema.WalletInfo( blockNumber=block_number, walletAddress=wallet_addr, - baseTokenType="WITHDRAWL_SHARE", - tokenType="WITHDRAWL_SHARE", - tokenValue=num_withdrawl_token, + baseTokenType="WITHDRAWAL_SHARE", + tokenType="WITHDRAWAL_SHARE", + tokenValue=num_withdrawal_token, maturityTime=None, sharePrice=None, ) diff --git a/elfpy/data/db_schema.py b/elfpy/data/db_schema.py index d8fb32581b..8b80c05a06 100644 --- a/elfpy/data/db_schema.py +++ b/elfpy/data/db_schema.py @@ -46,6 +46,28 @@ class WalletInfo(Base): sharePrice: Mapped[Union[float, None]] = mapped_column(Numeric, default=None) +class WalletDelta(Base): + """Table/dataclass schema for wallet information.""" + + __tablename__ = "walletdelta" + + # Default table primary key + # Note that we use postgres in production and sqlite in testing, but sqlite has issues with + # autoincrement with BigIntegers. Hence, we use the Integer variant when using sqlite in tests + id: Mapped[int] = mapped_column( + BigInteger().with_variant(Integer, "sqlite"), primary_key=True, init=False, autoincrement=True + ) + transactionHash: Mapped[str] = mapped_column(String, ForeignKey("transactions.transactionHash"), index=True) + blockNumber: Mapped[int] = mapped_column(BigInteger, ForeignKey("poolinfo.blockNumber"), index=True) + walletAddress: Mapped[Union[str, None]] = mapped_column(String, index=True, default=None) + # baseTokenType can be BASE, LONG, SHORT, LP, or WITHDRAWAL_SHARE + baseTokenType: Mapped[Union[str, None]] = mapped_column(String, index=True, default=None) + # tokenType is the baseTokenType appended with "-" for LONG and SHORT + tokenType: Mapped[Union[str, None]] = mapped_column(String, default=None) + delta: Mapped[Union[float, None]] = mapped_column(Numeric, default=None) + maturityTime: Mapped[Union[float, None]] = mapped_column(Numeric, default=None) + + class PoolConfig(Base): """Table/dataclass schema for pool config.""" @@ -119,12 +141,12 @@ class Transaction(Base): id: Mapped[int] = mapped_column( BigInteger().with_variant(Integer, "sqlite"), primary_key=True, init=False, autoincrement=True ) + transactionHash: Mapped[str] = mapped_column(String, index=True, unique=True) #### Fields from base transactions #### blockNumber: Mapped[int] = mapped_column(BigInteger, ForeignKey("poolinfo.blockNumber"), index=True) transactionIndex: Mapped[Union[int, None]] = mapped_column(Integer, default=None) nonce: Mapped[Union[int, None]] = mapped_column(Integer, default=None) - transactionHash: Mapped[Union[str, None]] = mapped_column(String, default=None) # Transaction receipt to/from # Almost always from wallet address to smart contract address txn_to: Mapped[Union[str, None]] = mapped_column(String, default=None) diff --git a/elfpy/data/postgres.py b/elfpy/data/postgres.py index d718887d91..f7cc7a07ed 100644 --- a/elfpy/data/postgres.py +++ b/elfpy/data/postgres.py @@ -12,7 +12,16 @@ from sqlalchemy import URL, MetaData, Table, create_engine, exc, func, inspect from sqlalchemy.orm import Session, sessionmaker -from elfpy.data.db_schema import Base, CheckpointInfo, PoolConfig, PoolInfo, Transaction, UserMap, WalletInfo +from elfpy.data.db_schema import ( + Base, + CheckpointInfo, + PoolConfig, + PoolInfo, + Transaction, + UserMap, + WalletDelta, + WalletInfo, +) # classes for sqlalchemy that define table schemas have no methods. # pylint: disable=too-few-public-methods @@ -125,6 +134,7 @@ def initialize_session() -> Session: """ postgres_config = build_postgres_config() + # TODO add waiting in connecting to postgres to avoid exiting out before postgres spins up url_object = URL.create( drivername="postgresql", username=postgres_config.POSTGRES_USER, @@ -280,6 +290,26 @@ def add_transactions(transactions: list[Transaction], session: Session) -> None: raise err +def add_wallet_deltas(wallet_deltas: list[WalletDelta], session: Session) -> None: + """Add wallet deltas to the walletdelta table. + + Arguments + --------- + transactions : list[WalletDelta] + A list of WalletDelta objects to insert into postgres + session : Session + The initialized session object + """ + for wallet_delta in wallet_deltas: + session.add(wallet_delta) + try: + session.commit() + except exc.DataError as err: + session.rollback() + print(f"{wallet_deltas=}") + raise err + + def add_user_map(username: str, addresses: list[str], session: Session) -> None: """Add username mapping to postgres during evm_bots initialization. @@ -591,6 +621,41 @@ def get_current_wallet_info( return current_wallet_info +def get_wallet_deltas(session: Session, start_block: int | None = None, end_block: int | None = None) -> pd.DataFrame: + """Get all wallet_delta data in history and returns as a pandas dataframe. + + Arguments + --------- + session : Session + The initialized session object + start_block : int | None, optional + The starting block to filter the query on. start_block integers + matches python slicing notation, e.g., list[:3], list[:-3] + end_block : int | None, optional + The ending block to filter the query on. end_block integers + matches python slicing notation, e.g., list[:3], list[:-3] + + Returns + ------- + DataFrame + A DataFrame that consists of the queried wallet info data + """ + query = session.query(WalletDelta) + + # Support for negative indices + if (start_block is not None) and (start_block < 0): + start_block = get_latest_block_number_from_table(WalletDelta, session) + start_block + 1 + if (end_block is not None) and (end_block < 0): + end_block = get_latest_block_number_from_table(WalletDelta, session) + end_block + 1 + + if start_block is not None: + query = query.filter(WalletDelta.blockNumber >= start_block) + if end_block is not None: + query = query.filter(WalletDelta.blockNumber < end_block) + + return pd.read_sql(query.statement, con=session.connection()) + + def get_agents(session: Session, start_block: int | None = None, end_block: int | None = None) -> list[str]: """Get the list of all agents from the WalletInfo table. @@ -669,7 +734,7 @@ def get_latest_block_number(session: Session) -> int: def get_latest_block_number_from_table( - table_obj: Type[WalletInfo | PoolInfo | Transaction | CheckpointInfo], session: Session + table_obj: Type[WalletInfo | WalletDelta | PoolInfo | Transaction | CheckpointInfo], session: Session ) -> int: """Get the latest block number based on the specified table in the db. diff --git a/examples/hackweek_demo/calc_pnl.py b/examples/hackweek_demo/calc_pnl.py index 539a659444..a60fb3232b 100644 --- a/examples/hackweek_demo/calc_pnl.py +++ b/examples/hackweek_demo/calc_pnl.py @@ -7,7 +7,7 @@ from extract_data_logs import calculate_spot_price -def calc_total_returns(pool_config: pd.Series, pool_info: pd.DataFrame, current_wallet: pd.DataFrame) -> pd.Series: +def calc_total_returns(pool_config: pd.Series, pool_info: pd.DataFrame, wallet_deltas: pd.DataFrame) -> pd.Series: """Calculate the most current pnl values. Calculate_spot_price_for_position calculates the spot price for a position that has matured by some amount. @@ -18,8 +18,8 @@ def calc_total_returns(pool_config: pd.Series, pool_info: pd.DataFrame, current_ Time-invariant pool configuration. pool_info : pd.DataFrame Pool information like reserves. This can contain multiple blocks, but only the most recent is used. - current_wallet : pd.DataFrame - Current agent holdings indexed by address and position (LP, LONG-YYYYMMDD, SHORT-YYYYMMDD, etc.) + wallet_deltas: pd.DataFrame + Wallet deltas for each agent and position. Returns ------- @@ -30,8 +30,21 @@ def calc_total_returns(pool_config: pd.Series, pool_info: pd.DataFrame, current_ # Most current block timestamp latest_pool_info = pool_info.loc[pool_info.index.max()] block_timestamp = latest_pool_info["timestamp"].timestamp() + + # Calculate unrealized gains + current_wallet = wallet_deltas.groupby(["walletAddress", "tokenType"]).agg( + {"delta": "sum", "baseTokenType": "first", "maturityTime": "first"} + ) + + # Sanity check, no tokens except base should dip below 0 + assert (current_wallet["delta"][current_wallet["baseTokenType"] != "BASE"] >= 0).all() + + # Calculate for base # Base is valued at 1:1, since that's our numéraire (https://en.wikipedia.org/wiki/Num%C3%A9raire) - base_balance = current_wallet[current_wallet["baseTokenType"] == "BASE"]["tokenValue"] + wallet_base = current_wallet[current_wallet["baseTokenType"] == "BASE"] + base_returns = wallet_base["delta"] + + # Calculate for lp # LP value = users_LP_tokens * sharePrice # derived from: # total_lp_value = lpTotalSupply * sharePrice @@ -40,10 +53,12 @@ def calc_total_returns(pool_config: pd.Series, pool_info: pd.DataFrame, current_ # users_LP_value = users_LP_tokens / lpTotalSupply * lpTotalSupply * sharePrice # users_LP_value = users_LP_tokens * sharePrice wallet_lps = current_wallet[current_wallet["baseTokenType"] == "LP"] - lp_returns = wallet_lps["tokenValue"] * latest_pool_info["sharePrice"] + lp_returns = wallet_lps["delta"] * latest_pool_info["sharePrice"] + # Calculate for withdrawal shares. Same as for LPs. - wallet_withdrawl = current_wallet[current_wallet["baseTokenType"] == "WITHDRAWL_SHARE"] - withdrawl_returns = wallet_withdrawl["tokenValue"] * latest_pool_info["sharePrice"] + wallet_withdrawal = current_wallet[current_wallet["baseTokenType"] == "WITHDRAWAL_SHARE"] + withdrawal_returns = wallet_withdrawal["delta"] * latest_pool_info["sharePrice"] + # Calculate for shorts # Short value = users_shorts * ( 1 - spot_price ) # this could also be valued at 1 + ( p1 - p2 ) but we'd have to know their entry price (or entry base 🤔) @@ -57,7 +72,8 @@ def calc_total_returns(pool_config: pd.Series, pool_info: pd.DataFrame, current_ maturity_timestamp=wallet_shorts["maturityTime"], block_timestamp=block_timestamp, ) - shorts_returns = wallet_shorts["tokenValue"] * (1 - short_spot_prices) + shorts_returns = wallet_shorts["delta"] * (1 - short_spot_prices) + # Calculate for longs # Long value = users_longs * spot_price wallet_longs = current_wallet[current_wallet["baseTokenType"] == "LONG"] @@ -70,14 +86,15 @@ def calc_total_returns(pool_config: pd.Series, pool_info: pd.DataFrame, current_ maturity_timestamp=wallet_longs["maturityTime"], block_timestamp=block_timestamp, ) - long_returns = wallet_longs["tokenValue"] * long_spot_prices + long_returns = wallet_longs["delta"] * long_spot_prices + # Add pnl to current_wallet information # Current_wallet and *_pnl dataframes have the same index - current_wallet.loc[base_balance.index, "pnl"] = base_balance + current_wallet.loc[base_returns.index, "pnl"] = base_returns current_wallet.loc[lp_returns.index, "pnl"] = lp_returns current_wallet.loc[shorts_returns.index, "pnl"] = shorts_returns current_wallet.loc[long_returns.index, "pnl"] = long_returns - current_wallet.loc[withdrawl_returns.index, "pnl"] = withdrawl_returns + current_wallet.loc[withdrawal_returns.index, "pnl"] = withdrawal_returns return current_wallet.reset_index().groupby("walletAddress")["pnl"].sum() diff --git a/examples/hackweek_demo/run_demo.py b/examples/hackweek_demo/run_demo.py index e35843ab3a..d01bda6806 100644 --- a/examples/hackweek_demo/run_demo.py +++ b/examples/hackweek_demo/run_demo.py @@ -16,13 +16,15 @@ # pylint: disable=invalid-name -st.set_page_config(page_title="Trading Competition Dashboard", layout="centered") +st.set_page_config(page_title="Trading Competition Dashboard", layout="wide") st.set_option("deprecation.showPyplotGlobalUse", False) # Helper functions # TODO should likely move these functions to another file -def get_ticker(data: pd.DataFrame, lookup: pd.DataFrame) -> pd.DataFrame: +def get_ticker( + wallet_delta: pd.DataFrame, transactions: pd.DataFrame, pool_info: pd.DataFrame, lookup: pd.DataFrame +) -> pd.DataFrame: """Show recent trades. Arguments @@ -35,15 +37,37 @@ def get_ticker(data: pd.DataFrame, lookup: pd.DataFrame) -> pd.DataFrame: pd.DataFrame The filtered transaction data based on what we want to view in the ticker """ - # Return reverse of methods to put most recent transactions at the top - usernames = username_to_address(lookup, data["operator"]) - ticker_data = data.reset_index()[["timestamp", "blockNumber", "operator", "trade_type", "value"]].copy() - ticker_data.insert(2, "username", usernames.values.tolist()) - ticker_data.columns = ["Timestamp", "Block", "User", "Wallet", "Method", "Amount"] + # TODO these merges should really happen via an sql query instead of in pandas here + # Set ticker so that each transaction is a single row + ticker_data = wallet_delta.groupby(["transactionHash"]).agg( + {"blockNumber": "first", "walletAddress": "first", "baseTokenType": tuple, "delta": tuple} + ) + + # Expand column of lists into seperate dataframes, then str cat them together + token_type = pd.DataFrame(ticker_data["baseTokenType"].to_list(), index=ticker_data.index) + token_deltas = pd.DataFrame(ticker_data["delta"].to_list(), index=ticker_data.index) + token_diffs = token_type + ": " + token_deltas.astype("str") + # Aggregate columns into a single list, removing nans + token_diffs = token_diffs.stack().groupby(level=0).agg(list) + + # Gather other information from other tables + usernames = address_to_username(lookup, ticker_data["walletAddress"]) + timestamps = pool_info.loc[ticker_data["blockNumber"], "timestamp"] + trade_type = transactions.set_index("transactionHash").loc[ticker_data.index, "input_method"] + + ticker_data = ticker_data[["blockNumber", "walletAddress"]].copy() + ticker_data.insert(0, "timestamp", timestamps.values) # type: ignore + ticker_data.insert(2, "username", usernames.values) # type: ignore + ticker_data.insert(4, "trade_type", trade_type) + ticker_data.insert(5, "token_diffs", token_diffs) # type: ignore + ticker_data.columns = ["Timestamp", "Block", "User", "Wallet", "Method", "Token Deltas"] # Shorten wallet address string ticker_data["Wallet"] = ticker_data["Wallet"].str[:6] + "..." + ticker_data["Wallet"].str[-4:] + # Return reverse of methods to put most recent transactions at the top ticker_data = ticker_data.set_index("Timestamp").sort_index(ascending=False) + # Drop rows with nonexistant wallets + ticker_data = ticker_data.dropna(axis=0, subset="Wallet") return ticker_data @@ -83,7 +107,7 @@ def combine_usernames(username: pd.Series) -> pd.DataFrame: def get_leaderboard(pnl: pd.Series, lookup: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: """Rank users by PNL, individually and bomined across their accounts.""" pnl = pnl.reset_index() # type: ignore - usernames = username_to_address(lookup, pnl["walletAddress"]) + usernames = address_to_username(lookup, pnl["walletAddress"]) pnl.insert(1, "username", usernames.values.tolist()) # Hard coded funding provider from migration account migration_addr = "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" @@ -165,7 +189,7 @@ def get_user_lookup() -> pd.DataFrame: return options_map.reset_index() -def username_to_address(lookup: pd.DataFrame, selected_list: pd.Series) -> pd.Series: +def address_to_username(lookup: pd.DataFrame, selected_list: pd.Series) -> pd.Series: """Look up selected users/addrs to all addresses. Arguments @@ -173,13 +197,12 @@ def username_to_address(lookup: pd.DataFrame, selected_list: pd.Series) -> pd.Se lookup: pd.DataFrame The lookup dataframe from `get_user_lookup` call selected_list: list[str] - A list of selected values from the multiselect input widget - These values can either be usernames or addresses + A list of addresses to look up usernames to Returns ------- list[str] - A list of addresses based on selected_list + A list of usernames based on selected_list """ selected_list_column = selected_list.name out = selected_list.to_frame().merge(lookup, how="left", left_on=selected_list_column, right_on="address") @@ -216,21 +239,18 @@ def username_to_address(lookup: pd.DataFrame, selected_list: pd.Series) -> pd.Se txn_data = postgres.get_transactions(session, -max_live_blocks) pool_info_data = postgres.get_pool_info(session, -max_live_blocks) combined_data = get_combined_data(txn_data, pool_info_data) - ticker = get_ticker(combined_data, user_lookup) - wallets = postgres.get_current_wallet_info(session) + wallet_deltas = postgres.get_wallet_deltas(session) + ticker = get_ticker(wallet_deltas, txn_data, pool_info_data, user_lookup) (fixed_rate_x, fixed_rate_y) = calc_fixed_rate(combined_data, config_data) ohlcv = calc_ohlcv(combined_data, config_data, freq="5T") - # temporary hack because we know they started with 1e6 base. - current_reutrns = calc_total_returns(config_data, pool_info_data, wallets) - # TODO: FIX PNL CALCULATIONS TO INCLUDE DEPOSITS - # agent PNL is their click trade pnl + bot pnls - # TODO: FIX BOT RESTARTS - # Add initial budget column to bots - # when bot restarts, use initial budget for bot's wallet address to set "budget" in Agent.Wallet + current_returns = calc_total_returns(config_data, pool_info_data, wallet_deltas) + ## TODO: FIX BOT RESTARTS + ## Add initial budget column to bots + ## when bot restarts, use initial budget for bot's wallet address to set "budget" in Agent.Wallet - comb_rank, ind_rank = get_leaderboard(current_reutrns, user_lookup) + comb_rank, ind_rank = get_leaderboard(current_returns, user_lookup) with ticker_placeholder.container(): st.header("Ticker") diff --git a/tests/data/test_transaction.py b/tests/data/test_transaction.py index 2b9176737a..743b983d0e 100644 --- a/tests/data/test_transaction.py +++ b/tests/data/test_transaction.py @@ -32,7 +32,7 @@ def test_create_transaction(self, session): # Note: this test is using inmemory sqlite, which doesn't seem to support # autoincrementing ids without init, whereas postgres does this with no issues # Hence, we explicitly add id here - transaction = Transaction(blockNumber=1, event_value=3.2) # add your other columns here... + transaction = Transaction(blockNumber=1, transactionHash="a", event_value=3.2) # add your other columns here... session.add(transaction) session.commit() @@ -43,7 +43,7 @@ def test_create_transaction(self, session): def test_update_transaction(self, session): """Update an entry""" - transaction = Transaction(blockNumber=1, event_value=3.2) + transaction = Transaction(blockNumber=1, transactionHash="a", event_value=3.2) session.add(transaction) session.commit() @@ -56,7 +56,7 @@ def test_update_transaction(self, session): def test_delete_transaction(self, session): """Delete an entry""" - transaction = Transaction(blockNumber=1, event_value=3.2) + transaction = Transaction(blockNumber=1, transactionHash="a", event_value=3.2) session.add(transaction) session.commit() @@ -72,14 +72,20 @@ class TestTransactionInterface: def test_latest_block_number(self, session): """Testing retrevial of transaction via interface""" - transaction_1 = Transaction(blockNumber=1, event_value=3.0) # add your other columns here... + transaction_1 = Transaction( + blockNumber=1, transactionHash="a", event_value=3.0 + ) # add your other columns here... postgres.add_transactions([transaction_1], session) latest_block_number = postgres.get_latest_block_number_from_table(Transaction, session) assert latest_block_number == 1 - transaction_2 = Transaction(blockNumber=2, event_value=3.2) # add your other columns here... - transaction_3 = Transaction(blockNumber=3, event_value=3.4) # add your other columns here... + transaction_2 = Transaction( + blockNumber=2, transactionHash="b", event_value=3.2 + ) # add your other columns here... + transaction_3 = Transaction( + blockNumber=3, transactionHash="c", event_value=3.4 + ) # add your other columns here... postgres.add_transactions([transaction_2, transaction_3], session) latest_block_number = postgres.get_latest_block_number_from_table(Transaction, session) @@ -87,9 +93,15 @@ def test_latest_block_number(self, session): def test_get_transactions(self, session): """Testing retrevial of transactions via interface""" - transaction_1 = Transaction(blockNumber=0, event_value=3.1) # add your other columns here... - transaction_2 = Transaction(blockNumber=1, event_value=3.2) # add your other columns here... - transaction_3 = Transaction(blockNumber=2, event_value=3.3) # add your other columns here... + transaction_1 = Transaction( + blockNumber=0, transactionHash="a", event_value=3.1 + ) # add your other columns here... + transaction_2 = Transaction( + blockNumber=1, transactionHash="b", event_value=3.2 + ) # add your other columns here... + transaction_3 = Transaction( + blockNumber=2, transactionHash="c", event_value=3.3 + ) # add your other columns here... postgres.add_transactions([transaction_1, transaction_2, transaction_3], session) transactions_df = postgres.get_transactions(session) @@ -97,9 +109,15 @@ def test_get_transactions(self, session): def test_block_query_transactions(self, session): """Testing querying by block number of transactions via interface""" - transaction_1 = Transaction(blockNumber=0, event_value=3.1) # add your other columns here... - transaction_2 = Transaction(blockNumber=1, event_value=3.2) # add your other columns here... - transaction_3 = Transaction(blockNumber=2, event_value=3.3) # add your other columns here... + transaction_1 = Transaction( + blockNumber=0, transactionHash="a", event_value=3.1 + ) # add your other columns here... + transaction_2 = Transaction( + blockNumber=1, transactionHash="b", event_value=3.2 + ) # add your other columns here... + transaction_3 = Transaction( + blockNumber=2, transactionHash="c", event_value=3.3 + ) # add your other columns here... postgres.add_transactions([transaction_1, transaction_2, transaction_3], session) transactions_df = postgres.get_transactions(session, start_block=1) diff --git a/tests/data/test_wallet_deltas.py b/tests/data/test_wallet_deltas.py new file mode 100644 index 0000000000..bf21a604f6 --- /dev/null +++ b/tests/data/test_wallet_deltas.py @@ -0,0 +1,118 @@ +"""CRUD tests for WalletDelta""" +import numpy as np +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from elfpy.data import postgres +from elfpy.data.db_schema import Base, WalletDelta + +engine = create_engine("sqlite:///:memory:") # in-memory SQLite database for testing +Session = sessionmaker(bind=engine) + +# fixture arguments in test function have to be the same as the fixture name +# pylint: disable=redefined-outer-name + + +@pytest.fixture(scope="function") +def session(): + """Session fixture for tests""" + Base.metadata.create_all(engine) # create tables + session_ = Session() + yield session_ + session_.close() + Base.metadata.drop_all(engine) # drop tables + + +class TestWalletDeltaTable: + """CRUD tests for WalletDelta table""" + + def test_create_wallet_delta(self, session): + """Create and entry""" + # Note: this test is using inmemory sqlite, which doesn't seem to support + # autoincrementing ids without init, whereas postgres does this with no issues + # Hence, we explicitly add id here + wallet_delta = WalletDelta(blockNumber=1, transactionHash="a", delta=3.2) # add your other columns here... + session.add(wallet_delta) + session.commit() + + retrieved_wallet_delta = session.query(WalletDelta).filter_by(blockNumber=1).first() + assert retrieved_wallet_delta is not None + # toekValue retreieved from postgres is in Decimal, cast to float + assert float(retrieved_wallet_delta.delta) == 3.2 + + def test_update_wallet_delta(self, session): + """Update an entry""" + wallet_delta = WalletDelta(blockNumber=1, transactionHash="a", delta=3.2) + session.add(wallet_delta) + session.commit() + + wallet_delta.delta = 5.0 + session.commit() + + updated_wallet_delta = session.query(WalletDelta).filter_by(blockNumber=1).first() + # delta retreieved from postgres is in Decimal, cast to float + assert float(updated_wallet_delta.delta) == 5.0 + + def test_delete_wallet_delta(self, session): + """Delete an entry""" + wallet_delta = WalletDelta(blockNumber=1, transactionHash="a", delta=3.2) + session.add(wallet_delta) + session.commit() + + session.delete(wallet_delta) + session.commit() + + deleted_wallet_delta = session.query(WalletDelta).filter_by(blockNumber=1).first() + assert deleted_wallet_delta is None + + +class TestWalletDeltaInterface: + """Testing postgres interface for walletinfo table""" + + def test_latest_block_number(self, session): + """Testing retrevial of wallet info via interface""" + wallet_delta_1 = WalletDelta(blockNumber=1, transactionHash="a", delta=3.0) # add your other columns here... + postgres.add_wallet_deltas([wallet_delta_1], session) + + latest_block_number = postgres.get_latest_block_number_from_table(WalletDelta, session) + assert latest_block_number == 1 + + wallet_delta_2 = WalletDelta(blockNumber=2, transactionHash="a", delta=3.2) # add your other columns here... + wallet_delta_3 = WalletDelta(blockNumber=3, transactionHash="a", delta=3.4) # add your other columns here... + postgres.add_wallet_deltas([wallet_delta_2, wallet_delta_3], session) + + latest_block_number = postgres.get_latest_block_number_from_table(WalletDelta, session) + assert latest_block_number == 3 + + def test_get_wallet_delta(self, session): + """Testing retrevial of walletinfo via interface""" + wallet_delta_1 = WalletDelta(blockNumber=0, transactionHash="a", delta=3.1) # add your other columns here... + wallet_delta_2 = WalletDelta(blockNumber=1, transactionHash="a", delta=3.2) # add your other columns here... + wallet_delta_3 = WalletDelta(blockNumber=2, transactionHash="a", delta=3.3) # add your other columns here... + postgres.add_wallet_deltas([wallet_delta_1, wallet_delta_2, wallet_delta_3], session) + + wallet_delta_df = postgres.get_wallet_deltas(session) + np.testing.assert_array_equal(wallet_delta_df["delta"], np.array([3.1, 3.2, 3.3])) + + def test_block_query_wallet_delta(self, session): + """Testing querying by block number of wallet info via interface""" + wallet_delta_1 = WalletDelta(blockNumber=0, transactionHash="a", delta=3.1) # add your other columns here... + wallet_delta_2 = WalletDelta(blockNumber=1, transactionHash="a", delta=3.2) # add your other columns here... + wallet_delta_3 = WalletDelta(blockNumber=2, transactionHash="a", delta=3.3) # add your other columns here... + postgres.add_wallet_deltas([wallet_delta_1, wallet_delta_2, wallet_delta_3], session) + + wallet_delta_df = postgres.get_wallet_deltas(session, start_block=1) + np.testing.assert_array_equal(wallet_delta_df["delta"], np.array([3.2, 3.3])) + + wallet_delta_df = postgres.get_wallet_deltas(session, start_block=-1) + np.testing.assert_array_equal(wallet_delta_df["delta"], np.array([3.3])) + + wallet_delta_df = postgres.get_wallet_deltas(session, end_block=1) + np.testing.assert_array_equal(wallet_delta_df["delta"], np.array([3.1])) + + wallet_delta_df = postgres.get_wallet_deltas(session, end_block=-1) + np.testing.assert_array_equal(wallet_delta_df["delta"], np.array([3.1, 3.2])) + + wallet_delta_df = postgres.get_wallet_deltas(session, start_block=1, end_block=-1) + np.testing.assert_array_equal(wallet_delta_df["delta"], np.array([3.2])) diff --git a/tests/solidity/test_close_long.py b/tests/solidity/test_close_long.py index 0af6c123f4..0797f6e1b2 100644 --- a/tests/solidity/test_close_long.py +++ b/tests/solidity/test_close_long.py @@ -360,7 +360,7 @@ def test_close_long_redeem_at_maturity_negative_variable_interest(self): """Close a long when the interest rate was negative. .. todo:: This test only verifies that a long can be closed with a negative interest rate. - There is a commented assert on the accounting that should pass after withdrawl shares are implemented. + There is a commented assert on the accounting that should pass after withdrawal shares are implemented. """ # Bob opens a long base_amount = FixedPoint("10.0") @@ -405,7 +405,7 @@ def test_close_long_half_through_term_negative_variable_interest(self): """Close a long when the interest rate was negative halfway through the term .. todo:: This test only verifies that a long can be closed with a negative interest rate. - There is a commented assert on the accounting that should pass after withdrawl shares are implemented. + There is a commented assert on the accounting that should pass after withdrawal shares are implemented. """ # Bob opens a long base_amount = FixedPoint("10.0") # how much base the agent is using to open a long