<img src="https://hilpisch.com/tpq_logo.png" alt="The Python Quants" width="35%" align="right" border="0"><br>

# Reinforcement Learning for Finance

**Chapter 03 — Financial Q-Learning (Keras 3 with PyTorch Backend)**

© Dr. Yves J. Hilpisch

<a href="https://tpq.io" target="_blank">https://tpq.io</a> | <a href="https://twitter.com/dyjh" target="_blank">@dyjh</a> | <a href="mailto:team@tpq.io">team@tpq.io</a>

**Adapted for Keras 3 API with PyTorch backend**

## Finance Environment

In [None]:
import os
import random

In [2]:
random.seed(100)
os.environ['PYTHONHASHSEED'] = '0'

In [3]:
class ActionSpace:
    """Simple action space for binary decisions (buy/sell or up/down)"""
    def sample(self):
        return random.randint(0, 1)

In [4]:
action_space = ActionSpace()

In [5]:
[action_space.sample() for _ in range(10)]

[0, 1, 1, 0, 1, 1, 1, 0, 0, 0]

In [6]:
import numpy as np
import pandas as pd

### Finance Environment Class

This custom environment predicts market direction (up/down) using historical price data.

**Key concepts:**
- **State**: Last N normalized prices or returns
- **Action**: Predict direction (0=down, 1=up)
- **Reward**: 1 if correct prediction, 0 if wrong
- **Done**: Episode ends if accuracy drops below threshold or data exhausted

In [7]:
class Finance:
    url = 'https://certificate.tpq.io/rl4finance.csv'
    
    def __init__(self, symbol, feature, min_accuracy=0.485, n_features=4):
        """
        Args:
            symbol: Financial instrument (e.g., 'EUR=', 'AAPL.O')
            feature: Which data to use as state ('EUR=' for price, 'r' for returns)
            min_accuracy: Minimum accuracy threshold before stopping episode
            n_features: Number of lagged observations in state
        """
        self.symbol = symbol
        self.feature = feature
        self.n_features = n_features
        self.action_space = ActionSpace()
        self.min_accuracy = min_accuracy
        self._get_data()
        self._prepare_data()
    
    def _get_data(self):
        """Download historical price data"""
        self.raw = pd.read_csv(self.url, index_col=0, parse_dates=True)

In [8]:
class Finance(Finance):
    def _prepare_data(self):
        """Calculate returns, direction, and normalize data"""
        self.data = pd.DataFrame(self.raw[self.symbol]).dropna()
        self.data['r'] = np.log(self.data / self.data.shift(1))  # Log returns
        self.data['d'] = np.where(self.data['r'] > 0, 1, 0)      # Direction (1=up, 0=down)
        self.data.dropna(inplace=True)
        # Normalize data (z-score normalization)
        self.data_ = (self.data - self.data.mean()) / self.data.std()
    
    def reset(self):
        """Reset environment to start of data"""
        self.bar = self.n_features
        self.treward = 0
        # State = last n_features observations
        state = self.data_[self.feature].iloc[
            self.bar - self.n_features:self.bar].values
        return state, {}

In [9]:
class Finance(Finance):
    def step(self, action):
        """Execute action (predict direction) and return result"""
        # Check if prediction matches actual direction
        if action == self.data['d'].iloc[self.bar]:
            correct = True
        else:
            correct = False
        
        reward = 1 if correct else 0
        self.treward += reward
        self.bar += 1
        self.accuracy = self.treward / (self.bar - self.n_features)
        
        # Determine if episode should end
        if self.bar >= len(self.data):
            done = True  # Reached end of data
        elif reward == 1:
            done = False  # Correct prediction, continue
        elif (self.accuracy < self.min_accuracy) and (self.bar > 15):
            done = True  # Poor performance, stop episode
        else:
            done = False
        
        # Next state = next n_features observations
        next_state = self.data_[self.feature].iloc[
            self.bar - self.n_features:self.bar].values
        
        return next_state, reward, done, False, {}

### Test the Finance Environment

In [10]:
fin = Finance(symbol='EUR=', feature='EUR=')

