# Step 1: Install dependencies:
Installs the required libraries for data handling, plotting, and (deep) reinforcement learning.


In [2]:
!pip install yfinance pandas numpy matplotlib stable-baselines3[extra] shimmy>=2.0


# Step 2: Mount Google Drive:
Mounts Google Drive to save and load files directly from the Colab session

In [None]:
from google.colab import drive
drive.mount('/content/drive')


# Step 3: Import libraries:
Imports necessary packages for (deep) reinforcement learning, data processing, and visualization.

In [4]:
import gym
import numpy as np
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt

from gym import spaces
from stable_baselines3 import PPO, A2C
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.noise import OrnsteinUhlenbeckActionNoise
from stable_baselines3.common.noise import NormalActionNoise

from sklearn.model_selection import train_test_split


# Step 4: Custom train/test split:
Splits DataFrame into training and test sets based on the defined ratio.

In [5]:
def train_test_split(df, train_ratio=0.8):
    train_size = int(len(df) * train_ratio)
    return df[:train_size].copy(), df[train_size:].copy()

# Step 5: Custom Trading Environment:
Defines the custom Gym environment that simulates stock trading with buy/sell hold actions and a realistic reward structure.

In [6]:
class SimpleTradingEnv(gym.Env):
    def __init__(self, df, lookback=5):
        super(SimpleTradingEnv, self).__init__()
        self.df = df.reset_index(drop=True)
        self.lookback = lookback
        self.n_steps = len(self.df)
        self.current_step = self.lookback  # start after enough history for lookback window
        self.cash = 10000
        self.stock_owned = 0
        self.initial_cash = self.cash

        # Observation: 4 OHLC + 2 SMA features per day × lookback days + cash + stock_owned
        self.state_size = (4 + 2) * self.lookback + 2
        self.action_space = spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float32)  # action: -1 (sell all) to 1 (buy max)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_size,), dtype=np.float32)
        self.trade_penalty_factor = 0.01  # penalty per share traded to discourage overtrading

    def _get_obs(self):
        # Get OHLC and SMA values over lookback window
        frame = self.df.iloc[self.current_step - self.lookback:self.current_step]
        ohlc = frame[["Open", "High", "Low", "Close"]].values.flatten()
        sma5 = frame["SMA5"].values.flatten()
        sma20 = frame["SMA20"].values.flatten()
        # Combine all features into a single observation vector
        obs = np.concatenate([ohlc, sma5, sma20, [self.cash], [self.stock_owned]])
        return obs.astype(np.float32)

    def reset(self):
        # Reset environment to initial state
        self.current_step = self.lookback
        self.cash = self.initial_cash
        self.stock_owned = 0
        return self._get_obs()

    def step(self, action):
        # Get current price and portfolio value before action
        row = self.df.iloc[self.current_step]
        price = float(row["Close"])
        prev_total_asset = self.cash + self.stock_owned * price

        # Clip action to [-1, 1] and extract scala
        action = float(action[0])
        action = np.clip(action, -1, 1)

        min_trade_amount = 1
        shares_traded = 0

        # Buy logic
        if action > 0:
            max_buy = int(self.cash // price) # max shares we can afford
            shares_bought = int(action * max_buy) # fraction of max
            if shares_bought >= min_trade_amount:
                self.cash -= shares_bought * price
                self.stock_owned += shares_bought
                shares_traded = shares_bought

        # Sell logic
        elif action < 0:
            shares_sold = int(abs(action) * self.stock_owned) # fraction of owned shares
            if shares_sold >= min_trade_amount:
                self.cash += shares_sold * price
                self.stock_owned -= shares_sold
                shares_traded = shares_sold

        self.current_step += 1
        done = self.current_step >= self.n_steps - 1
        next_obs = self._get_obs()

        # Get new portfolio value after action
        next_price = float(self.df.iloc[self.current_step]["Close"]) if not done else price
        new_total_asset = self.cash + self.stock_owned * next_price

        # Reward: portfolio change minus penalty for trading too much
        reward = new_total_asset - prev_total_asset - self.trade_penalty_factor * shares_traded

        return next_obs, reward, done, {}

# Step 6: Download, preprocess, train and evaluate:
Downloads historical data for each Dow 30 stock, computes indicators, trains PPO & A2C agents, evaluates them, and compares to Buy & Hold, saves them to csv.


In [None]:
# List of Dow Jones 30 tickers
dow30_tickers = [
    "AAPL", "AMGN", "AXP", "BA", "CAT", "CSCO", "CVX", "DIS", "DOW", "GS",
    "HD", "HON", "IBM", "INTC", "JNJ", "JPM", "KO", "MCD", "MMM", "MRK",
    "MSFT", "NKE", "PG", "CRM", "TRV", "UNH", "V", "VZ", "WBA", "WMT"
]

# Dictionary to store results per stock
results = {}

# Loop through each stock ticker
for ticker in dow30_tickers:
    print(f"\n📈 Processing: {ticker}")

    try:
        # Download historical data for the stock
        raw = yf.download(ticker, start="2018-01-01", end="2025-01-01", group_by=None)

        # Handle multi-index columns (if present)
        if isinstance(raw.columns, pd.MultiIndex):
            raw.columns = [col[1] if isinstance(col, tuple) else col for col in raw.columns]

        # Select OHLC data and clean
        df = raw[["Open", "High", "Low", "Close"]].dropna().reset_index(drop=True)
        df = df.astype(float)

        # Add technical indicators: SMA5 and SMA20
        df["SMA5"] = df["Close"].rolling(window=5).mean()
        df["SMA20"] = df["Close"].rolling(window=20).mean()
        df = df.dropna().reset_index(drop=True)  # drop initial rows without SMA

        # Split into train/test sets chronologically
        train_df, test_df = train_test_split(df, train_ratio=0.8)

        # Create training and test environments
        train_env = DummyVecEnv([lambda: SimpleTradingEnv(train_df, lookback=5)])
        test_env = SimpleTradingEnv(test_df, lookback=5)

        # Initialize PPO and A2C agents
        ppo = PPO("MlpPolicy", train_env, verbose=0)
        a2c = A2C("MlpPolicy", train_env, verbose=0)

        # Train both models
        ppo.learn(total_timesteps=50000)
        a2c.learn(total_timesteps=50000)

        # Function to evaluate an agent on the test environment
        def eval_agent(model, env):
            obs = env.reset()
            done = False
            while not done:
                action, _ = model.predict(obs)
                obs, reward, done, _ = env.step(action)
            final_price = env.df.iloc[env.current_step - 1]["Close"]
            final_value = env.cash + env.stock_owned * final_price
            return final_value


        # Evaluate both trained agents
        ppo_val = eval_agent(ppo, test_env)
        a2c_val = eval_agent(a2c, test_env)

        # Simulate Buy & Hold strategy: buy 10 shares and hold till end
        buy_hold_cash = 10000
        buy_hold_shares = 10
        start_price = test_df["Close"].iloc[0]
        end_price = test_df["Close"].iloc[-1]

        # Calculate Buy & Hold final value/Buy 10 shares if affordable
        total_cost = buy_hold_shares * start_price
        cash_left = buy_hold_cash - total_cost if buy_hold_cash >= total_cost else 0
        buy_hold_val = cash_left + buy_hold_shares * end_price

        # Calculate price change during test period
        price_change = test_df["Close"].iloc[-1] - test_df["Close"].iloc[0]

        # Store results for this ticker
        results[ticker] = {
            "PPO_Absolute": round(ppo_val, 2),
            "PPO_Profit": round(ppo_val - 10000, 2),
            "PPO_ROI": round(((ppo_val - 10000) / 10000) * 100, 2),

            "A2C_Absolute": round(a2c_val, 2),
            "A2C_Profit": round(a2c_val - 10000, 2),
            "A2C_ROI": round(((a2c_val - 10000) / 10000) * 100, 2),

            "BuyHold_Absolute": round(buy_hold_val, 2),
            "BuyHold_Profit": round(buy_hold_val - 10000, 2),
            "BuyHold_ROI": round(((buy_hold_val - 10000) / 10000) * 100, 2),

            "Price_Change": round(price_change, 2),
        }

        # Print results to console
        print(f"✅ {ticker} → PPO: {results[ticker]['PPO_Absolute']}, A2C: {results[ticker]['A2C_Absolute']}, Buy & Hold: {results[ticker]['BuyHold_Absolute']}, Price Change: {results[ticker]['Price_Change']}")
        # print(f"✅ {ticker} → PPO: {results[ticker]['PPO_Absolute']}, A2C: {results[ticker]['A2C_Absolute']}, Buy & Hold: {results[ticker]['BuyHold_Absolute']}")

    except Exception as e:
        # If anything fails for a stock, skip and log the error
        print(f"⚠️ Skipped {ticker} due to error: {e}")


# ✅ SAVE RESULTS TO CSV
results_df = pd.DataFrame.from_dict(results, orient="index")
results_df.index.name = "Ticker"

# Save to drive folder
results_df.to_csv("/content/drive/MyDrive/trading_results.csv")
print("📄 Results saved to Google Drive → trading_results.csv")

# Optional: Inspect policy decisions
Tests how each trained model responds to selected input states to verify policy behavior

In [None]:
# # Policy Response Inspection Block

# def test_policy_response(model, model_name, sample_states):
#     print(f"\n🔍 Policy inspection for {model_name}:")
#     for i, state in enumerate(sample_states):
#         action, _ = model.predict(np.array(state).reshape(1, -1))
#         print(f"Input {i+1}: {state} → Action: {action[0]}")


# sample_states = [
#     [0] * ((4+2)*5 + 2),                                # all zeros, adjusted for new state size
#     [100] * ((4)*5) + [10000]*5 + [100]*5 + [10000, 100],  # just example big values with indicators and holdings (adjust if needed)
#     [1] * ((4)*5) + [1]*5 + [1]*5 + [10000, 100],           # flat prices + SMA + holdings
#     test_env.reset(),                                      # real current market state
# ]

# test_policy_response(ppo, "PPO", sample_states)
# test_policy_response(a2c, "A2C", sample_states)
# # test_policy_response(ddpg, "DDPG", sample_states)


# Visual: Plot ROI comparison:
Plots bar chart comparing ROI across all strategies per stock.

In [None]:
# Transpose and sort results DataFrame by ticker
results_df = pd.DataFrame(results).T.sort_index()

# Plot ROI comparison for PPO, A2C, and Buy & Hold
results_df[["PPO_ROI", "A2C_ROI", "BuyHold_ROI"]].plot(kind="bar", figsize=(18,6), alpha=0.85)

# Add plot title and labels
plt.title("Return on Investment (ROI) by Algorithm on Dow 30 Stocks (%)")
plt.ylabel("ROI (%)")
plt.xlabel("Ticker")
plt.xticks(rotation=90) # rotate x-axis labels for better readability
plt.grid(True)
plt.tight_layout()
plt.legend(loc="upper left")
plt.show()


# Visual: Plot portfolio value comparison:
Plots the final portfolio values per strategy for each stock.

In [None]:
# Plot final portfolio values for PPO, A2C, and Buy & Hold
results_df[["PPO_Absolute", "A2C_Absolute", "BuyHold_Absolute"]].plot(kind="bar", figsize=(18,6), alpha=0.85)

# Add plot title and axis labels
plt.title("Final Portfolio Value vs Buy & Hold on Dow 30 Stocks")
plt.ylabel("Portfolio Value ($)")
plt.xlabel("Ticker")
plt.xticks(rotation=90) # Rotate ticker labels for better readability

# Show grid for visual clarity
plt.grid(True)

# Draw a red dashed line at the initial capital of $10,000
plt.axhline(10000, color='red', linestyle='--', linewidth=1.5, label='Initial Capital ($10,000)')
plt.tight_layout()
plt.legend(loc="upper left")
plt.show()


# Visual: Cumulative reward curves
Plots cumulative reward during evaluation to visualize how the agent’s portfolio grows over time.

Not used in thesis

In [None]:
# Function to plot cumulative reward for a given model and environment
def plot_model_reward(model, env, label):
    obs = env.reset()
    rewards = []

    # Simulate one full episode
    for _ in range(len(env.df) - 1):
        action, _ = model.predict(obs) # get model's action
        obs, reward, done, _ = env.step(action) # apply action and receive reward
        # Ensure reward is a scalar float (handles cases where reward is a list/array)
        rewards.append(float(reward[0]) if isinstance(reward, (list, np.ndarray)) else float(reward))
        if done:
            break
    # Plot cumulative sum of rewards
    plt.plot(np.cumsum(rewards), label=label)

plt.figure(figsize=(12, 5))

# Plot cumulative rewards for both PPO and A2C
plot_model_reward(ppo, test_env, "PPO")
plot_model_reward(a2c, test_env, "A2C")

plt.legend()
plt.title("Cumulative Reward Curve on AAPL (Test Set)")
plt.xlabel("Days")
plt.ylabel("Cumulative Reward ($)")
plt.grid(True)
plt.tight_layout()
plt.show()


# Optional: Check what action is taken and what reward is given at each step

Has to be adjusted for different algorithm PPO vs A2C

In [None]:
ppo_val = 0
obs = test_env.reset()
done = False
step = 0

# Step through the environment using the PPO agent
while not done:
    action, _ = ppo.predict(obs)  # Predict next action using PPO model
    print(f"Step {step}: Action = {action}")  # Print current action
    obs, reward, done, _ = test_env.step(action)  # Apply action and get feedback
    print(f"Reward = {reward}")  # Print reward for the step

    # Convert reward to scalar if it's in array/list format
    reward_val = reward[0] if isinstance(reward, (list, np.ndarray)) else reward
    ppo_val += reward_val  # Accumulate total reward
    step += 1

# Print total cumulative reward as a proxy for portfolio value
print(f"💰 Final PPO Portfolio Value: {round(ppo_val, 2)}")
