In [43]:
# Step 1: Imports

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from collections import deque
import random
import datetime

import gym
from gym import spaces
import yfinance as yf


from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam

In [None]:
# Step 2: Load & Prepare Data

# Load CSVs
df_regime = pd.read_csv('regime.csv', parse_dates=['date'])
df_price = pd.read_csv('price.csv', parse_dates=['date'])

# Merge on date
df = pd.merge(df_regime, df_price, on='date')
df.sort_values('date', inplace=True)

# Calculate absolute prediction error
df['error'] = np.abs(df['actual_price'] - df['predicted_price'])

# Compute 7-day rolling average error
df['rolling_error'] = df['error'].rolling(window=7).mean()

# Drop initial rows where rolling average is NaN
df = df.dropna().reset_index(drop=True)

# Preview
df.head()


In [44]:
# Step 2 (Alternative): Generate mock regime and price data without CSVs


# Generate 200 days of mock data
np.random.seed(42)
n_days = 200
dates = pd.date_range(end=pd.Timestamp.today(), periods=n_days)

# Simulate actual prices (e.g., trending stock with noise)
actual_price = np.cumsum(np.random.randn(n_days) * 2 + 0.5) + 100

# Simulate predicted prices with some noise
predicted_price = actual_price * (1 + np.random.normal(0, 0.02, n_days))

# Simulate regime based on price movement
price_diff = np.diff(actual_price, prepend=actual_price[0])
regime = np.where(price_diff > 1, 1, np.where(price_diff < -1, -1, 0))

# Construct mock DataFrame
df = pd.DataFrame({
    'date': dates,
    'regime': regime,
    'actual_price': actual_price,
    'predicted_price': predicted_price
})

# Compute error and 7-day rolling error
df['error'] = np.abs(df['actual_price'] - df['predicted_price'])
df['rolling_error'] = df['error'].rolling(window=7).mean()

# Drop rows with NaN (from rolling average)
df = df.dropna().reset_index(drop=True)

# Preview
#df.head()


In [45]:
class TradingEnv(gym.Env):
    """
    Custom Trading Environment for RL Agent.
    Observations:
        [regime (1/0/-1), predicted_price, 7-day average prediction error]
    Actions:
        0: Buy, 1: Hold, 2: Sell
    """
    metadata = {'render.modes': ['human']}

    def __init__(self, merged_df, lookback=7):
        super(TradingEnv, self).__init__()
        self.data = merged_df.copy()
        self.lookback = lookback
        self.data['error'] = np.abs(self.data['actual_price'] - self.data['predicted_price'])
        self.max_error = self.data['error'].max()

        self.action_space = spaces.Discrete(3)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(3,), dtype=np.float32)
        self.reset()

    def _get_obs(self):
        row = self.data.iloc[self.current_step]
        regime = row['regime']
        predicted = row['predicted_price']
        avg_error = self.data.iloc[self.current_step - self.lookback:self.current_step]['error'].mean()
        return np.array([regime, predicted, avg_error], dtype=np.float32)

    def reset(self):
        self.current_step = self.lookback
        return self._get_obs()

    def step(self, action):
        row = self.data.iloc[self.current_step]
        regime = row['regime']

        # Base reward matrix
        if regime == 1:  # Bull
            base_reward = 1.0 if action == 0 else (0.0 if action == 1 else -1.0)
        elif regime == -1:  # Bear
            base_reward = 1.0 if action == 2 else (0.0 if action == 1 else -1.0)
        else:  # Neutral
            base_reward = 1.0 if action == 1 else -0.1

        # Scaled penalty from prediction error
        obs = self._get_obs()
        avg_error = obs[2]
        alpha = 1.0
        error_penalty = alpha * (avg_error / (self.max_error + 1e-6))

        reward = base_reward - error_penalty

        self.current_step += 1
        done = self.current_step >= len(self.data)
        next_state = self._get_obs() if not done else np.zeros(self.observation_space.shape)
        return next_state, reward, done, {}

    def render(self, mode='human'):
        obs = self._get_obs()
        print(f"Step {self.current_step}: Regime={obs[0]}, Predicted={obs[1]:.2f}, Avg Error={obs[2]:.2f}")