URLError: <urlopen error [WinError 10054] An existing connection was forcibly closed by the remote host>

In [None]:
list(fin.raw.columns)

In [None]:
fin.reset()
# four lagged, normalized price points

In [None]:
fin.action_space.sample()

In [None]:
fin.step(fin.action_space.sample())

In [None]:
fin = Finance('EUR=', 'r')

In [None]:
fin.reset()
# four lagged, normalized log returns

### Random Agent Baseline

In [None]:
class RandomAgent:
    """Baseline agent that makes random predictions"""
    def __init__(self):
        self.env = Finance('EUR=', 'r')
    
    def play(self, episodes=1):
        self.trewards = list()
        for e in range(episodes):
            self.env.reset()
            for step in range(1, 100):
                a = self.env.action_space.sample()
                state, reward, done, trunc, info = self.env.step(a)
                if done:
                    self.trewards.append(step)
                    break

In [None]:
ra = RandomAgent()

In [None]:
ra.play(15)

In [None]:
ra.trewards

In [None]:
round(sum(ra.trewards) / len(ra.trewards), 2)

In [None]:
len(fin.data)

## DQL Agent with Keras 3 API

In [None]:
import os
import random
import warnings
import numpy as np

# Configure Keras to use PyTorch backend
os.environ['KERAS_BACKEND'] = 'torch'

import keras
from keras import layers, models, optimizers
from collections import deque

print(f"Using Keras {keras.__version__} with backend: {keras.backend.backend()}")

In [None]:
warnings.simplefilter('ignore')

In [None]:
lr = 0.0001

### DQL Agent for Financial Market Prediction

**Task**: Learn to predict market direction (up/down) from historical patterns.

**Key differences from CartPole:**
- State: 4 lagged returns (financial time series)
- Action: Predict direction (0=down, 1=up)
- Reward: Binary (correct=1, incorrect=0)
- Done: Based on prediction accuracy threshold

In [None]:
class DQLAgent:
    def __init__(self, symbol, feature, min_accuracy, n_features=4):
        """
        DQL Agent for financial market direction prediction using Keras 3 API.
        
        Args:
            symbol: Trading symbol (e.g., 'EUR=', 'AAPL.O')
            feature: Feature to use ('r' for returns, symbol for prices)
            min_accuracy: Minimum accuracy threshold
            n_features: Number of lagged observations in state
        """
        self.epsilon = 1.0
        self.epsilon_decay = 0.9975
        self.epsilon_min = 0.1
        self.memory = deque(maxlen=2000)
        self.batch_size = 32
        self.gamma = 0.5  # Lower gamma for short-term financial predictions
        self.trewards = []
        self.max_treward = 0
        self.n_features = n_features
        
        # Create Keras model
        self.model = self._create_model()
        
        # Initialize environment
        self.env = Finance(symbol, feature, min_accuracy, n_features)
    
    def _create_model(self):
        """Create neural network using Keras 3 Sequential API"""
        model = models.Sequential([
            layers.Dense(24, activation='relu', input_shape=(self.n_features,)),
            layers.Dense(24, activation='relu'),
            layers.Dense(2, activation='linear')  # 2 actions: down or up
        ])
        
        model.compile(
            optimizer=optimizers.Adam(learning_rate=lr),
            loss='mse'
        )
        
        return model
    
    def act(self, state):
        """Epsilon-greedy action selection"""
        if random.random() < self.epsilon:
            return self.env.action_space.sample()
        
        # Reshape state for prediction
        state = np.reshape(state, [1, self.n_features])
        q_values = self.model.predict(state, verbose=0)
        return int(np.argmax(q_values[0]))
    
    def replay(self):
        """Experience replay using Keras 3 training API"""
        if len(self.memory) < self.batch_size:
            return
        
        batch = random.sample(self.memory, self.batch_size)
        
        # Prepare batch data
        states = np.array([x[0] for x in batch])
        actions = np.array([x[1] for x in batch])
        next_states = np.array([x[2] for x in batch])
        rewards = np.array([x[3] for x in batch])
        dones = np.array([x[4] for x in batch])
        
        # Predict Q-values for current and next states
        q_values = self.model.predict(states, verbose=0)
        next_q_values = self.model.predict(next_states, verbose=0)
        
        # Update Q-values using Bellman equation
        for i in range(self.batch_size):
            if dones[i]:
                q_values[i, actions[i]] = rewards[i]
            else:
                q_values[i, actions[i]] = rewards[i] + self.gamma * np.amax(next_q_values[i])
        
        # Train model using Keras fit API
        self.model.fit(states, q_values, epochs=1, verbose=0, batch_size=self.batch_size)
        
        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
    
    def learn(self, episodes):
        """Train the agent over multiple episodes"""
        for e in range(1, episodes + 1):
            state, _ = self.env.reset()
            
            for f in range(1, 5000):
                action = self.act(state)
                next_state, reward, done, trunc, _ = self.env.step(action)
                
                # Store experience in memory
                self.memory.append((state, action, next_state, reward, done))
                state = next_state
                
                if done:
                    self.trewards.append(f)
                    self.max_treward = max(self.max_treward, f)
                    print(f'episode={e:4d} | treward={f:4d} | max={self.max_treward:4d}', end='\r')
                    break
            
            # Experience replay after each episode
            if len(self.memory) > self.batch_size:
                self.replay()
        
        print()
    
    def test(self, episodes):
        """Test the trained agent"""
        # Temporarily disable accuracy threshold for testing
        ma = self.env.min_accuracy
        self.env.min_accuracy = 0.5
        
        for e in range(1, episodes + 1):
            state, _ = self.env.reset()
            
            for f in range(1, 5001):
                # Always use best action (no exploration)
                state = np.reshape(state, [1, self.n_features])
                q_values = self.model.predict(state, verbose=0)
                action = int(np.argmax(q_values[0]))
                
                state, reward, done, trunc, _ = self.env.step(action)
                
                if done:
                    print(f'total reward={f} | accuracy={self.env.accuracy:.3f}')
                    break
        
        # Restore original accuracy threshold
        self.env.min_accuracy = ma

