In [1]:
import numpy as np
import pandas as pd
import json
import os
from time import time
import argparse

# Torch imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ExponentialLR
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, get_rank, get_world_size, destroy_process_group

# Sklearn imports
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split

# Import classes
from Agent import Agent
from CryptoData import CryptoData
from DataVisualizer import DataVisualizer

def setup_distributed():
    # Parse command line arguments for 'rank' and 'world_size'
    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int)
    args = parser.parse_args()

    # Initialize the process group
    dist.init_process_group(backend='nccl', init_method='env://')
    torch.cuda.set_device(args.local_rank)


class TradingSystem:
    def __init__(self, initial_investment=10000, num_episodes=10, local_rank=0):
        self.local_rank = local_rank
        self.INITIAL_INVESTMENT = initial_investment
        self.NUM_EPISODES = num_episodes
        self.trading_agent = None
        self.train_df = None
        self.test_df = None
        self.train_close = None
        self.test_close = None
        self.data_visualizer = None

    def manual_data_partition(self, data):
        total_size = len(data)
        per_gpu = total_size // dist.get_world_size()
        start = self.local_rank * per_gpu
        end = start + per_gpu if self.local_rank < dist.get_world_size() - 1 else total_size
        return data[start:end]

    def load_data(self):
        crypto_data = CryptoData()
        full_train_df, full_test_df, train_close, test_close = crypto_data.get_precollected_data(split_ratio=0.8)

        # Partition data for distributed training
        self.train_df = self.manual_data_partition(full_train_df)
        self.test_df = self.manual_data_partition(full_test_df)
        self.train_close = self.manual_data_partition(train_close)
        self.test_close = self.manual_data_partition(test_close)

    def setup_trading_agent(self):
        self.trading_agent = Agent(self.train_df.shape, self.NUM_EPISODES, self.window_size) 
        if os.path.exists("Output/online_model/model.pt"):
            print("Loading the model")
            self.trading_agent.load_model()
        else:
            print("No model found, training from scratch")

        if torch.cuda.is_available():
            self.trading_agent.model = self.trading_agent.model.cuda(self.local_rank)
        self.trading_agent.model = DDP(self.trading_agent.model, device_ids=[self.local_rank])

    # Curriculum 1 
    def train(self):
        episode_mem = [{"Actions": [], "Inventory Size": [], "Portfolio Value": [], "Realized Profit": [], "Reward": [], "Done": [], "Epsilon": [], "MSE Loss": []} for _ in range(self.NUM_EPISODES)]
        t0 = time()
        for s in range(self.NUM_EPISODES):
            print(f"\n===== Episode {s + 1} / {self.NUM_EPISODES} =====")
            self.trading_agent.inventory = []
            state = self.trading_agent.get_state(0, self.train_df)
            balance = self.INITIAL_INVESTMENT
            portfolio_value_usd = 0
            self.trading_agent.portfolio = [0, self.INITIAL_INVESTMENT, portfolio_value_usd, 0]
            done = False
            for t in range(len(self.train_df) - 1):
                if done:
                    break
                action = self.trading_agent.get_action(state)
                next_state = self.trading_agent.get_state(t + 1, self.train_df)
                reward = self.trading_agent.trade(t, action, self.train_df, self.train_close, self.INITIAL_INVESTMENT, trading_fee_rate=0.05)
                balance += reward
                done = balance < self.train_close["Close"].iloc[t]
                self.trading_agent.memory.add_exp(state, action, reward, next_state, done)
                loss = self.trading_agent.train() or 0
                state = next_state
                
                # Corrected dictionary update part
                episode_mem[s]["Actions"].append(int(action))
                episode_mem[s]["Inventory Size"].append(len(self.trading_agent.inventory))
                episode_mem[s]["Portfolio Value"].append(balance + self.train_close["Close"].iloc[t] * len(self.trading_agent.inventory) - sum(self.trading_agent.inventory) - self.INITIAL_INVESTMENT)
                episode_mem[s]["Realized Profit"].append(balance - self.INITIAL_INVESTMENT)
                episode_mem[s]["Reward"].append(reward)
                episode_mem[s]["Done"].append(done)
                episode_mem[s]["Epsilon"].append(self.trading_agent.epsilon)
                episode_mem[s]["MSE Loss"].append(loss)

        with open('Output/training_scores.out', 'a') as f:
            
            f.write(f"EPISODE {s} (runtime: {time() - t0})   | Portfolio Value is {round(episode_mem[s]['Portfolio Value'][-1], 3)} Epsilon is {round(self.trading_agent.epsilon, 3)}   |   MSE Loss is {round(episode_mem[s]['MSE Loss'][-1], 3)}\n")
        with open("Output/episode_mem.json", 'w') as f:
            json.dump(episode_mem, f)

    def test(self):
        testing_mem = {"Actions": [], "Inventory Size": [], "Portfolio Value": [], "Realized Profit": [], "Reward": [], "Done": []}
        t0 = time()
        self.trading_agent.epsilon = 0
        self.trading_agent.inventory = []
        state = self.trading_agent.get_state(0, self.test_df)
        balance = self.INITIAL_INVESTMENT
        portfolio_value_usd = 0
        self.trading_agent.portfolio = [0, self.INITIAL_INVESTMENT, portfolio_value_usd, 0]

        done = False
        for t in range(len(self.test_df) - 1):
            if done:
                print("Done with testing")
                break
            action = self.trading_agent.get_action(state)
            next_state = self.trading_agent.get_state(t + 1, self.test_df)
            reward = self.trading_agent.trade(t, action, self.test_df, self.test_close, self.INITIAL_INVESTMENT, trading_fee_rate=0.05)
            balance += reward
            done = balance < self.test_close["Close"].iloc[t]
            state = next_state
            testing_mem.update({
                "Actions": int(action),
                "Inventory Size": len(self.trading_agent.inventory),
                "Portfolio Value": float(balance + self.test_close["Close"].iloc[t] * len(self.trading_agent.inventory) - sum(self.trading_agent.inventory)) - self.INITIAL_INVESTMENT,
                "Realized Profit": float(balance - self.INITIAL_INVESTMENT),
                "Reward": float(reward),
                "Done": bool(done)
            })

        if dist.rank == 0:
            with open('Output/testing_scores.out', 'a') as f:
                f.write(f"TESTING (runtime: {time() - t0})   |  Portfolio Value is {round(testing_mem['Portfolio Value'][-1], 3)}\n")
            with open("Output/testing_mem.json", 'w') as f:
                json.dump(testing_mem, f)

    def run(self):
        print("PyTorch version " + torch.__version__)
        print("Num GPUs Available: ", torch.cuda.device_count())
        # grab the gpu id if available
        print("GPU available: ", torch.cuda.is_available())
        print("GPU device: ", torch.cuda.get_device_name(0))
        print("GPU device count: ", torch.cuda.device_count())
        print("GPU device index: ", torch.cuda.current_device())

        print(torch.cuda.is_available())

        setup_distributed()

        self.load_and_prepare_data()
        self.setup_trading_agent()
        self.train()
        self.test()

        # Only save models or log information from one process to avoid conflicts
        if dist.get_rank() == 0:
            self.trading_agent.save_model()
            print("Model saved")

        self.data_visualizer = DataVisualizer(self.train_close, self.test_close)
        self.data_visualizer.visualize_data()

if __name__ == "__main__":
    args = setup_distributed()
    trading_system = TradingSystem(local_rank=args.local_rank)  # Pass local_rank to TradingSystem
    trading_system.run()


KeyboardInterrupt: 