In [None]:
import torch
from torch import nn, optim
from torch.autograd.variable import Variable
import matplotlib.pyplot as plt
import random
from torch.utils.data import Dataset, DataLoader
import numpy as np
from torch import from_numpy
import os
from PIL import Image

In [None]:
# Init Vars
model_path = 'Simpsons_train3/'
dataset_path = 'image_dataset/Simpsons/perfect-noback/'

dim = 64
dim_noise = 120

batchsize = 32
num_epochs = 5000

In [None]:
# Load and Resize Images

resize = (dim, dim)
data_img = []
for f in os.listdir(dataset_path):
    img = Image.open(dataset_path+f) # image extension *.png,*.jpg
    new_width  = resize[0]
    new_height = resize[1]
    img = img.resize((new_width, new_height), Image.LANCZOS)
    array_image = np.array(img)[:, :, :3]
    data_img.append(np.array(array_image).transpose(2,0,1))
    
# Normalize data [-1, 1]
data_x = (np.array(data_img)-127.5)/127.5

print('ALL DATA SHAPE:')
print(data_x.shape)

In [None]:
# Init Dataset and DataLoader
class MyDataset(Dataset):
    def __init__(self):
        self.len = data_x.shape[0]
        self.x_data = np.float32(data_x)
        self.y_data = from_numpy(data_x[:, [0], [0]])

    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]

    def __len__(self):
        return self.len

dataset = MyDataset()
my_data_loader = DataLoader(dataset=dataset, batch_size=batchsize, shuffle=True)
print('NUMBER OF BATCHES')
print(len(my_data_loader))

# ONE BATCH
images, labels = next(iter(my_data_loader))
print(images.shape)

In [None]:
# Plot Functions

def plot_images(images):
  figure = plt.figure(figsize=(3, 3))
  cols, rows = 3, 3
  
  for i in range(1, cols * rows + 1):
    if len(images) > cols*rows:
      rand_i = i - 1
    else:
      rand_i = random.randint(0, len(images)-1)
    img = images[rand_i]
    figure.add_subplot(rows, cols, i)
    plt.axis("off")
    img = img * 127.5 + 127.5
    plt.imshow(np.rot90(img.T, 3).astype(np.uint8), cmap="PRGn")

def plot_image(img):
    print(img.shape)
    figure = plt.figure(figsize=(3, 3))
    img = img * 127.5 + 127.5
    plt.imshow(np.rot90(img.T, 3).astype(np.uint8), cmap="PRGn")

# TEST IMAGES
for i in range(2):
  print(i)
  batchh = next(iter(my_data_loader))
  plot_images(batchh[0])
  plt.show()
batchh = next(iter(my_data_loader))
plot_image(batchh[0][3])


In [None]:
# Init Generator and Discriminator Nets

class DiscriminatorNet(torch.nn.Module):
    """
    A three hidden-layer discriminative neural network
    """
    def __init__(self):
        super(DiscriminatorNet, self).__init__()
        n_features = dim*dim*3
        n_out = 1
        factor_n = 0.5
        
        self.hidden0 = nn.Sequential( 
            nn.Linear(n_features, int(factor_n*1024)),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3)
        )
        self.hidden1 = nn.Sequential(
            nn.Linear(int(factor_n*1024), int(factor_n*512)),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3)
        )
        self.hidden2 = nn.Sequential(
            nn.Linear(int(factor_n*512), int(factor_n*256)),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3)
        )
        self.out = nn.Sequential(
            torch.nn.Linear(int(factor_n*256), n_out),
            torch.nn.Sigmoid()
        )

    def forward(self, x):
        x = self.hidden0(x)
        x = self.hidden1(x)
        x = self.hidden2(x)
        x = self.out(x)
        return x

