In [1]:
import os, psutil

# Determine number of available (idle) cores
IDLE_THRESHOLD = 20.0  # percent
cpu_usages = psutil.cpu_percent(percpu=True, interval=1)
available_cores = sum(usage < IDLE_THRESHOLD for usage in cpu_usages)
available_cores = max(1, available_cores) - 4 # At least 1

available_cores = os.cpu_count()

import mplfinance
import numpy as np
import pandas as pd
import random
import matplotlib.pyplot as plt

# Stable-Baselines3 imports
from stable_baselines3 import PPO,A2C
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.vec_env import SubprocVecEnv  # Use this instead of DummyVecEnv
from stable_baselines3.common.callbacks import BaseCallback

# Gymnasium (updated from Gym)
import gymnasium as gym
from gymnasium import Env,Wrapper
from gymnasium.spaces import Discrete, Box


In [2]:
# Load Data
df = pd.read_csv("MGOL.csv")  # Replace with actual file
df['datetime'] = pd.to_datetime(df['datetime'], format='%m/%d/%y %H:%M')
df.set_index('datetime', inplace=True)

In [3]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler

df.index = df.index + pd.Timedelta(hours=3)
df.index.name = 'Date'
df = df.drop(columns=['symbol', 'frame'])
df = df.iloc[:10]  # Select the first 30 rows
df_original = df
df = df[["close"]]



In [None]:
# ActionMasker

class ActionMasker(gym.Wrapper):
    """
    Wrapper for action masking in environments.
    Adds action mask as a part of the environment step.
    """
    def __init__(self, env: gym.Env, mask_fn: callable):
        super().__init__(env)
        self.mask_fn = mask_fn

    def step(self, action):
        obs, reward, done, truncated, info = self.env.step(action)
        
        # Get the action mask
        action_mask = self.mask_fn(self.env)
        
        # Add the action mask to the info dictionary
        info['action_mask'] = action_mask
        
        return obs, reward, done, truncated, info
    
    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        return obs


In [8]:
# MaskedPPOPolicy

from stable_baselines3.common.policies import ActorCriticPolicy
#import torch
#import torch.nn.functional as F

class MaskedPPOPolicy(ActorCriticPolicy):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def forward(self, combined_obs, action_mask=None, deterministic=False):
        combined_obs = combined_obs.to(dtype=torch.float32)  # Ensure float32

        # Extract latent features
        latent_pi, latent_vf = self.mlp_extractor(combined_obs.to(dtype=torch.float32))  

        # Compute action distribution and value
        distribution = self._get_action_dist_from_latent(latent_pi)
        values = self.value_net(latent_vf).to(dtype=torch.float32)  # Ensure float32

        # Ensure action_mask is properly handled
        if action_mask is None:
            action_mask = combined_obs[:, 5:]  # Extract last 3 elements as action mask

        if action_mask is not None:
            action_mask_tensor = torch.as_tensor(action_mask, dtype=torch.float32, device=combined_obs.device)

            # Ensure shape matches logits
            action_mask_tensor = action_mask_tensor.view(-1, distribution.distribution.logits.shape[-1])

            # Apply masking to logits
            distribution.distribution.logits = distribution.distribution.logits.masked_fill(action_mask_tensor == 0, -1e9)

        # Action selection
        if deterministic:
            actions = torch.argmax(distribution.distribution.probs, dim=-1)
        else:
            actions = distribution.sample()

        log_probs = distribution.log_prob(actions)

        if debug: print(f"  - Selected Actions: {actions}")

        return actions, values, log_probs

    def predict(self, observation, state=None, mask=None, action_mask=None, deterministic=False):
        with torch.no_grad():
            observation = torch.as_tensor(observation, device=self.device, dtype=torch.float32)
            actions, _, _ = self.forward(observation, action_mask=action_mask, deterministic=deterministic)
            actions = actions.cpu().numpy()
        return actions, state


In [6]:
debug = False

class RewardCallback(BaseCallback):
    def __init__(self, verbose=0, debug=False):
        super(RewardCallback, self).__init__(verbose)
        self.debug = debug
        
        # Metrics for tracking actions and rewards
        self.episode_rewards = []
        self.episode_steps = []
        self.iteration_rewards = []
        self.iteration_invalid_actions = []
        self.invalid_actions = []
        self.valid_actions = []
        self.current_episode_steps = 0
        
        # Metrics for TensorBoard logging
        self.total_reward = 0
        self.reward = 0
        self.num_trades = 0

    def _on_step(self) -> bool:
        # Collect rewards and actions
        rewards = self.locals.get("rewards", [])
        actions = self.locals.get("actions", [])
        
        if len(rewards) > 0:  # Check if rewards is not empty
            self.episode_rewards.extend(rewards)
        if len(actions) > 0:  # Check if actions is not empty
            infos = self.locals.get("infos", [])
            for idx, info in enumerate(infos):
                valid_actions = info.get("valid_actions", [0, 1, 2])
                action = actions[idx]
                if action not in valid_actions:
                    self.invalid_actions.append(action)
                else:
                    self.valid_actions.append(action)

        self.current_episode_steps += 1

        # Access the environment metrics using get_attr for SubprocVecEnv
        if isinstance(self.training_env, SubprocVecEnv):
            try:
                inner_envs = self.training_env.get_attr('env')  # ActionMasker
                for env in inner_envs:
                    if hasattr(env, 'env'):  # Unwrap ActionMasker
                        env = env.env
                    self.total_reward += getattr(env, "total_reward", 0)
                    self.num_trades += getattr(env, "round_trip_trades", 0)
            except Exception as e:
                if self.debug:
                    print(f"Failed to access env attributes: {e}")
        else:
            # For DummyVecEnv or single environments
            for env in self.training_env.envs:
                if hasattr(env, 'env'):
                    env = env.env
                self.total_reward += getattr(env, "total_reward", 0)
                self.num_trades += getattr(env, "round_trip_trades", 0)

        # TensorBoard logging
        self.logger.record("custom/num_trades", self.num_trades)
        self.logger.record("custom/total_reward", self.total_reward)

        # Entropy logging
        if hasattr(self.model.policy, "action_dist"):
            action_dist = self.model.policy.action_dist
            entropy = action_dist.entropy().mean().item()
            self.logger.record("policy/entropy", entropy)
        elif hasattr(self.model.policy, "get_distribution"):
            obs = self.locals.get("obs", [])
            if len(obs) > 0:  # Check if observations exist
                action_dist = self.model.policy.get_distribution(obs)
                entropy = action_dist.entropy().mean().item()
                self.logger.record("policy/entropy", entropy)

        # Value loss logging
        if "value_loss" in self.locals:
            value_loss = self.locals["value_loss"]
            self.logger.record("loss/value_loss", value_loss)

        # Episode done handling
        dones = self.locals.get("dones", [])
        if any(dones):
            self.episode_steps.append(self.current_episode_steps)
            self.current_episode_steps = 0
            total_reward = np.sum(self.episode_rewards)
            self.iteration_rewards.append(total_reward)
            self.episode_rewards = []

            invalid_count = len(self.invalid_actions)
            valid_count = len(self.valid_actions)
            self.iteration_invalid_actions.append(invalid_count)

            if self.debug:
                print(f"Invalid actions in this episode: {invalid_count}")
                print(f"Valid actions in this episode: {valid_count}")
                print(f"Invalid actions: {self.invalid_actions}")

            self.invalid_actions = []

        return True




