In [1]:
# here we convert the jupiter file to a python script since it is easier to handle 
!jupyter nbconvert --to script --output supporting_classes supporting_classes.ipynb

[NbConvertApp] Converting notebook supporting_classes.ipynb to script
[NbConvertApp] Writing 6723 bytes to supporting_classes.py


# TradingEnv Class

In [2]:

import pandas as pd
import numpy as np

import gymnasium as gym         # Gymnasium is a library for building RL environments
                                # It provides a standard interface so RL algorithms (like PPO from Stable Baselines3) can interact with your environment.
from gymnasium import spaces    # spaces defines the action space and observation space for your environment

class TradingEnv(gym.Env):
    def __init__(self, data: pd.DataFrame, window_size=50):
        super().__init__()                              # initialize the base class
        self.data = data.reset_index(drop=True)         # reset index for easier slicing and store it in self.data
        self.window_size = window_size                  # number of previous candles to include in the observation
        self.current_step = window_size                 # start after the initial window

        # Observation: last N candles (OHLC + Volume)
        self.observation_space = spaces.Box(            # description of the observation space is given
            low=-np.inf, high=np.inf,                   # low and high values for each element in the observation are unbounded / infinite
            shape=(window_size, self.data.shape[1]),    # shape of the observation (window_size rows, number of features columns)
            dtype=np.float32                            # data type of the observation elements
        )
        # Actions: 0 = Hold, 1 = Buy, 2 = Sell
        self.action_space = spaces.Discrete(3)          # actually, there is not much we can do

    def reset(self, seed=None, options=None):           # reset the environment to initial state
        super().reset(seed=seed)                        # call the base class reset method
        self.current_step = self.window_size            # bring current step back to initial position
        return self._get_observation(), {}              # return the last known observation and an empty info dict
                                                        #   -> dict can be used to pass additional information like debugging info                                               

    def step(self, action):                             # do one step of the training
        reward = self._calculate_reward(action)         # calculate reward based on action taken - in the end the |profit/loss|
        self.current_step += 1                          # move to the next time step
        done = self.current_step >= len(self.data) - 1  # episode is done if we reach the end of the data
        return self._get_observation(), reward, done, False, {} 

    def _get_observation(self):
        obs = self.data.iloc[self.current_step - self.window_size:self.current_step].values # Return last N rows as observation ":" is NOT a division
        return obs.astype(np.float32)                   # return the last <windwow_size> observations as float32 numpy array

    def _calculate_reward(self, action, hold_threshold=0.1, transaction_cost=0.01):
        start_price = self.data['Open'].iloc[self.current_step]
        end_price = self.data['Close'].iloc[self.current_step]
        price_diff_pct = (end_price - start_price) / start_price * 100

        # Determine correct action
        if abs(price_diff_pct) <= hold_threshold:
            correct_action = 0  # HOLD
        elif price_diff_pct > hold_threshold:
            correct_action = 1  # CALL
        else:
            correct_action = 2  # PUT

        # Base reward
        if action == correct_action:
            reward = 0.7 if action == 0 else 1.0
        else:
            reward = -0.2 if action == 0 else -0.5

        # Bonus for magnitude (only if correct and not HOLD)
        if action == correct_action and action != 0:
            bonus = np.clip(abs(price_diff_pct) / 100, 0, 1.0)  # max bonus = 1.0
            reward += bonus

        # Transaction cost penalty for CALL/PUT
        if action in [1, 2]:
            reward -= transaction_cost

        return reward


    
    def render(self, action=None, reward=None):  # render the current state of the environment
        msg = f"Step: {self.current_step}, Close Price: {self.data['Close'].iloc[self.current_step]}"
        if action is not None:  # if an action was taken, include it in the message
            msg += f", Action: {action}"
        if reward is not None:  # if a reward was given, include it in the message
            msg += f", Reward: {reward:.4f}"
        print(msg)


# OHLCScaler Class

In [3]:
# since StandardSkaler works column-wise, and OHLC value should have the SAME scaling, we have to do a bit of manual work here
class OHLCScaler:
    def __init__(self, train_df: pd.DataFrame):                     # with the init fct the hole dataset is given in for Open, High, 
                                                                    # Low, Close and Volume
        self.columns_ohlc = ['Open', 'High', 'Low', 'Close']        # all OHLC columns in one mean/std
        self.column_volume = 'Volume'                               # Volume column in a 2nd mean/std

        ohlc_values = train_df[self.columns_ohlc].values.flatten()  # get all given OHLC values from all columns 
        self.mean_ohlc = np.mean(ohlc_values)                       # calculate mean for OHLC values
        self.std_ohlc = np.std(ohlc_values)                         # calculate std for OHLC values

        volume_values = train_df[self.column_volume].values         # same for all Volume values
        self.mean_volume = np.mean(volume_values)
        self.std_volume = np.std(volume_values)

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:      # the transform method scales a hole given dataframe in the form Open, 
                                                                # High, Low, Close and Volume
        df_scaled = df.copy()
        for col in self.columns_ohlc:
            df_scaled[col]            = (df_scaled[col]                - self.mean_ohlc)   / self.std_ohlc
        df_scaled[self.column_volume] = (df_scaled[self.column_volume] - self.mean_volume) / self.std_volume
        return df_scaled
    
    def re_transform(self, normalized_gain: float, normalized_price: float) -> tuple:
        real_price = normalized_price * self.std_ohlc + self.mean_ohlc
        real_gain  = normalized_gain  * self.std_ohlc # for the gain the mean μ is not needed -> −μ−(−μ)=−μ+μ=0
        return real_gain, real_price