# Impermanent Loss Agent

The `ImpermanentLossAgent` monitors a when value of investment falls below a certain threshold using ``UniswapImpLoss`` object. It integrates DeFiPyâ€™s LPQuote for exits and Web3Scoutâ€™s event feeds for real-time updates, supporting off-chain testing and on-chain execution.

### ðŸ“˜ Notable Classes

---

* **Class**: ðŸ“˜ ``defipy.agents.config.ImpermanentLossConfig`` 
    * **Purpose**: Configuration for volume threshold agent.
        * **Parameters**:
            * `volume_threshold`: Volume threshold to check (`float`).
            * `pool_address`: UniV2 pool address (`str`).
            * `provider_url`: Provider URL (eg, infura.io)(`str`).
            * `platform`: Platform where pool resides (eg, uniswap) (`str`).
            * `abi_name`: ABI handle pointing to pool contract in JSON (eg, UniswapV2Pair) (`str`).

---
         
* **Class**: ðŸ“˜ ``defipy.agents.ImpermanentLossAgent`` 
    * **Purpose**: Reactive DeFi agent for determining price threshold.
        * **Parameters**:
            * `config`: Agent configuration parameters (`PriceThresholdConfig`).
    * **Methods**:
        * ``apply()``
            * **Purpose**: Apply price threshold agent
        * ``run_batch(lp: UniswapExchange, tkn: ERC20, user_nm: str, events: dict)``
            * **Purpose**: Run AI price agent on batch data 
            * **Parameters**:
                * `lp`: Swap token (`UniswapExchange`).  
                * `tkn`: Swap token (`ERC20`).
                * `user_nm`: Account name (`str`).
                * `events`: Dictionary of sync events (`dict`).
        * ``apply(lp: UniswapExchange, tkn: ERC20, user_nm: str, block_num: int)``
            * **Purpose**: Apply TVL check
            * **Parameters**:
                * `lp`: Exchange (`UniswapExchange`).  
                * `tkn`: Swap token (`ERC20`).
                * `user_nm`: Account name (`str`).
                * `block_num`: Block number (`int`).          
        * ``check_condition(tkn: ERC20, threshold: float)``
            * **Purpose**: Check if position is below threshold  
            * **Parameters**:
                * `tkn`: Swap token (`ERC20`).
                * `threshold`: override config price threshold (optional) (`float`).
        * ``get_current_position_value(tkn: ERC20)``
            * **Purpose**: Check if TVL is below threshold  
            * **Parameters**:
                * `tkn`: Swap token (`ERC20`).
        * ``take_mock_position(lp: UniswapExchange, tkn: ERC20, user_nm: str, amt: float)``
            * **Purpose**: Check if TVL is below threshold  
            * **Parameters**:
                * `lp`: Exchange (`UniswapExchange`).  
                * `tkn`: Swap token (`ERC20`).
                * `user_nm`: Account name (`str`).
                * `amt`: Mock token amount (`float`).
        * ``withdraw_mock_position(lp: UniswapExchange, tkn: ERC20, user_nm: str, lp_amt: float)``
            * **Purpose**: Check if TVL is below threshold  
            * **Parameters**:
                * `lp`: Exchange (`UniswapExchange`).  
                * `tkn`: Swap token (`ERC20`).
                * `user_nm`: Account name (`str`).
                * `lp_amt`: Mock LP amount (`float`).
        * ``update_mock_pool(lp: UniswapExchange, cur_block: int)``
            * **Purpose**: Check if TVL is below threshold  
            * **Parameters**:
                * `lp`: Exchange (`UniswapExchange`).  
                * `cur_block`: Current block number (`int`).
        * ``prime_mock_pool(start_block: int, user_nm: str)``
            * **Purpose**: Initialize off-chain pool using information in start_block
            * **Parameters**:
                * `start_block`: Start block (`int`).
                * `user_nm`: Account name of mock off-chain pool (`str`).
        * ``get_impermanent_loss()``
            * **Purpose**: Getter function for impermanent loss of position measured in (%)
        * ``def get_current_position_value()``
            * **Purpose**: Getter function for current position value of investment
        * ``get_w3()``
            * **Purpose**: Getter function for web3.py connector object
        * ``get_abi()``
            * **Purpose**: Getter function for ABI json data
        * ``get_contract_instance()``
            * **Purpose**: Getter function for pool contract instance
        * ``get_lp_data()``
            * **Purpose**: Getter function for pool data associated with `pool_address` from config
        * ``def get_iloss()``
            * **Purpose**: Getter function for ImpLoss object.


In [1]:
from defipy import *
from web3scout import *

In [2]:
# class ImpermanentLossAgent:
#     def __init__(self, config: ImpermanentLossConfig, verbose: bool = False):
#         self.config = config
#         self.abi = ABILoad(self.config.platform, self.config.abi_name)  # Load ABI here  
#         self.connector = ConnectW3(self.config.provider_url)  # Web3Scout setup
#         self.connector.apply()
#         self.verbose = verbose
#         self.user_position = config.user_position
#         self.exit_percentage = config.exit_percentage
#         self.iLoss = None
#         self.lp_contract = None
#         self.lp_data = None
#         self.lp_state = None