In [5]:
# latest DebugCallback

from stable_baselines3.common.callbacks import BaseCallback

class DebugCallback(BaseCallback):
    def __init__(self, debug_episodes=None, verbose=0, debug=False):
        super().__init__(verbose)
        self.debug_episodes = set(debug_episodes) if debug_episodes is not None else set()
        self.episode_counts = []
        self.episode_step_counts = {}  # Track steps per environment
        self.debug = debug
        self.debug_triggered = False  # Optional: Only needed if you want to trigger once
        self.debug_on_step = None     # Optional: If you plan to use this
        self.last_printed_episodes = []  # To store the last printed episode number
        self.max_episode_steps = {}  # Track the max steps per environment

    def _init_callback(self) -> None:
        n_envs = self.training_env.num_envs
        # Initialize the counts and step tracking for all environments
        self.episode_counts = [0] * n_envs
        self.last_printed_episodes = [None] * n_envs  # Make sure this list is of the correct size
        self.episode_step_counts = {i: 0 for i in range(n_envs)}  # Initialize step counts for all environments
        self.max_episode_steps = {i: 0 for i in range(n_envs)}  # Initialize the max episode steps per environment

    def _on_training_start(self) -> None:
        num_envs = getattr(self.training_env, "num_envs", 1)
        self.episode_counts = [0] * num_envs
        self.current_episode_steps = [0] * num_envs

    def on_training_end(self) -> None:
        # Print the maximum steps per environment after training ends
        for env_id, max_steps in self.max_episode_steps.items():
            if self.debug: print(f"Max steps in episode for env {env_id}: {max_steps}")

    def _on_step(self) -> bool:
        dones = self.locals.get("dones", [])
        infos = self.locals.get("infos", [])

        for i in range(len(dones)):
            # Ensure that episode_step_counts has the environment index initialized
            if i not in self.episode_step_counts:
                self.episode_step_counts[i] = 0

            # Track steps
            self.episode_step_counts[i] += 1

            # Update the maximum episode steps for each environment
            if self.episode_step_counts[i] > self.max_episode_steps[i]:
                self.max_episode_steps[i] = self.episode_step_counts[i]
            
            episode_num = self.episode_counts[i]

            # Print start of episode (optional)
            if self.last_printed_episodes[i] != episode_num:
                if self.debug: print(f"Current episode (env {i}): {episode_num}")
                self.last_printed_episodes[i] = episode_num

            # Debug output if in debug_episodes list
            if episode_num in self.debug_episodes:
                if self.debug: print(f"Current step (env {i}): {self.episode_step_counts[i]}")
                if self.debug: print(f"[Env {i}] dones: {dones[i]} infos: {infos[i]}")

            # If episode is done, print step count and reset counter
            if dones[i]:
                if self.debug: print(f"Episode {episode_num} (env {i}) finished in {self.episode_step_counts[i]} steps.")
                self.episode_counts[i] += 1
                self.episode_step_counts[i] = 0  # Reset for next episode

        return True


In [None]:
class RewardCallback(BaseCallback):
    def __init__(self, debug=False, verbose=0):
        super().__init__(verbose)
        self.iteration_rewards_per_env = []
        self.iteration_invalid_actions_per_env = []
        self.debug = debug
        self.n_envs = None

        self.current_episode_steps = 0

    def _on_training_start(self) -> None:
        self.n_envs = self.training_env.num_envs
        self.current_rewards = [0.0 for _ in range(self.n_envs)]
        self.invalid_actions = [0 for _ in range(self.n_envs)]

    def _on_step(self) -> bool:
        rewards = self.locals["rewards"]
        dones = self.locals["dones"]
        infos = self.locals["infos"]

        for i in range(self.n_envs):
            self.current_rewards[i] += rewards[i]
            if infos[i].get("invalid_action", False):
                self.invalid_actions[i] += 1

            if dones[i]:
                if self.debug:
                    print(f"[Env {i}] Done. Number of steps: {self.current_episode_steps}, Reward: {self.current_rewards[i]:.2f}, Invalid: {self.invalid_actions[i]}")
                self.iteration_rewards_per_env.append((i, self.current_rewards[i]))
                self.iteration_invalid_actions_per_env.append((i, self.invalid_actions[i]))
                self.current_rewards[i] = 0.0
                self.invalid_actions[i] = 0

        return True

    def _on_training_end(self) -> None:
        # Compute summary per environment
        from collections import defaultdict

        reward_sums = defaultdict(float)
        invalid_sums = defaultdict(int)

        for env_id, reward in self.iteration_rewards_per_env:
            reward_sums[env_id] += reward
        for env_id, invalid in self.iteration_invalid_actions_per_env:
            invalid_sums[env_id] += invalid

        print("\n=== Final Per-Environment Summary ===")
        for i in range(self.n_envs):
            print(f"[Env {i}] Total Reward: {reward_sums[i]:.2f}, Invalid Actions: {invalid_sums[i]}")

        # Print full list format for rewards and invalids
        rewards_list = [reward_sums[i] for i in range(self.n_envs)]
        invalids_list = [invalid_sums[i] for i in range(self.n_envs)]

        print("\nRewards per environment (list):", rewards_list)
        print("Invalid actions per environment (list):", invalids_list)


