In [None]:
import csv
import gym
from gym import spaces
import numpy as np

In [None]:
tickers = ["AAPL", "AMZN", "GOOGL", "MSFT", "NVDA", "TSLA"]

data = {i: {t: float(row[t]) for t in tickers} \
    for i, row in enumerate(csv.DictReader( \
    open("nasdaq_stock_prices.csv", mode='r'), delimiter=','))
}

In [None]:
class Environment(gym.Env):
    def __init__(self, data: dict, window_size: int, initial_balance: float, verbose: bool = False):
        self.current_step = 0
        self.history_prices = data
        self.current_prices = self.history_prices[self.current_step]
        self.max_steps = len(self.history_prices) - 1
        self.tickers = list(self.history_prices[0].keys())
        self.window_size = window_size

        self.initial_balance = initial_balance
        self.history_balance = {0: self.initial_balance}
        self.current_balance = self.history_balance[self.current_step]

        self.initial_shares = {t: 0 for t in self.tickers}
        self.history_shares = {0: self.initial_shares}
        self.current_shares = self.history_shares[self.current_step]

        self.initial_value = self.initial_balance
        self.history_value = {0: self.initial_value}
        self.current_value = self.history_value[self.current_step]

        self.action_space = spaces.Box(low = -1.0, high = 1.0, shape = (len(self.tickers),))
        self.observation_dimension = self.window_size * (2 * len(self.tickers) + 2)  #window * (prices (n) + current_shares (n) + current_balance (1) + current_value (1) )
        self.observation_space = spaces.Box(low = -np.inf, high = np.inf, shape = (self.observation_dimension,))

        self.done = False

        self.verbose = verbose

    def reset(self):
        self.current_step = 0
        self.current_prices = self.history_prices[self.current_step]

        self.history_balance = {0: self.initial_balance}
        self.current_balance = self.initial_balance

        self.history_shares = {0: self.initial_shares}
        self.current_shares = {t: 0 for t in self.tickers}

        self.initial_value = self.initial_balance
        self.history_value = {0: self.initial_value}
        self.current_value = self.initial_value
        
        self.done = False

        print(f"\n📈 Step: {self.current_step}")
        print(f"🟦 Prices: {[round(self.current_prices[t], 2) for t in self.tickers]}")
        print(f"💰 Balance: {self.current_balance:.2f}")
        print(f"📊 Shares: { {t: round(self.current_shares[t], 2) for t in self.tickers} }")
        print(f"📦 Value: {self.current_value:.2f}")
        return self._get_state()
    
    def render(self):
        return self.history_balance, self.history_shares, self.history_value
    
    def _get_state(self):
        start = max(0, self.current_step - self.window_size)
        end = self.current_step + 1
        prices_window = [self.history_prices[i] for i in range(start, end)]
        balance_window = [self.history_balance[i] for i in range(start, end)]
        shares_window = [self.history_shares[i] for i in range(start, end)]
        value_window = [self.history_value[i] for i in range(start, end)]
        return prices_window, balance_window, shares_window, value_window
        
    def step(self, action: np.ndarray):
        if self.done:
            return self._get_state(), 0, self.done, {}

        if np.sum(action) > 1.0:
            raise ValueError(f"Invalid action: total buy fraction = {np.sum(action):.2f} > 1.0")
        
        if any([a < -1.0 for a in action]):
            raise ValueError(f"Invalid action: sell fraction < -1.0")

        for i, ticker in enumerate(self.tickers):
            act = action[i]
            if act < 0:
                shares_to_sell = self.current_shares[ticker] * (-act)
                proceeds = shares_to_sell * self.current_prices[ticker]
                self.current_balance += proceeds
                self.current_shares[ticker] -= shares_to_sell

            elif act > 0:
                amount_to_invest = self.current_balance * act
                shares_to_buy = amount_to_invest / self.current_prices[ticker]
                cost = shares_to_buy * self.current_prices[ticker]
                self.current_balance -= cost
                self.current_shares[ticker] += shares_to_buy

        previous_value = self.history_value[self.current_step]
        self.current_value = self.current_balance + sum(self.current_shares[t] * self.current_prices[t] for t in self.tickers)
        reward = self.current_value - previous_value

        self.current_step += 1
        self.done = self.current_step >= self.max_steps

        self.current_prices = self.history_prices[self.current_step]
        self.history_balance[self.current_step] = self.current_balance
        self.history_shares[self.current_step] = self.current_shares.copy()
        self.history_value[self.current_step] = self.current_value

        if self.verbose:
            print(f"\n📈 Step: {self.current_step}")
            print(f"🟦 Prices: {[round(self.current_prices[t], 2) for t in self.tickers]}")
            print(f"💰 Balance: {self.current_balance:.2f}")
            print(f"📊 Shares: { {t: round(self.current_shares[t], 2) for t in self.tickers} }")
            print(f"📦 Value: {self.current_value:.2f}")
            print(f"🔄 Reward: {reward:.2f} (Δ from {previous_value:.2f})")
            print(f"🎯 Action taken: {np.round(action, 2)}")

        return self._get_state(), reward, self.done, {}