#     def init(self):
#         self.lp_contract = self._init_lp_contract()
        
#         reserves = self.lp_contract.functions.getReserves().call()
#         token0_address = self.lp_contract.functions.token0().call()
#         token1_address = self.lp_contract.functions.token1().call()
#         reserve0 = reserves[0]; reserve1 = reserves[1]

#         w3 = self.connector.get_w3()
#         FetchERC20 = FetchToken(w3)
#         TKN0 = FetchERC20.apply(token0_address)
#         TKN1 = FetchERC20.apply(token1_address)

#         self.lp_data = UniswapPoolData(TKN0, TKN1, reserves)

#     def get_connector(self):
#         return self.connector

#     def get_abi(self):
#         return self.abi

#     def get_w3(self):
#         return self.connector.get_w3() 

#     def get_contract_instance(self):
#         return self.lp_contract

#     def get_lp_data(self):
#         return self.lp_data

#     def get_iloss(self):
#         return self.iLoss

#     def prime_mock_pool(self, start_block, user_nm = None):
#         w3 = self.get_w3() 
#         fetch_tkn = FetchToken(w3)
        
#         lp_contract = self._init_lp_contract()
#         tkn0_addr = lp_contract.functions.token0().call()
#         tkn1_addr = lp_contract.functions.token1().call()
#         total_supply = lp_contract.functions.totalSupply().call(block_identifier=start_block)
#         reserves = lp_contract.functions.getReserves().call(block_identifier=start_block)
        
#         # Step 2: Define tokens
#         tkn0 = fetch_tkn.apply(tkn0_addr)
#         tkn1 = fetch_tkn.apply(tkn1_addr)
        
#         amt0 = fetch_tkn.amt_to_decimal(tkn0, reserves[0])
#         amt1 = fetch_tkn.amt_to_decimal(tkn1, reserves[1])
        
#         # Step 3:  Initialize factory
#         factory = UniswapFactory("Pool factory", "0x2")
        
#         # Step 4: Set up exchange data for V2
#         exch_data = UniswapExchangeData(tkn0=tkn0, tkn1=tkn1, symbol="LP", address=self.config.pool_address)
        
#         # Step 5: Deploy pool
#         self.lp_state = factory.deploy(exch_data)
        
#         # Step 6: Add initial liquidity
#         join = Join()
#         join.apply(self.lp_state, user_nm, amt0, amt1)
#         self.lp_state.total_supply = total_supply # override total supply
    
#         return self.lp_state

#     def update_mock_pool(self, lp, cur_block):
#         w3 = self.get_w3() 
#         fetch_tkn = FetchToken(w3)
        
#         lp_contract = self._init_lp_contract()
#         tkn0_addr = lp_contract.functions.token0().call()
#         tkn1_addr = lp_contract.functions.token1().call()
#         total_supply = lp_contract.functions.totalSupply().call(block_identifier=int(cur_block))
#         reserves = lp_contract.functions.getReserves().call(block_identifier=int(cur_block))
        
#         tkn0 = self.get_lp_data().tkn0
#         tkn1 = self.get_lp_data().tkn1
#         amt0 = fetch_tkn.amt_to_decimal(tkn0, reserves[0])
#         amt1 = fetch_tkn.amt_to_decimal(tkn1, reserves[1])
        
#         prev_total_supply = lp.total_supply
#         lp.reserve0 = lp.convert_to_machine(amt0)      # override reserve0
#         lp.reserve1 = lp.convert_to_machine(amt1)      # override reserve1
#         lp.total_supply = total_supply                 # override total supply
#         lp.last_liquidity_deposit = abs(prev_total_supply - lp.total_supply)

#     def run_batch(self, lp, tkn, user_nm, events: dict):
#         """Process batched Sync events to check TVL and trigger exits."""
#         if not events:
#             print("No Sync events found in range.")
#             return
#         for k in events:
#             block_num = events[k]['blockNumber']
#             self.apply(lp, tkn, user_nm, block_num)

#     def apply(self, lp, tkn, user_nm, block_num):
#         """Execute liquidity exit if condition met."""
#         self.update_mock_pool(lp, block_num)
#         if self.check_condition(tkn, self.config.il_threshold):
#             val = self.get_current_position_value(tkn)
#             print(f"Block {block_num}: Value ({tkn.token_name}) = {val}, outside loss threshold {self.config.il_threshold}")
#             return val
#         else:
#             print(f"Block {block_num}: Value threshold condition met for {lp.name} LP")
#             return None
    
#     def take_mock_position(self, lp, tkn, user_nm, amt):
#         SwapDeposit().apply(lp, tkn, user_nm, amt)
#         self.mock_lp_pos_amt = lp.get_last_liquidity_deposit()
#         self.iLoss = UniswapImpLoss(lp, self.mock_lp_pos_amt)
#         return self.mock_lp_pos_amt