In [22]:
from stable_baselines3.common.callbacks import BaseCallback
from collections import defaultdict
import numpy as np
from stable_baselines3.common.vec_env import SubprocVecEnv

class RewardCallback(BaseCallback):
    def __init__(self, verbose=0, debug=False):
        super(RewardCallback, self).__init__(verbose)
        self.debug = debug
        
        # Episode-level tracking
        self.episode_rewards = []
        self.episode_steps = []
        self.iteration_rewards = []
        self.iteration_invalid_actions = []
        self.invalid_actions = []
        self.valid_actions = []
        self.current_episode_steps = 0
        
        # TensorBoard logging
        self.total_reward = 0
        self.reward = 0
        self.num_trades = 0

        self.iteration_rewards_per_env = []
        self.iteration_invalid_actions_per_env = []
        self.n_envs = None
        
        # Per-environment tracking
        self.per_env_rewards = defaultdict(list)
        self.per_env_invalid_actions = defaultdict(list)

        self.per_env_rewards_total = defaultdict(list)
        self.per_env_invalid_actions_total = defaultdict(list)

    def _on_training_start(self) -> None:
        self.n_envs = self.training_env.num_envs
        self.current_rewards = [0.0 for _ in range(self.n_envs)]
        self.invalid_actions = [0 for _ in range(self.n_envs)]

    def _on_training_end(self):
        print("\n=== Final Per-Environment Summary ===")
        # Summary across all episodes
        for env_idx in sorted(self.per_env_rewards_total.keys()):
            total_reward = np.sum(self.per_env_rewards_total[env_idx])
            total_invalid = len(self.per_env_invalid_actions_total[env_idx])
            print(f"[Env {env_idx}] Total Reward: {total_reward:.2f}, Invalid Actions: {total_invalid}")

    
    def _on_step(self) -> bool:
        rewards = self.locals.get("rewards", [])
        actions = self.locals.get("actions", [])
        infos = self.locals.get("infos", [])

        for idx, info in enumerate(infos):
            valid_actions = info.get("valid_actions", [0, 1, 2])
            action = actions[idx]
            reward = rewards[idx]
        
            self.per_env_rewards[idx].append(reward)
            self.per_env_rewards_total[idx].append(reward)
        
            if action not in valid_actions:
                self.invalid_actions.append(action)
                self.per_env_invalid_actions[idx].append(action)
                self.per_env_invalid_actions_total[idx].append(action)
            else:
                self.valid_actions.append(action)
        
            self.episode_rewards.append(reward)


        self.current_episode_steps += 1

        # Log metrics from environment
        if isinstance(self.training_env, SubprocVecEnv):
            try:
                inner_envs = self.training_env.get_attr('env')  # Get ActionMasker
                for env in inner_envs:
                    if hasattr(env, 'env'):
                        env = env.env
                    self.total_reward += getattr(env, "total_reward", 0)
                    self.num_trades += getattr(env, "round_trip_trades", 0)
            except Exception as e:
                if self.debug:
                    print(f"Failed to access env attributes: {e}")
        else:
            for env in self.training_env.envs:
                if hasattr(env, 'env'):
                    env = env.env
                self.total_reward += getattr(env, "total_reward", 0)
                self.num_trades += getattr(env, "round_trip_trades", 0)

        # Logging to TensorBoard
        self.logger.record("custom/num_trades", self.num_trades)
        self.logger.record("custom/total_reward", self.total_reward)

        # Entropy logging
        if hasattr(self.model.policy, "action_dist"):
            action_dist = self.model.policy.action_dist
            entropy = action_dist.entropy().mean().item()
            self.logger.record("policy/entropy", entropy)
        elif hasattr(self.model.policy, "get_distribution"):
            obs = self.locals.get("obs", [])
            if len(obs) > 0:
                action_dist = self.model.policy.get_distribution(obs)
                entropy = action_dist.entropy().mean().item()
                self.logger.record("policy/entropy", entropy)

        # Value loss logging
        if "value_loss" in self.locals:
            value_loss = self.locals["value_loss"]
            self.logger.record("loss/value_loss", value_loss)

        # Episode done
        dones = self.locals.get("dones", [])
        if any(dones):
            self.episode_steps.append(self.current_episode_steps)
            self.current_episode_steps = 0
            total_reward = np.sum(self.episode_rewards)
            self.iteration_rewards.append(total_reward)
            self.episode_rewards = []

            self.iteration_invalid_actions.append(len(self.invalid_actions))

            if self.debug:
                print("\n--- Episode finished ---")
                for env_idx in sorted(self.per_env_rewards.keys()):
                    total_env_reward = np.sum(self.per_env_rewards[env_idx])
                    total_env_invalid = len(self.per_env_invalid_actions[env_idx])
                    print(f"[Env {env_idx}] Total reward: {total_env_reward:.2f}, Invalid actions: {total_env_invalid}")

            # Reset per-episode and per-env stats
            self.invalid_actions.clear()
            self.valid_actions.clear()
            self.per_env_rewards.clear()
            self.per_env_invalid_actions.clear()

        return True


