In [None]:
####### Improved reward system with proper structure
import os
import numpy as np
import pandas as pd
import gymnasium as gym
from gymnasium import spaces
import matplotlib.pyplot as plt
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import BaseCallback

# =========================
# Constants
# =========================
STABLE_THRESHOLD = 0.001  # 0.1% movement considered stable
# =========================
class CustomCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(CustomCallback, self).__init__(verbose)
        self.episode = 0
        self.num_correct = 0
        self.total_steps = 0
        self.current_ep_reward = 0
        self.episode_rewards = []

    def _on_step(self) -> bool:
        info = self.locals['infos'][0]
        action = self.locals['actions'][0]
        reward = self.locals['rewards'][0]
        done = self.locals['dones'][0]

        step = info['step']
        delta_pct = info['delta_pct'] * 100  # convert to % for readability
        action_str = ['Hold', 'Buy', 'Sell'][action]

        if abs(delta_pct) < info['threshold_pct'] * 100:
            movement = 'stable'
        elif delta_pct > 0:
            movement = 'up'
        else:
            movement = 'down'
        movement_str = f"{movement} ({delta_pct:.2f}%)"

        print(f"Step {step}, chosen action {action_str}, next price movement {movement_str}, reward {reward:.4f}")

        # Track accuracy (reward >= 0 considered correct)
        # self.total_steps += 1
        # if reward >= 0:
        #     self.num_correct += 1
        # accuracy = (self.num_correct / self.total_steps) * 100
        # print(f"Running accuracy: {accuracy:.2f}%")

        # self.current_ep_reward += reward

        if step == 0:
            print(f"Episode {self.episode + 1}")

        if done:
            print(f"Episode {self.episode + 1} completed.")
            self.episode_rewards.append(self.current_ep_reward)
            self.current_ep_reward = 0
            self.episode += 1

        return True


# =========================
# Custom BTC Trading Environment
# =========================
class BTCTradingEnv(gym.Env):
    def __init__(self, data, lookback_window=12, transaction_cost=0.001):
        super(BTCTradingEnv, self).__init__()
        self.data = data.drop(columns=['OpenTime'])
        self.data = self.data.astype(np.float32)

        self.lookback = lookback_window
        self.transaction_cost = transaction_cost

        obs_shape = (self.lookback * self.data.shape[1],)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=obs_shape, dtype=np.float32)
        self.action_space = spaces.Discrete(3)  # Hold, Buy, Sell

        self.current_step = 0

    def reset(self, *, seed=None, options=None):
        self.current_step = self.lookback  # start after enough history
        obs = self._get_obs()
        return obs, {}

    def _calculate_reward(self, action, delta_pct):
    # Define thresholds
        up_threshold = 0.001   # >0.1% = up
        down_threshold = -0.001 # <-0.1% = down

        reward = 0.0

        if action == 1:  # Buy
            if delta_pct > up_threshold:
                reward = 1 + abs(delta_pct)  # correct, stronger move = bigger reward
            else:
                reward = -1                  # wrong prediction

        elif action == 2:  # Sell
            if delta_pct < down_threshold:
                reward = 1 + abs(delta_pct)  # correct, stronger down = bigger reward
            else:
                reward = -1

        else:  # Hold
            if abs(delta_pct) <= up_threshold:   # stable market
                reward = 0.5                     # small positive reward
            else:
                reward = -0.5                    # should have predicted moves

        return reward

    def step(self, action):
        current_close = self.data.iloc[self.current_step]['Close']
        next_close = self.data.iloc[self.current_step + 1]['Close']
        delta = next_close - current_close
        delta_pct = delta / current_close

        # Calculate reward using improved system
        reward = self._calculate_reward(action, delta_pct)

        info = {
            'step': self.current_step,
            'delta': delta,
            'delta_pct': delta_pct,
            'threshold_pct': STABLE_THRESHOLD,
            'reward': reward,
            'action': action,
        }

        self.current_step += 1
        terminated = self.current_step >= len(self.data) - 1
        truncated = False

        obs = self._get_obs() if not terminated else np.zeros(self.observation_space.shape, dtype=np.float32)
        return obs, reward, terminated, truncated, info

    def _get_obs(self):
        start = self.current_step - self.lookback
        end = self.current_step
        window = self.data.iloc[start:end].values
        return window.flatten().astype(np.float32)


# =========================
# Evaluation function
# =========================
def evaluate_model(model, vec_env):
    obs = vec_env.reset()
    terminated = False
    total_steps = 0
    num_correct = 0
    total_reward = 0

    print("Starting evaluation episode")
    while not terminated:
        action, _ = model.predict(obs, deterministic=True)
        obs, rewards, term, trunc, infos = vec_env.step(action)
        terminated = term[0] or trunc[0]

        info = infos[0]
        reward = rewards[0]
        delta_pct = info['delta_pct'] * 100
        step = info['step']
        action_val = action[0]

        action_str = ['Hold', 'Buy', 'Sell'][action_val]
        if abs(delta_pct) < info['threshold_pct'] * 100:
            movement = 'stable'
        elif delta_pct > 0:
            movement = 'up'
        else:
            movement = 'down'

        print(f"Step {step}, chosen action {action_str}, next price movement {movement} ({delta_pct:.2f}%), reward {reward:.4f}")

        total_steps += 1
        total_reward += reward
        if reward >= 0:
            num_correct += 1

    accuracy = (num_correct / total_steps) * 100 if total_steps > 0 else 0
    print("Evaluation completed.")
    print(f"Final evaluation accuracy: {accuracy:.2f}%")
    print(f"Total cumulative reward: {total_reward:.4f}")


# =========================
# Main
# =========================
if __name__ == "__main__":
    df = pd.read_csv('BTC-USD_with_Indicators.csv')
    env = BTCTradingEnv(df, lookback_window=12, transaction_cost=0.001)
    vec_env = DummyVecEnv([lambda: env])

    model_path = 'btc_dqn_agent.zip'
    trained = False
    if os.path.exists(model_path):
        model = DQN.load(model_path, env=vec_env)
        print("Loaded existing model.")
    else:
        # model = DQN("MlpPolicy", vec_env, verbose=1)
        model = DQN(
            "MlpPolicy",
            vec_env,
            verbose=1,
            learning_rate=1e-4,
            buffer_size=50000,
            learning_starts=1000,
            batch_size=32,
            tau=0.05,
            gamma=0.99,
            train_freq=4,
            target_update_interval=500,
            exploration_fraction=0.2,
            exploration_final_eps=0.05,
        )
        callback = CustomCallback()

        steps_per_episode = len(df) - 1
        total_timesteps = 20 * steps_per_episode

        model.learn(total_timesteps=total_timesteps, callback=callback)
        model.save(model_path)
        print("Trained and saved new model.")
        trained = True

        plt.plot(callback.episode_rewards)
        plt.xlabel('Episode')
        plt.ylabel('Total Reward')
        plt.title('Training Reward History')
        plt.show()

    evaluate_model(model, vec_env)

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


Using cpu device
