<a href="https://colab.research.google.com/github/Navneeth08k/dayTradingModel/blob/main/DayTrading_Full_RL_approach_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


**Project by Navneeth Krishna**

**Reinforcement Learning model to learn how to day trade stocks**


Treat a day of stock trading as a game/puzzle where you have to figure out when
a stock will rise and fall. Reward for return, penalty for failures.


My approach here was fully Reinforcement Learning, where I train the model to only predict apple stock.

Pros: Will be really good (hopefully) at predicting apple stock changes, will be able to find a pattern in this stock

Cons: Will not be generalizable to other stocks (probably), may not find the real patterns that analyzers use. Daytraders often see patterns in a variety of stocks which is what enables them to make their predictions of whether or not a stock will go up or down.

In [None]:
#install dependencies
!pip install gymnasium stable-baselines3 numpy pandas matplotlib

In [None]:
#import libraries
import gymnasium as gym
from gymnasium import spaces

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from stable_baselines3 import DQN
from stable_baselines3.common.env_checker import check_env


In [None]:
!pip install ta


In [None]:
# Install and import necessary libraries
!pip install yfinance ta
import yfinance as yf
import pandas as pd
import numpy as np
import ta

# 1. Download Apple intraday data
df = yf.download('AAPL', start='2024-10-01', end='2024-11-10', interval='5m')

# 2. Check the columns before flattening
print("Columns before flattening:", df.columns)

# 3. Properly flatten the MultiIndex columns by joining the levels
df.columns = ['_'.join(col).strip() if isinstance(col, tuple) else col for col in df.columns]

# 4. Rename columns for clarity
df.rename(columns={
    'Adj Close_AAPL': 'Adj_Close',
    'Close_AAPL': 'Close',
    'High_AAPL': 'High',
    'Low_AAPL': 'Low',
    'Open_AAPL': 'Open',
    'Volume_AAPL': 'Volume'
}, inplace=True)

# 5. Verify the column names
print("Columns after renaming:", df.columns)

# 6. Check for missing values
print("Missing values before cleaning:", df.isna().sum())

# 7. Fill missing values using forward-fill and back-fill, then drop remaining NaNs
df.ffill(inplace=True)
df.bfill(inplace=True)
df.dropna(inplace=True)

# 8. Verify the DataFrame is not empty and check its length
print("DataFrame length after cleaning:", len(df))
if len(df) == 0:
    raise ValueError("DataFrame is empty after preprocessing. Please check the input data.")

# 9. Prepare the data for indicator calculations
high_prices = df['High'].astype(float)
low_prices = df['Low'].astype(float)
close_prices = df['Close'].astype(float)

# 10. Calculate technical indicators

# Short-Term Moving Averages
df['SMA_5'] = df['Close'].rolling(window=5).mean()
df['EMA_10'] = df['Close'].ewm(span=10, adjust=False).mean()

# Volume-Weighted Average Price (VWAP)
df['VWAP'] = (df['Volume'] * (df['High'] + df['Low'] + df['Close']) / 3).cumsum() / df['Volume'].cumsum()

# Short-Term RSI (7-period)
rsi_short = ta.momentum.RSIIndicator(close=close_prices, window=7)
df['RSI_7'] = rsi_short.rsi()

# Intraday Momentum Index (IMI)
df['Up'] = np.where(df['Close'] > df['Open'], df['Close'] - df['Open'], 0)
df['Down'] = np.where(df['Close'] < df['Open'], df['Open'] - df['Close'], 0)
df['IMI'] = 100 * (df['Up'].rolling(window=14).sum() / (df['Up'].rolling(window=14).sum() + df['Down'].rolling(window=14).sum()))

# Average True Range (ATR)
atr_indicator = ta.volatility.AverageTrueRange(high=high_prices, low=low_prices, close=close_prices, window=14)
df['ATR'] = atr_indicator.average_true_range()

# Stochastic Oscillator
stochastic = ta.momentum.StochasticOscillator(high=high_prices, low=low_prices, close=close_prices, window=14, smooth_window=3)
df['Stoch_K'] = stochastic.stoch()
df['Stoch_D'] = stochastic.stoch_signal()

