# Setup Environment and Dependencies
Install and import required libraries including gym, tensorflow/pytorch, pandas-datareader, numpy, and matplotlib.

In [1]:
# Install required libraries
# !pip install gym tensorflow pandas-datareader numpy matplotlib

# Import required libraries
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import gym
from gym import spaces
import tensorflow as tf
import random
from collections import deque

# Data Collection and Preprocessing
Fetch historical stock data, calculate technical indicators, and prepare the data for training.

In [2]:
# Data Collection and Preprocessing

# Fetch historical stock data
def fetch_stock_data(ticker, start_date, end_date):
    stock = yf.Ticker(ticker)
    df = stock.history(start=start_date, end=end_date)
    return df

# Calculate technical indicators
def calculate_technical_indicators(df):
    try:
        df['SMA'] = df['Close'].rolling(window=20).mean()
        df['EMA'] = df['Close'].ewm(span=20, adjust=False).mean()
        df['Momentum'] = df['Close'] - df['Close'].shift(4)
        df['Volatility'] = df['Close'].rolling(window=20).std()
        df.dropna(inplace=True)
        return df
    except Exception as e:
        print(f"Error calculating indicators: {e}")
        return None

# Prepare the data for training
def prepare_data(df):
    df['Return'] = df['Close'].pct_change()
    df.dropna(inplace=True)
    features = df[['SMA', 'EMA', 'Momentum', 'Volatility']].values
    labels = df['Return'].values
    return features, labels

# Example usage
ticker = 'AAPL'
start_date = '2010-01-01'
end_date = '2020-01-01'

# Fetch and preprocess data
stock_data = fetch_stock_data(ticker, start_date, end_date)
print("Raw data shape:", stock_data.shape)
stock_data = calculate_technical_indicators(stock_data)
print("Processed data shape:", stock_data.shape)
print("\nColumns:", stock_data.columns)
features, labels = prepare_data(stock_data)

# Display the first few rows of the preprocessed data
stock_data.head()

Raw data shape: (2516, 7)
Processed data shape: (2497, 11)

Columns: Index(['Open', 'High', 'Low', 'Close', 'Volume', 'Dividends', 'Stock Splits',
       'SMA', 'EMA', 'Momentum', 'Volatility'],
      dtype='object')


Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits,SMA,EMA,Momentum,Volatility,Return
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
2010-02-02 00:00:00-05:00,5.902122,5.914473,5.825901,5.900615,698342400,0.0,0.0,6.210872,6.161553,-0.362122,0.204166,0.005803
2010-02-03 00:00:00-05:00,5.879825,6.031362,5.85723,6.002139,615328000,0.0,0.0,6.188051,6.146371,-0.001806,0.200499,0.017206
2010-02-04 00:00:00-05:00,5.926823,5.976231,5.771369,5.78583,757652000,0.0,0.0,6.159552,6.112034,-0.0003,0.215356,-0.036039
2010-02-05 00:00:00-05:00,5.803303,5.90483,5.749677,5.888561,850306800,0.0,0.0,6.136776,6.090751,0.021991,0.218872,0.017755
2010-02-08 00:00:00-05:00,5.895489,5.961467,5.844575,5.84819,478270800,0.0,0.0,6.109872,6.06765,-0.052424,0.21966,-0.006856


# Define Trading Environment
Create a custom OpenAI Gym environment that simulates the stock trading environment with state space, action space, and reward function.

In [3]:
# Define Trading Environment
from gym import spaces

class StockTradingEnv(gym.Env):
    def __init__(self, df):
        super(StockTradingEnv, self).__init__()
        self.df = df
        self.action_space = spaces.Discrete(3)
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, 0]), 
            high=np.array([np.inf, np.inf, np.inf, np.inf]),
            dtype=np.float32
        )
        
    def reset(self):
        self.current_step = 0
        self.balance = 10000  # Initial balance
        self.shares_held = 0
        self.total_shares_sold = 0
        self.total_sales_value = 0
        return self._next_observation()
    
    def _next_observation(self):
        obs = self.df.iloc[self.current_step][['SMA', 'EMA', 'Momentum', 'Volatility']].values
        return obs
    
    def step(self, action):
        self._take_action(action)
        self.current_step += 1
        
        if self.current_step >= len(self.df) - 1:
            done = True
        else:
            done = False
        
        reward = self.balance + (self.shares_held * self.df.iloc[self.current_step]['Close']) - 10000
        obs = self._next_observation()
        
        return obs, reward, done, {}
    
    def _take_action(self, action):
        current_price = self.df.iloc[self.current_step]['Close']
        
        if action == 1:  # Buy
            shares_bought = self.balance // current_price
            self.balance -= shares_bought * current_price
            self.shares_held += shares_bought
        
        elif action == 2:  # Sell
            self.balance += self.shares_held * current_price
            self.total_shares_sold += self.shares_held
            self.total_sales_value += self.shares_held * current_price
            self.shares_held = 0

# Example usage
env = StockTradingEnv(stock_data)
obs = env.reset()
print(obs)

[ 6.21087234  6.1615531  -0.36212158  0.20416609]


# Create Trading Agent
Implement a trading agent class with methods for selecting actions, storing experiences, and updating the policy.

