# Model

In [1]:
import torch
from torch import nn
from torch.nn import Parameter
import torch.nn.functional as F

In [3]:
def l2normalize(v, eps=1e-12):
    return v / (v.norm() + eps)

class SpectralNorm(nn.Module):
    def __init__(self, module, name='weight', power_iterations=1):
        super(SpectralNorm, self).__init__()
        self.module = module
        self.name = name
        self.power_iterations = power_iterations
        if not self._made_params():
            self._make_params()

    def _update_u_v(self):
        u = getattr(self.module, self.name + "_u")
        v = getattr(self.module, self.name + "_v")
        w = getattr(self.module, self.name + "_bar")

        height = w.data.shape[0]
        for _ in range(self.power_iterations):
            v.data = l2normalize(torch.mv(torch.t(w.view(height,-1).data), u.data))
            u.data = l2normalize(torch.mv(w.view(height,-1).data, v.data))

        # sigma = torch.dot(u.data, torch.mv(w.view(height,-1).data, v.data))
        sigma = u.dot(w.view(height, -1).mv(v))
        setattr(self.module, self.name, w / sigma.expand_as(w))

    def _made_params(self):
        try:
            u = getattr(self.module, self.name + "_u")
            v = getattr(self.module, self.name + "_v")
            w = getattr(self.module, self.name + "_bar")
            return True
        except AttributeError:
            return False

    def _make_params(self):
        w = getattr(self.module, self.name)

        height = w.data.shape[0]
        width = w.view(height, -1).data.shape[1]

        u = Parameter(w.data.new(height).normal_(0, 1), requires_grad=False)
        v = Parameter(w.data.new(width).normal_(0, 1), requires_grad=False)
        u.data = l2normalize(u.data)
        v.data = l2normalize(v.data)
        w_bar = Parameter(w.data)

        del self.module._parameters[self.name]

        self.module.register_parameter(self.name + "_u", u)
        self.module.register_parameter(self.name + "_v", v)
        self.module.register_parameter(self.name + "_bar", w_bar)

    def forward(self, *args):
        self._update_u_v()
        return self.module.forward(*args)



def upconv(in_channels, out_channels, kernel_size, stride=2, padding=2, batch_norm=True, init_zero_weights=True, spectral_norm=False):
    """Creates a upsample-and-convolution layer, with optional batch normalization.
    """
    layers = []

    if stride>1:
        layers.append(nn.Upsample(scale_factor=stride))

    conv_layer = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=1, padding=padding, bias=False)

    if init_zero_weights:
        conv_layer.weight.data = torch.randn(out_channels, in_channels, kernel_size, kernel_size) * 0.001

    if spectral_norm:
        layers.append(SpectralNorm(conv_layer))
    else:
        layers.append(conv_layer)

    if batch_norm:
        layers.append(nn.BatchNorm2d(out_channels))

    return nn.Sequential(*layers)


def conv(in_channels, out_channels, kernel_size, stride=2, padding=2, batch_norm=True, init_zero_weights=True, spectral_norm=False):
    """Creates a convolutional layer, with optional batch normalization.
    """
    layers = []

    conv_layer = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=stride, padding=padding, bias=False)

    if init_zero_weights:
        conv_layer.weight.data = torch.randn(out_channels, in_channels, kernel_size, kernel_size) * 0.001
            
    if spectral_norm:
        layers.append(SpectralNorm(conv_layer))
    else:
        layers.append(conv_layer)

    if batch_norm:
        layers.append(nn.BatchNorm2d(out_channels))

    return nn.Sequential(*layers)