In [46]:
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size  # 3: regime, predicted_price, avg_error
        self.action_size = action_size  # 3: Buy, Hold, Sell
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95    # discount factor
        self.epsilon = 1.0   # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state[np.newaxis, :], verbose=0)
        return np.argmax(act_values[0])

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target += self.gamma * np.amax(self.model.predict(next_state[np.newaxis, :], verbose=0)[0])
            target_f = self.model.predict(state[np.newaxis, :], verbose=0)
            target_f[0][action] = target
            self.model.fit(state[np.newaxis, :], target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay


In [None]:
env = TradingEnv(df)
agent = DQNAgent(state_size=3, action_size=3)
episodes = 50
batch_size = 32
rewards_history = []

for e in range(episodes):
    state = env.reset()
    total_reward = 0
    done = False

    while not done:
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward

        if len(agent.memory) > batch_size:
            agent.replay(batch_size)

    rewards_history.append(total_reward)
    print(f"Episode {e+1}/{episodes} - Total Reward: {total_reward:.2f} - Epsilon: {agent.epsilon:.2f}")


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Episode 1/50 - Total Reward: -1.49 - Epsilon: 0.46
Episode 2/50 - Total Reward: 49.31 - Epsilon: 0.18
Episode 3/50 - Total Reward: 96.11 - Epsilon: 0.07
Episode 4/50 - Total Reward: 123.51 - Epsilon: 0.03
Episode 5/50 - Total Reward: 118.01 - Epsilon: 0.01
Episode 6/50 - Total Reward: 132.91 - Epsilon: 0.01


In [None]:
def animate_training_episode(env, agent):
    saved_epsilon = agent.epsilon
    agent.epsilon = 0.0  # deterministic policy

    state = env.reset()
    done = False
    step_log = []

    while not done:
        action = agent.act(state)
        reward = 0
        base_reward = 0
        regime = state[0]
        if regime == 1: base_reward = 1.0 if action == 0 else (0.0 if action == 1 else -1.0)
        elif regime == -1: base_reward = 1.0 if action == 2 else (0.0 if action == 1 else -1.0)
        else: base_reward = 1.0 if action == 1 else -0.1
        penalty = state[2] / (env.max_error + 1e-6)
        reward = base_reward - penalty

        step_log.append({
            'step': env.current_step,
            'predicted': state[1],
            'action': action,
            'penalty': penalty,
            'reward': reward
        })

        next_state, _, done, _ = env.step(action)
        state = next_state

    agent.epsilon = saved_epsilon

    # Plot Animation
    steps = [d['step'] for d in step_log]
    predicted = [d['predicted'] for d in step_log]
    actions = [d['action'] for d in step_log]
    penalties = [d['penalty'] for d in step_log]
    rewards = [d['reward'] for d in step_log]

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
    colors = {0: 'green', 1: 'blue', 2: 'red'}

    def init():
        ax1.clear()
        ax2.clear()
        ax1.set_title("Predicted Price with Action Markers")
        ax2.set_title("Penalty and Reward Over Time")
        ax1.set_ylabel("Predicted Price")
        ax2.set_ylabel("Value")
        ax2.set_xlabel("Step")

    def update(i):
        ax1.clear()
        ax2.clear()
        ax1.plot(steps[:i+1], predicted[:i+1], 'k--')
        for j in range(i+1):
            ax1.scatter(steps[j], predicted[j], color=colors[actions[j]], s=60)
        ax2.plot(steps[:i+1], penalties[:i+1], 'r-', label='Penalty')
        ax2.plot(steps[:i+1], rewards[:i+1], 'b-', label='Final Reward')
        ax2.legend()

    ani = animation.FuncAnimation(fig, update, frames=len(steps), init_func=init,
                                  interval=300, repeat=False)
    plt.tight_layout()
    plt.show()

animate_training_episode(env, agent)


In [35]:
plt.figure(figsize=(12, 4))
plt.plot(env.data['date'], env.data['error'].rolling(7).mean(), label='7-Day Avg Error')
plt.title("Rolling 7-Day Prediction Error")
plt.xlabel("Date")
plt.ylabel("Error")
plt.grid(True)
plt.legend()
plt.show()

In [None]:
# Re-run one deterministic episode to collect actions
state = env.reset()
agent.epsilon = 0.0
done = False
decision_points = []

while not done:
    action = agent.act(state)
    decision_points.append((env.data.iloc[env.current_step]['date'],
                            env.data.iloc[env.current_step]['actual_price'],
                            action))
    next_state, _, done, _ = env.step(action)
    state = next_state

# Plotting decisions
dates, prices, actions = zip(*decision_points)
colors = ['green' if a == 0 else 'blue' if a == 1 else 'red' for a in actions]

plt.figure(figsize=(12, 5))
plt.plot(env.data['date'], env.data['actual_price'], label='Actual Price', color='black')
plt.scatter(dates, prices, c=colors, label='Actions', s=50, alpha=0.7)
plt.title("Buy (Green), Hold (Blue), Sell (Red) Decisions")
plt.xlabel("Date")
plt.ylabel("Price")
plt.legend()
plt.grid(True)
plt.show()
