# Imports

In [13]:
import numpy as np
import random
import pandas as pd
from abc import abstractmethod
from typing import Tuple
import yfinance as yf
import matplotlib.pyplot as plt

# Data & Utilities

In [9]:
def fetch_and_save_stock_data(stock_symbol: str, start_date, end_date, file_name):
    stock_data = yf.download(stock_symbol, start=start_date, end=end_date)
    stock_data = preprocess_data(stock_data)
    stock_data.to_csv(file_name)
    print(f"Data for {stock_symbol} saved to {file_name}")

def preprocess_data(df: pd.DataFrame):
    df.replace(0, np.nan, inplace=True)

    # Forward and backward fill to handle NaNs
    df.fillna(method="ffill", inplace=True)
    df.fillna(method="bfill", inplace=True)

    return df

# Example usage
fetch_and_save_stock_data("PEP", "2010-01-01", "2023-01-01", "pepsi_data.csv")
fetch_and_save_stock_data("KO", "2010-01-01", "2023-01-01", "cola_data.csv")

[*********************100%%**********************]  1 of 1 completed


Data for PEP saved to pepsi_data.csv
[*********************100%%**********************]  1 of 1 completed
Data for KO saved to cola_data.csv


  df.fillna(method="ffill", inplace=True)
  df.fillna(method="bfill", inplace=True)
  df.fillna(method="ffill", inplace=True)
  df.fillna(method="bfill", inplace=True)


# Basic Environment Class #
This class implements the core methods and properties of a trading environment. The different environments that will be needed to implements the experiments will inherit from this class

In [10]:
class TradingEnvironment:
    def __init__(
        self, pepsi_file: str, cola_file: str, observation_dim: int, action_dim: int
    ):
        self.pepsi_data = pd.read_csv(pepsi_file)
        self.cola_data = pd.read_csv(cola_file)

        self.action_space = range(action_dim)
        self.state = np.zeros(observation_dim)

        self.current_step = 0
        self.portfolio_value = self._compute_portfolio_value()

    def step(self, action: int) -> Tuple[np.ndarray, float, bool]:
        """
        Update the environment with action taken by the agent

        Args:
            action: int, The action taken by the agent

        Returns:
            next_state_index: int, The index of the next state
            reward: float, The reward returned by the environment
            done: bool, Is the episode terminated or truncated
        """
        self._check_action_validity(action)
        self.state = self._trade(action)
        self.current_step += 1
        done = self.current_step >= len(self.pepsi_data) - 1
        reward = self._compute_reward()

        return self.state, reward, done

    @abstractmethod
    def reset(self) -> np.ndarray:
        # Not Implemented
        raise NotImplementedError

    @abstractmethod
    def _trade(self, action: int) -> np.ndarray:
        # Not Implemented
        raise NotImplementedError

    @abstractmethod
    def _get_indicator(self, stock_data: pd.DataFrame) -> int | float:
        # Not Implemented
        raise NotImplementedError

    @abstractmethod
    def _check_action_validity(self, action: int) -> None:
        # Not Implemented
        raise NotImplementedError

    @abstractmethod
    def _compute_portfolio_value(self) -> float:
        # Not Implemented
        raise NotImplementedError

    def _get_stock_price(self, step: int, stock_data: pd.DataFrame) -> float:
        """
        Fetch the price for the given step and stock
        """
        return stock_data.iloc[step]["Close"]

    def _get_stock_trend(self, step: int, stock_data: pd.DataFrame) -> float:
        """
        Fetch the trend for the given stock between the given step and the previous one
        """
        return stock_data.iloc[step]["Close"] - stock_data.iloc[step - 1]["Close"]

    def _compute_reward(self) -> float:
        """
        Computes and updates the portfolio value and returns the reward associated
        The reward is the difference between the current portfolio value and the previous one
        """
        current_portfolio_value = self._calculate_portfolio_value()
        reward = current_portfolio_value - self.previous_portfolio_value
        self.previous_portfolio_value = current_portfolio_value
        return reward

# Experiment 1
## Super simplified stock trading as a discrete MDP

### Q Learning
To fill

