<h1>WGANGP

https://arxiv.org/pdf/1704.00028.pdf

<h2> Download von "Opendatasets" und "Torchsummary"

In [1]:
!pip install opendatasets
!pip install torchsummary

Collecting torchsummary
  Using cached torchsummary-1.5.1-py3-none-any.whl (2.8 kB)
Installing collected packages: torchsummary
Successfully installed torchsummary-1.5.1


<h2> Importieren der Pakete

In [2]:
import torch
import torch.nn.functional as F
from tqdm.auto import tqdm
from tkinter.tix import IMAGE
from matplotlib import image
from torchvision.utils import save_image  # Speichern von Bildern
import torch.optim as optim  # Optimierungs-Algorithmen
import torch.nn as nn  # Neuronales Netz
from torchvision.utils import make_grid
import matplotlib.pyplot as plt  # plotten von Grafen/ Bildern
import torchvision.transforms as transforms  # Transformieren von Bildern
import torchvision.datasets as ImageFolder
import torch.utils.data as DataLoader
from torchvision import datasets
import torchvision
import torch as t
import numpy as np
import os                 # Dient zum lokalen Speichern des Datasets
import opendatasets as od
from random import random, weibullvariate
from torch.autograd import Variable
import torch.autograd as autograd
from torchsummary import summary

<h2> Definition der Parameter/ Variablen 

In [3]:
IMAGE_SIZE = 64  # Größe der Bilder
BATCH_SIZE = 64  # Anzahl der Batches
WORKERS = 2  # Anzahl der Kerne beim Arbeiten auf der GPU
# Normalisierung mit 0.5 Mittelwert und Standardabweichung für alle drei Channels der Bilder
NORM = (0.5, 0.5, 0.5), (0.5, 0.5, 0.5)
WORKERS = 2  # Anzahl der Kerne beim Arbeiten auf der GPU
NUM_EPOCH = 4  # Anzahl der Epochen
LR = 1e-4  # Learningrate
LATENT_SIZE = 100  # Radom Input für den Generator
N_CRITIC = 5
LAMBDA_GP = 10  # Penalty Koeffizient
no_of_channels = 3
cur_step = 0
display_step = 500

<h2> Download des Datasets von Kaggle 

In [6]:
# Anlegen eines Ordners für Bilder
data_dir = '../data/'
os.makedirs(data_dir, exist_ok=True)

# Erklärung zum Umgang mit Opendata und Kaggle - https://pypi.org/project/opendatasets/
# Datensatz:Anime-Faces werden von Kaggle geladen
# Hierfür wird der User-API-KEY benötigt
# APIKEY {"username":"kimmhl","key":"f585163b4ee30f0a5b44b1a902dc56e6"}
dataset_url = 'https://www.kaggle.com/splcher/animefacedataset'
# Images werden in './animefacedataset' gespeichert
od.download(dataset_url, data_dir)

# zeigt Ordner unter "../data/" an
print(os.listdir(data_dir))  

# gibt 10 Bezeichnungen von Bildern aus (Prüfung ob Bilder geladen worden)
print(os.listdir(data_dir+'animefacedataset/images')[:10])

Skipping, found downloaded files in "../data/animefacedataset" (use force=True to force download)
['animefacedataset', '.gitkeep', 'outputs']
['4426_2003.jpg', '38921_2012.jpg', '55591_2016.jpg', '8777_2004.jpg', '56274_2017.jpg', '24208_2008.jpg', '13759_2006.jpg', '19302_2007.jpg', '14698_2006.jpg', '30569_2010.jpg']


<h2> Vorbereiten& Erstellen des Dataloaders

In [7]:
# Transformer
transform = transforms.Compose([
    # Resize der Images auf 64 der kürzesten Seite; Andere Seite wird
    transforms.Resize(IMAGE_SIZE),
    # skaliert, um das Seitenverhältnis des Bildes beizubehalten.
    # Zuschneiden auf die Mitte des Images, sodass ein quadratisches Bild mit 64 x 64 Pixeln entsteht
    transforms.CenterCrop(IMAGE_SIZE),
    # Umwandeln in einen Tensor (Bildern in numerische Werte umwandeln)
    transforms.ToTensor(),
    # Normalisierung Mean & Standardabweichung von 0.5 für alle Channels
    # Anzahl: 3 für farbige Bilder
    # Pixelwerte liegen damit zwischen (-1;1)
    transforms.Normalize(*NORM)])          