# 11. Check for NaNs and fill them again
print("Missing values before final cleaning:", df.isna().sum())
df.ffill(inplace=True)
df.bfill(inplace=True)
df.dropna(inplace=True)

# 12. Verify the DataFrame is not empty and display the first few rows
print("DataFrame length after final cleaning:", len(df))
print(df.head())


In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd

class StockTradingEnv(gym.Env):
    def __init__(self, df, max_episode_length=500, cooldown_period=5):
        super(StockTradingEnv, self).__init__()
        self.df = df
        self.trading_days = self.df.index.normalize().unique()
        self.current_day = None
        self.current_step = 0
        self.initial_balance = 10000
        self.balance = self.initial_balance
        self.shares_held = 0
        self.total_profit = 0
        self.cooldown_period = cooldown_period
        self.cooldown_counter = 0
        self.can_buy = True  # Enforce a buy-sell cycle

        # Define action and observation space
        self.action_space = spaces.Discrete(3)  # Actions: 0 = Hold, 1 = Buy, 2 = Sell
        self.observation_space = spaces.Box(low=0, high=1, shape=(14,), dtype=np.float32)

    def reset(self, seed=None, options=None):
      super().reset(seed=seed)

      # Randomly select a trading day for the episode
      self.current_day = np.random.choice(self.trading_days)
      self.day_data = self.df[self.df.index.normalize() == self.current_day]
      self.current_step = 0
      self.balance = self.initial_balance
      self.shares_held = 0
      self.total_profit = 0
      self.last_buy_price = 0  # Initialize last_buy_price to 0
      self.can_buy = True  # Start with the ability to buy

      observation = self._next_observation()
      return observation, {}


    def _next_observation(self):
        frame = np.array([
            float(self.day_data.iloc[self.current_step]['Open']) / 1000,
            float(self.day_data.iloc[self.current_step]['High']) / 1000,
            float(self.day_data.iloc[self.current_step]['Low']) / 1000,
            float(self.day_data.iloc[self.current_step]['Close']) / 1000,
            self.balance / 10000,
            self.shares_held / 10,
            float(self.day_data.iloc[self.current_step]['SMA_5']) / 1000,
            float(self.day_data.iloc[self.current_step]['EMA_10']) / 1000,
            float(self.day_data.iloc[self.current_step]['VWAP']) / 1000,
            float(self.day_data.iloc[self.current_step]['RSI_7']) / 100,
            float(self.day_data.iloc[self.current_step]['IMI']) / 100,
            float(self.day_data.iloc[self.current_step]['ATR']) / 1000,
            float(self.day_data.iloc[self.current_step]['Stoch_K']) / 100,
            float(self.day_data.iloc[self.current_step]['Stoch_D']) / 100,
        ], dtype=np.float32)
        return frame

    def step(self, action):
      current_price = float(self.day_data.iloc[self.current_step]['Close'])
      reward = 0.0
      transaction_cost = 0.001  # 0.1% transaction cost
      min_profit_threshold = current_price * 0.01  # 1% minimum profit threshold

      # Cooldown mechanism to prevent overtrading
      if self.cooldown_counter > 0:
          action = 0  # Force hold action during cooldown
          self.cooldown_counter -= 1

      # Execute action: 0 = Hold, 1 = Buy, 2 = Sell
      if self.can_buy:
          if action == 1 and self.balance >= current_price:  # Buy action
              self.shares_held += 1
              self.balance -= current_price * (1 + transaction_cost)
              self.last_buy_price = current_price
              self.can_buy = False  # Now the model must sell before it can buy again
              self.cooldown_counter = self.cooldown_period

      if not self.can_buy:  # Only allow sell after a buy
          if action == 2 and self.shares_held > 0:  # Sell action
              self.shares_held -= 1
              self.balance += current_price * (1 - transaction_cost)

              # Calculate trade profit
              trade_profit = current_price - self.last_buy_price - (2 * transaction_cost * current_price)

              # Reward only if the trade was profitable above the threshold
              if trade_profit > min_profit_threshold:
                  reward = trade_profit * 1.1
              else:
                  reward = -1.0  # Increased penalty for unprofitable trades

              self.can_buy = True  # Now the model can buy again
              self.cooldown_counter = self.cooldown_period

      # Reward for holding during an uptrend
      if action == 0 and not self.can_buy and current_price > self.last_buy_price:
          reward += 0.05

      # Update current step
      self.current_step += 1

      # Check if the episode is done
      done = self.current_step >= len(self.day_data) - 1
      truncated = False

      # **Do Not Automatically Sell at End of Episode**
      # Comment out any forced sell at the end of the episode.

      # Calculate total value (balance + value of held shares)
      total_value = self.balance + self.shares_held * current_price
      realized_profit = self.balance - self.initial_balance  # Only from balance
      unrealized_profit = total_value - self.initial_balance

      # Add a small reward for increasing realized profit
      reward += realized_profit * 0.002

      return self._next_observation(), reward, done, truncated, {
          "realized_profit": realized_profit,
          "unrealized_profit": unrealized_profit,
          "shares_held": self.shares_held,
      }


    def render(self):
        total_value = self.balance + self.shares_held * float(self.day_data.iloc[self.current_step]['Close'])
        print(f'Step: {self.current_step}')
        print(f'Balance: {self.balance:.2f}')
        print(f'Shares Held: {self.shares_held}')
        print(f'Total Value: {total_value:.2f}')
        print(f'Total Profit: {total_value - self.initial_balance:.2f}')