In [4]:
class ResnetBlock(nn.Module):
    def __init__(self, conv_dim):
        super(ResnetBlock, self).__init__()
        self.conv_layer = conv(in_channels=conv_dim, out_channels=conv_dim, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        out = x + self.conv_layer(x)
        return out

In [5]:
class DCGenerator(nn.Module):
    def __init__(self, noise_size, conv_dim=32, spectral_norm=False):
        super(DCGenerator, self).__init__()

        self.conv_dim = conv_dim

        # self.linear_bn = nn.Sequential(nn.Linear(noise_size, conv_dim*4*4*4), nn.BatchNorm1d(noise_size, conv_dim*4*4*4))
        self.linear_bn = upconv(in_channels=noise_size, out_channels=conv_dim*4, kernel_size=2, stride=1, spectral_norm=spectral_norm)
        self.upconv1 = upconv(in_channels=conv_dim*4, out_channels=conv_dim*2, kernel_size=3, stride=2, padding=1, spectral_norm=spectral_norm)
        self.upconv2 = upconv(in_channels=conv_dim*2, out_channels=conv_dim, kernel_size=3, stride=2, padding=1, spectral_norm=spectral_norm)
        self.upconv3 = upconv(in_channels=conv_dim, out_channels=3, kernel_size=3, stride=2, padding=1, batch_norm=False, spectral_norm=spectral_norm)

    def forward(self, z):
        """Generates an image given a sample of random noise.

            Input
            -----
                z: BS x noise_size x 1 x 1   -->  BSx100x1x1 (during training)

            Output
            ------
                out: BS x channels x image_width x image_height  -->  BSx3x32x32 (during training)
        """
        batch_size = z.size(0)
        out = F.relu(self.linear_bn(z)).view(-1, self.conv_dim*4, 4, 4)    # BS x 128 x 4 x 4
        out = F.relu(self.upconv1(out))  # BS x 64 x 8 x 8
        out = F.relu(self.upconv2(out))  # BS x 32 x 16 x 16
        out = torch.tanh(self.upconv3(out))  # BS x 3 x 32 x 32
        
        out_size = out.size()
        if out_size != torch.Size([batch_size, 3, 32, 32]):
            raise ValueError("expect {} x 3 x 32 x 32, but get {}".format(batch_size, out_size))
        return out

In [6]:
G = DCGenerator(noise_size=100, conv_dim=32)
x = torch.rand([4, 100, 1, 1], dtype=torch.float32)
g = G.forward(x)
g.shape

torch.Size([4, 3, 32, 32])

In [7]:
class DCDiscriminator(nn.Module):

    def __init__(self, conv_dim=32, spectral_norm=False):
        super(DCDiscriminator, self).__init__()

        self.conv1 = conv(in_channels=3, out_channels=conv_dim, kernel_size=5, stride=2, spectral_norm=spectral_norm)
        self.conv2 = conv(in_channels=conv_dim, out_channels=conv_dim*2, kernel_size=5, stride=2, spectral_norm=spectral_norm)
        self.conv3 = conv(in_channels=conv_dim*2, out_channels=conv_dim*4, kernel_size=5, stride=2, spectral_norm=spectral_norm)
        self.conv4 = conv(in_channels=conv_dim*4, out_channels=1, kernel_size=5, stride=2, padding=1, batch_norm=False, spectral_norm=spectral_norm)

    def forward(self, x):
        batch_size = x.size(0)

        out = F.relu(self.conv1(x))   # BS x 32 x 16 x 16
        out = F.relu(self.conv2(out))    # BS x 64 x 8 x 8
        out = F.relu(self.conv3(out))    # BS x 128 x 4 x 4
        out = torch.sigmoid(self.conv4(out)).squeeze()    # BS x 1 x 1 x 1
        out_size = out.size()

        if out_size != torch.Size([batch_size,]):
            raise ValueError("expect {} x 1, but get {}".format(batch_size, out_size))
        return out

In [8]:
x = torch.rand([4, 3, 32, 32], dtype=torch.float32)
D = DCDiscriminator(32)
d = D.forward(x)
d.shape
d

tensor([0.4967, 0.4979, 0.4988, 0.5016], grad_fn=<SqueezeBackward0>)

# Utils

In [9]:
import os
from torch.autograd import Variable
import torch

                
def to_var(tensor, cuda=False):
    """Wraps a Tensor in a Variable, optionally placing it on the GPU.

        Arguments:
            tensor: A Tensor object.
            cuda: A boolean flag indicating whether to use the GPU.

        Returns:
            A Variable object, on the GPU if cuda==True.
    """
    if cuda:
        return Variable(tensor.cuda())
    else:
        return Variable(tensor)

    
def to_data(x):
    """Converts variable to numpy."""
    if torch.cuda.is_available():
        x = x.cpu()
    return x.data.numpy()


def create_dir(directory):
    """Creates a directory if it doesn't already exist.
    """
    if not os.path.exists(directory):
        os.makedirs(directory)


def gan_checkpoint(iteration, G, D, opts):
    """Saves the parameters of the generator G and discriminator D.
    """
    G_path = os.path.join(opts.checkpoint_dir, f'G_{iteration}.pkl')
    D_path = os.path.join(opts.checkpoint_dir, f'D_{iteration}.pkl')
    torch.save(G.state_dict(), G_path)
    torch.save(D.state_dict(), D_path)

def load_checkpoint(opts, iteration):
    """Loads the generator and discriminator models from checkpoints.
    """
    G_path = os.path.join(opts.load, f'G_{iteration}.pkl')
    D_path = os.path.join(opts.load, f'D_{iteration}.pkl')

    G = DCGenerator(noise_size=opts.noise_size, conv_dim=opts.g_conv_dim, spectral_norm=opts.spectral_norm)
    D = DCDiscriminator(conv_dim=opts.d_conv_dim)

    G.load_state_dict(torch.load(G_path, map_location=lambda storage, loc: storage))
    D.load_state_dict(torch.load(D_path, map_location=lambda storage, loc: storage))

    if torch.cuda.is_available():
        G.cuda()
        D.cuda()
        print('Models moved to GPU.')

    return G, D


def gan_save_samples(G, fixed_noise, iteration, opts):
    generated_images = G(fixed_noise)
    generated_images = to_data(generated_images)
    # save images in sample dir

def create_model(opts):
    """Builds the generators and discriminators.
    """
    ### GAN
    G = DCGenerator(noise_size=opts.noise_size, conv_dim=opts.g_conv_dim, spectral_norm=opts.spectral_norm)
    D = DCDiscriminator(conv_dim=opts.d_conv_dim, spectral_norm=opts.spectral_norm)

    if torch.cuda.is_available():
        G.cuda()
        D.cuda()
        print('Models moved to GPU.')
    return G, D

def sample_noise(batch_size, dim):
    """
    Generate a PyTorch Tensor of uniform random noise.

    Input:
    - batch_size: Integer giving the batch size of noise to generate.
    - dim: Integer giving the dimension of noise to generate.

    Output:
    - A PyTorch Tensor of shape (batch_size, dim, 1, 1) containing uniform
      random noise in the range (-1, 1).
    """
    return to_var(torch.rand(batch_size, dim) * 2 - 1).unsqueeze(2).unsqueeze(3)


# Dataset

Download link : https://drive.google.com/file/d/1EW93WrocQ6gKXRB28QbT8C-HSKOgWhY1/view

In [10]:
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision import transforms

def rgba_to_rgb(image):
    # Remove the alpha channel (assuming it's the fourth channel)
    rgb_image = image[:3, :, :]
    return rgb_image

def get_emoji_loader(train_path, test_path, batch_size, image_size):
    transform = transforms.Compose([
                    transforms.Resize(image_size),
                    transforms.ToTensor(),
                    rgba_to_rgb,  # Add the custom RGBA to RGB conversion function
                    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
                ])

    train_dataset = datasets.ImageFolder(train_path, transform)
    test_dataset = datasets.ImageFolder(test_path, transform)

    train_dloader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    test_dloader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=4)

    return train_dloader, test_dloader