# Dataset
"""
ImageFolder() : Befehl erwartet, dass nach Images nach labeln organisiert sind (root/label/picture.png)
"""
org_dataset = torchvision.datasets.ImageFolder(root=data_dir, transform=transform)

# Dataloader
"""
Dataloader(): ermöglicht zufällige Stichproben der Daten auszugeben;
Dient dazu, dass das Modell nicht mit dem gesamten Dataset umgehen muss > Training effizienter
"""
org_loader = t.utils.data.DataLoader(org_dataset,               # Dataset (Images)
                                     batch_size=BATCH_SIZE,     # Es wird auf Batches trainiert, damit auf Basis eines Batch-Fehlers das NN angepasst wird
                                     shuffle=True,
                                     num_workers=WORKERS)

<h2> Abfrage des Devices (CPU o. GPU) und Laden des Tensors auf das jeweilige verfügbare Device

In [8]:
# Nutzen der GPU wenn vorhanden, ansonsten CPU

def get_default_device():
    if t.cuda.is_available():     # Wenn cuda verfügbar dann:
        return t.device('cuda')   # Nutze Device = Cuda (=GPU)
    else:                         # Ansonsten
        return t.device('cpu')    # Nutze Device = CPU


# Anzeigen welches Device verfügbar ist
device = get_default_device()
print(device)

cpu


*Hilfsklasse zum Verschieben des Dataloaders "org_loader" auf das jeweilige Device*

In [9]:
class DeviceDataLoader():

    # Initialisierung
    def __init__(self, dataloader, device):
        self.dataloader = dataloader
        self.device = device

    # Anzahl der Images pro Batch
    def __len__(self):
        return len(self.dataloader)

    # Erstellt einen Batch an Tensoren nach dem Verschieben auf das Device
    def __iter__(self):
        for batch in self.dataloader:
            yield tuple(tensor.to(self.device) for tensor in batch)


# Dataloader auf dem verfügbaren Device
dataloader = DeviceDataLoader(org_loader, device)

*Randomisierter Tensor*

In [10]:
def get_noise(n_samples, noise_dim, device=device):    
    return  torch.randn(n_samples,noise_dim, 1,1,device=device)

<h2> Generator

In [11]:
class Generator(nn.Module):
    def __init__(self, no_of_channels=no_of_channels, noise_dim=LATENT_SIZE, gen_dim=IMAGE_SIZE):
      super(Generator, self).__init__()
      self.generator = nn.Sequential(
          nn.ConvTranspose2d(noise_dim, gen_dim*8, 4, 1, 0, bias=False),
          nn.BatchNorm2d(gen_dim*8),
          nn.ReLU(True),
  
          nn.ConvTranspose2d(gen_dim*8, gen_dim*4, 4, 2, 1, bias=False),
          nn.BatchNorm2d(gen_dim*4),
          nn.ReLU(True),
  
          nn.ConvTranspose2d(gen_dim*4, gen_dim*2, 4, 2, 1, bias=False),
          nn.BatchNorm2d(gen_dim*2),
          nn.ReLU(True),
          
          nn.ConvTranspose2d(gen_dim*2, gen_dim, 4, 2, 1, bias=False),
          nn.BatchNorm2d(gen_dim),
          nn.ReLU(True),
  
          nn.ConvTranspose2d(gen_dim, no_of_channels, 4, 2, 1, bias=False),
          nn.Tanh()
      )
  
    def forward(self, input):
      output = self.generator(input)
      return output

gen = Generator().to(device)

<h2> Critic/ Diskrimnator

