In [2]:
import numpy as np
import pandas as pd
import json
import os
from time import time
import matplotlib.pyplot as plt


# Torch imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ExponentialLR
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, get_rank, get_world_size, destroy_process_group

# Sklearn imports
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split

# Import classes
from Agent import Agent
from DataVisualizer import DataVisualizer
from DataManager import DataManager
from NumpyEncoder import NumpyEncoder

# set Rank environment variables
os.environ['RANK'] = '0'
os.environ['WORLD_SIZE'] = '1'

def setup_distributed(local_rank):
    # Set environment variables
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    os.environ['WORLD_SIZE'] = '1'
    os.environ['RANK'] = '0'

    # Initialize the process group
    if not dist.is_initialized():
        dist.init_process_group(backend='gloo', init_method='env://')
    torch.cuda.set_device(local_rank)

    return local_rank


class TradingSystem:
    def __init__(self, initial_investment=10000, num_episodes=10, local_rank=0):
        self.local_rank = local_rank
        self.INITIAL_INVESTMENT = initial_investment
        self.NUM_EPISODES = num_episodes
        self.trading_agent = None
        self.data_manager = DataManager()
        self.train_df_shape = None
        self.test_df_shape = None
        self.data_visualizer = None
        self.running_profit = []

    def load_and_prepare_data(self, directory):
        # create the test and train df dictionaries for each csv file in the directory
        self.data_manager.prepare_data(directory)
        self.NUM_EPISODES = len(self.data_manager.train_data_dict)

    def manual_data_partition(self, data):
        total_size = len(data)
        per_gpu = total_size // dist.get_world_size()
        start = self.local_rank * per_gpu
        end = start + per_gpu if self.local_rank < dist.get_world_size() - 1 else total_size
        return data[start:end]

    def setup_trading_agent(self):
        self.trading_agent = Agent(self.train_df_shape, self.NUM_EPISODES) 
        if os.path.exists("Output/online_model/model.pt"):
            print("Loading the model")
            self.trading_agent.load_model()
        else:
            print("No model found, training from scratch")

        if torch.cuda.is_available():
            self.trading_agent.model = self.trading_agent.model.cuda(self.local_rank)
        self.trading_agent.model = DDP(self.trading_agent.model, device_ids=[self.local_rank])
        
    # Curriculum 1 
    def train(self):
        episode_mem = [{"Actions": [], "Inventory Size": [], "Portfolio Value": [], "Realized Profit": [], "Reward": [], "Done": [], "Epsilon": [], "MSE Loss": []} for _ in range(self.NUM_EPISODES)]
        t0 = time()
        episode_num = 0
        for accession_number, data in self.data_manager.train_data_dict.items():
            normalized_df, reference_df = data
            self.train_df_shape = normalized_df.shape
            self.trading_agent = Agent(self.train_df_shape, self.NUM_EPISODES)
            print(f"\n===== Episode {episode_num + 1} : {accession_number} =====")
            self.trading_agent.inventory = []
            state = self.trading_agent.get_state(0, normalized_df)
            balance = self.INITIAL_INVESTMENT
            portfolio_value_usd = 0
            self.trading_agent.portfolio = [0, self.INITIAL_INVESTMENT, portfolio_value_usd]
            done = False
            print(len(normalized_df))
            for t in range(len(normalized_df) - 1):
                if done:
                    break
                action = self.trading_agent.get_action(state, t, reference_df)
                next_state = self.trading_agent.get_state(t + 1, normalized_df)
                reward = self.trading_agent.trade(t, action, normalized_df, reference_df, self.INITIAL_INVESTMENT, trading_fee_rate=0.05)
                balance += reward
                done = balance < reference_df["close"].iloc[t]

                self.trading_agent.memory.add_exp(state, action, reward, next_state, done)
                loss = self.trading_agent.train() or 0
                state = next_state

                episode_mem[episode_num]["Actions"].append(int(action))
                episode_mem[episode_num]["Inventory Size"].append(len(self.trading_agent.inventory))
                episode_mem[episode_num]["Portfolio Value"].append(balance + reference_df["close"].iloc[t] * len(self.trading_agent.inventory) - sum(self.trading_agent.inventory) - self.INITIAL_INVESTMENT)
                episode_mem[episode_num]["Realized Profit"].append(balance - self.INITIAL_INVESTMENT)
                episode_mem[episode_num]["Reward"].append(reward)
                episode_mem[episode_num]["Done"].append(done)
                episode_mem[episode_num]["Epsilon"].append(self.trading_agent.epsilon)
                episode_mem[episode_num]["MSE Loss"].append(loss)

                if t % 10 == 0:
                    print(f"""
                            Time step {t} / {len(normalized_df)}   
                            |   Inventory Size: {len(self.trading_agent.inventory)}
                            |   Action: {int(action)}
                            |   Portfolio Value: {round(episode_mem[episode_num]['Portfolio Value'][t], 3)} 
                            |   Realized Profit: {round(episode_mem[episode_num]['Realized Profit'][t], 3)}  
                            |   Inventory: {len(self.trading_agent.inventory)}   
                            |   {reference_df['ISSUERTRADINGSYMBOL'].iloc[t]}: ${round(reference_df['close'].iloc[t], 3)}   
                            |   Epsilon: {round(self.trading_agent.epsilon, 4)}   
                            |   MSE Loss: {loss}
                            """)
            episode_num += 1
            realized_profit = balance - self.INITIAL_INVESTMENT
            print(f"Realized Profit: {realized_profit}")
            self.running_profit.append(realized_profit)
            self.trading_agent.save_model()

        total_sell_actions = sum(episode['Actions'].count(0) for episode in episode_mem)
        total_buy_actions = sum(episode['Actions'].count(2) for episode in episode_mem)
        total_hold_actions = sum(episode['Actions'].count(1) for episode in episode_mem)
        average_reward = np.mean([sum(episode['Reward']) for episode in episode_mem])

        with open('Output/training_scores.out', 'a') as f:
            f.write(f"""
                    EPISODES Completed {episode_num + 1} |  (runtime: {time() - t0})   
                    | Realized Profit: {realized_profit}   |
                    Total Sell Actions taken: {total_sell_actions}   |
                    Total Buy Actions taken: {total_buy_actions}   |
                    Total Hold Actions taken: {total_hold_actions}   |
                    Average Reward: {average_reward}   |
                    Epsilon is {round(self.trading_agent.epsilon, 3)}   |   
                    MSE Loss is {round(episode_mem[episode_num - 1]['MSE Loss'][-1], 3)}\n
                    """)
        with open("Output/episode_mem.json", 'w') as f:
            json.dump(episode_mem, f, cls=NumpyEncoder)

    def test(self):
        testing_mem = {"Actions": [], "Inventory Size": [], "Portfolio Value": [], "Realized Profit": [], 
                        "Reward": [], "Done": []}
        t0 = time()
        self.NUM_EPISODES = len(self.data_manager.test_data_dict)
        
        for accession_number, data in self.data_manager.test_data_dict.items():
            normalized_df, reference_df = data
            self.test_df_shape = normalized_df.shape
            self.trading_agent = Agent(self.test_df_shape, self.NUM_EPISODES)
            self.trading_agent.epsilon = 0
            self.trading_agent.inventory = []
            state = self.trading_agent.get_state(0, normalized_df)
            balance = self.INITIAL_INVESTMENT
            portfolio_value_usd = 0
            self.trading_agent.portfolio = [0, self.INITIAL_INVESTMENT, portfolio_value_usd]

            done = False
            for t in range(len(normalized_df) - 1):
                if done:
                    print("Done with testing")
                    break
                action = self.trading_agent.get_action(state, t, reference_df)
                next_state = self.trading_agent.get_state(t + 1, normalized_df)
                reward = self.trading_agent.trade(t, action, normalized_df, reference_df, self.INITIAL_INVESTMENT, trading_fee_rate=0.05)
                balance += reward
                done = balance < reference_df["close"].iloc[t]
                state = next_state
                testing_mem["Actions"].append(int(action))
                testing_mem["Inventory Size"].append(len(self.trading_agent.inventory))
                testing_mem["Portfolio Value"].append(float(balance + reference_df["close"].iloc[t] * len(self.trading_agent.inventory) - sum(self.trading_agent.inventory)) - self.INITIAL_INVESTMENT)
                testing_mem["Realized Profit"].append(float(balance - self.INITIAL_INVESTMENT))
                testing_mem["Reward"].append(float(reward))
                testing_mem["Done"].append(bool(done))
                if t % 1 == 0:
                    print(f"""
                            Time step {t} / {len(normalized_df)}   
                            |   Inventory Size: {len(self.trading_agent.inventory)}  
                            |  Portfolio Value: {round(testing_mem['Portfolio Value'][t], 3)}   
                            |  Action: {int(action)}  |  Reward: {round(reward, 3)}
                            """)
    
        if local_rank == 0:
            realized_profit = balance - self.INITIAL_INVESTMENT
            print(f"Realized Profit: {realized_profit}")
            self.running_profit.append(realized_profit)

            with open('Output/testing_scores.out', 'a') as f:
                f.write(f"""TESTING (runtime: {time() - t0})   |  
                        Portfolio Value is {round(testing_mem['Portfolio Value'][-1], 3)}\n
                        """)
            with open("Output/testing_mem.json", 'w') as f:
                json.dump(testing_mem, f, cls=NumpyEncoder)
        
        elif torch.distributed.is_available() and torch.distributed.is_initialized() and dist.rank == 0:
            with open('Output/testing_scores.out', 'a') as f:
                f.write(f"""TESTING (runtime: {time() - t0})   |  
                        Portfolio Value is {round(testing_mem['Portfolio Value'][-1], 3)}\n
                        """)
            with open("Output/testing_mem.json", 'w') as f:
                json.dump(testing_mem, f, cls=NumpyEncoder)

    def run(self, directory):
        print("PyTorch version " + torch.__version__)
        print("Num GPUs Available: ", torch.cuda.device_count())
        # grab the gpu id if available
        print("GPU available: ", torch.cuda.is_available())
        print("GPU device: ", torch.cuda.get_device_name(0))
        print("GPU device count: ", torch.cuda.device_count())
        print("GPU device index: ", torch.cuda.current_device())

        print(torch.cuda.is_available())

        self.local_rank = setup_distributed(self.local_rank)

        self.load_and_prepare_data(directory)
        self.train()
        self.test()

        # Only save models or log information from one process to avoid conflicts
        if dist.get_rank() == 0:
            self.trading_agent.save_model()
            print("Model saved")

        self.data_visualizer = DataVisualizer()
        self.data_visualizer.visualize_data()