In [None]:
from abc import ABC, abstractmethod
import numpy as np

class BaseAgent(ABC):
    @abstractmethod
    def reset(self):
        pass

    @abstractmethod
    def act(self, state: np.ndarray) -> tuple[np.ndarray, int | None]:
        pass

    @abstractmethod
    def update(self, *args):
        pass

    def train(self):
        pass

In [None]:
def train_agent(agent: BaseAgent, env: Environment, episodes: int = 1, verbose: bool = False):
    all_rewards = []

    for ep in range(episodes):
        state = env.reset()
        agent.reset()
        done = False
        total_reward = 0

        while not done:
            action, action_id = agent.act(state)
            next_state, reward, done, _ = env.step(action)
            
            if hasattr(agent, "remember"):
                agent.remember(state, action_id, reward)
            else:
                agent.update(action_id, reward)
            
            state = next_state
            total_reward += reward

        if hasattr(agent, "update") and hasattr(agent, "remember"):
            agent.update()

        all_rewards.append(total_reward)
        if verbose:
            print(f"🎯 Épisode {ep + 1}/{episodes} — Total reward: {total_reward:.2f}")

    return all_rewards

In [None]:
import numpy as np
#from agents.base_agent import BaseAgent

class MAB_Agent(BaseAgent):
    def __init__(self, env: Environment, epsilon: float = 0.1):
        self.epsilon = epsilon
        self.tickers = env.tickers
        self.n_arms = len(self.tickers)

        buy_actions = np.eye(self.n_arms, dtype=np.float32)
        sell_action = -np.ones((1, self.n_arms), dtype=np.float32)
        hold_action = np.zeros((1, self.n_arms), dtype=np.float32)
        self.action_templates = np.vstack([buy_actions, sell_action, hold_action])

        self.Q = np.zeros(len(self.action_templates))
        self.N = np.zeros(len(self.action_templates))

    def reset(self):
        self.Q = np.zeros(len(self.action_templates))
        self.N = np.zeros(len(self.action_templates))

    def select_action_id(self) -> int:
        if np.random.rand() < self.epsilon:
            return np.random.randint(len(self.action_templates))
        else:
            return np.argmax(self.Q)

    def act(self, state=None):
        action_id = self.select_action_id()
        action = self.action_templates[action_id]
        return action, action_id

    def update(self, action_id, reward):
        self.N[action_id] += 1
        alpha = 1 / self.N[action_id]
        self.Q[action_id] += alpha * (reward - self.Q[action_id])

In [None]:
#import numpy as np
#from agents.base_agent import BaseAgent

