In [3]:
import os
import copy
import time
import pickle
import numpy as np
import pandas as pd
from datetime import datetime
import time
import argparse
import random
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset, Subset, random_split
from torchvision import datasets, transforms
import torchvision
from torch.autograd import Variable
from torchvision.datasets import MNIST, EMNIST
import torch.nn.functional as F
from matplotlib.pyplot import subplots
from torchvision.utils import save_image
import torch.optim as optim
from tensorboardX import SummaryWriter
import matplotlib.pyplot as plt
seed = 42
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
bs = 100
n_epoch = 50
dim=100
#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
writer = SummaryWriter(os.path.join('../celeba', 'test'))
gpu = 0
device = 0

https://github.com/TomislavZupanovic/Data-Reconstruction/blob/main/train.py

In [2]:
class Generator(nn.Module):
    """ Generator part of DCGAN model """
    def __init__(self, latent_vector_size, feature_map, num_channels):
        super(Generator, self).__init__()
        self.latent_vector = latent_vector_size
        self.feature_map = feature_map
        self.channels = num_channels
        self.optimizer = None
        self.main = None

    def build(self):
        """ Builds model with Sequential definition """
        self.main = nn.Sequential(
            # Input is latent vector
            nn.ConvTranspose2d(self.latent_vector, self.feature_map * 8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(self.feature_map * 8),
            nn.ReLU(True),
            # state size: (feature_map*8) x 4 x 4
            nn.ConvTranspose2d(self.feature_map * 8, self.feature_map * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.feature_map * 4),
            nn.ReLU(True),
            # state size: (feature_map*4) x 8 x 8
            nn.ConvTranspose2d(self.feature_map * 4, self.feature_map * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.feature_map * 2),
            nn.ReLU(True),
            # state size: (feature_map*2) x 16 x 16
            nn.ConvTranspose2d(self.feature_map * 2, self.feature_map, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.feature_map),
            nn.ReLU(True),
            # state size: (feature_map) x 32 x 32
            nn.ConvTranspose2d(self.feature_map, self.channels, 4, 2, 1, bias=False),
            nn.Tanh()
            # Output is image: channels x 64 x 64
        )

    def forward(self, input):
        """ Perform forward pass """
        return self.main(input)

    def define_optim(self, learning_rate, beta1):
        self.optimizer = optim.Adam(self.main.parameters(), lr=learning_rate, betas=(beta1, 0.999))

    @staticmethod
    def init_weights(layers):
        """ Randomly initialize weights from Normal distribution with mean = 0, std = 0.02 """
        classname = layers.__class__.__name__
        if classname.find('Conv') != -1:
            nn.init.normal_(layers.weight.data, 0.0, 0.02)
        elif classname.find('BatchNorm') != -1:
            nn.init.normal_(layers.weight.data, 1.0, 0.02)
            nn.init.constant_(layers.bias.data, 0)


class Discriminator(nn.Module):
    """ Discriminator part of DCGAN model """
    def __init__(self, latent_vector_size, feature_map, num_channels):
        super(Discriminator, self).__init__()
        self.latent_vector = latent_vector_size
        self.feature_map = feature_map
        self.channels = num_channels
        self.criterion = None
        self.optimizer = None
        self.main = None

    def build(self):
        """ Builds model with Sequential definition """
        self.main = nn.Sequential(
            # Input is image: channels x 64 x 64
            nn.Conv2d(self.channels, self.feature_map, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True),
            # State size: (feature_map) x 32 x 32
            nn.Conv2d(self.feature_map, self.feature_map * 2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.feature_map * 2),
            nn.LeakyReLU(0.2, inplace=True),
            # State size: (feature_map * 2) x 16 x 16
            nn.Conv2d(self.feature_map * 2, self.feature_map * 4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.feature_map * 4),
            nn.LeakyReLU(0.2, inplace=True),
            # State size: (feature_map * 4) x 8 x 8
            nn.Conv2d(self.feature_map * 4, self.feature_map * 8, 4, 2, 1, bias=False),
            nn.BatchNorm2d(self.feature_map * 8),
            nn.LeakyReLU(0.2, inplace=True),
            # Output size: (feature_map * 8) x 4 x 4
            nn.Conv2d(self.feature_map * 8, 1, 4, 1, 0, bias=False),
            nn.Sigmoid()
        )

    def forward(self, input):
        """ Perform forward pass """
        return self.main(input)

    def define_optim(self, learning_rate, beta1):
        """ Initialize Loss Function and Optimizer """
        self.optimizer = optim.Adam(self.main.parameters(), lr=learning_rate, betas=(beta1, 0.999))

    @staticmethod
    def init_weights(layers):
        """ Randomly initialize weights from Normal distribution with mean = 0, std = 0.02 """
        classname = layers.__class__.__name__
        if classname.find('Conv') != -1:
            nn.init.normal_(layers.weight.data, 0.0, 0.02)
        elif classname.find('BatchNorm') != -1:
            nn.init.normal_(layers.weight.data, 1.0, 0.02)
            nn.init.constant_(layers.bias.data, 0)

In [4]:
class Data:
    def __init__(self, path):
        self.path = path
        self.train_dataloader = None
        self.test_dataloader = None

    def build_dataset(self, image_size, batch_size):
        """ Builds DataLoader for iterating through data """
        print('\nBuilding dataset...')
        transform = transforms.Compose([transforms.Resize(image_size),
                                        transforms.CenterCrop(image_size),
                                        transforms.ToTensor(),
                                        transforms.Normalize((0.5, 0.5, 0.5),
                                                             (0.5, 0.5, 0.5))])
        train_dataset = datasets.ImageFolder(root=self.path + '/train', transform=transform)
        test_dataset = datasets.ImageFolder(root=self.path + '/test', transform=transform)
        self.train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        self.test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

    @staticmethod
    def mask_images(data, option='half'):
        """ Masks the input images with option for 50%, 80% and 90% of image as pixels to reconstruct """
        valid_options = ('half', 'half_random', '10_random', '20_random', '5_random')
        multipliers = {'half': 0.5, 'half_random': 0.5, '10_random': 0.9, '20_random': 0.8, '5_random': 0.95}
        if option not in valid_options:
            raise ValueError(f"Option must be one of: {valid_options}")
        real_img = data[0]
        img_size = real_img.shape[2]
        masked_img, real_part = real_img.clone(), real_img.clone()
        masking_equations = [-1.0, -1.0, -1.0]
        if option == 'half':
            mask = np.zeros(real_img.shape[2:])
            mask[:int(img_size / 2), :] = 1
            mask = mask.astype('bool')
            for equation in masking_equations:
                masked_img[:, :, mask] = equation
                real_part[:, :, ~mask] = equation
        else:
            random_array = np.random.choice(2, int(img_size ** 2), p=[1 - multipliers[option], multipliers[option]])
            mask = random_array.reshape(real_img.shape[2:]).astype('bool')
            for equation in masking_equations:
                masked_img[:, :, mask] = equation
                real_part[:, :, ~mask] = equation
        return masked_img, real_part, mask

    @staticmethod
    def resize_images(data):
        """ Resize Tensor Images to 4 times lower resolution """
        real_img = data[0]
        resize_images = real_img.clone()
        dim_size = int(resize_images.shape[2] / 4)
        resize_images = functional.interpolate(resize_images, size=(dim_size, dim_size), mode='bilinear')
        return resize_images

    def plot_samples(self):
        """ Plots some image samples from dataloader """
        print('\nPlotting some image samples...')
        # Iterate over data with specified batch_size of 128
        images, labels = next(iter(self.train_dataloader))
        fig = plt.figure(1, figsize=(15, 5))
        for idx in range(10):
            # Make plotting grid
            ax = fig.add_subplot(2, 10 / 2, idx + 1, xticks=[], yticks=[])
            # Use image from dataloader and transform to numpy array
            img = images[idx].numpy()
            # Transpose image so that last dim is number of channels and un-normalize
            transposed_img = np.transpose(img, (1, 2, 0))
            image = transposed_img * np.array((0.5, 0.5, 0.5)) + np.array((0.5, 0.5, 0.5))
            plt.imshow(image)
        plt.show()

In [5]:
data = Data('../cele_data/celeba')