if __name__ == "__main__":
    # Initialize the distributed environment
    local_rank = 0

    # Create a TradingSystem instance
    trading_system = TradingSystem(initial_investment=10000, num_episodes=10, local_rank=local_rank)

    directory = "C:\\Users\\ericb\\Desktop\\CS 535 - Big Data\\Term_Projcet\\Storm_ouput\\Data_Input\\"
    # Run the trading system
    trading_system.run(directory)

PyTorch version 1.13.1
Num GPUs Available:  1
GPU available:  True
GPU device:  NVIDIA GeForce RTX 3080 Laptop GPU
GPU device count:  1
GPU device index:  0
True
sorted_storm_output_2022.csv
Loading the model

===== Episode 1 : 0001225208-22-009192 =====
299

                            Time step 0 / 299   
                            |   Inventory Size: 0
                            |   Action: 1
                            |   Portfolio Value: -0.1 
                            |   Realized Profit: -0.1  
                            |   Inventory: 0   
                            |   HBAN: $12.132   
                            |   Epsilon: 1.0   
                            |   MSE Loss: 0
                            

                            Time step 10 / 299   
                            |   Inventory Size: 0
                            |   Action: 1
                            |   Portfolio Value: -1.1 
                            |   Realized Profit: -1.1  
                

AttributeError: 'TradingSystem' object has no attribute 'train_reference'