Skip to content
18 changes: 15 additions & 3 deletions elfpy/data/acquire_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ def main(
postgres.add_checkpoint_infos([convert_data.convert_checkpoint_info(checkpoint_info_dict)], session)

# Query and add block transactions
block_transactions = convert_data.fetch_contract_transactions_for_block(web3, hyperdrive_contract, block_number)
block_transactions, wallet_deltas = convert_data.fetch_contract_transactions_for_block(
web3, hyperdrive_contract, block_number
)
postgres.add_transactions(block_transactions, session)
postgres.add_wallet_deltas(wallet_deltas, session)

# monitor for new blocks & add pool info per block
logging.info("Monitoring for pool info updates...")
Expand Down Expand Up @@ -160,9 +163,10 @@ def main(

# keep querying until it returns to avoid random crashes with ValueError on some intermediate block
block_transactions = None
wallet_deltas = None
for _ in range(RETRY_COUNT):
try:
block_transactions = convert_data.fetch_contract_transactions_for_block(
block_transactions, wallet_deltas = convert_data.fetch_contract_transactions_for_block(
web3, hyperdrive_contract, block_number
)
break
Expand All @@ -171,10 +175,18 @@ def main(
time.sleep(1)
continue

if block_transactions is None: # Proceed only if we have data, otherwise do nothing
# This case only happens if fetch_contract_transactions throws an exception
# e.g., the web3 call fails. fetch_contract_transactions_for_block will return
# empty lists (which doesn't execute the if statement below) if there are no hyperdrive
# transactions for the block
if block_transactions is None or wallet_deltas is None:
raise ValueError("Error in getting transactions")
Comment thread
slundqui marked this conversation as resolved.

postgres.add_transactions(block_transactions, session)
postgres.add_wallet_deltas(wallet_deltas, session)

# TODO put the wallet info query as an optional block,
# and check these wallet values with what we get from the deltas
wallet_info_for_transactions = convert_data.get_wallet_info(
hyperdrive_contract, base_contract, block_number, block_transactions, block_pool_info
)
Expand Down
257 changes: 243 additions & 14 deletions elfpy/data/convert_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _convert_scaled_value(input_val: int | None) -> float | None:
# TODO move this function to hyperdrive_interface and return a list of dictionaries
def fetch_contract_transactions_for_block(
web3: Web3, contract: Contract, block_number: BlockNumber
) -> list[db_schema.Transaction]:
) -> tuple[list[db_schema.Transaction], list[db_schema.WalletDelta]]:
"""Fetch transactions related to the contract.

Returns the block pool info from the Hyperdrive contract
Expand All @@ -67,15 +67,17 @@ def fetch_contract_transactions_for_block(

Returns
-------
list[Transaction]
A list of Transaction objects ready to be inserted into Postgres
tuple[list[Transaction], list[WalletDelta]]
A list of Transaction objects ready to be inserted into Postgres, and
a list of wallet delta objects ready to be inserted into Postgres
"""
block: BlockData = web3.eth.get_block(block_number, full_transactions=True)
transactions = block.get("transactions")
if not transactions:
logging.info("no transactions in block %s", block.get("number"))
return []
return ([], [])
out_transactions = []
out_wallet_deltas = []
for transaction in transactions:
if isinstance(transaction, HexBytes):
logging.warning("transaction HexBytes")
Expand All @@ -97,7 +99,234 @@ def fetch_contract_transactions_for_block(
logs = eth.get_transaction_logs(web3, contract, tx_receipt)
receipt: dict[str, Any] = _recursive_dict_conversion(tx_receipt) # type: ignore
out_transactions.append(_build_hyperdrive_transaction_object(transaction_dict, logs, receipt))
return out_transactions
# Build wallet deltas based on transaction logs
out_wallet_deltas.extend(_build_wallet_deltas(logs, transaction_dict["hash"], block_number))
return out_transactions, out_wallet_deltas


# TODO this function likely should be decoupled from postgres and added into
# hyperdrive interface returning a list of dictionaries, with a conversion function to translate
# into postgres
def _build_wallet_deltas(logs: list[dict], tx_hash: str, block_number) -> list[db_schema.WalletDelta]:
"""From decoded transaction logs, we look at the log that contains the trade summary

Arguments
---------
logs: list[dict]
The list of dictionaries that was decoded from `eth.get_transaction_logs`
tx_hash: str
The transaction hash that resulted in this wallet delta
block_number: BlockNumber
The current block number of the log

Returns
-------
list[Transaction]
A list of Transaction objects ready to be inserted into Postgres
"""
wallet_deltas = []
# We iterate through the logs looking for specific events that describe the transaction
# We then create a WalletDelta object with their corresponding token and base deltas for
# each action
for log in logs:
if log["event"] == "AddLiquidity":
wallet_addr = log["args"]["provider"]
token_delta = _convert_scaled_value(log["args"]["lpAmount"])
base_delta = _convert_scaled_value(-log["args"]["baseAmount"])
wallet_deltas.extend(
[
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="LP",
tokenType="LP",
delta=token_delta,
),
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="BASE",
tokenType="BASE",
delta=base_delta,
),
]
)

elif log["event"] == "OpenLong":
wallet_addr = log["args"]["trader"]
token_delta = _convert_scaled_value(log["args"]["bondAmount"])
base_delta = _convert_scaled_value(-log["args"]["baseAmount"])
maturity_time = log["args"]["maturityTime"]
wallet_deltas.extend(
[
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="LONG",
tokenType="LONG-" + str(maturity_time),
delta=token_delta,
maturityTime=maturity_time,
),
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="BASE",
tokenType="BASE",
delta=base_delta,
),
]
)

