# Reinforcement Learning for Stock Trading in Data-Scarce Environments

This notebook implements a reinforcement learning (RL) solution for stock trading.


In [None]:

import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
from collections import deque

# Define the stock ticker and the date range
STOCK_TICKER = 'AAPL'
START_DATE = '2020-01-01'
END_DATE = '2023-01-01'

# Fetch historical data
df = yf.download(STOCK_TICKER, start=START_DATE, end=END_DATE)

# Calculate daily returns and price changes
price_col = "Adj Close" if "Adj Close" in df.columns else "Close"
df['Daily_Return'] = df[price_col].pct_change()
df['Price_Change'] = df[price_col].diff()

# Drop any NaN values created by the diff/pct_change operations
df.dropna(inplace=True)

print(f"Data for {STOCK_TICKER} from {START_DATE} to {END_DATE}:")
print(df.head())
print(df.tail())

class StockTradingEnv:
    def __init__(self, df, initial_balance=10000, stock_quantity=10):
        self.df = df.reset_index()
        self.initial_balance = initial_balance
        self.stock_quantity = stock_quantity
        self.current_step = 0
        self.balance = initial_balance
        self.shares_held = 0
        self.portfolio_value = initial_balance
        self.action_space = [0, 1, 2] # 0: Hold, 1: Buy, 2: Sell
        self.state_space_size = 4 # Current Price, Price Change, Balance, Shares Held

    def _get_state(self):
        price_col = "Adj Close" if "Adj Close" in self.df.columns else "Close"
        current_price = self.df.loc[self.current_step, price_col]
        if isinstance(current_price, pd.Series):
            current_price = current_price.item()
        price_change = self.df.loc[self.current_step, "Price_Change"]
        if isinstance(price_change, pd.Series):
            price_change = price_change.item()
        return np.array([current_price, price_change, self.balance, self.shares_held])

    def reset(self):
        self.current_step = 0
        self.balance = self.initial_balance
        self.shares_held = 0
        self.portfolio_value = self.initial_balance
        return self._get_state()

    def step(self, action):
        price_col = "Adj Close" if "Adj Close" in self.df.columns else "Close"
        current_price = self.df.loc[self.current_step, price_col]
        if isinstance(current_price, pd.Series):
            current_price = current_price.item()
        reward = 0
        done = False

        if action == 1: # Buy
            if self.balance >= self.stock_quantity * current_price:
                self.balance -= self.stock_quantity * current_price
                self.shares_held += self.stock_quantity
        elif action == 2: # Sell
            if self.shares_held >= self.stock_quantity:
                self.balance += self.stock_quantity * current_price
                self.shares_held -= self.stock_quantity

        new_portfolio_value = self.balance + self.shares_held * current_price
        reward = new_portfolio_value - self.portfolio_value
        self.portfolio_value = new_portfolio_value

        self.current_step += 1
        if self.current_step >= len(self.df) - 1:
            done = True

        next_state = self._get_state()
        return next_state, reward, done, {}

class QLearningAgent:
    def __init__(self, state_size, action_size, learning_rate=0.1, discount_factor=0.95, epsilon=1.0, epsilon_decay_rate=0.995, min_epsilon=0.01):
        self.state_size = state_size
        self.action_size = action_size
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay_rate = epsilon_decay_rate
        self.min_epsilon = min_epsilon
        self.q_table = {}

    def _discretize_state(self, state):
        current_price_bucket = int(state[0] / 10)
        price_change_bucket = int(state[1] * 10)
        balance_bucket = int(state[2] / 1000)
        shares_held_bucket = int(state[3] / 10)
        return (current_price_bucket, price_change_bucket, balance_bucket, shares_held_bucket)

    def get_action(self, state):
        discretized_state = str(self._discretize_state(state))
        if discretized_state not in self.q_table:
            self.q_table[discretized_state] = np.zeros(self.action_size)

        if random.uniform(0, 1) < self.epsilon:
            return random.choice(range(self.action_size))
        else:
            return np.argmax(self.q_table[discretized_state])

    def learn(self, state, action, reward, next_state, done):
        discretized_state = str(self._discretize_state(state))
        discretized_next_state = str(self._discretize_state(next_state))

        if discretized_state not in self.q_table:
            self.q_table[discretized_state] = np.zeros(self.action_size)
        if discretized_next_state not in self.q_table:
            self.q_table[discretized_next_state] = np.zeros(self.action_size)

        current_q_value = self.q_table[discretized_state][action]
        max_future_q_value = np.max(self.q_table[discretized_next_state])

        new_q_value = current_q_value + self.learning_rate * (reward + self.discount_factor * max_future_q_value - current_q_value)
        self.q_table[discretized_state][action] = new_q_value

        if done:
            self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay_rate)

# Training parameters
EPISODES = 100
CHECKPOINT_INTERVAL = 10

env = StockTradingEnv(df)
agent = QLearningAgent(state_size=env.state_space_size, action_size=len(env.action_space))

episode_rewards = []
portfolio_values = []

for episode in range(EPISODES):
    state = env.reset()
    total_reward = 0
    done = False
    
    while not done:
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)
        agent.learn(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

    episode_rewards.append(total_reward)
    portfolio_values.append(env.portfolio_value)

    print(f"Episode {episode + 1}/{EPISODES} - Total Reward: {total_reward:.2f} - Final Portfolio Value: {env.portfolio_value:.2f} - Epsilon: {agent.epsilon:.2f}")

    if (episode + 1) % CHECKPOINT_INTERVAL == 0:
        print(f"Checkpoint: Q-table saved at episode {episode + 1}")

print("\nTraining complete!")

# Evaluation
agent.epsilon = 0.0

env_eval = StockTradingEnv(df)
state_eval = env_eval.reset()

eval_portfolio_values = []

done_eval = False
while not done_eval:
    action_eval = agent.get_action(state_eval)
    next_state_eval, reward_eval, done_eval, _ = env_eval.step(action_eval)
    state_eval = next_state_eval
    eval_portfolio_values.append(env_eval.portfolio_value)

print(f"\nEvaluation complete! Final Portfolio Value: {env_eval.portfolio_value:.2f}")

# Visualization
plt.figure(figsize=(12, 6))
plt.plot(episode_rewards)
plt.title("Episode Rewards during Training")
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.grid(True)
plt.show()

plt.figure(figsize=(12, 6))
plt.plot(portfolio_values)
plt.title("Portfolio Value during Training")
plt.xlabel("Episode")
plt.ylabel("Portfolio Value")
plt.grid(True)
plt.show()

plt.figure(figsize=(12, 6))
plt.plot(eval_portfolio_values)
plt.title("Portfolio Value during Evaluation")
plt.xlabel("Time Step")
plt.ylabel("Portfolio Value")
plt.grid(True)
plt.show()


