In [None]:
import torch 
import random

import torch.nn as nn 
import pandas as pd 
import matplotlib.pyplot as plt 
import numpy as np 

from tqdm import tqdm
from bokeh.plotting import figure, show
from bokeh.io import output_notebook
from sklearn.manifold import TSNE

output_notebook()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Data

In [2]:
## CODE FROM TIME-GAN REPO 
def MinMaxScaler(data):
    numerator = data - np.min(data, 0)
    denominator = np.max(data, 0) - np.min(data, 0)
    return numerator / (denominator + 1e-7)

def google_data_loading(seq_length):
    # Load Google Data
    x = np.loadtxt('GOOGLE.csv', delimiter = ",",skiprows = 1)
    # Flip the data to make chronological data
    x = x[::-1]
    
    # Min-Max Normalizer
    x = MinMaxScaler(x)
    
    dataX = []
    for i in range(0, len(x) - seq_length):
        _x = x[i:i + seq_length]
        dataX.append(_x)
    idx = np.random.permutation(len(dataX))
    outputX = []
    for i in range(len(dataX)):
        outputX.append(dataX[idx[i]])
    
    return outputX

In [3]:
# https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
def chunks(lst, n, leave_last=False):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        data = lst[i:i + n]
        if leave_last and len(data) != n:
            break
        yield data 

In [4]:
def noise_generator():
    while True: 
        data = torch.randn(args['batchsize'], args['seq_len'], args['noise_embed']).to(device)
        yield data

In [5]:
def stock_generator(batchsize, length, inf_flag):
    def gen():
        dset = np.array(google_data_loading(length))[:, :, :-1]
        idxs = np.arange(dset.shape[0])
        random.shuffle(idxs)

        for b_idx in chunks(idxs, batchsize, leave_last=True):
            bdata = torch.tensor(dset[b_idx]).float().to(device)
            yield bdata
        
    if inf_flag: 
        while True: 
            for x in gen(): 
                yield x 
    else: 
        for x in gen(): 
            yield x 

# Models

In [6]:
class Discriminator(nn.Module):
    def __init__(self, emebed, hidden, length):
        super().__init__()
        self.lstm = nn.LSTM(emebed, hidden, 3, bidirectional=True)
        self.linear = nn.Linear(hidden * 2, 1)
    
    def forward(self, x):
        # (bs, seq_len, feat) -> ...
        x = x.permute(1, 0, 2)
        x, _ = self.lstm(x)
        
        # permute to (bs, seq, hidden * 2)
        features = x.permute(1, 0, 2)
        
        x = self.linear(features) 
        return x, features

class LSTMRCGenerator(nn.Module):
    def __init__(self, embed_size, hidden, out, seq_len):
        super().__init__()
        self.lstm = nn.LSTM(embed_size, hidden, 1)
        self.linear = nn.Linear(hidden, out)
    
    def forward(self, x):
        # (bs, seq_len, feat) -> (seq_len, bs, feat)
        x = x.permute(1, 0, 2)
        x, _ = self.lstm(x)
        
        # ... -> (bs, seq, hidden * 2)
        features = x.permute(1, 0, 2)
        x = torch.tanh(self.linear(features))
        return x 

# Training

In [7]:
args = dict(
    experiment_name = 'stock_gan',
    # model args 
    noise_embed = 10, 
    disc_hidden = 128, 
    enc_hidden = 128, 
    seq_len = 30, 
    gen_feat = 6, 
    discrim_feat = 6, 
    
    # train args
    batchsize = 64,  
    max_steps = 50000, 
    k = 1, 
)

In [None]:
# define models 
generator = LSTMRCGenerator(args['noise_embed'], args['enc_hidden'], args['gen_feat'], args['seq_len']).to(device)
discriminator = Discriminator(args['discrim_feat'], args['disc_hidden'], args['seq_len']).to(device)

# RCGAN params 
gen_optim = torch.optim.Adam(generator.parameters())
disc_optim = torch.optim.SGD(discriminator.parameters(), 0.1)

data_gen = stock_generator(args['batchsize'], args['seq_len'], True)
noise_gen = noise_generator()

try: 
    for i in tqdm(range(1, args['max_steps']+1)):
        # discrim training
        for _ in range(args['k']):
            bdata = data_gen.__next__() 
            noise = noise_gen.__next__()

            fake = generator(noise).detach() 

            fake_dscore, _ = discriminator(fake)
            true_dscore, _ = discriminator(bdata)

            floss = torch.nn.BCEWithLogitsLoss()(fake_dscore, torch.zeros_like(fake_dscore))
            tloss = torch.nn.BCEWithLogitsLoss()(true_dscore, torch.ones_like(true_dscore))
            dloss = floss + tloss 

            # discriminator update
            disc_optim.zero_grad()
            dloss.backward()
            disc_optim.step()

        # generator update 
        noise = noise_gen.__next__()
        fake = generator(noise) 
        fake_dscore, f_features = discriminator(fake)

        gloss = torch.nn.BCEWithLogitsLoss()(fake_dscore, torch.ones_like(fake_dscore))

        gen_optim.zero_grad()
        gloss.backward()
        gen_optim.step()
        
        print('Generator Loss: {:.2f} Discrim Loss: {:.2f}'.format(dloss, gloss))

        
finally: 
    torch.save(generator.state_dict(), 'models/stock_gen.mdl')
    torch.save(discriminator.state_dict(), 'models/stock_disc.mdl')


# Visualize

In [9]:
def plot(t, obj):
    # bokeh plotting
    p = figure(title=t, 
                sizing_mode='stretch_both',
                tools="xpan,xwheel_zoom,reset,crosshair,save",
                active_drag='xpan',
                active_scroll='xwheel_zoom')

    bar_width = 1 # 1-day 
    
    h, l, o, c = obj['high'].cpu().numpy(), obj['low'].cpu().numpy(), obj['open'].cpu().numpy(), obj['close'].cpu().numpy()
    inc, dec = c > o, c < o
    dt = np.arange(len(h))

    # plot candles 
    p.segment(dt, h, dt, l, color="black")
    p.vbar(dt[inc], bar_width, o[inc], c[inc], fill_color="green", line_color="black")
    p.vbar(dt[dec], bar_width, o[dec], c[dec], fill_color="red", line_color="black")

    return p    

In [10]:
def plot_sample(data):
    obj = {
        'open': data[:, 0],
        'close': data[:, 1],
        'high': data[:, 2],
        'low': data[:, 3],
    }
    p = plot('data', obj)
    show(p)

In [11]:
def get_sample(generator, noise_gen):
    noise = noise_gen.__next__()[:1]
    fake = generator(noise).detach().cpu().squeeze()
    return fake 

In [12]:
data = get_sample(generator, noise_gen)

In [13]:
plot_sample(data)