### Environment

In [11]:
class SimplifiedDiscreteTradingEnvironment(TradingEnvironment):
    def __init__(self, pepsi_file: str, cola_file: str):
        self.observation_dim = (
            5  # [Balance, Shares Pepsi, Shares Cola, Trend Pepsi, Trend Cola]
        )
        self.action_dim = 4  # 0 = Sell all, 1 = Hold, 2 = Buy Pepsi, 3 = Buy Cola
        super.__init__(pepsi_file, cola_file, self.observation_dim, self.action)

        self.balance_unit = 10
        self.max_balance_units = 10
        self.max_shares_per_stock = 5

        self.max_state_index = (
            11 * 6 * 6 * 2 * 2
        )  # 11 balances, 6 shares each for Pepsi and Cola, 2 trends each

        self.state = np.array(
            [15, 0, 0, 0, 0]
        )  # Initial state: [Balance, Pepsi shares, Cola shares, Trend of Pepsi, Trend of Cola]

    def __str__(self) -> str:
        info = """The environment is a Simplified Discrete Trading Problem (Experiment 1).\n 
        It is using the stocks: {}, {}.\n 
        The episode is at the timestep {}\n
        The current stock prices are {}\n
        Amount of shares held by the agent: {}\n
        Left balance: {}"""
        return info

    def step(self, action: int) -> Tuple[np.ndarray, float, bool]:
        state, reward, done = super.step(action)
        state_index = self.convert_state_to_index(state)
        return state_index, reward, done

    def reset(self) -> np.ndarray:
        self.state = np.array([15, 0, 0, 0, 0])  # Reset to initial state
        self.current_step = 0
        self.portfolio_value = self._compute_portfolio_value()
        return self.state

    def _trade(self, action: int) -> np.ndarray:
        """
        Trade the desired amount

        Args:
            action: int, The trade order, can be
                - 0: Sell all
                - 1: Hold
                - 2: Buy Pepsi
                - 3: Buy Cola
        """
        balance_units, shares_pepsi, shares_cola = (
            self.state[0],
            self.state[1],
            self.state[2],
        )
        balance = balance_units * self.balance_unit
        pepsi_price = self._get_stock_price(self.current_step, self.pepsi_data)
        cola_price = self._get_stock_price(self.current_step, self.cola_data)

        if action == 0:  # Sell all
            balance += shares_pepsi * pepsi_price + shares_cola * cola_price
            shares_pepsi, shares_cola = 0, 0
        elif action == 2:  # Buy Pepsi
            quantity = min(
                balance // pepsi_price, self.max_shares_per_stock - shares_pepsi
            )
            shares_pepsi += quantity
            balance -= quantity * pepsi_price
        elif action == 3:  # Buy Cola
            quantity = min(
                balance // cola_price, self.max_shares_per_stock - shares_cola
            )
            shares_cola += quantity
            balance -= quantity * cola_price

        # Update state with rounded balance
        new_balance = max(int(balance / self.balance_unit), 0), self.max_balance_units

        trend_pepsi = self._get_indicator(self.current_step, self.pepsi_data)
        trend_cola = self._get_indicator(self.current_step, self.cola_data)

        return np.array(new_balance, shares_pepsi, shares_cola, trend_pepsi, trend_cola)

    def _get_indicator(self, step: int, stock_data: pd.DataFrame) -> int:
        trend = self._get_stock_trend(step, stock_data)
        return int(trend > 0)

    def _compute_portfolio_value(self) -> float:
        balance = self.state[0] * self.balance_unit
        pepsi_holdings_value = self.state[1] * self._get_stock_price(
            self.current_step, self.pepsi_data
        )
        cola_holdings_value = self.state[2] * self._get_stock_price(
            self.current_step, self.cola_data
        )
        return balance + pepsi_holdings_value + cola_holdings_value

    def convert_state_to_index(self, state: np.ndarray) -> int:
        balance_index, pepsi_shares, cola_shares, trend_pepsi, trend_cola = state
        index = balance_index
        index += pepsi_shares * 11
        index += cola_shares * 11 * 6
        index += trend_pepsi * 11 * 6 * 6
        index += trend_cola * 11 * 6 * 6 * 2
        return int(index)

    def convert_index_to_state(self, index: int) -> np.ndarray:
        trend_cola = index // (11 * 6 * 6 * 2)
        index %= 11 * 6 * 6 * 2
        trend_pepsi = index // (11 * 6 * 6)
        index %= 11 * 6 * 6
        cola_shares = index // (11 * 6)
        index %= 11 * 6
        pepsi_shares = index // 11
        balance_index = index % 11
        return np.array(
            [balance_index, pepsi_shares, cola_shares, trend_pepsi, trend_cola]
        )

