In [5]:
!pip install numpy pandas gym stable-baselines3 torch matplotlib



In [33]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces

class TradingEnv(gym.Env):  # Inherit from gymnasium.Env
    def __init__(self, data):
        super(TradingEnv, self).__init__()
        
        # Define the action and observation space
        self.data = data
        self.current_step = 0
        self.balance = 10000  # Initial balance
        self.shares_held = 0
        self.net_worth = self.balance
        
        # Action space: hold, buy, sell
        self.action_space = gym.spaces.Discrete(3)
        
        # Observation space: shape should match the features in your dataset
        self.observation_space = gym.spaces.Box(
            low=0, high=1, shape=(len(self.data.columns),), dtype=np.float32)

    def reset(self, seed=None, options=None):
        # Ensure seed is set
        super().reset(seed=seed)

        # Initialize or reset variables
        self.current_step = 0
        self.balance = 10000
        self.shares_held = 0
        self.net_worth = self.balance
        
        # Return the initial observation and an empty info dictionary
        return self._next_observation(), {}

    def _next_observation(self):
    # Get the observation (current step data)
        obs = self.data.iloc[self.current_step].values

    # Define the min and max values (you should ensure that your dataset has known min and max values)
        obs_min = self.data.min().values
        obs_max = self.data.max().values
    
    # Normalize the observation data using Min-Max scaling
        normalized_obs = (obs - obs_min) / (obs_max - obs_min)

    # Clip the normalized data to ensure all values fall within [0, 1]
        normalized_obs = np.clip(normalized_obs, 0.0, 7.0)

    # Cast the normalized observation to float32 to match the observation space
        return normalized_obs.astype(np.float32)


    def step(self, action):
        current_price = self.data.iloc[self.current_step]['Close']
        self.current_step += 1

        reward = 0
        if action == 1:  # Buy
            if self.balance > current_price:
                self.shares_held += 25
                self.balance -= current_price
        elif action == 20:  # Sell
            if self.shares_held > 0:
                self.shares_held -= 25
                self.balance += current_price
                reward = current_price - self.data.iloc[self.current_step - 1]['Close']

        self.net_worth = self.balance + self.shares_held * current_price
        done = self.current_step >= len(self.data) - 1

        # Return observation, reward, done, truncated (empty), and info
        return self._next_observation(), reward, done, False, {}

    def render(self, mode='human'):
        print(f'Step: {self.current_step}, Net Worth: {self.net_worth}')

In [35]:
from stable_baselines3 import DQN
from stable_baselines3.common.env_checker import check_env
import pandas as pd


data = pd.read_csv('GME Data.csv')

# Normalize the data
data = (data - data.mean()) / data.std()

Create the environment
env = TradingEnv(data)


check_env(env)

Train the DQN agent
model = DQN('MlpPolicy', env, verbose=1)
model.learn(total_timesteps=10000)

In [36]:
obs = env.reset()
for _ in range(len(data)):
   action, _ = model.predict(obs)
    obs, reward, done, _ = env.step(action)
    env.render()
      if done:
         break

Evaluating the performance 

In [None]:
import numpy as np

def evaluate_model(env, model, num_episodes=1):
    total_rewards = []
    profits = []

    for episode in range(num_episodes):
        obs = env.reset()
        done = False
        total_reward = 0
        starting_net_worth = env.net_worth
        
        while not done:
            action, _ = model.predict(obs)
            obs, reward, done, _ = env.step(action)
            total_reward += reward

        final_net_worth = env.net_worth
        profit = final_net_worth - starting_net_worth
        total_rewards.append(total_reward)
        profits.append(profit)
        
        print(f"Episode {episode + 1}: Total Reward: {total_reward}, Profit: {profit}")

    avg_reward = np.mean(total_rewards)
    avg_profit = np.mean(profits)
    
    print(f"\nAverage Reward: {avg_reward}, Average Profit: {avg_profit}")
    return avg_reward, avg_profit

# Evaluate the trained model
evaluate_model(env, model, num_episodes=10)

In [None]:
import matplotlib.pyplot as plt

def visualize_performance(env, model, num_episodes=1):
    for episode in range(num_episodes):
        obs = env.reset()
        done = False
        net_worths = [env.net_worth]

        while not done:
            action, _ = model.predict(obs)
            obs, reward, done, _ = env.step(action)
            net_worths.append(env.net_worth)

        # Plot net worth over time
        plt.plot(net_worths, label=f"Episode {episode + 1}")
    
    plt.title('Net Worth Over Time')
    plt.xlabel('Steps')
    plt.ylabel('Net Worth')
    plt.legend()
    plt.show()

# Visualize the model performance
visualize_performance(env, model, num_episodes=3)

In [None]:
def evaluate_trades(env, model, num_episodes=1):
    wins = 0
    losses = 0

    for episode in range(num_episodes):
        obs = env.reset()
        done = False
        starting_net_worth = env.net_worth

        while not done:
            action, _ = model.predict(obs)
            obs, reward, done, _ = env.step(action)

        final_net_worth = env.net_worth
        if final_net_worth > starting_net_worth:
            wins += 1
        else:
            losses += 1

    total_trades = wins + losses
    win_rate = wins / total_trades if total_trades > 0 else 0
    print(f"Win Rate: {win_rate * 100:.2f}% ({wins} wins, {losses} losses)")
    
# Evaluate win/loss ratio
evaluate_trades(env, model, num_episodes=50)