<a href="https://colab.research.google.com/github/VarunSaiCSE/Sem5_private/blob/main/Automated_Stock_Trading_Systems_using_Utility_Based_Agent_Learning_Agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Data Collection


In [2]:
import yfinance as yf
import pandas as pd
import numpy as np

# Download stock data from Yahoo Finance
def get_stock_data(ticker, start_date, end_date):
  """
  Download historical stock price data
  ticker: Stock symbol like 'AAPL'
  start_date: Starting date in 'YYYY-MM-DD' format
  end_date: Ending date in 'YYYY-MM-DD' format
  """
  data = yf.download(ticker, start=start_date, end=end_date)
  return data

# Example: Download Apple stock data
apple_data = get_stock_data('AAPL', '2015-01-01', '2023-12-31') # Added an end date for the example
print(apple_data.head()) # Show first 5 rows

  data = yf.download(ticker, start=start_date, end=end_date)
[*********************100%***********************]  1 of 1 completed

Price           Close       High        Low       Open     Volume
Ticker           AAPL       AAPL       AAPL       AAPL       AAPL
Date                                                             
2015-01-02  24.261040  24.729263  23.821664  24.718167  212818400
2015-01-05  23.577568  24.110144  23.391167  24.030258  257142000
2015-01-06  23.579794  23.839424  23.218085  23.641928  263188400
2015-01-07  23.910440  24.010298  23.677438  23.788391  160423600
2015-01-08  24.829126  24.886823  24.121244  24.238856  237458000





#Calculate Technical Indicators

In [16]:
import ta

def add_technical_indicators(df):
  """
  Add technical indicators to stock data

  df: DataFrame with stock price data
  """
  # Ensure the 'Close' column is a pandas Series
  close_prices = df['Close_AAPL']

  # RSI - shows if stock is overbought/oversold
  df['RSI'] = ta.momentum.RSIIndicator(close_prices).rsi()

  # MACD - trend following indicator
  macd = ta.trend.MACD(close_prices)
  df['MACD'] = macd.macd()
  df['MACD_Signal'] = macd.macd_signal()

  # Moving Averages - trend indicators
  df['SMA_20'] = ta.trend.SMAIndicator(close_prices, window=20).sma_indicator()
  df['SMA_50'] = ta.trend.SMAIndicator(close_prices, window=50).sma_indicator()


  # Remove any missing values
  df = df.fillna(method='bfill')

  return df

# Apply indicators to our data
apple_data = add_technical_indicators(apple_data)

  df = df.fillna(method='bfill')


#Create trading env

In [19]:
import gym
from gym import spaces
import numpy as np

class TradingEnvironment(gym.Env):
  """
  Custom environment for training AI trading agents
  """
  def __init__(self, data, initial_money=100000):
    self.data = data
    self.initial_money = initial_money
    self.current_step = 0

    # Define possible actions: 0=Hold, 1=Buy, 2=Sell
    self.action_space = spaces.Discrete(3)

    # Define what the agent can observe
    self.observation_space = spaces.Box(
        low=-np.inf, high=np.inf, shape=(8,), dtype=np.float32
    )

    self.reset()

  def reset(self):
    """Start new episode"""
    self.current_step = 0
    self.money = self.initial_money # Available cash
    self.shares = 0 # Number of shares owned
    self.total_value = self.initial_money # Total portfolio value
    return self._get_state()

  def step(self, action):
    """Execute one trading action"""
    current_price = self.data.iloc[self.current_step]['Close_AAPL'] # Use correct column name

    # Execute the chosen action
    if action == 1: # Buy shares
      shares_to_buy = self.money // current_price # Buy as many as possible
      self.shares += shares_to_buy
      self.money -= shares_to_buy * current_price
    elif action == 2: # Sell all shares
      self.money += self.shares * current_price
      self.shares = 0

    # Calculate new portfolio value
    self.total_value = self.money + self.shares * current_price

    # Calculate reward (profit or loss)
    reward = self.total_value - self.initial_money

    # Move to next day
    self.current_step += 1
    done = self.current_step >= len(self.data) - 1

    return self._get_state(), reward, done, {}

  def _get_state(self):
    """Get current market state for the agent"""
    row = self.data.iloc[self.current_step]
    state = np.array([
        row['Close_AAPL'], # Current price (Use correct column name)
        row['RSI'], # RSI indicator
        row['MACD'], # MACD indicator
        row['SMA_20'], # 20-day moving average
        row['SMA_50'], # 50-day moving average
        self.money, # Available cash
        self.shares, # Shares owned
        self.total_value # Total portfolio value
    ])
    return state

#Train AI agents


In [26]:
from stable_baselines3 import PPO, A2C, DQN
from stable_baselines3.common.vec_env import DummyVecEnv

# Create environment for training
env = DummyVecEnv([lambda: TradingEnvironment(train_data)])

# Train PPO Agent
print("Training PPO Agent...")
ppo_agent = PPO('MlpPolicy', env, verbose=1)
ppo_agent.learn(total_timesteps=50000)
ppo_agent.save('ppo_trader')

