In [None]:
import torch
import dcgan
import data
%matplotlib inline
import matplotlib.pyplot as plt
import torchvision
import numpy as np
from tqdm.auto import tqdm

image_folder_path = r'D:\Python\gan\emoji_clean_4'
desired_image_size = (64, 64)

In [None]:
dataset = data.load_dataset(image_folder_path, desired_image_size)
dataloader = data.get_dataloader(dataset, 64)

def show_images(img_folder_path, num_samples=32, cols=8):
    """ 
    Plots some samples from the dataset 
    """
    dataset = torchvision.datasets.ImageFolder(img_folder_path)
    fig = plt.figure(figsize=(15,9))
    fig.patch.set_alpha(0) 
    axs = fig.axes
    for ax in axs:
        ax.remove()
    for i, img in enumerate(dataset):
        if i == num_samples:
            break
        ax = plt.subplot(int(num_samples/cols + 1), cols, i + 1)
        ax.axis("off")
        plt.imshow(img[0])

show_images(image_folder_path)

In [None]:
generator = dcgan.Generator()
discriminator = dcgan.Discriminator()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
generator = generator.to(device)
discriminator = discriminator.to(device)

In [None]:
# Initialize the ``BCELoss`` function
criterion = torch.nn.BCELoss()
l1_loss = torch.nn.L1Loss()

# Create batch of latent vectors that we will use to visualize
#  the progression of the generator
fixed_noise = torch.randn(64, 100, 1, 1, device=device)

# Establish convention for real and fake labels during training
real_label = 1.
fake_label = 0.

lr = 0.0002
beta1 = 0.5

# Setup Adam optimizers for both G and D
optimizerD = torch.optim.AdamW(generator.parameters(), lr=lr, betas=(beta1, 0.999))
optimizerG = torch.optim.AdamW(discriminator.parameters(), lr=lr, betas=(beta1, 0.999))

In [None]:
# Training Loop

# Lists to keep track of progress
img_list = []
G_losses = []
D_losses = []
iters = 0

In [None]:
def save(PATH):
    torch.save(
        {
            'generator_weights' : generator.state_dict(),
            'discriminator_weights' : discriminator.state_dict(),
            'generator_optimizer_weights' : optimizerG.state_dict(),
            'discriminator_optimizer_weights' : optimizerD.state_dict(),
        },
        PATH
    )

def load(PATH):
    checkpoint = torch.load(PATH)
    generator = dcgan.Generator()
    discriminator = dcgan.Discriminator()
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    generator = generator.to(device)
    discriminator = discriminator.to(device)
    optimizerD = torch.optim.Adam(generator.parameters(), lr=lr, betas=(beta1, 0.999))
    optimizerG = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(beta1, 0.999))
    generator.load_state_dict(checkpoint['generator_weights'])
    discriminator.load_state_dict(checkpoint['discriminator_weights'])
    optimizerG.load_state_dict(checkpoint['generator_optimizer_weights'])
    optimizerD.load_state_dict(checkpoint['discriminator_optimizer_weights'])
    return generator, discriminator, optimizerG, optimizerD

In [None]:
print("Starting Training Loop...")
num_epochs = 500
already_completed = 0
update_discriminator_every = 1
update_generator_every = 3
# For each epoch
generator.train()
discriminator.train()

def wasserstein_loss(y_pred, y_true):
    return torch.mean(torch.abs(y_true - y_pred))

for epoch in tqdm(range(1, 1+num_epochs)):
    # For each batch in the dataloader
    
    for i, data in enumerate(dataloader, 0):

        ############################
        # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
        ###########################
        ## Train with all-real batch
        discriminator.zero_grad()
        # Format batch
        real_cpu = data[0].to(device)
        b_size = real_cpu.size(0)
        label = torch.full((b_size,), real_label, dtype=torch.float, device=device)
        # Forward pass real batch through D
        output = discriminator(real_cpu).view(-1)
        # Calculate loss on all-real batch
        errD_real = wasserstein_loss(output, label)
        # Calculate gradients for D in backward pass
        errD_real.backward()
        D_x = output.mean().item()

        if i % update_discriminator_every == 0:
            ## Train with all-fake batch
            # Generate batch of latent vectors
            noise = torch.randn(b_size, 100, 1, 1, device=device)
            # Generate fake image batch with G
            fake = generator(noise)
            label.fill_(fake_label)
            # Classify all fake batch with D
            output = discriminator(fake.detach()).view(-1)
            # Calculate D's loss on the all-fake batch
            # errD_fake = criterion(output, label)
            errD_fake = wasserstein_loss(output, label)
            # Calculate the gradients for this batch, accumulated (summed) with previous gradients
            errD_fake.backward()
            D_G_z1 = output.mean().item()
            # Compute error of D as sum over the fake and the real batches
            errD = errD_real + errD_fake
            # Update D
            optimizerD.step()

        ############################
        # (2) Update G network: maximize log(D(G(z)))
        ###########################
        if i % update_generator_every == 0:
            generator.zero_grad()
            label.fill_(real_label)  # fake labels are real for generator cost
            # Since we just updated D, perform another forward pass of all-fake batch through D
            output = discriminator(fake).view(-1)
            # Calculate G's loss based on this output
            errG = -wasserstein_loss(output, label)
            # Calculate gradients for G
            errG.backward()
            D_G_z2 = output.mean().item()
            # Update G
            optimizerG.step()

        # # Output training stats
        # if i % 400 == 0:
        #     print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
        #           % (epoch, num_epochs, i, len(dataloader),
        #              errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))

        # Save Losses for plotting later
        G_losses.append(errG.item())
        D_losses.append(errD.item())

        # Check how the generator is doing by saving G's output on fixed_noise
        if (iters % 500 == 0) or ((epoch == num_epochs-1) and (i == len(dataloader)-1)):
            with torch.no_grad():
                fake = generator(fixed_noise).detach().cpu()
            img_list.append(torchvision.utils.make_grid(fake, padding=2, normalize=True))
        iters += 1

    print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
            % (epoch, num_epochs, i, len(dataloader),
            errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))
    
    save(r"D:\Python\gan\models\emoji_3\\" + str(already_completed+epoch) + "_epochs.pt")

    # Plot the fake images from the last epoch
    plt.subplot(1,2,2)
    plt.axis("off")
    plt.title("Fake Images")
    plt.imshow(np.transpose(img_list[-1],(1,2,0)))
    plt.show()



In [None]:
generator, discriminator, _, _ = load(r'D:\Python\gan\models\emoji_1\110_epochs.pt')

In [None]:
# Grab a batch of real images from the dataloader
real_batch = next(iter(dataloader))
generator.eval()

# Plot the real images
plt.figure(figsize=(15,15))
plt.subplot(1,2,1)
plt.axis("off")
plt.title("Real Images")
plt.imshow(np.transpose(torchvision.utils.make_grid(real_batch[0].to(device)[:64], padding=5, normalize=True).cpu(),(1,2,0)))

generator = generator.to(torch.device('cpu'))
plt.subplot(1,2,2)
plt.axis("off")
plt.title("Fake Images")
plt.imshow(np.transpose(torchvision.utils.make_grid(generator(torch.randn(64, 100, 1, 1)), padding=2, normalize=True),(1,2,0)))
generator = generator.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
plt.show()