#     def get_impermanent_loss(self) -> float:
#         """Calculate impermanent loss percentage based on initial and current reserves."""
#         returns_calc = self.iLoss.apply(fees = True)
#         return returns_calc

#     def get_current_position_value(self, tkn) -> float:
#         current_position_value = self.iLoss.current_position_value(tkn)
#         return current_position_value

#     def check_condition(self, tkn, threshold):
#         """Check if TVL is below threshold."""
#         position_value = self.get_current_position_value(tkn)
#         return position_value < threshold

#     def withdraw_mock_position(self, lp, tkn, user_nm, lp_amt = None):
#         assert self.mock_lp_pos_amt != None, 'TVLBasedLiquidityExitAgent: MOCK_POSITION_UNAVAILABLE' 
#         lp_amt = self.mock_lp_pos_amt if lp_amt == None else lp_amt
#         tkn_amt = LPQuote(False).get_amount_from_lp(lp, tkn0, lp_amt)
#         amount_out = WithdrawSwap().apply(lp, tkn0, user_nm, tkn_amt)
#         return amount_out
        
#     def _init_lp_contract(self): 
#         pair_address = self.config.pool_address
#         w3 = self.get_w3()       
#         abi_obj = self.get_abi()
#         lp_contract = abi_obj.apply(w3, pair_address)   
#         return lp_contract

In [3]:
il_threshold = 99.70
pair_address = "0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc"
provider_url = "https://mainnet.infura.io/v3/9624e3e5c40f4ac3958b79fa5aa2562d"
platform = Platform.AGNOSTIC
abi_name = JSONContract.UniswapV2Pair
user_position = 100
exit_percentage = 5

config = ImpermanentLossConfig(
    il_threshold = il_threshold,
    pool_address = pair_address,
    provider_url = provider_url,
    platform = platform,
    abi_name = abi_name,
    user_position = user_position,
    exit_percentage = exit_percentage
)

agent = ImpermanentLossAgent(config)
agent.init()

print(f"Monitoring TVL changes @ pool address {pair_address}")

Monitoring TVL changes @ pool address 0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc


In [4]:
abi = ABILoad(platform, abi_name)
connect = ConnectW3(provider_url)
connect.apply()

last_block = connect.get_w3().eth.block_number
start_block = last_block - 100

# Grab batch sync events from pool
rEvents = RetrieveEvents(connect, abi)
events = rEvents.apply(EventType.SWAP, address = pair_address, start_block=start_block, end_block=last_block)
df_events = rEvents.to_dataframe(events)
df_events.head(2)

Unnamed: 0,blockNumber,event,address,blockHash,logIndex,transactionHash,transactionIndex,args
0,23199876,Swap,0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc,0x2f2f4a177492cde6dae2b7c2feb877fd4cbd3759c8ff...,43,0x4ba3725976e3693a965d1fe57f18599439ce5c8f29d0...,4,{'sender': '0x66a9893cC07D91D95644AEDD05D03f95...
1,23199878,Swap,0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc,0xbfd8a157df7b14f44b75ff2d3653d3a97db2b5f27a67...,430,0x99563999c0f311122d4cf6a66499a6fa9f085af7c0d6...,213,{'sender': '0x6E4141d33021b52C91c28608403db4A0...


### Agent Run #1

In [5]:
tkn0 = agent.get_lp_data().tkn0
tkn1 = agent.get_lp_data().tkn1
lp = agent.prime_mock_pool(start_block, 'user')
lp.summary()

Exchange USDC-WETH (LP)
Reserves: USDC = 15767706.893411, WETH = 3269.6180909995715
Liquidity: 0.08445443237134992 



In [6]:
# Take a position
lp_pos = agent.take_mock_position(lp, tkn0, 'user', 100)

In [7]:
# Run impermanent loss analysis
agent.run_batch(lp, tkn0, 'user', events)

Block 23199876: Value (USDC) = 99.69951623814174, outside loss threshold 99.7
Block 23199878: Value (USDC) = 99.69940818131374, outside loss threshold 99.7
Block 23199881: Value (USDC) = 99.69800078897019, outside loss threshold 99.7
Block 23199884: Value (USDC) = 99.69800119718501, outside loss threshold 99.7
Block 23199893: Value (USDC) = 99.68374713212131, outside loss threshold 99.7
Block 23199894: Value (USDC) = 99.6900701621897, outside loss threshold 99.7
Block 23199896: Value (USDC) = 99.68861200254122, outside loss threshold 99.7
Block 23199896: Value (USDC) = 99.68861200254122, outside loss threshold 99.7
Block 23199896: Value (USDC) = 99.68861200254122, outside loss threshold 99.7
Block 23199900: Value threshold condition met for USDC-WETH LP
Block 23199915: Value threshold condition met for USDC-WETH LP
Block 23199917: Value threshold condition met for USDC-WETH LP
Block 23199918: Value threshold condition met for USDC-WETH LP
Block 23199926: Value threshold condition met f