<a href="https://colab.research.google.com/github/avrymi-asraf/Garden-of-GAN/blob/main/1-Basic-And-Principles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Basic and Principle

## Installations and Import

In [2]:
!pip install -q torchvision plotly tqdm

In [3]:
import torch
from torch import nn, optim
import torch.utils.data.dataloader as dataloader
from torchvision import datasets, transforms
import plotly.express as px
import pandas as pd
import plotly.figure_factory as ff

In [4]:
from typing import Optional, Callable

## Simple Model

In [5]:
# Define the generator network
class Generator(nn.Module):
  def __init__(self, noise_dim, output_dim):
    super(Generator, self).__init__()
    # MLP with dense layers and activations
    self.fc1 = nn.Linear(noise_dim, 128)
    self.relu = nn.LeakyReLU(0.2)
    self.fc2 = nn.Linear(128, output_dim)

  def forward(self, z):
    x = self.fc1(z)
    x = self.relu(x)
    return self.fc2(x)

# Define the discriminator network
class Discriminator(nn.Module):
  def __init__(self, input_dim):
    super(Discriminator, self).__init__()
    # MLP with dense layers and activations
    self.fc1 = nn.Linear(input_dim, 128)
    self.relu = nn.LeakyReLU(0.2)
    self.fc2 = nn.Linear(128, 1)
    self.sigmoid = nn.Sigmoid()

  def forward(self, x):
    x = self.fc1(x)
    x = self.relu(x)
    return self.sigmoid(self.fc2(x))


In [12]:
def gaussians(
    batch_size: int = 64,
    mean: Optional[torch.Tensor] = None,
    var: Optional[torch.Tensor] = None,
) -> Callable[[], torch.Tensor]:
    """creates a function that returns a tensor of gaussian distributed numbers

    Args:
        batch_size (int, optional): batch size. Defaults to 64.
        mean (Optional[torch.Tensor], optional): if determinte the mean or determin randomly. Defaults to None.
        var (Optional[torch.Tensor], optional): if determine the var or randomly. Defaults to None.

    Returns:
        Callable[[],torch.Tensor]: callable function that returns a tensor of gaussian distributed numbers
    """
    mean = var if mean else torch.rand(1) * torch.randint(-5, 5, [1])
    var = var if var else torch.rand(1) * torch.randint(0, 10, [1])
    return lambda: (torch.randn(batch_size) * var + mean).unsqueeze(1)

In [15]:
f1, f2 = gaussians(), gaussians()
print(f1().shape)
ff.create_distplot([f1().squeeze().tolist(), f2().squeeze().tolist()], ["1", "2"])

torch.Size([64, 1])


In [19]:
# Create the models
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch_size = 64
noise_dim = 100
output_dim = 1
generator = Generator(noise_dim, output_dim).to(device)
discriminator = Discriminator(output_dim).to(device)
sampler = gaussians(batch_size)
# Define loss functions and optimizers
loss_function_D = nn.BCELoss()
loss_function_G = nn.BCELoss()
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=0.0002)
optimizer_G = torch.optim.Adam(generator.parameters(), lr=0.0002)

In [20]:
from IPython.display import clear_output

# Training loop
for epoch in range(10000):
    # Sample real data and noise
    real_data = sampler().to(device)
    noise = torch.randn(batch_size, noise_dim).to(device)

    # Train discriminator
    real_labels = torch.ones((batch_size, 1)).to(device)
    fake_labels = torch.zeros((batch_size, 1)).to(device)

    # Real data pass
    discriminator_output = discriminator(real_data.float())
    D_real_loss = loss_function_D(discriminator_output, real_labels)

    # Fake data pass
    fake_data = generator(noise)
    discriminator_output = discriminator(fake_data)
    D_fake_loss = loss_function_D(discriminator_output, fake_labels)

    # Total discriminator loss
    D_loss = D_real_loss + D_fake_loss

    # Reset and update discriminator
    optimizer_D.zero_grad()
    D_loss.backward()
    optimizer_D.step()

    # Train generator
    fake_labels = torch.ones((batch_size, 1)).to(device)
    generator_output = generator(noise)
    G_loss = loss_function_G(discriminator(generator_output), fake_labels)

    # Reset and update generator
    optimizer_G.zero_grad()
    G_loss.backward()
    optimizer_G.step()

    # Print losses and visualize progress (optional)
    if epoch % 500 == 0:
        clear_output(wait=True)
        print("Epoch:", epoch, "D_loss:", D_loss.item(), "G_loss:", G_loss.item())
        ff.create_distplot(
            [
                sampler().view((batch_size)).tolist(),
                generator_output.view((batch_size)).cpu().detach().tolist(),
            ],
            ["rael", "generator"],
        ).show()