elif log["event"] == "OpenShort":
wallet_addr = log["args"]["trader"]
token_delta = _convert_scaled_value(log["args"]["bondAmount"])
base_delta = _convert_scaled_value(-log["args"]["baseAmount"])
maturity_time = log["args"]["maturityTime"]
wallet_deltas.extend(
[
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="SHORT",
tokenType="SHORT-" + str(maturity_time),
delta=token_delta,
maturityTime=maturity_time,
),
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="BASE",
tokenType="BASE",
delta=base_delta,
),
]
)

elif log["event"] == "RemoveLiquidity":
wallet_addr = log["args"]["provider"]
# Two deltas, one for withdrawal shares, one for lp tokens
Comment thread
slundqui marked this conversation as resolved.
lp_delta = _convert_scaled_value(-log["args"]["lpAmount"])
withdrawal_delta = _convert_scaled_value(log["args"]["withdrawalShareAmount"])
base_delta = _convert_scaled_value(log["args"]["baseAmount"])
wallet_deltas.extend(
[
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="LP",
tokenType="LP",
delta=lp_delta,
),
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="WITHDRAWAL_SHARE",
tokenType="WITHDRAWAL_SHARE",
delta=withdrawal_delta,
),
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="BASE",
tokenType="BASE",
delta=base_delta,
),
]
)

elif log["event"] == "CloseLong":
wallet_addr = log["args"]["trader"]
token_delta = _convert_scaled_value(-log["args"]["bondAmount"])
base_delta = _convert_scaled_value(log["args"]["baseAmount"])
maturity_time = log["args"]["maturityTime"]
wallet_deltas.extend(
[
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="LONG",
tokenType="LONG-" + str(maturity_time),
delta=token_delta,
maturityTime=maturity_time,
),
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="BASE",
tokenType="BASE",
delta=base_delta,
),
]
)

elif log["event"] == "CloseShort":
wallet_addr = log["args"]["trader"]
token_delta = _convert_scaled_value(-log["args"]["bondAmount"])
base_delta = _convert_scaled_value(log["args"]["baseAmount"])
maturity_time = log["args"]["maturityTime"]
wallet_deltas.extend(
[
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="SHORT",
tokenType="SHORT-" + str(maturity_time),
delta=token_delta,
maturityTime=maturity_time,
),
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="BASE",
tokenType="BASE",
delta=base_delta,
),
]
)

elif log["event"] == "RedeemWithdrawalShares":
wallet_addr = log["args"]["provider"]
maturity_time = None
token_delta = _convert_scaled_value(-log["args"]["withdrawalShareAmount"])
base_delta = _convert_scaled_value(log["args"]["baseAmount"])
wallet_deltas.extend(
[
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="WITHDRAWAL_SHARE",
tokenType="WITHDRAWAL_SHARE",
delta=token_delta,
maturityTime=maturity_time,
),
db_schema.WalletDelta(
transactionHash=tx_hash,
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="BASE",
tokenType="BASE",
delta=base_delta,
),
]
)
# Every log should have either 0 (no op), 2(two deltas per transaction), or 3(in the case of remove liquidity)
# entries in the wallet delta
assert len(wallet_deltas) in (0, 2, 3)
return wallet_deltas