# Train

In [11]:
import torch.optim as optim
import torch
import matplotlib.pyplot as plt
import os
import torch.nn.parallel
import torch.utils.data
import torchvision.utils as vutils

In [12]:
def gan_training_loop(dataloader, test_dataloader, opts):
    G, D = create_model(opts)
    
    g_params = G.parameters()
    d_params = D.parameters()
    
    g_optimizer = optim.Adam(g_params, opts.lr, betas=(opts.beta1, opts.beta2))
    d_optimizer = optim.Adam(d_params, opts.lr * 2., betas=(opts.beta1, opts.beta2))

    train_iter = iter(dataloader)
    # test_iter = iter(test_dataloader)

    # fixed_noise = sample_noise(opts.batch_size, opts.noise_size)
    fixed_noise = torch.randn(opts.batch_size, opts.noise_size, 1, 1)

    iter_per_epoch = len(train_iter)
    # total_train_iters = opts.train_iters

    G_losses = []
    D_losses = []

    print("Starting Training Loop...")

    try:
        for epoch in range(opts.epoch):
            for iteration in range(1, opts.train_iters + 1):

                # Reset data_iter for each epoch
                if iteration % iter_per_epoch == 0:
                    train_iter = iter(dataloader)

                real_images, _ = next(train_iter)
                real_images = to_var(real_images)

                for d_i in range(opts.d_train_iters):
                    d_optimizer.zero_grad()

                    # 1. Compute the discriminator loss on real images
                    output = D(real_images).view(-1)
                    D_real_loss = torch.mean((output - 1) ** 2) / 2
                    D_real_loss.backward()

                    # 2. Sample noise
                    noise = torch.randn(opts.batch_size, opts.noise_size, 1, 1)

                    # 3. Generate fake images from the noise
                    fake_images = G(noise)

                    # 4. Compute the discriminator loss on the fake images
                    output = D(fake_images.detach()).view(-1)
                    D_fake_loss = torch.mean(output ** 2) / 2
                    D_fake_loss.backward()

                    # Update the weights of the discriminator model.
                    D_total_loss = D_real_loss + D_fake_loss
                    d_optimizer.step()


                # Initialize the generator model gradient.
                g_optimizer.zero_grad()

                # 1. Sample noise
                noise = torch.randn(opts.batch_size, opts.noise_size, 1, 1)

                # 2. Generate fake images from the noise
                fake_images = G(noise)

                # 3. Compute the generator loss
                output = D(fake_images).view(-1)
                G_loss = torch.mean((output - 1) ** 2)
                G_loss.backward()
                g_optimizer.step()
                    
                # Save Losses for plotting later
                G_losses.append(G_loss.item())
                D_losses.append(D_total_loss.item())
        
            if epoch % opts.log_step == 0:
                with torch.no_grad():
                    generated_images = G(fixed_noise)
                    grid = vutils.make_grid(generated_images, nrow=8, normalize=True)

                    plt.figure(figsize=(20,30))
                    plt.imshow(grid.permute(1, 2, 0).cpu().numpy())
                    plt.title(f'Generated Images in epoch {epoch}')
                    plt.axis('off')
                    plt.show()

    except KeyboardInterrupt:
        print('Exiting early from training.')

    plt.figure(figsize=(10,5))
    plt.title("Generator and Discriminator Loss During Training")
    plt.plot(G_losses,label="G")
    plt.plot(D_losses,label="D")
    plt.xlabel("iterations")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

In [13]:
def train(opts):
    dataloader_X, test_dataloader_X = get_emoji_loader(opts.train_path, opts.test_path, opts.batch_size, opts.image_size)

    # create_dir(opts.checkpoint_dir)
    # create_dir(opts.sample_dir)

    gan_training_loop(dataloader_X, test_dataloader_X, opts)

In [14]:
class Options:
    pass

# Load config file
args = Options()

args.train_path = './emojis/'
args.test_path = './emojis/'
args.batch_size = 8
args.image_size = 32
# args.checkpoint_dir = './dir/checkpoint/'
# args.sample_dir = './dir/sample/'
args.lr = 3.e-5
args.beta1 = 0.5
args.beta2 = 0.999
args.noise_size = 100
args.train_iters = 297
args.d_train_iters = 5
args.epoch = 300
args.log_step = 50
# args.sample_every = 10
# args.checkpoint_every = 10
args.d_conv_dim = 32
args.g_conv_dim = 32
args.spectral_norm = False

In [None]:
# spectral_norm = False
args.spectral_norm = False

if __name__ == '__main__':
    train(args)

In [None]:
# spectral_norm = True
args.spectral_norm = True

if __name__ == '__main__':
    train(args)