In [1]:
!pip install stable-baselines3

Collecting stable-baselines3
  Downloading stable_baselines3-2.4.0-py3-none-any.whl.metadata (4.5 kB)
Collecting gymnasium<1.1.0,>=0.29.1 (from stable-baselines3)
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium<1.1.0,>=0.29.1->stable-baselines3)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading stable_baselines3-2.4.0-py3-none-any.whl (183 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m183.9/183.9 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium, stable-baselines3
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0 stable-baselines3-2.4

In [3]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
from typing import Tuple, Dict, Any
import yfinance as yf
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import make_vec_env
from sklearn.preprocessing import MinMaxScaler

In [19]:
class StockMarketEnv(gym.Env):
    metadata = {"render_modes": ["human"]}

    def __init__(self, stock_data: pd.DataFrame, initial_balance: float = 100000):
        super().__init__()

        if not isinstance(stock_data, pd.DataFrame) or "Close" not in stock_data.columns:
            raise ValueError("Stock data must be a DataFrame with a 'Close' column")

        # Ensure we don't have zero or negative prices
        if (stock_data["Close"] <= 0).any().any():
            raise ValueError("Stock data contains zero or negative prices")

        self.stock_data = stock_data.reset_index(drop=True)
        self.n_steps = len(stock_data)
        self.initial_balance = float(initial_balance)

        #This is to define action space
        self.action_space = spaces.Discrete(3)  # 2 actiosn -hold is 0, buy is 1, sell is 2

        # This is obs space
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, 0], dtype=np.float32),
            high=np.array([np.finfo(np.float32).max] * 4, dtype=np.float32),
            dtype=np.float32
        )

        self.reset()

    #This is to reset the env
    def reset(self, seed=None, options=None) -> Tuple[np.ndarray, Dict]:
        super().reset(seed=seed)
        self.current_balance = self.initial_balance
        self.current_step = 0
        self.total_shares = 0
        self.portfolio_value = self.initial_balance
        self.previous_portfolio_value = self.initial_balance
        return self._get_obs(), {}


    def _get_obs(self) -> np.ndarray:
        current_price = float(self.stock_data.iloc[self.current_step]["Close"])
        return np.array([
            current_price,
            self.current_balance,
            self.total_shares,
            self.portfolio_value
        ], dtype=np.float32)

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]:
        if self.current_step >= self.n_steps - 1:
            raise RuntimeError("Episode has ended, please reset the environment")

        current_price = float(self.stock_data.iloc[self.current_step]["Close"])
        self.previous_portfolio_value = self.portfolio_value

        if action == 1: 
            max_shares = int(self.current_balance // current_price)
            if max_shares > 0:
                self.total_shares += max_shares
                self.current_balance -= max_shares * current_price
        elif action == 2: 
            if self.total_shares > 0:
                self.current_balance += self.total_shares * current_price
                self.total_shares = 0

        # To update portfolio value, to update steps as they are taken
        self.portfolio_value = float(self.current_balance + (self.total_shares * current_price))
        self.current_step += 1

        # reward as a percentage
        reward = ((self.portfolio_value - self.previous_portfolio_value) / self.previous_portfolio_value) * 100

        done = self.current_step >= self.n_steps - 1

        return self._get_obs(), reward, done, False, {}

    def render(self, mode="human"):
        """Renders the current state of the environment."""
        current_price = float(self.stock_data.iloc[self.current_step-1]["Close"])
        print("\nCurrent State:")
        print(f"Step: {self.current_step}")
        print(f"Stock Price: ${current_price:,.2f}")
        print(f"Cash Balance: ${self.current_balance:,.2f}")
        print(f"Shares Held: {self.total_shares:,}")
        print(f"Portfolio Value: ${self.portfolio_value:,.2f}")
        print(f"Total Return: {((self.portfolio_value - self.initial_balance) / self.initial_balance * 100):,.2f}%")

In [20]:
def prepare_data():
    data = yf.download("HDFCBANK.NS", start='2021-01-01', end='2024-01-01')

    scaled_data = data.copy()
    scaler = MinMaxScaler(feature_range=(0.1, 1.0))

    scaled_columns = {}
    for column in scaled_data.columns:
        values = scaled_data[column].values.reshape(-1, 1)
        scaled_values = scaler.fit_transform(values)
        scaled_data[column] = scaled_values.flatten()

    if not (scaled_data['Close'] > 0).all().item():
        raise ValueError("Scaling resulted in invalid prices")

    train_size = int(0.7 * len(scaled_data))
    val_size = int(0.2 * len(scaled_data))

    train_data = scaled_data[:train_size]
    val_data = scaled_data[train_size:train_size + val_size]
    test_data = scaled_data[train_size + val_size:]

    return train_data, val_data, test_data, scaler

def train_model(env):
    # this is used to create a vectorized env
    vec_env = make_vec_env(lambda: env, n_envs=1)

    # Making use of DQN here with multi layer perceptron policy
    model = DQN(
        "MlpPolicy",
        vec_env,
        verbose=1,
        learning_rate=0.001,
        buffer_size=5000,
        learning_starts=1000,
        exploration_fraction=0.1,
        exploration_final_eps=0.02
    )

    model.learn(total_timesteps=10000)
    return model

In [21]:
def evaluate_model(model, env, episodes=1):
    """Evaluate the trained model."""
    for episode in range(episodes):
        obs, _ = env.reset()
        done = False
        episode_reward = 0

        print(f"\nStarting Episode {episode + 1}")
        print(f"Initial Portfolio Value: ${env.portfolio_value:,.2f}")

        while not done:
            action, _ = model.predict(obs)
            obs, reward, done, _, _ = env.step(action[0])
            episode_reward += reward

            if env.current_step % 20 == 0:
                print(f"\nStep: {env.current_step}")
                print(f"Portfolio Value: ${env.portfolio_value:,.2f}")
                print(f"Action taken: {'Hold' if action == 0 else 'Buy' if action == 1 else 'Sell'}")

        final_value = env.portfolio_value
        total_return = ((final_value - env.initial_balance) / env.initial_balance) * 100
        print(f"\nEpisode {episode + 1} Summary:")
        print(f"Final Portfolio Value: ${final_value:,.2f}")
        print(f"Total Return: {total_return:,.2f}%")
        print(f"Total Reward: {episode_reward:,.2f}")

In [26]:
def main():
    train_data, val_data, test_data, scaler = prepare_data()

    env = StockMarketEnv(stock_data=train_data)

    vec_env = make_vec_env(lambda: env, n_envs=1)

    model = DQN(
            "MlpPolicy",
            vec_env,
            verbose=1,
            learning_rate=0.001,
            buffer_size=5000,
            learning_starts=1000
        )

    print("\nStarting model training...")
    model.learn(total_timesteps=10000)

    print("\nTesting trained model...")
    test_env = StockMarketEnv(stock_data=test_data)
    obs, _ = test_env.reset()
    done = False

    while not done:
          action, _ = model.predict(obs)
          obs, reward, done, _, _ = test_env.step(action)
          if test_env.current_step % 20 == 0:
                test_env.render()


if __name__ == "__main__":
    main()

[*********************100%***********************]  1 of 1 completed
  current_price = float(self.stock_data.iloc[self.current_step]["Close"])
  current_price = float(self.stock_data.iloc[self.current_step]["Close"])


Using cpu device

Starting model training...
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 517      |
|    ep_rew_mean      | 287      |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 4        |
|    fps              | 728      |
|    time_elapsed     | 2        |
|    total_timesteps  | 2068     |
| train/              |          |
|    learning_rate    | 0.001    |
|    loss             | 1.15e+03 |
|    n_updates        | 266      |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 517      |
|    ep_rew_mean      | 331      |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 572      |
|    time_elapsed     | 7        |
|    total_timesteps  | 4136     |
| train/              |          |
|    learning_rate    | 0.001    |
|    loss 

  current_price = float(self.stock_data.iloc[self.current_step]["Close"])
  current_price = float(self.stock_data.iloc[self.current_step]["Close"])
  current_price = float(self.stock_data.iloc[self.current_step-1]["Close"])