Epoch: 9500 D_loss: 1.3900351524353027 G_loss: 0.6880781650543213


In [None]:
class Generator(nn.Module):
    def __init__(self, letant_dim: int, im_dim):
        super().__init__()
        self.im_dim = im_dim
        self.len_im = im_dim[0] * im_dim[1]
        self.letant_dim = letant_dim
        self.model = nn.Sequential(
            nn.Linear(letant_dim, 256),
            nn.LeakyReLU(),
            nn.Linear(256, self.len_im),
            nn.Tanh(),
        )

    def forward(self, X):
        return self.model(X.view((-1, self.letant_dim))).view((-1, *self.im_dim))


class Discrimnator(nn.Module):
    def __init__(self, im_dim) -> None:
        super().__init__()
        self.im_dim = im_dim
        self.len_im = im_dim[0] * im_dim[1]
        self.model = nn.Sequential(
            nn.Linear(self.len_im, 128), nn.LeakyReLU(), nn.Linear(128, 1), nn.Sigmoid()
        )

    def forward(self, X):
        return self.model(X.view(-1, self.len_im)).view(-1)

In [None]:
transfomer = transforms.Compose([transforms.ToTensor(), transforms.Normalize(0.5, 0.5)])
mnist_data = datasets.MNIST("/dataset", download=True, transform=transfomer)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
letant_dim = 100
im_dim = (28, 28)
generator = Generator(letant_dim, im_dim).to(device)
discrimnator = Discrimnator(im_dim).to(device)

In [None]:
lr = 3e-4
optim_g = optim.Adam(generator.parameters(), lr=lr)
optim_d = optim.Adam(discrimnator.parameters(), lr=lr)
loss_f = nn.BCELoss()

In [None]:
from math import ceil

num_epochs = 32
batch_size = 64
out_data = pd.DataFrame(
    {"epoch": pd.NA, "batch": pd.NA, "loss_g": pd.NA, "loss_d": pd.NA},
    index=range(num_epochs * ceil(len(mnist_data) / batch_size)),
)

In [None]:
ind_out_data = 0
for epoch_ind in range(num_epochs):
    loader = dataloader.DataLoader(mnist_data, batch_size=batch_size, shuffle=True)
    for i_batch, (X, _) in enumerate(loader):
        optim_g.zero_grad()
        optim_d.zero_grad()
        noise = torch.rand(X.shape[0], letant_dim).to(device)
        fake = generator(noise)
        loss_d_fake = loss_f(discrimnator(fake), torch.zeros(fake.shape[0]).to(device))
        loss_d_real = loss_f(
            discrimnator(X.to(device)), torch.ones(X.shape[0]).to(device)
        )
        loss_d = (loss_d_fake + loss_d_real) / 2
        loss_d.backward(retain_graph=True)
        optim_d.step()

        loss_g = loss_f(discrimnator(fake), torch.ones(fake.shape[0]).to(device))
        loss_g.backward()
        optim_g.step()
        out_data.loc[ind_out_data] = [epoch_ind, i_batch, loss_g.item(), loss_d.item()]
        ind_out_data += 1
    with torch.no_grad():
        noise = torch.rand(10, letant_dim)
        im = generator(noise.to(device))
        clear_output(wait=True)
        px.imshow(im.cpu().detach(), facet_col=0, facet_col_wrap=5).show()
        px.line(out_data, x="batch", y="loss_g", color="epoch").show()

In [None]:
from math import ceil
from IPython.display import clear_output

num_epochs = 20
batch_size = 32
out_data = pd.DataFrame(
    {"epochs": pd.NA, "batch": pd.NA, "loss_g": pd.NA, "loss_d": pd.NA},
    index=range(num_epochs * ceil(len(mnist_data) / batch_size)),
)
index_data = 0
for epoch in range(num_epochs):
    loader = dataloader.DataLoader(mnist_data, batch_size=batch_size, shuffle=True)
    for batch_i, (X, _) in enumerate(loader):
        curr_batch_size = X.shape[0]

        ### Train Discriminator: max log(D(x)) + log(1 - D(G(z)))
        noise = torch.randn(curr_batch_size, letant_dim).to(device)
        fake = generator(noise)
        disc_real = discrimnator(X.to(device)).view(-1)
        lossD_real = loss_f(disc_real, torch.ones_like(disc_real).to(device))
        disc_fake = discrimnator(fake).view(-1)
        lossD_fake = loss_f(disc_fake, torch.zeros_like(disc_fake).to(device))
        lossD = (lossD_real + lossD_fake) / 2
        optim_d.zero_grad()
        lossD.backward(retain_graph=True)
        optim_d.step()

        ### Train Generator: min log(1 - D(G(z))) <-> max log(D(G(z))
        # where the second option of maximizing doesn't suffer from
        # saturating gradients
        output = discrimnator(fake).view(-1)
        lossG = loss_f(output, torch.ones_like(output).to(device))
        optim_g.zero_grad()
        lossG.backward()
        optim_g.step()

        out_data.loc[index_data] = [epoch, batch_i, lossG.item(), lossD.item()]
        index_data += 1

    with torch.no_grad():
        fake = generator(torch.rand(10, 100).to(device))
        clear_output(wait=True)
        px.imshow(fake.cpu().detach(), facet_col=0, facet_col_wrap=5).show()
        px.line(out_data, x="batch", y=["loss_g"], color="epochs").show()