In [24]:
class RewardCallback(BaseCallback):
    def __init__(self, debug=False, verbose=0):
        super().__init__(verbose)
        self.debug = debug
        self.n_envs = None
        self.episode_count = 0  # Total episode counter
        self.episode_count_per_env = None  # Episode counter per environment

    def _on_training_start(self) -> None:
        self.n_envs = self.training_env.num_envs
        self.episode_count_per_env = [0 for _ in range(self.n_envs)]
        self.current_rewards = [0.0 for _ in range(self.n_envs)]
        self.invalid_actions = [0 for _ in range(self.n_envs)]
        self.current_steps = [0 for _ in range(self.n_envs)]  # Steps counter per environment

    def _on_step(self) -> bool:
        rewards = self.locals["rewards"]
        dones = self.locals["dones"]
        infos = self.locals["infos"]

        for i in range(self.n_envs):
            # Track steps for each environment
            if not dones[i]:
                self.current_steps[i] += 1
            
            # Update reward and invalid actions
            self.current_rewards[i] += rewards[i]
            if infos[i].get("invalid_action", False):
                self.invalid_actions[i] += 1

            if dones[i]:
                self.episode_count += 1
                self.episode_count_per_env[i] += 1
                
                if self.debug:
                    print(f"[Env {i}] Episode {self.episode_count_per_env[i]} Done. "
                          f"Steps: {self.current_steps[i]}, "
                          f"Reward: {self.current_rewards[i]:.2f}, "
                          f"Invalid: {self.invalid_actions[i]}")
                
                # Reset episode-specific metrics
                self.current_steps[i] = 0
                self.current_rewards[i] = 0.0
                self.invalid_actions[i] = 0

        return True

    def _on_training_end(self) -> None:
        print("\n=== Final Training Summary ===")
        print(f"Total episodes completed: {self.episode_count}")
        for i in range(self.n_envs):
            print(f"[Env {i}] Total episodes: {self.episode_count_per_env[i]}")

In [11]:
class RewardCallback(BaseCallback):
    def __init__(self, debug=False, verbose=0):
        super().__init__(verbose)
        self.debug = debug
        self.n_envs = None
        self.episode_count = 0
        self.episode_count_per_env = None
        
        # New tracking
        self.current_rewards = None
        self.invalid_actions = None
        self.current_steps = None
        
        # Old tracking (for compatibility)
        self.iteration_rewards = []
        self.iteration_invalid_actions = []

    def _on_training_start(self) -> None:
        self.n_envs = self.training_env.num_envs
        self.episode_count_per_env = [0 for _ in range(self.n_envs)]
        self.current_rewards = [0.0 for _ in range(self.n_envs)]
        self.invalid_actions = [0 for _ in range(self.n_envs)]
        self.current_steps = [0 for _ in range(self.n_envs)]

    def _on_step(self) -> bool:
        rewards = self.locals["rewards"]
        dones = self.locals["dones"]
        infos = self.locals["infos"]

        for i in range(self.n_envs):
            if not dones[i]:
                self.current_steps[i] += 1
            self.current_rewards[i] += rewards[i]
            
            if infos[i].get("invalid_action", False):
                self.invalid_actions[i] += 1

            if dones[i]:
                self.episode_count += 1
                self.episode_count_per_env[i] += 1
                
                if self.debug:
                    print(f"[Env {i}] Episode {self.episode_count_per_env[i]} Done. "
                          f"Steps: {self.current_steps[i]}, "
                          f"Reward: {self.current_rewards[i]:.2f}, "
                          f"Invalid: {self.invalid_actions[i]}")
                
                # Add to old tracking system
                self.iteration_rewards.append(self.current_rewards[i])
                self.iteration_invalid_actions.append(self.invalid_actions[i])
                
                # Reset episode-specific metrics
                self.current_steps[i] = 0
                self.current_rewards[i] = 0.0
                self.invalid_actions[i] = 0

        return True

    def _on_training_end(self) -> None:
        print("\n=== Final Training Summary ===")
        print(f"Number of environments: {self.n_envs}")
        print(f"Total episodes completed: {self.episode_count}")
        print(f"Average episodes per environment: {self.episode_count / self.n_envs:.1f}")
        print(f"Total steps taken: {self.total_steps}")
        print(f"Average steps per episode: {self.total_steps / self.episode_count:.1f}")
        
        for i in range(self.n_envs):
            print(f"\n[Env {i}] Summary:")
            print(f"Total episodes: {self.episode_count_per_env[i]}")
            print(f"Total steps: {self.total_steps_per_env[i]}")
            print(f"Average steps per episode: {self.total_steps_per_env[i] / self.episode_count_per_env[i]:.1f}")
        
        # Print old format for compatibility
        formatted_rewards = ', '.join(f"{reward:.1f}" for reward in self.iteration_rewards)
        formatted_invalid_actions = ', '.join(str(invalid) for invalid in self.iteration_invalid_actions)
        print("\nTotal rewards per iteration:", formatted_rewards)
        print("Invalid actions per iteration:", formatted_invalid_actions)

