In [0]:
import optuna
import mlflow

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from torch.nn.utils import weight_norm

from torch.optim.lr_scheduler import StepLR
from mlflow import pytorch

import pandas as pd
import numpy as np

import datetime

In [0]:
from pyspark.sql.functions import split, explode, col
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.types import *
import struct, time
import numpy as np
import pandas as pd
import urllib
import json

tickers = ["GOOG", "AAPL"]

df = spark.createDataFrame([(i,) for i in tickers],("ticker",))

return_type = ArrayType(MapType(StringType(), StringType()))

@udf(returnType=return_type)
def yahoo_udf(ticker_label):

    headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:74.0) Gecko/20100101 Firefox/74.0'}
    end_ts = int(time.time())
    start_ts = end_ts - 3600*24*7
    ticker = ticker_label
    data_url = f"https://query1.finance.yahoo.com/v8/finance/chart/{ticker}?symbol={ticker}&period1={start_ts}&period2={end_ts}&useYfid=true&interval=1m&includePrePost=true"
    values = None
    data_dict = {i: [] for i in ['timestamp', 'low', 'high', 'close', 'open', 'volume']}
    try: 
        f = urllib.request.urlopen(urllib.request.Request(data_url, headers=headers))
        status = f.status
        if status == 200:
            jresp = json.loads(f.read().decode("utf-8"))    
            data_dict['timestamp'] = jresp['chart']['result'][0]['timestamp']
            for k in ['low', 'high', 'close', 'open', 'volume']:
                data_dict[k] = jresp['chart']['result'][0]['indicators']['quote'][0][k]
    except:
        pass
    df = pd.DataFrame(data_dict)
    values = df.to_dict("index").values()
    return list(values)



extracted = yahoo_udf("ticker")
exploded = explode(extracted).alias("exploded")
expanded = [
    col("exploded").getItem(k).alias(k) for k in ['timestamp', 'low', 'high', 'close', 'open', 'volume']
]

pdf = df.select("ticker", exploded).select("ticker", *expanded).toPandas()
pdf.to_csv("data.csv")
pdf

Unnamed: 0,ticker,timestamp,low,high,close,open,volume
0,GOOG,1662451200,109.01,109.4,109.28,109.01,0
1,GOOG,1662451260,109.31,109.5,109.5,109.5,0
2,GOOG,1662451320,109.31,109.5,109.37,109.5,0
3,GOOG,1662451380,109.33,109.4,109.33,109.38,0
4,GOOG,1662451440,109.4,109.5,109.4,109.5,0
...,...,...,...,...,...,...,...
6988,AAPL,1662767700,157.46,157.48,157.46,157.48,0
6989,AAPL,1662767760,157.45,157.5,157.48,157.47,0
6990,AAPL,1662767820,157.48,157.52,157.51,157.48,0
6991,AAPL,1662767880,157.5,157.53,157.52,157.51,0


In [0]:
class DatasetLoader(Dataset):

    def __init__(self, filename, normalize=True):
        min_seq_arr = []
        all_v = []
        pdf = pd.read_csv(filename)
        for t in pdf.groupby('ticker').apply(lambda df: df.sample(1))['ticker'].values.tolist():
            a_values_arr = []
            for k in ['low', 'high', 'close', 'open']:
                df = pdf.loc[pdf['ticker']==t][['timestamp', k]].rename(columns={k:'value'})
                df['timestamp'] = pd.to_datetime(df["timestamp"], unit='s')
                df = df.set_index("timestamp")
                df['value'] = df['value'].astype(np.float32)
                df['value'] = df['value'].interpolate()
                min_seq = min([df.loc[df.index.day==d].shape[0] for d in df.index.day.unique().tolist()])
                if min_seq > 300:
                    a_values_arr += [df.value]
                    min_seq_arr += [min_seq]
            all_v.append([group[1].to_numpy().tolist()[:min(min_seq_arr)] for i in a_values_arr for group in i.groupby(df.index.date)])

        all_v = [i for i in all_v[:] if len(i)]
        min_seq = min([np.array(i).shape[1] for i in all_v])
        data = torch.from_numpy(np.expand_dims(np.array([np.array(j).T[:min_seq].T for i in all_v for j in i]), -1)).float()
        self.data = self.normalize(data) if normalize else data
        self.seq_len = data.size(1)

    def normalize(self, x):
        self.max = x.max()
        self.min = x.min()
        return (2 * (x - x.min())/(x.max() - x.min()) - 1)
    
    def denormalize(self, x):
        return 0.5 * (x*self.max - x*self.min + self.max + self.min)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]


