<a href="https://colab.research.google.com/github/alllis/AmazonReviews2023/blob/main/PositionManagement.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install gym
!pip install numpy
!pip install 'shimmy>=2.0'
!pip install stable-baselines3

Collecting shimmy>=2.0
  Downloading Shimmy-2.0.0-py3-none-any.whl.metadata (3.5 kB)
Downloading Shimmy-2.0.0-py3-none-any.whl (30 kB)
Installing collected packages: shimmy
Successfully installed shimmy-2.0.0
Collecting stable-baselines3
  Downloading stable_baselines3-2.5.0-py3-none-any.whl.metadata (4.8 kB)
Collecting gymnasium<1.1.0,>=0.29.1 (from stable-baselines3)
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch<3.0,>=2.3->stable-baselines3)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch<3.0,>=2.3->stable-baselines3)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch<3.0,>=2.3->stable-baselines3)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collect

In [None]:
import gym
import numpy as np
from gym import spaces

In [None]:
import numpy as np
import gym
from gym import spaces

class MechanicalTradingEnv(gym.Env):
    def __init__(self):
        super(MechanicalTradingEnv, self).__init__()

        # Fixed buy and sell levels
        self.buy_levels = np.array([90, 80])  # Buy at these price levels
        self.sell_levels = np.array([100, 105])  # Sell at these price levels

        # Allocations (initial values, will be optimized by RL later)
        self.buy_allocations = np.array([0.4, 0.6])  # Buy allocations: 40% at 90, 60% at 80
        self.sell_allocations = np.array([0.4, 0.6])  # Sell allocations: 40% at 100, 60% at 105

        # Position tracking
        self.current_position = 0.0  # Total % of capital invested
        self.last_buy_level = None  # Last buy level
        self.last_sell_level = None  # Last sell level

        # Observation space: [current_price, current_position]
        self.observation_space = spaces.Box(
            low=np.array([0, 0]),
            high=np.array([np.inf, 1.0]),
            dtype=np.float32
        )

        # Action space (for RL optimization in the future)
        self.action_space = spaces.Box(
            low=0.0,
            high=1.0,
            shape=(len(self.buy_levels) + len(self.sell_levels),),
            dtype=np.float32
        )

        # Initialize environment variables
        self.current_price = 100.0  # Start at neutral price
        self.remaining_capital = 1.0  # Total capital (normalized to 1)
        self.total_profit = 0.0  # Track realized profit
        self.time_step = 0
        self.max_steps = 100
        self.trades = []  # Record trades

    def reset(self):
        self.current_price = 100.0
        self.remaining_capital = 1.0
        self.current_position = 0.0
        self.total_profit = 0.0
        self.last_buy_level = None
        self.last_sell_level = None
        self.time_step = 0
        self.trades = []
        state = np.array([self.current_price, self.current_position], dtype=np.float32)
        return state

    def step(self, action):
        # In future, RL will optimize these allocations
        self.buy_allocations = action[:len(self.buy_levels)]
        self.sell_allocations = action[len(self.buy_levels):]

        reward = 0.0  # Track realized profit for this step

        # **Handle Buy Actions**
        for i, level in enumerate(self.buy_levels):
            if self.current_price < level:
                allocated_amount = self.buy_allocations[i]  # How much to buy at this level

                if i == 0:  # Buy level 90
                    if self.current_position < self.buy_allocations[0]:  # Only buy if we haven't hit 40%
                        self.current_position += allocated_amount
                        self.remaining_capital -= allocated_amount
                        self.last_buy_level = level
                        self.trades.append(f"Buy {allocated_amount:.2f} at {self.current_price:.2f} (Level: {level})")

                elif i == 1:  # Buy level 80
                    if self.current_position < sum(self.buy_allocations):  # Only buy if position < 100%
                        additional_allocation = sum(self.buy_allocations) - self.current_position
                        allocation = min(allocated_amount, additional_allocation)
                        self.current_position += allocation
                        self.remaining_capital -= allocation
                        self.last_buy_level = level
                        self.trades.append(f"Buy {allocation:.2f} at {self.current_price:.2f} (Level: {level})")

        # **Handle Sell Actions**
        for i, level in enumerate(self.sell_levels):
            if self.current_price >= level and self.current_position > 0:
                allocated_amount = self.sell_allocations[i] * self.current_position  # Sell % of current position

                if i == 0:  # Sell level 100
                    if self.last_sell_level != level:  # Ensure no duplicate sell at the same level
                        self.current_position -= allocated_amount
                        self.remaining_capital += allocated_amount * self.current_price
                        self.total_profit += allocated_amount * (self.current_price - level)  # Profit calc
                        self.last_sell_level = level
                        self.trades.append(f"Sell {allocated_amount:.2f} at {self.current_price:.2f} (Level: {level})")

                elif i == 1:  # Sell level 105
                    self.current_position -= allocated_amount
                    self.remaining_capital += allocated_amount * self.current_price
                    self.total_profit += allocated_amount * (self.current_price - level)
                    self.trades.append(f"Sell {allocated_amount:.2f} at {self.current_price:.2f} (Level: {level})")

        # **Simulate Price Movement**
        drift = 0.1
        volatility = 1.5
        self.current_price += np.random.normal(drift, volatility)

        # Increment time step
        self.time_step += 1
        done = self.time_step >= self.max_steps

        # Create new state
        state = np.array([self.current_price, self.current_position], dtype=np.float32)

        return state, self.total_profit, done, {}

# Initialize the environment
env = MechanicalTradingEnv()


In [None]:
from stable_baselines3 import PPO

model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=10000)

obs = env.reset()
for _ in range(100):
    action, _ = model.predict(obs)
    obs, reward, done, _ = env.step(action)
    print(f"Action: {action}, Reward: {reward}, Next State: {obs}")
    if done:
        break

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 100      |
|    ep_rew_mean     | 0        |
| time/              |          |
|    fps             | 1112     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 100          |
|    ep_rew_mean          | 0.315        |
| time/                   |              |
|    fps                  | 819          |
|    iterations           | 2            |
|    time_elapsed         | 5            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0041102143 |
|    clip_fraction        | 0.0213       |
|    clip_range           | 0.2          |
|    en