def _build_hyperdrive_transaction_object(
Expand Down Expand Up @@ -312,20 +541,20 @@ def get_wallet_info(
)

# Query and add withdraw tokens to wallet info
withdrawl_token_prefix = hyperdrive_interface.AssetIdPrefix.WITHDRAWAL_SHARE.value
# Withdrawl tokens always have 0 maturity
withdrawl_token_id = hyperdrive_interface.encode_asset_id(withdrawl_token_prefix, timestamp=0)
num_withdrawl_token = _query_contract_for_balance(
hyperdrive_contract, wallet_addr, block_number, withdrawl_token_id
withdrawal_token_prefix = hyperdrive_interface.AssetIdPrefix.WITHDRAWAL_SHARE.value
# Withdrawal tokens always have 0 maturity
withdrawal_token_id = hyperdrive_interface.encode_asset_id(withdrawal_token_prefix, timestamp=0)
num_withdrawal_token = _query_contract_for_balance(
hyperdrive_contract, wallet_addr, block_number, withdrawal_token_id
)
if num_withdrawl_token is not None:
if num_withdrawal_token is not None:
out_wallet_info.append(
db_schema.WalletInfo(
blockNumber=block_number,
walletAddress=wallet_addr,
baseTokenType="WITHDRAWL_SHARE",
tokenType="WITHDRAWL_SHARE",
tokenValue=num_withdrawl_token,
baseTokenType="WITHDRAWAL_SHARE",
tokenType="WITHDRAWAL_SHARE",
tokenValue=num_withdrawal_token,
maturityTime=None,
sharePrice=None,
)
Expand Down
24 changes: 23 additions & 1 deletion elfpy/data/db_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@ class WalletInfo(Base):
sharePrice: Mapped[Union[float, None]] = mapped_column(Numeric, default=None)


class WalletDelta(Base):
Comment thread
slundqui marked this conversation as resolved.
"""Table/dataclass schema for wallet information."""

__tablename__ = "walletdelta"

# Default table primary key
# Note that we use postgres in production and sqlite in testing, but sqlite has issues with
# autoincrement with BigIntegers. Hence, we use the Integer variant when using sqlite in tests
id: Mapped[int] = mapped_column(
BigInteger().with_variant(Integer, "sqlite"), primary_key=True, init=False, autoincrement=True
)
transactionHash: Mapped[str] = mapped_column(String, ForeignKey("transactions.transactionHash"), index=True)
blockNumber: Mapped[int] = mapped_column(BigInteger, ForeignKey("poolinfo.blockNumber"), index=True)
walletAddress: Mapped[Union[str, None]] = mapped_column(String, index=True, default=None)
# baseTokenType can be BASE, LONG, SHORT, LP, or WITHDRAWAL_SHARE
baseTokenType: Mapped[Union[str, None]] = mapped_column(String, index=True, default=None)
# tokenType is the baseTokenType appended with "-<maturity_time>" for LONG and SHORT
tokenType: Mapped[Union[str, None]] = mapped_column(String, default=None)
delta: Mapped[Union[float, None]] = mapped_column(Numeric, default=None)
maturityTime: Mapped[Union[float, None]] = mapped_column(Numeric, default=None)


class PoolConfig(Base):
"""Table/dataclass schema for pool config."""

Expand Down Expand Up @@ -119,12 +141,12 @@ class Transaction(Base):
id: Mapped[int] = mapped_column(
BigInteger().with_variant(Integer, "sqlite"), primary_key=True, init=False, autoincrement=True
)
transactionHash: Mapped[str] = mapped_column(String, index=True, unique=True)

#### Fields from base transactions ####
blockNumber: Mapped[int] = mapped_column(BigInteger, ForeignKey("poolinfo.blockNumber"), index=True)
transactionIndex: Mapped[Union[int, None]] = mapped_column(Integer, default=None)
nonce: Mapped[Union[int, None]] = mapped_column(Integer, default=None)
transactionHash: Mapped[Union[str, None]] = mapped_column(String, default=None)
# Transaction receipt to/from
# Almost always from wallet address to smart contract address
txn_to: Mapped[Union[str, None]] = mapped_column(String, default=None)
Expand Down
Loading