In [1]:
# ========================
# Standard Library
# ========================
import os
import random
import itertools
from copy import deepcopy
from collections import defaultdict
from typing import Dict, List, Tuple, Optional

# ========================
# Core Scientific Stack
# ========================
import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype
from scipy import stats
from scipy.special import boxcox as sp_boxcox

# ========================
# Machine Learning Utilities
# ========================
from sklearn.model_selection import train_test_split

# ========================
# Deep Learning (PyTorch)
# ========================
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch import autograd
from torch.utils.data import DataLoader, TensorDataset

# ========================
# Visualization
# ========================
import matplotlib.pyplot as plt
import seaborn as sns


In [2]:
def seed_all(seed: int = 42, deterministic: bool = True) -> None:
    import os, random, numpy as np, torch

    # Python built-ins
    random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)

    # NumPy
    np.random.seed(seed)

    # PyTorch
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

    # cuDNN settings
    torch.backends.cudnn.deterministic = deterministic
    torch.backends.cudnn.benchmark = not deterministic

    if torch.cuda.is_available():
        torch.backends.cuda.matmul.allow_tf32 = False
        torch.backends.cudnn.allow_tf32 = False


In [3]:
seed_all()

In [4]:
class Discriminator(nn.Module):
    def __init__(self, Hyper007_HD, data_types):
        super().__init__()
        HD, OD = Hyper007_HD, 1
        self.max_real = max(data_types.loc[data_types["type"] == "real", "index_end"])
        self.embedding_layers = nn.ModuleList()
        self.soft_embedding = []
        for _, row in data_types.iterrows():
            if row["type"] != "real":
                self.embedding_layers.append(nn.Embedding(row["num_classes"], row["embedding_size"]))
                idxs, idxe = row["index_start"], row["index_end"]
                self.soft_embedding.append(
                    lambda x, W, idxs=idxs, idxe=idxe: x[..., idxs:idxe] @ W
                )
        self.linear1 = nn.Linear(sum(data_types["embedding_size"]), HD)
        self.linear2 = nn.Linear(HD, HD)
        self.rnn_f = MyLSTM(HD, HD)
        self.rnn_r = MyLSTM(HD, HD)
        self.linear3 = nn.Linear(2 * HD, OD)
        self.leakyReLU = nn.LeakyReLU(0.1)

    def forward(self, x0):
        x_list = [x0[..., :self.max_real]] + [
            f(x0, emb.weight) for f, emb in zip(self.soft_embedding, self.embedding_layers)
        ]
        x1 = torch.cat(x_list, dim=-1)
        x2 = self.leakyReLU(self.linear1(x1))
        x3 = self.leakyReLU(self.linear2(x2))
        _, (x4_f, _) = self.rnn_f(x3)
        _, (x4_r, _) = self.rnn_r(x3.flip(dims=[1]))
        x4 = torch.cat((x4_f, x4_r), dim=1)
        return self.linear3(x4)