### Train the DQL Agent

In [None]:
random.seed(250)
np.random.seed(250)
keras.utils.set_random_seed(250)

In [None]:
agent = DQLAgent('EUR=', 'r', 0.495, 4)

In [None]:
%time agent.learn(250)

### Test the Trained Agent

In [None]:
agent.test(5)

### Visualize Training Progress

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(12, 6))
plt.plot(agent.trewards, alpha=0.3, label='Episode Reward')
if len(agent.trewards) >= 50:
    plt.plot(np.convolve(agent.trewards, np.ones(50)/50, mode='valid'), 
             label='50-Episode Moving Average')
plt.xlabel('Episode')
plt.ylabel('Total Reward (Steps Survived)')
plt.title('DQL Agent Training Progress - EUR= Direction Prediction (Keras 3 + PyTorch)')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

### Performance Analysis

In [None]:
print(f"Training Episodes: {len(agent.trewards)}")
print(f"Average Reward: {np.mean(agent.trewards):.2f}")
print(f"Max Reward: {agent.max_treward}")
print(f"Final Epsilon: {agent.epsilon:.4f}")
print(f"\nInterpretation:")
print(f"- Agent survived average {np.mean(agent.trewards):.0f} predictions per episode")
print(f"- Best run: {agent.max_treward} consecutive predictions at ≥{agent.env.min_accuracy*100:.1f}% accuracy")
print(f"- Total data points: {len(agent.env.data)}")

### Save/Load Model (Keras 3 API)

In [None]:
# Save model using Keras 3 API
agent.model.save('dql_finance_keras3.keras')

In [None]:
# Load model using Keras 3 API
# loaded_model = keras.models.load_model('dql_finance_keras3.keras')
# agent.model = loaded_model

<img src="https://hilpisch.com/tpq_logo.png" alt="The Python Quants" width="35%" align="right" border="0"><br>

<a href="https://tpq.io" target="_blank">https://tpq.io</a> | <a href="https://twitter.com/dyjh" target="_blank">@dyjh</a> | <a href="mailto:team@tpq.io">team@tpq.io</a>