In [13]:
class Discriminator(nn.Module):
    def __init__(self, no_of_channels=no_of_channels, disc_dim=IMAGE_SIZE):
        super(Discriminator, self).__init__()
        self.discriminator = nn.Sequential(
                
                nn.Conv2d(no_of_channels, disc_dim, 4, 2, 1, bias=False),
                nn.LeakyReLU(0.2, inplace=True),
                
                nn.Conv2d(disc_dim, disc_dim * 2, 4, 2, 1, bias=False),
                nn.InstanceNorm2d(disc_dim * 2, affine=True),
                nn.LeakyReLU(0.2, inplace=True),
                
                nn.Conv2d(disc_dim * 2, disc_dim * 4, 3, 2, 1, bias=False),
                nn.InstanceNorm2d(disc_dim * 4, affine=True),
                nn.LeakyReLU(0.2, inplace=True),
            
                nn.Conv2d(disc_dim * 4, disc_dim * 8, 3, 2, 1, bias=False),
                nn.InstanceNorm2d(disc_dim * 8, affine=True),
                nn.LeakyReLU(0.2, inplace=True),
                
                nn.Conv2d(disc_dim * 8, 1, 4, 1, 0, bias=False),
                
            )
    def forward(self, input):
        output = self.discriminator(input)
        return output.view(-1, 1).squeeze(1)
        #return output

critic =Discriminator().to(device)

<h2> Gewichtsinitialisierung von Generator und Critic/ Diskriminator