In [None]:
# Import necessary libraries
import gym

# Initialize the custom StockTradingEnv environment
env = StockTradingEnv(df)

# Verify the environment using Stable-Baselines3's `check_env`
from stable_baselines3.common.env_checker import check_env

# Check if the environment is properly set up
check_env(env)

# Reset the environment to get the initial observation
observation, info = env.reset()
print("Initial Observation:", observation)


In [None]:
from stable_baselines3 import DQN
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import BaseCallback

# Wrap the environment with Monitor for logging
env = Monitor(StockTradingEnv(df))

# Initialize the DQN model with the environment
model = DQN(
    "MlpPolicy",
    env,
    verbose=1,
    learning_rate=0.00005,
    exploration_fraction=0.45,
    exploration_final_eps=0.05
)


# Define a callback to log episode rewards
class RewardLoggingCallback(BaseCallback):
    def __init__(self):
        super(RewardLoggingCallback, self).__init__()
        self.episode_rewards = []
        self.episode_reward = 0

    def _on_step(self) -> bool:
        # Accumulate rewards for the current episode
        self.episode_reward += self.locals['rewards']

        # Check if the episode is done
        if self.locals['dones']:
            # Log the total reward for the episode
            self.episode_rewards.append(self.episode_reward)
            # Reset the episode reward
            self.episode_reward = 0
        return True

# Initialize the callback
reward_callback = RewardLoggingCallback()

# Train the model with the callback

model.learn(total_timesteps=1000000, callback=reward_callback)


# Save the trained model
model.save("dqn_stock_trading_model")
print("Training completed and model saved.")


In [None]:
import matplotlib.pyplot as plt

# Plot the learning curve
if len(reward_callback.episode_rewards) > 0:
    plt.figure(figsize=(12, 6))
    plt.plot(reward_callback.episode_rewards, label='Episode Reward')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.title('Learning Curve: Model Performance Over Training Episodes')
    plt.grid()
    plt.legend()
    plt.show()
else:
    print("No rewards logged. Try increasing the number of timesteps or reducing episode length.")


In [None]:
import random
import matplotlib.pyplot as plt

# Number of random test days
num_test_days = 5

# Get unique trading days from the DataFrame
trading_days = df.index.normalize().unique()
random_test_days = random.sample(list(trading_days), num_test_days)

# Variables to store aggregated results
total_model_profit = 0
total_buy_and_hold_profit = 0