class GeneratorNet(torch.nn.Module):
    """
    A three hidden-layer generative neural network
    """
    def __init__(self):
        super(GeneratorNet, self).__init__()
        n_features = dim_noise
        n_out = dim*dim*3
        factor_n = 1
        
        self.hidden0 = nn.Sequential(
            nn.Linear(n_features, int(256*factor_n)),
            nn.LeakyReLU(0.2)
        )
        self.hidden1 = nn.Sequential(            
            nn.Linear(int(256*factor_n), int(512*factor_n)),
            nn.LeakyReLU(0.2)
        )
        self.hidden2 = nn.Sequential(
            nn.Linear(int(512*factor_n), int(1024*factor_n)),
            nn.LeakyReLU(0.2)
        )
        
        self.out = nn.Sequential(
            nn.Linear(int(1024*factor_n), n_out),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.hidden0(x)
        x = self.hidden1(x)
        x = self.hidden2(x)
        x = self.out(x)
        return x

# Instantiate Generator and Discriminator
generator = GeneratorNet()
discriminator = DiscriminatorNet()

# Optimizers
d_optimizer = optim.Adam(discriminator.parameters(), lr=0.00015)
g_optimizer = optim.Adam(generator.parameters(), lr=0.00015)

# Loss fucntion
loss = nn.BCELoss()


In [None]:
# Random noise sampler
def noise(size):
    n = Variable(torch.randn(size, dim_noise))
    return n

# Testing noise sampler
test_noise = noise(9)
test_noise.shape

In [None]:
# Convert images to vectors and vice versa
def images_to_vectors(images):
    return images.view(images.size(0), dim*dim*3)
def vectors_to_images(vectors):
    return vectors.view(vectors.size(0), 3, dim, dim)

# Target vectors
def ones_target(size):
    data = Variable(torch.ones(size, 1))
    return data
def zeros_target(size):
    data = Variable(torch.zeros(size, 1))
    return data

In [None]:
# TRAIN FUNCTIONS

# Discriminator Trainer
def train_discriminator(optimizer, real_data, fake_data):
    N = real_data.size(0)
    # Reset gradients
    optimizer.zero_grad()
    
    # 1.1 Train on Real Data
    prediction_real = discriminator(real_data)
    # Calculate error and backpropagate
    error_real = loss(prediction_real, ones_target(N))
    error_real.backward()

    # 1.2 Train on Fake Data
    prediction_fake = discriminator(fake_data)
    # Calculate error and backpropagate
    error_fake = loss(prediction_fake, zeros_target(N))
    error_fake.backward()
    
    # 1.3 Update weights with gradients
    optimizer.step()
    
    # Return error and predictions for real and fake inputs
    return error_real + error_fake, prediction_real, prediction_fake

# Generator Trainer
def train_generator(optimizer, fake_data):
    N = fake_data.size(0)

    # Reset gradients
    optimizer.zero_grad()

    # Sample noise and generate fake data
    prediction = discriminator(fake_data)

    # Calculate error and backpropagate
    error = loss(prediction, ones_target(N))
    error.backward()

    # Update weights with gradients
    optimizer.step()

    # Return error
    return error

In [None]:
# Create TRAIN Directories
import pickle
import os

result_img_path = model_path+'train_images/'
if not os.path.exists(model_path):
  os.makedirs(model_path)
if not os.path.exists(result_img_path):
  os.makedirs(result_img_path)
print(f'TRAIN DIRECTORY: {model_path}')

In [None]:
# Training
for epoch in range(num_epochs):
    print(f' ************** EPOCH {epoch} ************** ')
    for n_batch, (real_batch,_) in enumerate(my_data_loader):
        N = real_batch.size(0)

        # 1. Train Discriminator
        real_data = Variable(images_to_vectors(real_batch))

        # Generate fake data and detach 
        # (so gradients are not calculated for generator)
        fake_data = generator((noise(N).detach()))

        # Train D
        d_error, d_pred_real, d_pred_fake = \
              train_discriminator(d_optimizer, real_data, fake_data)

        # 2. Train Generator
        # Generate fake data
        fake_data = generator(noise(N))

        # Train G
        g_error = train_generator(g_optimizer, fake_data)

        
    # SAVE AND SHOW DATA
    if epoch % 1 == 0:
        test_images = vectors_to_images(generator(test_noise))
        test_images = test_images.data
        plot_images(test_images)
        plt.savefig(result_img_path+str(epoch)+'_epoch.png')
        plt.show()
        pickle.dump(generator, open(model_path+'generator.model', 'wb'))
        print('DISCRIMINATOR ERROR')
        print(d_error)
        print('GENERATOR ERROR')
        print(g_error)