In [6]:
# latest RewardCallback
class RewardCallback(BaseCallback):
    def __init__(self, debug=False, verbose=0):
        super().__init__(verbose)
        self.debug = debug
        self.n_envs = None
        self.episode_count = 0
        self.episode_count_per_env = None
        self.total_steps_per_env = None
        
        # New tracking
        self.current_episode_rewards = None
        self.episode_rewards = None
        self.invalid_actions = None
        self.current_steps = None
        self.total_steps = 0

    def _on_training_start(self) -> None:
        self.n_envs = self.training_env.num_envs
        self.episode_rewards = []
        self.episode_invalid_actions = []
        self.episode_count_per_env = [0 for _ in range(self.n_envs)]
        self.total_steps_per_env = [0 for _ in range(self.n_envs)]
        self.episode_reward_per_env = [0.0 for _ in range(self.n_envs)]
        self.invalid_actions_per_env = [0 for _ in range(self.n_envs)]
        self.current_steps_per_env = [0 for _ in range(self.n_envs)]

    def _on_step(self) -> bool:
        rewards = self.locals["rewards"]
        dones = self.locals["dones"]
        infos = self.locals["infos"]

        for i in range(self.n_envs):
            # Increment step counters for every step
            self.current_steps_per_env[i] += 1
            self.total_steps += 1
            self.total_steps_per_env[i] += 1
            
            # Update rewards and check for invalid actions
            self.episode_reward_per_env[i] += rewards[i]
            
            if infos[i].get("is_invalid", False):
                self.invalid_actions_per_env[i] += 1

            # Handle episode completion
            if dones[i]:
                self.episode_count += 1
                self.episode_count_per_env[i] += 1
                
                if self.debug:
                    print(f"[Env {i}] Episode {self.episode_count_per_env[i]} Done. "
                        f"Steps: {self.current_steps_per_env[i]}, "
                        f"Reward: {self.episode_reward_per_env[i]:.2f}, "
                        f"Invalid: {self.invalid_actions_per_env[i]}")

        if all(dones):     
            # Log mean reward across all environments
            self.episode_rewards.append(sum(self.episode_reward_per_env))
            self.episode_invalid_actions.append(sum(self.invalid_actions_per_env))
            total_reward = sum(self.episode_rewards)
            self.logger.record("rewards/total_reward", total_reward)         

        for i in range(self.n_envs):
            if dones[i]:
                self.current_steps_per_env[i] = 0
                self.episode_reward_per_env[i] = 0.0
                self.invalid_actions_per_env[i] = 0

        return True

    def _on_training_end(self) -> None:
        print("\n=== Final Training Summary ===")
        print(f"Number of environments: {self.n_envs}")
        print(f"Total episodes completed: {self.episode_count}")
        print(f"Average episodes per environment: {self.episode_count / self.n_envs:.1f}")
        print(f"Total steps taken: {self.total_steps}")
        print(f"Average steps per episode: {self.total_steps / self.episode_count:.1f}")
        
        for i in range(self.n_envs):
            print(f"\n[Env {i}] Summary:")
            print(f"Total episodes: {self.episode_count_per_env[i]}")
            print(f"Total steps: {self.total_steps_per_env[i]}")
            print(f"Average steps per episode: {self.total_steps_per_env[i] / self.episode_count_per_env[i]:.1f}")
        
        # Print old format for compatibility
        formatted_rewards = ', '.join(f"{reward:.1f}" for reward in self.episode_rewards)
        formatted_invalid_actions = ', '.join(str(invalid) for invalid in self.episode_invalid_actions)
        print("\nTotal rewards per episode per env:", formatted_rewards)
        print("Invalid actions per episode per env:", formatted_invalid_actions)

        # Group by iterations (12 environments at a time)
        n_envs = self.n_envs
        n_iterations = len(self.episode_rewards) // n_envs

        """ print("\n=== Per-Iteration Summary ===")
        for i in range(n_iterations):
            start_idx = i * n_envs
            end_idx = (i + 1) * n_envs
            iter_rewards = self.episode_rewards[start_idx:end_idx]
            iter_invalid = self.episode_invalid_actions[start_idx:end_idx]
            
            print(f"\nIteration {i + 1}:")
            print(f"  Total rewards: {sum(iter_rewards):.1f}")
            print(f"  Total invalid actions: {sum(iter_invalid)}")
            print(f"  Avg reward per env: {np.mean(iter_rewards):.1f}")
            print(f"  Avg invalid per env: {np.mean(iter_invalid):.1f}") """

        # Print overall totals
        print("\n=== Combined Totals ===")
        print(f"Total combined rewards: {sum(self.episode_rewards):.1f}")
        print(f"Total invalid actions: {sum(self.episode_invalid_actions)}")

In [None]:
# Initialize parallel environments and train model
import torch
import importlib
import trading_env_sb3_ver1
importlib.reload(trading_env_sb3_ver1)
from trading_env_sb3_ver1 import TradingEnv
from stable_baselines3.common.callbacks import CallbackList

# Initialize the custom TradingEnv environment
env1 = TradingEnv(df)  # Your custom environment

# Define the mask_fn to get valid actions from the environment
def mask_fn(env: gym.Env) -> np.ndarray:
    return env.get_action_mask()  # Get valid action mask from the environment

# Wrap the environment with ActionMasker to apply action masking
env_masked = ActionMasker(env1, mask_fn)  # Apply the ActionMasker wrapper

# Define the number of CPU cores to use
num_cpu = available_cores  # Get the number of available CPU cores

#num_cpu = 4

# --- Function to create the wrapped environment ---
def make_env():
    def _init():
        env = TradingEnv(df)  # Your DataFrame must be accessible here
        return ActionMasker(env, mask_fn)
    return _init

# Create parallel environments using SubprocVecEnv
env = SubprocVecEnv([make_env() for _ in range(num_cpu)])

# Define PPO model with the custom policy using the vectorized environment
ppo_masked_model = PPO(
    MaskedPPOPolicy,  # Custom policy
    env,               # Your environment
    verbose=1,         
    tensorboard_log="./tensorboard_logs/",
    ent_coef=0.0,      # Small exploration penalty
    gamma=0.99,       # Discount factor for long-term rewards
    gae_lambda=0.9,    # Optimistic advantage estimation (strongly favors long-term)
    n_steps=512,      # Large number of timesteps per iteration
    clip_range=0.1,    # Clipping to allow more aggressive updates
    n_epochs=5,       # Number of passes over the data (many epochs to overfit)
    batch_size=128    # Large batch size for stability in updates
)

# Train the model with the callback
from multiprocessing import set_start_method
set_start_method('spawn', force=True)

debug_episodes = {}  # Or use range() for patterns
debug_callback = DebugCallback(debug_episodes=debug_episodes, debug=False)

# Initialize the callback
reward_callback = RewardCallback(debug=True)

try:
    ppo_masked_model.learn(
        total_timesteps=500000,
        progress_bar=False,
        tb_log_name="sb3_ppo",
        callback=CallbackList([reward_callback, debug_callback])  # Combine callbacks
    )
finally:
    try:
        env.close()
    except EOFError:
        print("Warning: env subprocess already crashed, skipping close()")

# Save the model
ppo_masked_model.save('sb3_ppo_model')