In [17]:
#  mean 0 and Standardabweichung 0.02
def w_initial(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
        torch.nn.init.normal_(m.weight, mean=0.0, std=0.02)
    if isinstance(m, nn.BatchNorm2d):
        torch.nn.init.normal_(m.weight, mean=0.0, std=0.02)
        torch.nn.init.constant_(m.bias, val=0)
gen = gen.apply(w_initial)
critic = critic.apply(w_initial)

<h2> Optimizer

In [18]:
gen_opt = torch.optim.Adam(gen.parameters(), lr=LR, betas=(0, 0.9))
critic_opt = torch.optim.Adam(critic.parameters(), lr=LR, betas=(0, 0.9))               

<h2> Gradient Penalty

In [19]:
 def gradient_penalty( critic, real_image, fake_image, device=device):
    # Übernahme der Batchsize, Channels, Höhe und Breite des realen Images
    batch_size, channel, height, width= real_image.shape
    
    # alpha radomisiert zwischen 0 und 1 gewählt
    alpha= torch.rand(batch_size,1,1,1).repeat(1, channel, height, width).to(device)
    
    # interpoliertes Bild = zufällig gewichteter Durchschnitt zwischen einem realen und einem fake Image
    interpolatted_image = (alpha*real_image) + (1-alpha) * fake_image # Alpha *echtes Bild + (1 − Alpha) * gefälschtes Bild
    
    # Berechnung des critic-scores auf einem interpolierten Bild
    interpolated_score= critic(interpolatted_image)
    
    # Gradient Interpoliertes Bild
    gradient= torch.autograd.grad(inputs=interpolatted_image,
                                  outputs=interpolated_score,
                                  retain_graph=True,
                                  create_graph=True,
                                  grad_outputs=torch.ones_like(interpolated_score)                          
                                 )[0]
    gradient = gradient.view(gradient.shape[0],-1)
    gradient_norm =  gradient.norm(2,dim=1) # Normalisierung 
    gradient_penalty = torch.mean((gradient_norm-1)**2) # Mean
    return gradient_penalty

*Hilfsfunktionen: Normalisierung von Tensoren*

In [21]:
def tensor_norm(img_tensors):
    # print (img_tensors)
    # print (img_tensors * NORM [1][0] + NORM [0][0])
    return img_tensors * NORM[1][0] + NORM[0][0]

<h2> Ordner anlegen für die vom Generator erstellten Images, Anzeigen der genierten Images (Fakes)

In [23]:
# Ordner unter "../data/" für die genierten Fake Images anlegen
dir_gen_samples = '../data/outputs/'
os.makedirs(dir_gen_samples, exist_ok=True)    

# Funktion zum Speichern der generierten Bilder    
def saves_gen_samples(idx, random_Tensor):

    # Randomisierter Tensor wird an den Generator übergeben
    fake_img = gen(random_Tensor)

    # Setzen von Bildbezeichnungen für die Fake_Images
    fake_img_name = "gen_img-{0:0=4d}.png".format(idx)

    # Tensor-Normalisierung; Speichern der Fake_Images im Ordner "Outputs/dir_gen_samples/"
    save_image(tensor_norm(fake_img), os.path.join(
        dir_gen_samples, fake_img_name), nrow=8)
    print("Gespeichert")

# Funktion zum anzeigen von Images
def display_images(image_tensor, num_images=25, size=(3, 64, 64)):

    image = image_tensor.detach().cpu().view(-1, *size) # Images Flatten  
    image_grid = make_grid(image[:num_images], nrow=5)
    plt.imshow(image_grid.permute(1, 2, 0).squeeze())
    plt.show()

<h2> Training

In [24]:
Gen_losses = []
Critic_losses = []

# Iteration über Epochen    
for epoch in range(NUM_EPOCH):
    
    # Iteration über Batches
    for real_image, _ in tqdm(dataloader):
        
        # Aktuelle Batchsize
        cur_batch_size = real_image.shape[0]

        # Real Images auf Device
        real_image = real_image.to(device)
        
        #Iteration über Critic (=Discrimiator) Anzahl
        for _ in range(N_CRITIC):
            
            # Generieren von Radom-Noise
            fake_noise = get_noise(cur_batch_size, LATENT_SIZE, device=device)
            fake = gen(fake_noise)
            
            # Trainieren des Critics (=Discriminator)
            critic_fake_pred = critic(fake).reshape(-1)
            critic_real_pred = critic(real_image).reshape(-1)
            
            # Berechnung: gradient penalty auf den realen and fake Images (Generiert durch Generator)
            gp = gradient_penalty(critic, real_image, fake, device)
            critic_loss = -(torch.mean(critic_real_pred) -
                            torch.mean(critic_fake_pred)) + LAMBDA_GP * gp
            
            # Gradient = 0 
            critic.zero_grad()
            
            # Backprop. + Aufzeichnen dynamischen Graphen 
            critic_loss.backward(retain_graph=True)
            
            # Update Optimizer
            critic_opt.step()

        # Trainieren des Generators: max E[critic(gen_fake)] <-> min -E[critic(gen_fake)]
        gen_fake = critic(fake).reshape(-1)
        gen_loss = -torch.mean(gen_fake)
        
        # Gradient = 0 
        gen.zero_grad()
        
        # Backprop.
        gen_loss.backward()
        
        # Update optimizer
        gen_opt.step()

        # Visualisierung nach Anzahl Display_Step (=500)
        if cur_step % display_step == 0 and cur_step > 0:
            
            # Ausgabe des Gen-Loss und Critic-Loss
            print(
                f"Step {cur_step}: Generator loss: {gen_loss}, critic loss: {critic_loss}")

            # Speichern des Gesamtlosses von Critic/ Diskriminator und Generator
            Critic_losses.append(critic_loss)
            Gen_losses.append(gen_loss)
            
            # Anzeigen der Fake Images
            display_images(fake)
            
            #display_images(real_image)
            
            # 
            gen_loss = 0
            critic_loss = 0

            #Speichern der Fake Images
            saves_gen_samples(cur_step, fake_noise)
        cur_step += 1 # cur_step = cur_step+1

# Darstellung Loss 
EPOCH_COUNT_G= range(1,len(Gen_losses)+1) # Anzahl der Epochen vom Gen.
EPOCH_COUNT_C= range(1,len(Critic_losses)+1) # Anzahl der Epochen vom Dis.

G_losses = [gen.item() for gen in Gen_losses ]
C_losses = [critic.item() for critic in Critic_losses ]

plt.figure(figsize=(10,5))
plt.title("LOSS: Generator und Critic/ Discriminator während dem Training")
plt.plot(EPOCH_COUNT_G, G_losses,"r-", label="Generator")
plt. plot(EPOCH_COUNT_C,C_losses,"b-", label="Crtic")
plt.xlabel("EPOCH")
plt.ylabel("LOSS")
plt.legend()
plt.show()

  0%|          | 2/994 [01:33<12:52:26, 46.72s/it]


KeyboardInterrupt: 