In [0]:
class _Block(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(_Block, self).__init__()

        self.padding = padding
        self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size, stride=stride, padding=padding, dilation=dilation))
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)
        self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size, stride=stride, padding=padding, dilation=dilation))       
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)
        
        self.resample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        self.init_weights()

    def init_weights(self):
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.resample != None:
            self.resample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        x0 = self.conv1(x)
        x0 = x0[:, :, :-self.padding].contiguous()
        x0 = self.relu1(x0)
        x0 = self.dropout1(x0)

        x0 = self.conv2(x0)
        x0 = x0[:, :, :-self.padding].contiguous()
        x0 = self.relu2(x0)
        out = self.dropout2(x0)

        res = x if self.resample is None else self.resample(x)
        return self.relu(out + res)



class CausalDialationBlock(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
        super(CausalDialationBlock, self).__init__()
        layers = []
        num_levels = len(num_channels)
         
        layers = [_Block(num_inputs, num_channels[0], kernel_size, stride=1, dilation=1, padding=kernel_size-1, dropout=dropout)]
        layers += [_Block(num_channels[i-1], num_channels[i], kernel_size, stride=1, dilation=((2 ** i) % 512), padding=(kernel_size-1) * ((2 ** i) % 512), dropout=dropout) for i in range(1, num_levels)]

        self.network = nn.Sequential(*layers)
        self.lstm = nn.LSTM(num_inputs, 256, 1, batch_first=True)
        self.linear = nn.Sequential(nn.Linear(256, 1), nn.Tanh())

    def  forward(self, x):
        return self.network(x)



class Discriminator(nn.Module):
    def __init__(self, in_dim, kernel_size=8, n_layers=1, hidden_dim=256, n_channel=10, cnn_layers=4, dropout=0):
        super().__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        
        self.lstm = nn.LSTM(in_dim, hidden_dim, n_layers, batch_first=True)
        self.linear_1 = nn.Sequential(nn.Linear(hidden_dim, 1), nn.Sigmoid())

        num_channels = [n_channel] * cnn_layers

        self.causalBlock = CausalDialationBlock(in_dim, num_channels, kernel_size=kernel_size, dropout=dropout)
        self.rectify = nn.ReLU()
        self.linear_2 = nn.Linear(num_channels[-1], 1)
        self.linear_2.weight.data.normal_(0, 0.01)

    def forward(self, input, channel_last=True):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        batch_size, seq_len = input.size(0), input.size(1)
        h_0 = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device)
        c_0 = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device)

        recurrent_features, _ = self.lstm(input, (h_0, c_0))
        outputs = self.linear_1(recurrent_features.contiguous().view(batch_size * seq_len, self.hidden_dim))
        outputs = outputs.view(batch_size, seq_len, 1)

        y1 = self.causalBlock(outputs.transpose(1, 2) if channel_last else outputs)

        j = self.linear_2(y1.transpose(1, 2))
        j0 = self.rectify(j)
        r = torch.tanh(j0)
        return r