Using cpu device
Observation with mask:Observation with mask:  Observation with mask: Observation with mask:Observation with mask: Observation with mask: Observation with mask: [3.386e-01 0.000e+00 1.000e+04 1.000e+04 0.000e+00 1.000e+00 1.000e+00
 0.000e+00]
Step 0:[3.386e-01 0.000e+00 1.000e+04 1.000e+04 0.000e+00 1.000e+00 1.000e+00
 0.000e+00]
Observation with mask:Step 0:
  [3.386e-01 0.000e+00 1.000e+04 1.000e+04 0.000e+00 1.000e+00 1.000e+00
 0.000e+00]
Step 0:
  - Original Observation (shape (5,)): [3.386e-01 0.000e+00 1.000e+04 1.000e+04 0.000e+00]
[3.386e-01 0.000e+00 1.000e+04 1.000e+04 0.000e+00 1.000e+00 1.000e+00
 0.000e+00]  - Original Observation (shape (5,)): [3.386e-01 0.000e+00 1.000e+04 1.000e+04 0.000e+00]

Step 0:
  - Action Mask (shape (3,)): [1. 1. 0.]
[3.386e-01 0.000e+00 1.000e+04 1.000e+04 0.000e+00 1.000e+00 1.000e+00
 0.000e+00]