In [None]:
!pip install -q torchvision plotly black[jupyter] tqdm

In [None]:
import torch
from torch import nn, optim
from torch.utils.data import dataloader
from torchvision import transforms, datasets
from plotly import express as px, graph_objects as go
import pandas as pd
import numpy as np
import os

In [None]:
# from google.colab import drive

# drive.mount("/content/drive")

In [None]:
# !black '/content/drive/MyDrive/gan_for_Day/chang_train_ratio_6/update_in_run_time_6.ipynb'
# !black '/content/drive/MyDrive/Colab Notebooks/gan_for_Day/chang_train_ratio_6/update_in_run_time_6.ipynb'

In [None]:
from typing import Tuple, List

ImageDimType = Tuple[int, int]
ImageType = torch.Tensor
TensorType = torch.Tensor
DataSetVisionType = datasets.vision.VisionDataset
LossFunctionType = nn.modules.loss._WeightedLoss
OptimizerType = optim.Optimizer
DeviceType = str

<div dir="rtl" lang="he" xml:lang="he">

## יצירת הדטה

In [None]:
f = lambda x: x**2 + 10
X = (torch.randn(1000) - 0.5) * 10
y = f(X)
Xy = torch.stack([X, y])  # 2x1000,on the first row is X, on the second row is y
data = Xy.T  # 1000x2, every row is a sample
px.scatter(x=Xy[0], y=Xy[1]).show()

<div dir="rtl" lang="he" xml:lang="he">

## יצירת המודלים


In [None]:
class Generator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(2, 16),
            nn.LeakyReLU(),
            nn.Linear(16, 32),
            nn.LeakyReLU(),
            nn.Linear(32, 2),
        )

    def forward(self, x):
        return self.model(x)

In [None]:
px.scatter(*Generator()(torch.randn(10000, 2)).detach().T)

