In [3]:
import numpy as np

class PriceModel():
    '''
    Note:This is a stateless class, gathering price evolution models in one place
    '''
    # TODO: write the concrete price model
    def price_model_1(current_price, current_action, tau, vol_matrix, perm_impact_matrix, random_vector):
        return current_price + tau**0.5 * vol_matrix.dot(random_vector) - perm_impact_matrix.dot(current_action) 
    def price_model_2(current_price):
        pass

In [None]:
import gym
from gym import spaces
import pandas as pd

class LiquidationEnv(gym.Env):
    metadata = {'render.modes': ['human']}
    
    def __init__(self, 
                 n_assets=3, 
                 initial_shares=100, 
                 initial_prices=100, 
                 max_steps=5,
                 price_model=PriceModel):
        super(LiquidationEnv, self).__init__()
        
        # Environment parameters
        self.n_assets = n_assets
        self.initial_shares = np.full(n_assets, initial_shares, dtype=np.float32)
        self.initial_prices = np.full(n_assets, initial_prices, dtype=np.float32)
        self.max_steps = max_steps
        self.price_generator = price_model.price_model_1
        
        # Define action and observation spaces
        self.action_space = spaces.Box(
            low=0,
            high= 1,
            shape=(n_assets,),
            dtype=np.float32
        )
        
        self.observation_space = spaces.Dict({
            "prices": spaces.Box(low = -np.inf, high=np.inf, shape=(n_assets,), dtype=np.float32),
            "remaining": spaces.Box(low = 0, high=initial_shares, shape=(n_assets,), dtype=np.float32),
            "acc_revenue": spaces.Box(low = 0, high=np.inf, shape=(1,), dtype=np.float32)
        })
        
        # Initialize state
        self.state = None
        self.current_step = 0
        self.reset()

    def _get_obs(self):
        return {
            "prices": self.state['prices'].copy(),
            "remaining": self.state['remaining'].copy(),
            "acc_revenue": np.array([self.state['acc_revenue']], dtype=np.float32)
        }

    def _next_price(self, current_price , current_action, tau, vol_matrix, perm_impact_matrix, random_vector):
        # TODO: not sure do we need action value when calculating the next price
        actual_action = self.state['remaining'] * current_action
        return self.price_generator(current_price, actual_action, tau, vol_matrix, perm_impact_matrix, random_vector)

    def reset(self):
        # Reset initial prices (customize with your price initialization)
        self.state = {
            'prices': self.initial_prices.copy(),
            'remaining': self.initial_shares.copy(),
            'acc_revenue': 0.0
        }
        self.current_step = 0
        return self._get_obs()
    
    def _get_reward(self, state, action, temp_price_matrix):
        '''
        TODO: The function to calculate the reward
        '''
        actual_action = action * state['remaining']
        reward = actual_action.dot(state['prices'] - temp_price_matrix.dot(actual_action))
        return reward

    def step(self, action, temp_price_matrix):
        # TODO: need a better way than clipping
        # action = np.clip(action, 0, self.state['remaining'])        # Clip actions to valid range
        actual_action = self.state['remaining'] * action
        # Update state
        self.state['remaining'] -= actual_action
        self.state['prices'] = self._next_price()
        step_revenue = np.sum(actual_action * (self.state['prices'] - temp_price_matrix.dot(actual_action))) # Calculate revenue from current prices
        self.state['acc_revenue'] += step_revenue # TODO: what's the third part of the state? what's the formulor to calculate it?
        
        # Update step counter
        self.current_step += 1
        
        # Check termination conditions
        done = (np.sum(self.state['remaining']) <= 0) or (self.current_step >= self.max_step)
        if done:
            reward = 0.
        else:
            reward = self._get_reward
        
        return self._get_obs(), reward, done, {}

    def render(self, mode='human'):
        print(f"Step: {self.current_step}")
        print(f"Prices: {self.state['prices']}")
        print(f"Remaining: {self.state['remaining']}")
        print(f"Accumulated Revenue: {self.state['acc_revenue']:.2f}\n")

    def close(self):
        pass

In [2]:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env

# Create environment
env = LiquidationEnv(n_assets=3, initial_shares=100)

# Verify environment compatibility
check_env(env)

# Create and train model
model = PPO("MultiInputPolicy", env, verbose=1)
model.learn(total_timesteps=5)

# Test trained model
obs = env.reset()
for _ in range(100):
    action, _states = model.predict(obs)
    obs, rewards, done, info = env.step(action)
    if done:
        break

  "We recommend you to use a symmetric and normalized Box action space (range=[-1, 1]) "


AttributeError: 'LiquidationEnv' object has no attribute 'max_step'