In [1]:
import pandas as pd
import torch.utils.data as Data
import torch.nn as nn
import numpy as np
import torch

In [16]:
NUM_FEATURES = 82
DAYS = 60
BATCH_SIZE = 128
LEARNING_RATE = 1e-3

In [3]:
class StockMarketDataReg(Data.Dataset):
    def __init__(self, train=True, split=0.2, target_market="S&P", days=60):
        self.days = days
        df = pd.read_csv(f"../data/Processed_{target_market}.csv")
        df['Date'] = pd.to_datetime(df['Date'], format='%Y-%m-%d')
        df = df.sort_values(by='Date')
        df = df.drop(columns=['Name', 'Date'])
        df = df.fillna(0)
        # outlier detection
        for col in df.columns:
            q1, q3 = df[col].quantile([0.25, 0.75])
            iqr = q3 - q1
            lb, rb = q1 - 1.5 * iqr, q3 + 1.5 * iqr
            for i in range(len(df)):
                if df[col][i] > rb:
                    df[col][i] = rb
                if df[col][i] < lb:
                    df[col][i] = lb
        self.features = df.shape[1]
        num_rows_init = df.shape[0]
        u = df.iloc[0:int(num_rows_init*(1-split))]
        self.mean = u.mean()
        self.std = u.std()
        if not train:
            u = df.iloc[int(num_rows_init*(1-split)) - self.days:]
        self.num_rows = u.shape[0]
        u = (u - self.mean)/self.std
        self.market_data = u
    
    def get_label(self, idx):
        return torch.tensor([float(self.market_data.iloc[idx + 1]["Close"])])
    
    def __getitem__(self, idx):
        end_idx = self.days + idx - 1
        lab = self.get_label(end_idx)
        df = self.market_data
        market_tens = torch.reshape(torch.from_numpy(np.array(df.iloc[idx:end_idx+1])), (self.features, self.days))
        return market_tens.float(), lab
    
    def __len__(self):
        return self.num_rows - self.days
    
    def var(self):
        return self.std['Close']**2

In [4]:
train_ds = StockMarketDataReg(train=True, target_market="S&P", split=0.2, days=DAYS)

In [5]:
prev = None
c1, c0 = 0, 0
for _, y in train_ds:
    if prev is None:
        prev = y
        continue
    c0 += (y < prev).item()
    c1 += (y >= prev).item()
c0 / (c0 + c1), c1 / (c0 + c1)

(0.09239842726081259, 0.9076015727391874)

In [6]:
train_dataloader = Data.DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)

In [7]:
class Generator(nn.Module):
    def __init__(self, num_features, days, batch_size, hidden_size=100):
        super().__init__()
        self.batch_size = batch_size
        self.num_features = num_features
        self.days = days
        self.hidden_size = hidden_size
        # model
        self.lstm = nn.LSTM(input_size=self.num_features, hidden_size=self.hidden_size, num_layers=1, batch_first=True)
        self.decoder = nn.Linear(self.hidden_size*self.days, 1)
    def forward(self, inp):
        inp = torch.transpose(inp, 1, 2)
        out1, _ = self.lstm(inp)
        out1 = out1.reshape((self.batch_size, -1))
        out2 = self.decoder(out1)
        out2 = out2.squeeze()
        return out2

In [20]:
gen = Generator(NUM_FEATURES, DAYS, BATCH_SIZE)

In [9]:
class Discriminator(nn.Module):
    def __init__(self, num_features, batch_size, c=4):
        # c denotes flatenning constant
        super().__init__()
        self.num_features = num_features
        self.batch_size = batch_size
        self.convs = nn.Sequential(
            nn.Conv1d(self.num_features, 32, kernel_size=5, stride=2),
            nn.LeakyReLU(0.01),
            nn.Conv1d(32, 64, kernel_size=5, stride=2),
            nn.LeakyReLU(0.01),
            nn.BatchNorm1d(64, momentum=0.9, eps=1e-05), 
            nn.Conv1d(64, 128, kernel_size=5, stride=2),
            nn.LeakyReLU(0.01),
            nn.BatchNorm1d(128, momentum=0.9, eps=1e-05),
        )
        self.dense = nn.Sequential(
            nn.Linear(128*c, 220),
            nn.BatchNorm1d(220, momentum=0.9, eps=1e-05),
            nn.LeakyReLU(0.01),
            nn.Linear(220, 220),
            nn.ReLU(),
            nn.Linear(220, 1)
        )
    def forward(self, inp):
        out1 = self.convs(inp)
        out1 = out1.reshape(self.batch_size, -1)
        out2 = self.dense(out1)
        out2 = out2.squeeze()
        return out2

In [10]:
disc = Discriminator(NUM_FEATURES, BATCH_SIZE)

In [55]:
adversarial_loss = torch.nn.BCELoss()

generator = Generator(NUM_FEATURES, DAYS, BATCH_SIZE)
discriminator = Discriminator(NUM_FEATURES, BATCH_SIZE)
    
    
optimizer_G = torch.optim.Adam(generator.parameters(), lr=0.002)
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=0.002)


def Train_Loss(generator, discrimitator, optimizer_G, optimizer_D, generated_data, real_data):
    
    g_loss = adversarial_loss(discriminator(generated_data), valid)
    g_loss.backward()
    optimizer_G.step()
    optimizer_D.zero_grad()

    # Valid and fake are ground truths 
    real_loss = adversarial_loss(discriminator(real_data), valid)
    fake_loss = adversarial_loss(discriminator(generated_data.detach()), fake)
    d_loss = (real_loss + fake_loss) / 2

    d_loss.backward()
    optimizer_D.step()