In [4]:

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.model = self._build_model()
    
    def _build_model(self):
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(24, input_shape=(self.state_size,), activation='relu'),
            tf.keras.layers.Dense(24, activation='relu'),
            tf.keras.layers.Dense(self.action_size, activation='linear')
        ])
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(learning_rate=0.001))
        return model
    
    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        state = np.reshape(state, [1, self.state_size])
        act_values = self.model.predict(state, verbose=0)
        return np.argmax(act_values[0])
    
    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return
            
        minibatch = random.sample(self.memory, batch_size)
        states = np.array([i[0][0] for i in minibatch])
        actions = np.array([i[1] for i in minibatch])
        rewards = np.array([i[2] for i in minibatch])
        next_states = np.array([i[3][0] for i in minibatch])
        dones = np.array([i[4] for i in minibatch])

        targets = rewards + self.gamma * (np.amax(self.model.predict(next_states, verbose=0), axis=1)) * (1 - dones)
        target_f = self.model.predict(states, verbose=0)
        
        for i, action in enumerate(actions):
            target_f[i][action] = targets[i]
        
        self.model.fit(states, target_f, epochs=1, verbose=0)
        
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

env = StockTradingEnv(stock_data)
state_size = 4  # Number of features
action_size = 3 # Hold, Buy, Sell
agent = DQNAgent(state_size=state_size, action_size=action_size)

# Usage example
state = env.reset()
state = np.reshape(state, [1, state_size])
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
agent.remember(state, action, reward, next_state, done)

if len(agent.memory) > 32:
    agent.replay(32)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


# Implement DQN Architecture
Build the Deep Q-Network model architecture with experience replay and target network.

In [5]:
# Implement DQN Architecture

import random
from collections import deque

class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95  # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.model = self._build_model()
        self.target_model = self._build_model()
        self.update_target_model()
    
    def _build_model(self):
        # Neural Network for Deep Q-Learning Model
        model = tf.keras.Sequential()
        model.add(tf.keras.layers.Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(tf.keras.layers.Dense(24, activation='relu'))
        model.add(tf.keras.layers.Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=0.001))
        return model
    
    def update_target_model(self):
        # Copy weights from model to target_model
        self.target_model.set_weights(self.model.get_weights())
    
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
    
    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])
    
    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma * np.amax(self.target_model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def load(self, name):
        self.model.load_weights(name)
    
    def save(self, name):
        self.model.save_weights(name)

# Example usage
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = DQNAgent(state_size, action_size)

# Example of storing an experience
state = np.reshape(obs, [1, state_size])
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
agent.remember(state, action, reward, next_state, done)

# Example of updating the policy
agent.replay(32)

# Update target model
agent.update_target_model()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


ValueError: Argument(s) not recognized: {'lr': 0.001}

# Training Loop Implementation
Create the main training loop with epsilon-greedy exploration and model updates.

In [None]:
# Training Loop Implementation

num_episodes = 1000
batch_size = 32

for e in range(num_episodes):
    state = env.reset()
    state = np.reshape(state, [1, state_size])
    
    for time in range(500):
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        
        if done:
            agent.update_target_model()
            print(f"episode: {e}/{num_episodes}, score: {time}, e: {agent.epsilon:.2}")
            break
        
        if len(agent.memory) > batch_size:
            agent.replay(batch_size)

# Backtesting and Evaluation
Test the trained agent on historical data and calculate performance metrics.

In [None]:
# Backtesting and Evaluation

# Test the trained agent on historical data and calculate performance metrics

# Define a function to backtest the agent
def backtest_agent(env, agent, episodes=10):
    total_rewards = []
    for e in range(episodes):
        state = env.reset()
        state = np.reshape(state, [1, state_size])
        total_reward = 0
        done = False
        while not done:
            action = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            total_reward += reward
            state = np.reshape(next_state, [1, state_size])
        total_rewards.append(total_reward)
    return total_rewards

# Calculate performance metrics
def calculate_performance_metrics(rewards):
    avg_reward = np.mean(rewards)
    std_reward = np.std(rewards)
    max_reward = np.max(rewards)
    min_reward = np.min(rewards)
    return avg_reward, std_reward, max_reward, min_reward

# Backtest the agent
rewards = backtest_agent(env, agent, episodes=10)

# Calculate performance metrics
avg_reward, std_reward, max_reward, min_reward = calculate_performance_metrics(rewards)

# Print performance metrics
print(f"Average Reward: {avg_reward}")
print(f"Standard Deviation of Reward: {std_reward}")
print(f"Maximum Reward: {max_reward}")
print(f"Minimum Reward: {min_reward}")

# Plot the rewards
plt.plot(rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Total Rewards per Episode')
plt.show()

# Trading Strategy Visualization
Create visualizations of trading actions, portfolio value, and performance metrics.

In [None]:
# Trading Strategy Visualization

# Plot trading actions
def plot_trading_actions(df, actions):
    plt.figure(figsize=(14, 7))
    plt.plot(df['Close'], label='Close Price')
    buy_signals = df[actions == 1]
    sell_signals = df[actions == 2]
    plt.scatter(buy_signals.index, buy_signals['Close'], marker='^', color='g', label='Buy Signal', alpha=1)
    plt.scatter(sell_signals.index, sell_signals['Close'], marker='v', color='r', label='Sell Signal', alpha=1)
    plt.title('Trading Strategy Visualization')
    plt.xlabel('Date')
    plt.ylabel('Close Price')
    plt.legend()
    plt.show()

# Plot portfolio value
def plot_portfolio_value(portfolio_values):
    plt.figure(figsize=(14, 7))
    plt.plot(portfolio_values, label='Portfolio Value')
    plt.title('Portfolio Value Over Time')
    plt.xlabel('Time Step')
    plt.ylabel('Portfolio Value')
    plt.legend()
    plt.show()

# Example usage
# Assuming `actions` is a list of actions taken by the agent and `portfolio_values` is a list of portfolio values over time
actions = [env.action_space.sample() for _ in range(len(stock_data))]  # Random actions for example
portfolio_values = [10000 + i * 10 for i in range(len(stock_data))]  # Example portfolio values

plot_trading_actions(stock_data, np.array(actions))
plot_portfolio_value(portfolio_values)