In [3]:
import torch
from torch import nn
import torch.optim as optim
from tqdm import tqdm
import numpy as np

import pickle
import pandas as pd

In [10]:
df = pd.read_csv('dummies_data_1.csv')
df = df.iloc[0:753305, :]

In [11]:
df.shape

(753305, 53)

# Generator

In [15]:
class Generator(nn.Module):
    def __init__(self, latent_dim, output_activation=None):
        """A generator for mapping a latent space to a sample space"""
        super(Generator, self).__init__()
        self.linear1 = nn.Linear(latent_dim, 159)
        self.leaky_relu = nn.LeakyReLU()
        self.linear2 = nn.Linear(159, 106)
        self.linear3 = nn.Linear(106, 53)
        self.output_activation = output_activation

    def forward(self, input_tensor):
        """Forward pass; map latent vectors to samples"""
        intermediate = self.linear1(input_tensor)
        intermediate = self.leaky_relu(intermediate)
        intermediate = self.linear2(intermediate)
        intermediate = self.leaky_relu(intermediate)
        intermediate = self.linear3(intermediate)
        if self.output_activation is not None:
            intermediate = self.output_activation(intermediate)
        return intermediate

# Discriminator

In [16]:
class Discriminator(nn.Module):
    def __init__(self, input_dim, layers):
        """A discriminator for discerning real from generated samples"""
        super(Discriminator, self).__init__()
        self.input_dim = input_dim
        self._init_layers(layers)

    def _init_layers(self, layers):
        """Initialize the layers and store as self.module_list"""
        self.module_list = nn.ModuleList()
        last_layer = self.input_dim
        for index, width in enumerate(layers):
            self.module_list.append(nn.Linear(last_layer, width))
            last_layer = width
            if index + 1 != len(layers):
                self.module_list.append(nn.LeakyReLU())
        else:
            self.module_list.append(nn.Sigmoid())

    def forward(self, input_tensor):
        """Forward pass; map samples to confidence they are real [0, 1]"""
        intermediate = input_tensor
        for layer in self.module_list:
            intermediate = layer(intermediate)
        return intermediate

# VanillaGAN

In [17]:
class VanillaGAN():
    def __init__(self, generator, discriminator, noise_fn, data_fn,
                 batch_size=32, device='cpu', lr_d=1e-3, lr_g=2e-4):
        """A GAN class for holding and training a generator and discriminator"""
        self.generator = generator
        self.generator = self.generator.to(device)
        self.discriminator = discriminator
        self.discriminator = self.discriminator.to(device)
        self.noise_fn = noise_fn
        self.data_fn = data_fn
        self.batch_size = batch_size
        self.device = device
        self.criterion = nn.BCELoss()
        self.optim_d = optim.Adam(discriminator.parameters(),
                                  lr=lr_d, betas=(0.5, 0.999))
        self.optim_g = optim.Adam(generator.parameters(),
                                  lr=lr_g, betas=(0.5, 0.999))
        self.target_ones = torch.ones((batch_size, 53)).to(device)
        self.target_zeros = torch.zeros((batch_size, 53)).to(device)

    def generate_samples(self, latent_vec=None, num=None):
        """Sample from the generator"""
        num = self.batch_size if num is None else num
        latent_vec = self.noise_fn(num) if latent_vec is None else latent_vec
        with torch.no_grad():
            samples = self.generator(latent_vec)
        return samples

    def train_step_generator(self):
        """Train the generator one step and return the loss"""
        self.generator.zero_grad()

        latent_vec = self.noise_fn(self.batch_size)
        generated = self.generator(latent_vec)
        classifications = self.discriminator(generated)
        loss = self.criterion(classifications, self.target_ones)
        loss.backward()
        self.optim_g.step()
        return loss.item()

    def train_step_discriminator(self):
        """Train the discriminator one step and return the losses"""
        self.discriminator.zero_grad()
        # real samples
        real_samples = self.data_fn(self.batch_size)
 
        pred_real = self.discriminator(real_samples)
        loss_real = self.criterion(pred_real, self.target_ones)

        # generated samples
        latent_vec = self.noise_fn(self.batch_size)
        with torch.no_grad():
            fake_samples = self.generator(latent_vec)
        pred_fake = self.discriminator(fake_samples)
        loss_fake = self.criterion(pred_fake, self.target_zeros)

        # combine
        loss = (loss_real + loss_fake) / 2
        loss.backward()
        self.optim_d.step()
        return loss_real.item(), loss_fake.item()

    def train_step(self):
        """Train both networks and return the losses"""
        loss_d = self.train_step_discriminator()
        loss_g = self.train_step_generator()
        return loss_g, loss_d