In [None]:
class Discriminator(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(2, 16),
            nn.Dropout(0.2),
            nn.LeakyReLU(),
            nn.Linear(16, 16),
            nn.Dropout(0.2),
            nn.LeakyReLU(),
            nn.Linear(16, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        return self.model(x)

In [None]:
Discriminator()(torch.randn(10,2))

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
discriminator = Discriminator().to(device)
generator = Generator().to(device)
optim_d = optim.Adam(Discriminator().parameters())
optim_g = optim.Adam(Generator().parameters())
loss_fn = nn.BCELoss()

<div dir="rtl" lang="he" xml:lang="he">

## אימון

In [None]:
from math import ceil

epochs = 300
batch_size = 64
num_batch = ceil(len(data) / batch_size)
record_data = pd.DataFrame(
    {"epoch": int(), "batch": int(), "loss_d": float(), "loss_g": float()},
    index=range(epochs * num_batch),
)
example_gen = torch.empty(epochs, 10, 2)

In [None]:
run_ind = 0
from tqdm.notebook import tqdm
from IPython.display import clear_output

for epoch in range(epochs):
    data_loader = dataloader.DataLoader(data, batch_size=batch_size, shuffle=True)
    for batch_ind, points in enumerate(data_loader):
        points = points.to(device)
        # Train discriminator
        optim_d.zero_grad()
        fake = generator(torch.randn(len(points), 2, device=device))
        loss_d = loss_fn(
            discriminator(points), torch.ones(len(points), 1, device=device)
        )
        loss_d += loss_fn(
            discriminator(fake), torch.zeros(len(points), 1, device=device)
        )
        loss_d.backward(retain_graph=True)
        optim_d.step()
        # Train generator
        optim_g.zero_grad()
        fake = generator(torch.randn(len(points), 2, device=device))
        loss_g = loss_fn(discriminator(fake), torch.ones(len(points), 1, device=device))
        loss_g.backward()
        optim_g.step()
        # Record losses
        record_data.iloc[run_ind] = epoch, batch_ind, loss_d.item(), loss_g.item()
        run_ind += 1
    clear_output(wait=True)
    with torch.no_grad():
        example_gen[epoch] = generator(torch.rand(10, 2, device=device)).detach().cpu()
px.line(record_data, x="batch", y=["loss_d", "loss_g"], animation_frame="epoch").show()

In [None]:
example_gen = example_gen.transpose(2, 1)

In [None]:
# @title { run: "auto" }
ind = 279  # @param {type:"slider", min:0, max:299, step:1}
px.scatter(x=example_gen[ind, 0], y=example_gen[ind, 1]).show()

In [None]:
px.scatter(example_gen)


In [None]:
import random

var, mean = random.randint(0, 10), random.randint(0, 10)
p_real = (torch.randn(10000) * var + mean).reshape(-1, 1)
bin_centers, bin_count = torch.histogram(p_real, bins=30, density=True)
fig = go.Figure(
    data=[
        go.Scatter(
            x=bin_count,
            y=bin_centers,
            mode="lines",
        )
    ]
)
fig.show()

In [None]:
class D(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(1, 2),
            nn.Tanh(),
            nn.Linear(2, 1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        return self.model(x)


class G(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(1, 1), nn.Tanh(), nn.Linear(1, 1), nn.Sigmoid()
        )

    def forward(self, x):
        return self.model(x)


line = torch.linspace(-10, 10, 10).reshape(-1, 1)
print(G()(line))
print(D()(line))

In [None]:
go.Figure(
    data=[
        go.Scatter(
            x=line.flatten(),
            y=G()(line).detach().flatten(),
            mode="lines",
        )
    ]
).show()

In [None]:
next(iter(dataloader.DataLoader(p_real, batch_size=10, shuffle=True)))

In [None]:
line_space = torch.linspace(-10, 10, 1000).reshape(-1, 1)
fig = go.Figure(
    data=[
        go.Scatter(
            x=line_space.flatten(),
            y=D()(line_space).detach().flatten(),
            mode="lines",
            name="discriminator",
        ),
        go.Scatter(
            x=line_space.flatten(),
            y=G()(line_space).detach().flatten(),
            mode="lines",
            name="generator",
        ),
        go.Scatter(x=bin_count, y=bin_centers, mode="lines", name="real"),
    ]
).show()

<div dir="rtl" lang="he" xml:lang="he">

## פונקצייות

<div dir="rtl" lang="he" xml:lang="he">

##הרצה

In [None]:
from math import ceil

device = "cuda" if torch.cuda.is_available() else "cpu"
epochs = 100
batch_size = 64
num_batch = ceil(len(p_real) / batch_size) * epochs
record_data = pd.DataFrame(
    {"epoch": int(), "batch": int(), "loss_d": float(), "loss_g": float()},
    index=range(num_batch),
)
example_gen = torch.empty(epochs, 100, 1)
generator = G().to(device)
discriminator = D().to(device)
optim_d = optim.Adam(D().parameters())
optim_g = optim.Adam(G().parameters())
loss_fn = nn.BCELoss()

In [None]:
run_ind = 0
from tqdm.notebook import tqdm
from IPython.display import clear_output

for epoch in tqdm(range(epochs)):
    data_loader = dataloader.DataLoader(p_real, batch_size=batch_size, shuffle=True)
    for batch_ind, points in enumerate(data_loader):
        points = points.to(device)
        # Train discriminator
        optim_d.zero_grad()
        fake = generator(torch.randn(len(points), 1, device=device))
        loss_d = loss_fn(
            discriminator(points), torch.ones(len(points), 1, device=device)
        )
        loss_d += loss_fn(
            discriminator(fake), torch.zeros(len(points), 1, device=device)
        )
        loss_d = loss_d / 2
        loss_d.backward(retain_graph=True)
        optim_d.step()
        # Train generator
        optim_g.zero_grad()
        fake = generator(torch.randn(len(points), 1, device=device))
        loss_g = loss_fn(discriminator(fake), torch.ones(len(points), 1, device=device))
        loss_g.backward()
        optim_g.step()
        # Record losses
        record_data.iloc[run_ind] = epoch, batch_ind, loss_d.item(), loss_g.item()
        run_ind += 1
    # clear_output(wait=True)
    # with torch.no_grad():
    #     example_gen[epoch] = generator(torch.rand(100, 1, device=device)).detach().cpu()
    #     px.line(
    #         record_data, x="batch", y=["loss_d", "loss_g"], animation_frame="epoch"
    #     ).show()
    #     px.histogram(example_gen[epoch], marginal="rug").show()

In [None]:
line_space = torch.linspace(-10, 10, 1000).reshape(-1, 1)
fig = go.Figure(
    data=[
        go.Scatter(
            x=line_space.flatten(),
            y=discriminator(line_space).detach().flatten(),
            mode="lines",
            name="discriminator",
        ),
        go.Scatter(
            x=line_space.flatten(),
            y=generator(line_space).detach().flatten(),
            mode="lines",
            name="generator",
        ),
        go.Scatter(x=bin_count, y=bin_centers, mode="lines", name="real"),
    ]
).show()