class Generator(nn.Module):
    def __init__(self, in_dim, out_dim, n_channel, kernel_size, dropout=0.2, n_layers=1, hidden_dim=256):
        super().__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.out_dim = out_dim

        self.lstm = nn.LSTM(in_dim, hidden_dim, n_layers, batch_first=True)
        self.linear_1 = nn.Sequential(nn.Linear(hidden_dim, out_dim), nn.Tanh())
        num_channels = [n_channel] * n_layers

        self.causalBlock = CausalDialationBlock(in_dim, num_channels, kernel_size=kernel_size, dropout=dropout)
        self.linear_2 = nn.Linear(num_channels[-1], 1)
        self.linear_2.weight.data.normal_(0, 0.01)

    def forward(self, input, channel_last=True):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        batch_size, seq_len = input.size(0), input.size(1)
        h_0 = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device)
        c_0 = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device)

        recurrent_features, _ = self.lstm(input, (h_0, c_0))
        y1 = self.causalBlock(recurrent_features.transpose(1, 2) if channel_last else recurrent_features)
        j = self.linear_2(y1.transpose(1, 2))
        r = torch.tanh(j)
        return r

In [0]:
def train(netG, netD, device, dataloader, optimizerG, optimizerD, epochs):
    nz_dim = 100
    netG.train()
    netD.train()
    seq_len = dataloader.dataset[0].size(0)
    criterion = nn.BCELoss().to(device)

    real_label = 1.0
    fake_label = 0.0
    for epoch in range(epochs):
        for i, data in enumerate(dataloader, 0):
            if i == 0:
                real_zero = data.cpu()

            netD.zero_grad()

            real = data.to(device)
            batch_size, seq_len = real.size(0), real.size(1)
            label = torch.full((batch_size, seq_len, 1), real_label, device=device)
            output = netD(real)

            errD_real = criterion(output, label)
            errD_real.backward()
            D_x = output.mean().item()

            noise = torch.randn(batch_size, seq_len, nz_dim, device=device)

            fake = netG(noise)    

            label.fill_(fake_label)
            output = netD(fake.detach())

            errD_fake = criterion(output, label)
            errD_fake.backward()
            D_G_z1 = output.mean().item()
            errD = errD_real + errD_fake
            optimizerD.step()

            netG.zero_grad()
            label.fill_(real_label)
            output = netD(fake)

            errG = criterion(output, label)
            errG.backward()
            D_G_z2 = output.mean().item()

            optimizerG.step()
            print(f"[{epoch}/{epochs}][{i}/{len(dataloader)}] Loss_D:{errD.item():.4f} Loss_G:{errG.item():.4f} D(x): {D_x:.4f} D(G(z)): {D_G_z1:.4f} / {D_G_z2:.4f}")


def test(netG, device, dataloader):
    nz_dim = 100
    errG = []
    for i, data in enumerate(dataloader, 0):
        real = data.to(device)
        batch_size, seq_len = real.size(0), real.size(1)
        noise = torch.randn(batch_size, seq_len, nz_dim, device=device)
        mse = nn.MSELoss()
        with torch.no_grad():
            fake = netG(noise)    
            errG += [mse(fake, real)]
    v = torch.tensor(errG).float().mean().item()
    return v

In [0]:
def suggest_hyperparameters(trial):
    lr = trial.suggest_float("lr", 1e-4, 1e-1, log=True)
    dropoutG = trial.suggest_float("dropoutG", 0.0, 0.4, step=0.1)
    dropoutD = trial.suggest_float("dropoutD", 0.0, 0.4, step=0.1)
    optimizer_name = trial.suggest_categorical("optimizer_name", ["Adam", "Adadelta"])

    return lr, dropoutG, dropoutD, optimizer_name