# Put together

In [12]:
def data_fn(batches):
    global start_batch, length_data
    data_batch = df.iloc[start_batch:start_batch + batches, :].to_numpy().astype(np.float32)
    start_batch += batches
    return torch.from_numpy(data_batch).to('cpu')

In [18]:
from time import time
epochs = 5
batches = 35
generator = Generator(53)
discriminator = Discriminator(53, [159, 106, 53])
noise_fn = lambda x: torch.rand((x, 53), device='cpu')
length_data = df.shape[0]
gan = VanillaGAN(generator, discriminator, noise_fn, data_fn, batches, device='cpu')
loss_g, loss_d_real, loss_d_fake = [], [], []
start = time()
for epoch in range(epochs):
    start_batch = 0
    loss_g_running, loss_d_real_running, loss_d_fake_running = 0, 0, 0
    for batch in range(length_data // batches):
        lg_, (ldr_, ldf_) = gan.train_step()
        loss_g_running += lg_
        loss_d_real_running += ldr_
        loss_d_fake_running += ldf_
    loss_g.append(loss_g_running / batches)
    loss_d_real.append(loss_d_real_running / batches)
    loss_d_fake.append(loss_d_fake_running / batches)
    print(f"Epoch {epoch+1}/{epochs} ({int(time() - start)}s):"
        f" G={loss_g[-1]:.3f},"
        f" Dr={loss_d_real[-1]:.3f},"
        f" Df={loss_d_fake[-1]:.3f}")

Epoch 1/5 (68s): G=24608.870, Dr=47347.400, Df=1.173
Epoch 2/5 (136s): G=61494.286, Dr=61493.162, Df=0.000
Epoch 3/5 (206s): G=61494.286, Dr=61493.149, Df=0.000
Epoch 4/5 (289s): G=61494.286, Dr=61492.976, Df=0.000
Epoch 5/5 (356s): G=5605.045, Dr=33697.327, Df=301.015


In [19]:
generated_samples = gan.generate_samples(num=1).cpu().numpy()


In [20]:
generated_samples

array([[ 1.2350484e+02,  1.4096575e+00,  4.9467897e+00, -2.5002880e+00,
         4.5303035e+00,  7.9789171e+00, -2.9454010e+00,  5.0018346e-01,
         8.6151773e-01, -4.5869684e+00, -1.2231378e+01, -5.4069657e+00,
         1.1816949e+01, -6.3610392e+00,  4.6811323e+00,  3.1741593e+00,
        -5.2092929e+00, -1.3620521e+00, -8.7754126e+00,  9.1118164e+00,
        -2.9528558e+00,  1.0148350e+01,  2.1381004e+00, -6.4488797e+00,
        -2.8645508e+00, -7.6953536e-01,  4.6907535e+00, -3.6098516e+00,
         2.7437675e+00, -8.0166674e+00,  7.1550541e-02, -1.2247552e+00,
        -5.4691935e+00,  3.0750759e+00,  1.0432886e+01, -8.5669136e+00,
        -1.2513848e+01,  3.8519418e+00, -3.7671458e-02,  1.0727370e+00,
         6.0059862e+00,  5.7001381e+00,  7.8552852e+00,  6.8618016e+00,
        -4.2021661e+00, -7.0804864e-01,  9.2491751e+00,  5.7030454e+00,
         1.1312075e+00, -1.9444726e-01, -1.0848482e+01,  6.1816282e+00,
         1.4729663e+00]], dtype=float32)