# Iterate through each randomly selected test day
for test_day in random_test_days:
    print(f"Testing on {test_day.date()}")

    # Filter data for the selected trading day
    test_df = df[df.index.normalize() == test_day]
    if test_df.empty:
        print("No data available for this day, skipping...")
        continue

    # Initialize the environment for testing
    test_env = StockTradingEnv(test_df)
    obs, _ = test_env.reset()

    # Variables for visualization and performance tracking
    balance_history = []
    stock_price_history = []
    profit_history = []
    buy_points = []
    sell_points = []
    previous_action = None

    done = False
    initial_balance = test_env.initial_balance

    # Calculate buy-and-hold profit
    buy_and_hold_start_price = float(test_df.iloc[0]['Open'])
    buy_and_hold_end_price = float(test_df.iloc[-1]['Close'])
    buy_and_hold_profit = buy_and_hold_end_price - buy_and_hold_start_price

    # Test the model on the selected day
    while not done:
        action, _ = model.predict(obs)
        obs, reward, done, truncated, info = test_env.step(action)

        # Record current stock price and balance
        current_price = float(test_df.iloc[test_env.current_step]['Close'])
        stock_price_history.append(current_price)
        balance_history.append(test_env.balance)

        # Calculate the total profit from the model's strategy
        total_value = test_env.balance + test_env.shares_held * current_price
        model_profit = total_value - initial_balance
        profit_history.append(model_profit)

        # Record buy and sell points
        if action == 1 and previous_action != 1:  # Buy action
            buy_points.append((test_env.current_step, current_price))
        elif action == 2 and previous_action != 2:  # Sell action
            sell_points.append((test_env.current_step, current_price))

        previous_action = action

    # Calculate final model profit
    final_model_profit = profit_history[-1]
    total_model_profit += final_model_profit
    total_buy_and_hold_profit += buy_and_hold_profit

    # Print the results for this day
    print(f"Model Profit: ${final_model_profit:.2f}")
    print(f"Buy-and-Hold Profit: ${buy_and_hold_profit:.2f}")
    print(f"Stock open: ${buy_and_hold_start_price:.2f}")
    # Visualization for each day
    fig, ax1 = plt.subplots(figsize=(14, 7))
    ax1.set_xlabel('Time Step')
    ax1.set_ylabel('Stock Price ($)', color='blue')
    ax1.plot(stock_price_history, label='Stock Price', color='blue', alpha=0.6)
    ax1.tick_params(axis='y', labelcolor='blue')

    if buy_points:
        ax1.scatter(*zip(*buy_points), color='green', marker='^', s=100, label='Buy Signal', alpha=0.8)
    if sell_points:
        ax1.scatter(*zip(*sell_points), color='red', marker='v', s=100, label='Sell Signal', alpha=0.8)

    # Plot model's profit on the second y-axis
    ax2 = ax1.twinx()
    ax2.set_ylabel('Profit ($)', color='green')
    ax2.plot(profit_history, label='Model Profit', color='green', alpha=0.8)
    ax2.tick_params(axis='y', labelcolor='green')

    # Add a horizontal line for buy-and-hold profit
    ax2.axhline(y=buy_and_hold_profit, color='purple', linestyle='--', label='Buy-and-Hold Profit')

    # Combine legends for clarity
    handles1, labels1 = ax1.get_legend_handles_labels()
    handles2, labels2 = ax2.get_legend_handles_labels()
    ax1.legend(handles1 + handles2, labels1 + labels2, loc='upper left')

    # Adding title and grid
    plt.title(f'Stock Price vs Model Profit with Buy/Sell Signals on {test_day.date()}')
    plt.grid()
    plt.show()

# Print aggregated results
print(f"\nAggregated Results Over {num_test_days} Random Days:")
print(f"Total Model Profit: ${total_model_profit:.2f}")
print(f"Total Buy-and-Hold Profit: ${total_buy_and_hold_profit:.2f}")


if total_model_profit > total_buy_and_hold_profit:
    print("The model outperformed the buy-and-hold strategy overall.")
else:
    print("The model underperformed compared to the buy-and-hold strategy overall.")


In [None]:
from google.colab import files
files.download('dqn_stock_trading_model.p5')