In [0]:
def objective(trial):
    best_val_loss = float('Inf')
    nz_dim = 100
    best_mse_val = None
    
    full_dataset = DatasetLoader("data.csv")
    train_size = int(0.8 * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(full_dataset, [train_size, test_size])

    with mlflow.start_run():

        lr, dropoutG, dropoutD, optimizer_name = suggest_hyperparameters(trial)
        n_epochs = 100
        torch.manual_seed(123)
        mlflow.log_params(trial.params)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        mlflow.log_param("device", device)

        
        train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=12, shuffle=True)
        test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=12, shuffle=True)

        netG = Generator(in_dim=nz_dim, n_channel=10, kernel_size=8, out_dim=1, hidden_dim=100, dropout=dropoutG).to(device)
        netD = Discriminator(in_dim=1, cnn_layers=4, n_layers=1, kernel_size=8, n_channel=10, dropout=dropoutD, hidden_dim=100).to(device)

        optimizerD = getattr(optim, optimizer_name)(netD.parameters(), lr=lr)
        optimizerG = getattr(optim, optimizer_name)(netG.parameters(), lr=lr)
        
        train(netG, netD, device, train_dataloader, optimizerG, optimizerD, n_epochs)
        mse_errG = test(netG, device, test_dataloader)
        
        if best_mse_val is None:
            best_mse_val = mse_errG
        best_mse_val = min(best_mse_val, mse_errG)
        mlflow.log_metric("mse_errG", mse_errG)

    return best_mse_val

In [0]:
run_tag = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")

experiment_id = mlflow.create_experiment(
    f"/Users/myusername/TSGAN_Exp_{run_tag}",
    tags={"version": "v1", "priority": "P1"},
)

mlflow.set_experiment(experiment_id=experiment_id)
study = optuna.create_study(study_name=f"TSGAN_study_{run_tag}", direction="minimize")
study.optimize(objective, n_trials=10)

[32m[I 2022-09-11 09:55:05,615][0m A new study created in memory with name: TSGAN_study_2022-09-11T09:55[0m
[0/100][0/3] Loss_D:2.0937 Loss_G:1.9368 D(x): 0.1440 D(G(z)): 0.1440 / 0.1442
[0/100][1/3] Loss_D:2.0925 Loss_G:1.9354 D(x): 0.1442 D(G(z)): 0.1442 / 0.1444
[0/100][2/3] Loss_D:2.0914 Loss_G:1.9339 D(x): 0.1444 D(G(z)): 0.1444 / 0.1446
[1/100][0/3] Loss_D:2.0901 Loss_G:1.9324 D(x): 0.1446 D(G(z)): 0.1446 / 0.1448
[1/100][1/3] Loss_D:2.0889 Loss_G:1.9309 D(x): 0.1448 D(G(z)): 0.1448 / 0.1450
[1/100][2/3] Loss_D:2.0877 Loss_G:1.9294 D(x): 0.1450 D(G(z)): 0.1450 / 0.1453
[2/100][0/3] Loss_D:2.0864 Loss_G:1.9279 D(x): 0.1453 D(G(z)): 0.1453 / 0.1455
[2/100][1/3] Loss_D:2.0851 Loss_G:1.9263 D(x): 0.1455 D(G(z)): 0.1455 / 0.1457
[2/100][2/3] Loss_D:2.0839 Loss_G:1.9248 D(x): 0.1457 D(G(z)): 0.1457 / 0.1459
[3/100][0/3] Loss_D:2.0826 Loss_G:1.9232 D(x): 0.1459 D(G(z)): 0.1459 / 0.1462
[3/100][1/3] Loss_D:2.0813 Loss_G:1.9217 D(x): 0.1462 D(G(z)): 0.1462 / 0.1464
[3/100][2/3] Loss_D:

In [0]:
params_trial = "\n".join([f"    {k}: {v}" for k, v in study.best_trial.params.items()])
print(f"Study statistics:\n Number of finished trials: {len(study.trials)}\n Best trial:\n  Trial number: {study.best_trial.number}\n  Trial value: {study.best_trial.value}\n  Params:\n{params_trial}")

Study statistics:
 Number of finished trials: 10
 Best trial:
  Trial number: 1
  Trial value: 0.5526883006095886
  Params:
    lr: 0.016045855026896795
    dropoutG: 0.2
    dropoutD: 0.30000000000000004
    optimizer_name: Adadelta