# Train A2C Agent
print("Training A2C Agent...")
a2c_agent = A2C('MlpPolicy', env, verbose=1)
a2c_agent.learn(total_timesteps=50000)
a2c_agent.save('a2c_trader')

# Train DQN Agent
print("Training DQN Agent...")
dqn_agent = DQN('MlpPolicy', env, verbose=1)
dqn_agent.learn(total_timesteps=50000)
dqn_agent.save('dqn_trader')

# Step 5: Test and Evaluate
def test_agent(agent, test_data):
    """Test trained agent on unseen data"""
    env = TradingEnvironment(test_data)
    state = env.reset()
    total_reward = 0
    while True:
        # Agent makes decision
        action, _ = agent.predict(state)
        # Execute action in environment
        state, reward, done, _ = env.step(action)
        total_reward += reward
        if done:
            break
    # Calculate performance metrics
    final_value = env.total_value
    return_pct = (final_value / env.initial_money - 1) * 100
    return {
        'Final Portfolio Value': final_value,
        'Total Return (%)': return_pct,
        'Profit/Loss': total_reward
    }

# Assuming test_data is defined elsewhere with the appropriate data
# Test all agents
# ppo_results = test_agent(ppo_agent, test_data)
# a2c_results = test_agent(a2c_agent, test_data)
# dqn_results = test_agent(dqn_agent, test_data)

# print("PPO Results:", ppo_results)
# print("A2C Results:", a2c_results)
# print("DQN Results:", dqn_results)



Training PPO Agent...
Using cpu device
-----------------------------
| time/              |      |
|    fps             | 1471 |
|    iterations      | 1    |
|    time_elapsed    | 1    |
|    total_timesteps | 2048 |
-----------------------------


  return datetime.utcnow().replace(tzinfo=utc)


-------------------------------------------
| time/                   |               |
|    fps                  | 1173          |
|    iterations           | 2             |
|    time_elapsed         | 3             |
|    total_timesteps      | 4096          |
| train/                  |               |
|    approx_kl            | 1.1350494e-09 |
|    clip_fraction        | 0             |
|    clip_range           | 0.2           |
|    entropy_loss         | -1.1          |
|    explained_variance   | 0             |
|    learning_rate        | 0.0003        |
|    loss                 | 7.19e+11      |
|    n_updates            | 10            |
|    policy_gradient_loss | -2.05e-06     |
|    value_loss           | 1.4e+12       |
-------------------------------------------
-------------------------------------------
| time/                   |               |
|    fps                  | 1106          |
|    iterations           | 3             |
|    time_elapsed         | 5   

In [25]:
%pip install shimmy>=2.0

#Split into training/testing sets

In [23]:
# Split data into training and testing sets
train_size = int(len(apple_data) * 0.8) # 80% for training
train_data, test_data = apple_data[0:train_size], apple_data[train_size:len(apple_data)]

print("Training data shape:", train_data.shape)
print("Testing data shape:", test_data.shape)

Training data shape: (1811, 10)
Testing data shape: (453, 10)


In [21]:
%pip install stable-baselines3[extra]

Collecting stable-baselines3[extra]
  Downloading stable_baselines3-2.7.0-py3-none-any.whl.metadata (4.8 kB)
Downloading stable_baselines3-2.7.0-py3-none-any.whl (187 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m187.2/187.2 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: stable-baselines3
Successfully installed stable-baselines3-2.7.0


#Test and Eval


In [27]:
def test_agent(agent, test_data):
    """Test trained agent on unseen data"""
    env = TradingEnvironment(test_data)
    state = env.reset()
    total_reward = 0
    while True:
        # Agent makes decision
        action, _ = agent.predict(state)
        # Execute action in environment
        state, reward, done, _ = env.step(action)
        total_reward += reward
        if done:
            break
    # Calculate performance metrics
    final_value = env.total_value
    return_pct = (final_value / env.initial_money - 1) * 100
    return {
        'Final Portfolio Value': final_value,
        'Total Return (%)': return_pct,
        'Profit/Loss': total_reward
    }

# Test all agents
ppo_results = test_agent(ppo_agent, test_data)
a2c_results = test_agent(a2c_agent, test_data)
dqn_results = test_agent(dqn_agent, test_data)

print("PPO Results:", ppo_results)
print("A2C Results:", a2c_results)
print("DQN Results:", dqn_results)

PPO Results: {'Final Portfolio Value': np.float64(121882.9393081665), 'Total Return (%)': np.float64(21.882939308166492), 'Profit/Loss': np.float64(1858249.6667861938)}
A2C Results: {'Final Portfolio Value': np.float64(100000.0), 'Total Return (%)': np.float64(0.0), 'Profit/Loss': np.float64(0.0)}
DQN Results: {'Final Portfolio Value': np.float64(139517.5265045166), 'Total Return (%)': np.float64(39.51752650451661), 'Profit/Loss': np.float64(6840964.692642212)}