Step 0:
  - Action Mask (shape (3,)): [1. 1. 0.]
  - Original Observation (shape (5,)): [3.386e-01 0.000e+00 1.000e+04 1.000e+04 0

Process ForkServerProcess-52:
Process ForkServerProcess-49:
Process ForkServerProcess-50:
Process ForkServerProcess-55:
Process ForkServerProcess-56:
Process ForkServerProcess-54:
Process ForkServerProcess-51:
Process ForkServerProcess-57:
Process ForkServerProcess-58:
Process ForkServerProcess-60:
Process ForkServerProcess-59:
Process ForkServerProcess-53:
Traceback (most recent call last):
  File "/opt/homebrew/Caskroom/mambaforge/base/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/homebrew/Caskroom/mambaforge/base/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/larka/jupyter_env/lib/python3.10/site-packages/stable_baselines3/common/vec_env/subproc_vec_env.py", line 35, in _worker
    observation, reward, terminated, truncated, info = env.step(data)
  File "/var/folders/h5/0dxd7qkj3ls9c04wvb460nhm0000gr/T/ipykernel_51843/59336617.py", line 11, in step
  File "/Us

EOFError: 

In [48]:
def test_agent(model, env, buy_signals_list, sell_signals_list, seed=42):
    """
    Test the trained model with action masking.
    
    Args:
        model: Trained PPO model.
        env: Environment with action masking.
        buy_signals_list: List to store buy signal timestamps.
        sell_signals_list: List to store sell signal timestamps.

    Returns:
        total_rewards: Total rewards accumulated during the test.
    """
    torch.manual_seed(seed)  # Ensure deterministic behavior

    # Reset the environment, get batch of observations
    obs = env.reset()  # This will return a batch of observations for each parallel environment
    done = [False] * len(obs)  # Done flags for each environment in the batch
    total_rewards = [0] * len(obs)  # Total rewards for each environment

    while not all(done):  # Run until all environments are done
        for i in range(30):  # Run for a fixed number of steps per environment

            # Extract the action mask for each environment (if applicable)
            action_mask = [env.get_action_mask() for env in env.envs]  # Get action masks for all parallel environments

            # Convert observations and masks to tensors
            obs_tensor = torch.as_tensor(obs, dtype=torch.float32, device=model.device)
            action_mask_tensor = torch.as_tensor(action_mask, dtype=torch.float32, device=model.device)

            # Predict actions with action masking
            with torch.no_grad():
                action, _, _ = model.policy.forward(combined_obs=obs_tensor, action_mask=action_mask_tensor, deterministic=True)

            # Convert actions to numpy
            action = action.cpu().numpy()

            # Step through the environment and get next observations, rewards, done flags, etc.
            obs, rewards, dones, truncateds, infos = env.step(action)

            # Debugging Output for each environment in the batch
            for idx, done_flag in enumerate(dones):
                print(f"Step {i+1}: Environment {idx+1} | Action {action[idx]} | Reward: {rewards[idx]:.2f} | Done: {done_flag}")

                total_rewards[idx] += rewards[idx]

                # Record buy or sell signals based on action
                current_step = env.envs[idx].current_step  # Access current_step for each environment
                if action[idx] == 1:  # Buy signal
                    buy_signals_list.append(df.index[current_step])
                elif action[idx] == 2:  # Sell signal
                    sell_signals_list.append(df.index[current_step])

                if done_flag or truncateds[idx]:  # If any environment ends
                    print(f"🎯 Total Reward for Environment {idx+1}: {total_rewards[idx]:.2f}")
                    obs[idx] = env.envs[idx].reset()  # Reset the environment and get new observation for that environment
                    done[idx] = True  # Mark that this environment has finished

    return sum(total_rewards)  # Return total rewards accumulated from all environments


In [None]:
# Test with a single environment inside SubprocVecEnv
from stable_baselines3.common.vec_env import SubprocVecEnv
import numpy as np

def make_env():
    return TradingEnv(df)

# Wrap the environment with SubprocVecEnv (with just one environment for now)
env = SubprocVecEnv([make_env])

# Reset environment
obs = env.reset()
done = False
while not done:
    action = np.random.choice(env.action_space.n)
    action = np.array([action])  # Wrap the action in a numpy array to match the expected format
    obs, reward, done, truncated, info = env.step(action)  # Pass the action as an array
    print(f"Action: {action} | Reward: {reward} | Done: {done}")



In [None]:
# Output results from one model

buy_signals = []
sell_signals = []

print("Recording signals")

test_agent(PPO.load("sb3_ppo_model"), env, buy_signals, sell_signals)

In [None]:
# Ensure directories exist
import os
import joblib  # or import pickle
from itertools import product


os.makedirs("./tensorboard_logs/", exist_ok=True)
os.makedirs("./saved_models/", exist_ok=True)

def evaluate_model(model, env, num_episodes=10):
    total_rewards = []
    for _ in range(num_episodes):
        # Access the environment wrapped by DummyVecEnv and ActionMasker
        env_inner = env.get_attr("env", 0)[0]  # Get the first environment
        
        # Check if env_inner has an 'env' attribute (i.e., it's wrapped)
        if hasattr(env_inner, "env"):  
            env_inner = env_inner.env  # Unwrap ActionMasker if applicable

        obs, info = env_inner.reset()  # Reset the environment
        episode_reward = 0
        done = False

        while not done:
            for i in range(30):  # Run for a fixed number of steps
                # Extract the action mask from the original environment
                action_mask = env_inner.get_action_mask()  # Access get_action_mask from TradingEnv inside ActionMasker
    
                # Convert observation and mask to tensors
                obs_tensor = torch.as_tensor(obs, dtype=torch.float32, device=model.device).unsqueeze(0)
                action_mask_tensor = torch.as_tensor(action_mask, dtype=torch.float32, device=model.device).unsqueeze(0)
    
                # Predict action with action masking
                with torch.no_grad():
                    action, _, _ = model.policy.forward(obs_tensor, action_mask=action_mask_tensor, deterministic=True)
    
                action = action.cpu().numpy()[0]  # Convert action tensor to numpy
    
                # Step the environment
                obs, reward, done, truncated, info = env_inner.step(action)
    
                episode_reward += reward

                if done or truncated:  # If the episode ends, reset the environment
                    print(f"🎯 Total Reward: {episode_reward:.2f}")
                    obs = env_inner.reset()  # Reset environment, which only returns the observation
                    break  # Exit loop if episode ends

        total_rewards.append(episode_reward)

    return np.mean(total_rewards)

# Perform grid search
results = []
models = []

# Define hyperparameter ranges
ent_coef_range = [0.0]       # Lower values for stability
gamma_range = [0.99]              # Lower values for short-term focus
gae_lambda_range = [0.9]        # Wider range to test variance
n_steps_range = [4096]           # Larger values for more experiences
clip_range_range = [0.1]         # Narrower range for stability
n_epochs_range = [5]               # Fewer epochs for faster training
batch_size_range = [128]           # Larger batches for better performance

force = True

timesteps = 500000

for ent_coef, gamma, gae_lambda, n_steps, clip_range, n_epochs, batch_size in product(
    ent_coef_range, gamma_range, gae_lambda_range, n_steps_range, clip_range_range, n_epochs_range, batch_size_range
):
    model_filename = f"./saved_models/model_ent_coef={ent_coef}_gamma={gamma}_gae_lambda={gae_lambda}_n_steps={n_steps}_clip_range={clip_range}_n_epochs={n_epochs}_batch_size={batch_size}_timesteps={timesteps}.zip"
    
    if os.path.exists(model_filename) and not force:
        print(f"Model already exists: {model_filename}, loading instead of training...")
        model = PPO.load(model_filename, env=env)  # Load existing model
    else:
        print(f"Training new model with: ent_coef={ent_coef}, gamma={gamma}, gae_lambda={gae_lambda}, "
              f"n_steps={n_steps}, clip_range={clip_range}, n_epochs={n_epochs}, batch_size={batch_size}, timesteps={timesteps}")
    
        model = PPO(
            MaskedPPOPolicy,  # Replace with your policy (e.g., "MlpPolicy" or "CnnPolicy")
            env,
            ent_coef=ent_coef,
            gamma=gamma,
            gae_lambda=gae_lambda,
            n_steps=n_steps,
            clip_range=clip_range,
            n_epochs=n_epochs,
            batch_size=batch_size,
            verbose=0,  # Set to 0 for less output
            tensorboard_log="./tensorboard_logs/"
        )
        
        # Train for x timesteps
        model.learn(total_timesteps=timesteps)
    
        # Save the model
        model.save(model_filename)
        print(f"Model saved: {model_filename}")
    
        # Store the model and its parameters
        models.append({
            "model_filename": model_filename,  # Store the path to the saved model
            "params": {
                "ent_coef": ent_coef,
                "gamma": gamma,
                "gae_lambda": gae_lambda,
                "n_steps": n_steps,
                "clip_range": clip_range,
                "n_epochs": n_epochs,
                "batch_size": batch_size,
            }
        })

    # Evaluate the model
    total_reward = evaluate_model(model, env)
    print('Average total reward:', total_reward)

    env.close()  # Close all subprocesses
    
    results.append({
        "ent_coef": ent_coef,
        "gamma": gamma,
        "gae_lambda": gae_lambda,
        "n_steps": n_steps,
        "clip_range": clip_range,
        "n_epochs": n_epochs,
        "batch_size": batch_size,
        "total_reward": total_reward,
        "model_filename": model_filename  # Link results to the saved model
    })

# Save results to a CSV file
results_df = pd.DataFrame(results)
csv_filename = f"grid_search_results_{timesteps}_steps.csv"
results_df.to_csv(csv_filename, index=False)

base_csv, csv_ext = os.path.splitext(csv_filename)

# Append a counter if the file already exists
counter = 1
while os.path.exists(csv_filename):
    csv_filename = f"{base_csv}_{counter}{csv_ext}"
    counter += 1

# Save the results DataFrame to CSV
results_df = pd.DataFrame(results)
results_df.to_csv(csv_filename, index=False)
print(f"Results saved to {csv_filename}")

# Now, do the same for the models file
models_filename = "./saved_models/models_list.joblib"
base_model, model_ext = os.path.splitext(models_filename)

counter = 1
while os.path.exists(models_filename):
    models_filename = f"{base_model}_{counter}{model_ext}"
    counter += 1

# Save the models variable
joblib.dump(models, models_filename)
print(f"Models saved to {models_filename}")

print("Grid search completed. Results saved to", csv_filename)

In [None]:
print('test')

In [None]:
buy_signals

In [None]:
sell_signals

In [None]:
# Show learned buy and sell signals

import mplfinance as mpf
import matplotlib.dates as mdates
import pandas as pd
import matplotlib.pyplot as plt

# Assuming df is your DataFrame, buy_signals_1000, sell_signals_1000, buy_signals_10000, and sell_signals_10000 are your lists of timestamps

# Function to plot the signals for a given buy and sell signal list
def plot_signals(buy_signals, sell_signals, title):
    # Convert buy/sell signals to DatetimeIndex
    buy_signals = pd.to_datetime(buy_signals)  
    buy_signals = buy_signals[buy_signals.isin(df_original.index)]  # Filter buy signals to match df.index

    sell_signals = pd.to_datetime(sell_signals)  
    sell_signals = sell_signals[sell_signals.isin(df_original.index)]  # Filter sell signals to match df.index

    # Create new columns to mark the buy and sell signals with NaN for non-signals
    buy_signal_prices = df_original['close'].copy()
    buy_signal_prices[~df_original.index.isin(buy_signals)] = float('nan')  # Set non-buy signals to NaN
    
    sell_signal_prices = df_original['close'].copy()
    sell_signal_prices[~df_original.index.isin(sell_signals)] = float('nan')  # Set non-sell signals to NaN

    # Create addplots for buy and sell signals
    buy_signal_plot = mpf.make_addplot(buy_signal_prices, type='scatter', markersize=40, marker='^', color='green')
    sell_signal_plot = mpf.make_addplot(sell_signal_prices, type='scatter', markersize=40, marker='v', color='red')

    # Plot with the added buy and sell signals
    fig, axes = mpf.plot(
        df_original,
        type='ohlc',
        datetime_format='%Y-%m-%d %H:%M',
        addplot=[buy_signal_plot, sell_signal_plot],
        returnfig=True,
        figsize=(16, 8),
        warn_too_much_data=10000,
        title=title #Adjust the size again to maintain consistency
    )

plot_signals(buy_signals, sell_signals, title="Buy/Sell Signals")


In [None]:
import mplfinance as mpf
import matplotlib.dates as mdates
import pandas as pd
import matplotlib.pyplot as plt

# Function to calculate profit based on buy and sell signals
def calculate_profit(buy_signals, sell_signals, df):
    buy_prices = df.loc[buy_signals, 'close']
    sell_prices = df.loc[sell_signals, 'close']

    # Ensure that the number of buy signals is less than or equal to the number of sell signals
    min_length = min(len(buy_prices), len(sell_prices))

    # Calculate profit for each pair of buy and sell signals
    profits = []
    for i in range(min_length):
        profit = sell_prices.iloc[i] - buy_prices.iloc[i]  # Profit = Sell Price - Buy Price
        profits.append(profit)

    total_profit = sum(profits)
    return total_profit, profits

# Function to plot the signals and compare profits
def plot_signals_and_compare_profit(buy_signals, sell_signals, df, title):
    # Convert buy/sell signals to DatetimeIndex
    buy_signals = pd.to_datetime(buy_signals)  
    buy_signals = buy_signals[buy_signals.isin(df.index)]  # Filter buy signals to match df.index

    sell_signals = pd.to_datetime(sell_signals)  
    sell_signals = sell_signals[sell_signals.isin(df.index)]  # Filter sell signals to match df.index

    # Create new columns to mark the buy and sell signals with NaN for non-signals
    buy_signal_prices = df['close'].copy()
    buy_signal_prices[~df.index.isin(buy_signals)] = float('nan')  # Set non-buy signals to NaN

    sell_signal_prices = df['close'].copy()
    sell_signal_prices[~df.index.isin(sell_signals)] = float('nan')  # Set non-sell signals to NaN

    # Create addplots for buy and sell signals
    buy_signal_plot = mpf.make_addplot(buy_signal_prices, type='scatter', markersize=40, marker='^', color='green')
    sell_signal_plot = mpf.make_addplot(sell_signal_prices, type='scatter', markersize=40, marker='v', color='red')

    # Plot with the added buy and sell signals
    fig, axes = mpf.plot(
        df,
        type='ohlc',
        style='charles',
        datetime_format='%Y-%m-%d %H:%M',
        addplot=[buy_signal_plot, sell_signal_plot],
        returnfig=True,
        figsize=(20, 10),
        warn_too_much_data=1000  # Adjust the size again to maintain consistency
    )

    # Customize x-axis
    #axes[0].xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d %H:%M'))
    #plt.xticks(rotation=45)  # Rotate x-axis labels for better readability

    # Add legend manually
    #plt.legend(['Buy Signals', 'Sell Signals'], loc='upper left')

    # Add title
    #plt.title(title)

    # Show the plot
    #plt.show()

    # Calculate and print profit for the signals
    total_profit, profits = calculate_profit(buy_signals, sell_signals, df)
    print(f"Total Profit for {title}: {total_profit:.2f}")
    return total_profit

# Calculate and plot for 1000 timesteps signals
profit_1000 = plot_signals_and_compare_profit(buy_signals_1000, sell_signals_1000, df, title="Profit for 1000 Timesteps")

# Calculate and plot for 10000 timesteps signals
profit_10000 = plot_signals_and_compare_profit(buy_signals_10000, sell_signals_10000, df, title="Profit for 10000 Timesteps")

# Calculate and plot for 100000 timesteps signals
#profit_100000 = plot_signals_and_compare_profit(buy_signals_100000, sell_signals_100000, df, title="Profit for 100000 Timesteps")


# Compare the total profits
print(f"Total Profit for 1000 Timesteps: {profit_1000:.2f}")
print(f"Total Profit for 10000 Timesteps: {profit_10000:.2f}")
#print(f"Total Profit for 100000 Timesteps: {profit_100000:.2f}")

In [None]:
!pip install tensorboard

In [2]:
import sys
print(sys.version)
print(sys.executable)

3.10.12 | packaged by conda-forge | (main, Jun 23 2023, 22:41:52) [Clang 15.0.7 ]
/Users/larka/jupyter_env/bin/python3