### Agent

In [8]:
class QLearningAgent:
    def __init__(
        self,
        state_space: int,
        action_space: int,
        learning_rate=0.01,
        discount_factor=0.99,
        exploration_rate=1.0,
    ):
        # Env
        self.state_space = state_space
        self.action_space = action_space

        # Learning
        self.learning_rate = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.exploration_min = 0.01
        self.exploration_decay = 0.995
        self.q_table = np.zeros((state_space, action_space))

        # Monitoring
        self.q_table_history = np.zeros((1, state_space, action_space))

    def __str__(self) -> str:
        info = """The agent is using Q-Learning algorithm\n
        It is working on Simplified Discrete Trading Environment (Experiment 1)\n
        The current Q Table values can be fetch by calling get_current_q_values() method\n
        The history of Q Table values can be fetch by calling get_history_q_values() method"""
        return info

    def choose_action(self, state_index: int) -> int:
        """
        Choose action according to current Q Table
        """
        if np.random.rand() < self.exploration_rate:
            return random.randrange(self.action_space)
        return np.argmax(self.q_table[state_index])

    def train(
        self, state_index: int, action: int, reward: float, next_state_index: int
    ) -> None:
        """
        Update Q values following Q Learning classical update
        """
        assert 0 <= state_index < self.state_space, "Invalid state_index"
        assert 0 <= next_state_index < self.state_space, "Invalid next_state_index"

        q_value = self.q_table[state_index, action]

        # Target = Rt + Gamma x max(Q[S(t+1), a])
        target = reward + self.discount_factor * np.max(self.q_table[next_state_index])

        # Q[S(t), action] =  Q[S(t), action] + alpha x (Rt + Gamma x max(Q[S(t+1), a]) - Q[S(t), action])
        self.q_table[state_index, action] += self.learning_rate * (target - q_value)

        # Store Q table
        self.q_table_history = np.concatenate((self.q_table_history, [self.q_table]))

        self.exploration_rate = max(
            self.exploration_rate * self.exploration_decay, self.exploration_min
        )

    def get_current_q_values(self) -> np.ndarray:
        """
        Fetch the current Q Table as a numpy array of shape:
            (number of possible states, number of possible actions)
        """
        return self.q_table

    def get_history_q_values(self) -> np.ndarray:
        """
        Fetch the history of Q Tables as a numpy array of shape:
            (number of episodes seen, number of possible states, number of possible actions)
        """
        return self.q_table_history


### Training

In [12]:
def train_QLearning_agent(env: SimplifiedDiscreteTradingEnvironment, 
                          agent: QLearningAgent, 
                          num_episodes: int):
    """
    Performs the training of the Agent for experiment 1
    """

    rewards_per_episode = []

    for episode in range(num_episodes):
        state_index = env.reset()  # Get the initial state index
        total_rewards = 0

        done = False
        while not done:
            action = agent.choose_action(state_index)
            next_state_index, reward, done = env.step(action)  # next_state_index is directly obtained here
            agent.train(state_index, action, reward, next_state_index)
            state_index = next_state_index
            total_rewards += reward

        rewards_per_episode.append(total_rewards)
        print(f"Episode: {episode}, Total Reward: {total_rewards}")

    plt.plot(rewards_per_episode)
    plt.title('Rewards per Episode')
    plt.xlabel('Episode')
    plt.ylabel('Total Reward')
    plt.show()

    return rewards_per_episode

### Experiment

In [14]:
# TODO