In [5]:
class Execute_D004:
    def __init__(self,
                 Hyper001_BatchSize, Hyper002_Epochs,
                 Hyper003_G_iter, Hyper004_GP_Lambda, Hyper005_C_Lambda,
                 Hyper006_ID, Hyper007_HD,
                 Hyper008_LR, Hyper009_Betas,
                 data_types,
                 correlation_real,
                 continue_info=[False, 'G_SD', 'D_SD', 0]):
        super().__init__()
        self.batch_size = Hyper001_BatchSize
        self.epochs = Hyper002_Epochs
        self.G_iter = Hyper003_G_iter
        self.gp_weight = Hyper004_GP_Lambda
        self.c_weight = Hyper005_C_Lambda
        self.ID = Hyper006_ID
        self.HD = Hyper007_HD
        self.lr = Hyper008_LR
        self.betas = Hyper009_Betas
        self.correlation_real = correlation_real.cuda()
        self.CUDA = torch.cuda.is_available()
        self.G = Generator(Hyper006_ID, Hyper007_HD, data_types)
        self.D = Discriminator(Hyper007_HD, data_types)
        if self.CUDA:
            self.G = self.G.cuda()
            self.D = self.D.cuda()
        G_SD, D_SD = LoadPreTrain(continue_info)
        if G_SD != 0:
            self.G.load_state_dict(G_SD)
            self.D.load_state_dict(D_SD)
            self.PreviousEpoch = continue_info[3]
        else:
            self.PreviousEpoch = 0
        self.D_opt = optim.Adam(self.D.parameters(), lr=self.lr, betas=self.betas)
        self.G_opt = optim.Adam(self.G.parameters(), lr=self.lr, betas=self.betas)

    def generate_data(self, seq_len, num_samples=None):
        if num_samples is None:
            num_samples = self.batch_size
        z = torch.rand((num_samples, seq_len, self.ID)).cuda()
        return self.G(z)

    def _critic_train_iteration(self, data_real):
        data_fake = self.generate_data(data_real.shape[1], data_real.shape[0])
        D_real = self.D(data_real)
        D_fake = self.D(data_fake)
        with torch.backends.cudnn.flags(enabled=False):
            gradient_penalty = self._gradient_penalty(data_real, data_fake)
        self.D_opt.zero_grad()
        D_loss = D_fake.mean() - D_real.mean() + gradient_penalty
        D_loss.backward()
        self.D_opt.step()
        return D_loss.item(), gradient_penalty.item(), D_real.mean().item(), D_fake.mean().item()

    def _generator_train_iteration(self, seq_len):
        data_fake = self.generate_data(seq_len)
        D_fake = self.D(data_fake)
        corr_loss = self._correlation_loss(data_fake)
        self.G_opt.zero_grad()
        G_loss = -D_fake.mean() + self.c_weight * corr_loss
        G_loss.backward()
        self.G_opt.step()
        return G_loss.item(), corr_loss.item()

    def _correlation_loss(self, data_fake):
        correlation_fake = correlation(data_fake)
        criterion = nn.L1Loss(reduction="mean")
        temp = torch.rand_like(correlation_fake)
        temp_fake = torch.min(temp, torch.abs(correlation_fake))
        temp_real = torch.min(temp, torch.abs(self.correlation_real))
        correlation_fake = temp_fake * torch.sign(correlation_fake)
        correlation_real = temp_real * torch.sign(self.correlation_real)
        return criterion(correlation_fake, correlation_real)

    def _gradient_penalty(self, data_real, data_fake):
        alpha = torch.rand((self.batch_size, 1, 1)).cuda().expand_as(data_real)
        interpolated = alpha * data_real + (1 - alpha) * data_fake
        prob_interpolated = self.D(interpolated)
        gradients = autograd.grad(
            outputs=prob_interpolated,
            inputs=interpolated,
            grad_outputs=torch.ones_like(prob_interpolated).cuda(),
            create_graph=True,
            retain_graph=True,
        )[0]
        gradients = gradients.view(self.batch_size, -1)
        gradients_norm = torch.sqrt(torch.sum(gradients ** 2, dim=1) + 1e-12)
        return self.gp_weight * ((gradients_norm - 1) ** 2).mean()

    def train(self, All_loader):
        All_Length = sorted(All_loader.keys())
        for epoch in range(self.epochs - self.PreviousEpoch):
            for Cur_Len in All_Length:
                Cur_loader = All_loader[Cur_Len]
                for batch_idx, (data_real, _) in enumerate(Cur_loader):
                    data_real = data_real.cuda()
                    for _ in range(self.G_iter):
                        D_Loss, GP, D_real_mean, D_fake_mean = self._critic_train_iteration(data_real)
                    G_Loss, Corr_Loss = self._generator_train_iteration(seq_len=Cur_Len)
                    if (np.mod(batch_idx + 1, round(len(Cur_loader) / 5)) == 0) or \
                       (batch_idx + 1 == len(Cur_loader)):
                        print(f"[Epoch {self.PreviousEpoch + epoch + 1}] "
                              f"L={Cur_Len} step {batch_idx + 1}/{len(Cur_loader)} | "
                              f"D_loss={D_Loss:.4f} GP={GP:.4f} "
                              f"D_real={D_real_mean:.4f} D_fake={D_fake_mean:.4f} | "
                              f"G_loss={G_Loss:.4f} Corr={Corr_Loss:.4f}")