class MC_Agent(BaseAgent):
    def __init__(self, env: Environment, epsilon: float = 0.1, gamma: float = 1.0):
        self.epsilon = epsilon
        self.gamma = gamma
        self.tickers = env.tickers
        self.n_actions = len(self.tickers) + 2  # buy one asset + sell all + hold
        self.action_templates = np.vstack([
            np.eye(len(self.tickers)),
            -np.ones((1, len(self.tickers))),
            np.zeros((1, len(self.tickers)))
        ]).astype(np.float32)

        self.Q = {}           # (state, action_id) → Q-value
        self.returns = {}     # (state, action_id) → list of returns
        self.trajectory = []  # list of (state, action_id, reward)

    def _state_to_key(self, state):
        # Convertit un état complexe en clé hashable (tuple)
        # Ici on simplifie en flattenant tous les éléments en un seul vecteur
        prices, balance, shares, value = state
        flat = []
        for d in prices + balance + shares + value:
            if isinstance(d, dict):
                flat.extend(list(d.values()))
            else:
                flat.extend(d if isinstance(d, list) else [d])
        return tuple(np.round(flat, 2))  # On arrondit pour limiter les variations

    def act(self, state):
        state_key = self._state_to_key(state)
        if np.random.rand() < self.epsilon:
            action_id = np.random.randint(self.n_actions)
        else:
            q_vals = [self.Q.get((state_key, a), 0.0) for a in range(self.n_actions)]
            action_id = np.argmax(q_vals)
        return self.action_templates[action_id], action_id

    def update(self):
        # Monte Carlo every-visit
        G = 0
        visited = set()

        for t in reversed(range(len(self.trajectory))):
            state, action_id, reward = self.trajectory[t]
            G = self.gamma * G + reward
            key = (state, action_id)

            # every-visit : on stocke G à chaque passage
            if key not in self.returns:
                self.returns[key] = []
            self.returns[key].append(G)

            # moyenne des retours
            self.Q[key] = np.mean(self.returns[key])

        self.trajectory = []  # Clear trajectory after update

    def remember(self, state, action_id, reward):
        state_key = self._state_to_key(state)
        self.trajectory.append((state_key, action_id, reward))

    def reset(self):
        self.trajectory = []

In [None]:
import matplotlib.pyplot as plt

environment = Environment(data, window_size=2, initial_balance=1000, verbose=True)

mab_agent_optimal = MAB_Agent(environment, epsilon=0.5)
mab_agent_random = MAB_Agent(environment, epsilon=1.0)
mab_agent_greedy = MAB_Agent(environment, epsilon=0.1)
mc_agent_optimal = MC_Agent(environment, epsilon=0.5, gamma=0.7)
mc_agent_random = MC_Agent(environment, epsilon=1.0, gamma=0.7)
mc_agent_greedy = MC_Agent(environment, epsilon=0.1, gamma=0.7)


results_mab_mab_agent_optimal = train_agent(mab_agent_optimal, environment, episodes=100, verbose=True)
results_mab_agent_random = train_agent(mab_agent_random, environment, episodes=100, verbose=True)
results_mab_agent_greedy = train_agent(mab_agent_greedy, environment, episodes=100, verbose=True)
results_mc_agent_optimal = train_agent(mc_agent_optimal, environment, episodes=100, verbose=True)
results_mc_agent_random = train_agent(mc_agent_random, environment, episodes=100, verbose=True)
results_mc_agent_greedy = train_agent(mc_agent_greedy, environment, episodes=100, verbose=True)

plt.plot(results_mab_agent_greedy, label=f"Greedy {mab_agent_greedy.epsilon}")
plt.plot(results_mab_agent_random, label=f"Random {mab_agent_random.epsilon}")
plt.plot(results_mab_mab_agent_optimal, label=f"Optimal {mab_agent_optimal.epsilon}")
plt.plot(results_mc_agent_optimal, label=f"MC {mc_agent_optimal.epsilon}")
plt.plot(results_mc_agent_random, label=f"MC {mc_agent_random.epsilon}")
plt.plot(results_mc_agent_greedy, label=f"MC {mc_agent_greedy.epsilon}")
plt.legend()
plt.title("Total rewards over episodes")
plt.xlabel("Episodes")
plt.ylabel("Total rewards")
plt.show()

print("Greedy: ", np.mean(results_mab_agent_greedy))
print("Random: ", np.mean(results_mab_agent_random))
print("Optimal: ", np.mean(results_mab_mab_agent_optimal))
print("MC Greedy: ", np.mean(results_mc_agent_greedy))
print("MC Random: ", np.mean(results_mc_agent_random))
print("MC Optimal: ", np.mean(results_mc_agent_optimal))