In [None]:
import torch #הספריה מרכזית ליצירת המודל
from torch import nn #קיצור ליצירת שכבות
from tqdm.auto import tqdm #נתונים בזמן ריצת לולאה
from torchvision import transforms #עיבוד הנתונים
from torchvision.utils import make_grid #מאפשר פלט מסודר של תמונות
from torch.utils.data import DataLoader, Dataset #עם אלו נטען את הנתונים ונכין אותם לאימון
import matplotlib.pyplot as plt #ספריה המאפשר יצרית גרפים ולהראות תמונות
import os #משמש לעבודה עם תיקיות
from PIL import Image #ספריה המאפשרת לעבוד עם תמונות
import numpy as np #ספריה לעבודה עם מערכים

<div dir=rtl>
גישה לאיחסון בגוגל דרייב
</div>

In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

<div dir=rtl>
הפונקציה הזאת נלקחה מאחד התרגילים בהתמחות ה GAN והיא מציגה את התמונות בתוך באץ'(batch) של תמונות אחד ליד השני

השימוש בה נעשה בזמן אימון
</div>

In [None]:
def show_tensor_images(image_tensor, num_images=25, size=(3, 256, 256)):
    '''
    Function for visualizing images: Given a tensor of images, number of images, and
    size per image, plots and prints the images in an uniform grid.
    '''
    image_tensor = (image_tensor + 1) / 2 #מעביר את ערכי בפיקסלים מ 1- עד 1 ל 0 עד 1
    image_shifted = image_tensor
    image_unflat = image_shifted.detach().cpu().view(-1, *size)
    image_grid = make_grid(image_unflat[:num_images], nrow=5)
    plt.imshow(image_grid.permute(1, 2, 0).squeeze())
    plt.show()

# model

[Residual Blocks](https://towardsdatascience.com/residual-blocks-building-blocks-of-resnet-fd90ca15d6ec)
<div dir=rtl>
יצירת שכבת ריזידואל.

שומר על ערך הקלט ומוסיף אותו לערך הפלט. השימוש בשכבה מהסוג הזה מאפשר לשים הרבה שכבות מבלי חשש שישבשו את תהליך הלמידה, ובכך מאפשר למודל יותר כוח.
קונבולוציה, נרמול ערכים, אקטיבציה ReLU, קונבולוציה, נרמול ערכים.
</div>

In [None]:
class ResidualBlock(nn.Module):

    def __init__(self, input_channels): #מקבל את כמות הצ'אנלים של הקלט
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(input_channels, input_channels, kernel_size=3, padding=1, padding_mode='reflect') #שכבת קונבולוציה
        self.conv2 = nn.Conv2d(input_channels, input_channels, kernel_size=3, padding=1, padding_mode='reflect') #שכבת קונבולוציה
        self.instancenorm = nn.InstanceNorm2d(input_channels) #שכבת נרמול
        self.activation = nn.ReLU() #אקטיבציה

    def forward(self, x): 

        original_x = x.clone() #שכפול ושמירת הקלט
        x = self.conv1(x)
        x = self.instancenorm(x)
        x = self.activation(x)
        x = self.conv2(x)
        x = self.instancenorm(x)
        return original_x + x #מחזיר את הסכום של הקלט והפלט

<div dir=rtl>
DownsampleLayer

שכבה המקטינה את "גודל" התמונה ומגדילה את כמות הצ'אנלים(channels), עיבוד של הנתונים.

שכבת קונבולוציה אחת. מאפשרת בחירה של פונקציית האקטיבציה, בין ReLU ל Leaky ReLU. 
שכבת נרמול - מנרמל את הערכי בכך שלא יהיה ערכים יוצאי דופן שיוצרים שונות גדולה, ובכך מתאפשר למידה טובה יותר.
שכבת dropout מוריד ערכים באקראיות ובכך מוריד את הסיכוי להתאמת יתר (overfitting)


UpsampleLayer

שכבה המגדילה את "גודל" התמונה ומקטינה את כמות הצ'אנלים(channels), אחרי עיבוד של התמונה "מחזיר" אותה לגודל המתבקש.

מתבצע בעזרה קונבולוציה הפוכה. שימוש בשכבת נרמול. ואז פונקציית אקטיבציה ReLU
</div>

[Leaky ReLU](https://paperswithcode.com/method/leaky-relu)

In [None]:
class DownsampleLayer(nn.Module):
    def __init__(self, in_channel, out_channel, kernel_size, stride, padding, padding_mode, dropout, activation_type="relu"):
        super(DownsampleLayer, self).__init__()
        self.activation = None
        self.instancenorm = nn.InstanceNorm2d(out_channel) #נרמול ערכים
        self.dropout = nn.Dropout(dropout) #שכבת דרופאוט
        self.conv1 = nn.Conv2d(in_channel, out_channel, kernel_size, stride, padding, padding_mode=padding_mode) #שכבת קונבולוציה

        if activation_type == "relu":
            self.activation = nn.ReLU(inplace=True) #אקטיבציה ReLU
        elif activation_type == "l_relu":
            self.activation = nn.LeakyReLU(0.2,inplace=True) #אקטיבציה leaky ReLU

    def forward(self, x):
        x = self.conv1(x)
        x = self.instancenorm(x)
        x = self.activation(x)
        x = self.dropout(x)
        return x

class UpsampleLayer(nn.Module):
    def __init__(self, in_channel, out_channel, kernel_size, stride, padding, out_padding, padding_mode):
        super(UpsampleLayer, self).__init__()
        
        self.main = nn.Sequential(
            nn.ConvTranspose2d(in_channel, out_channel, kernel_size, stride, padding, #קונבולוציה הפוכה
                                output_padding=out_padding , padding_mode=padding_mode),
            nn.InstanceNorm2d(out_channel), #נרמול ערכים
            nn.ReLU() #אקטיבציה ReLU
        )

    def forward(self, x):
        x = self.main(x)

        return x

קונבולוציה הפוכה
![Alt Text](https://miro.medium.com/max/1400/1*kOThnLR8Fge_AJcHrkR3dg.gif)

# Generator

<div dir=rtl>
יצירת המייצר
</div>

In [None]:
class Generator(nn.Module):

    def __init__(self):
        super(Generator, self).__init__()

        self.DSL1 = DownsampleLayer(3, 64, 7, 1, 'same', "reflect", 0)
        self.DSL2 = DownsampleLayer(64,128, 3, 2, 1, "reflect",0)
        self.DSL3 = DownsampleLayer(128,256,3,2, 1, "reflect", 0)

        # 9 שכבות ריזידואל
        # 256 צ'אנלים
        self.res1 = ResidualBlock(256)
        self.res2 = ResidualBlock(256)
        self.res3 = ResidualBlock(256)
        self.res4 = ResidualBlock(256)
        self.res5 = ResidualBlock(256)
        self.res6 = ResidualBlock(256)
        self.res7 = ResidualBlock(256)
        self.res8 = ResidualBlock(256)
        self.res9 = ResidualBlock(256)

        self.USL1 = UpsampleLayer(256,128,3,2,1,1, "zeros")
        self.USL2 = UpsampleLayer(128,64,3,2,1,1, "zeros")
        self.conv1 = nn.Conv2d(64, 3, 7, 1, padding='same', padding_mode="reflect") #שכבת קונבולוציה
        self.activation = nn.Tanh() #אקטיבציה טנגנס היפרבולי (בין 1- ל 1)

    def forward(self, x):

        x1 = self.DSL1(x)
        x2 = self.DSL2(x1)
        x3 = self.DSL3(x2)

        x4 = self.res1(x3)
        x5 = self.res2(x4)
        x6 = self.res3(x5)
        x7 = self.res4(x6)
        x8 = self.res5(x7)
        x9 = self.res6(x8)
        x10 = self.res7(x9)
        x11 = self.res8(x10)
        x12 = self.res9(x11)

        x13 = self.USL1(x12)
        x14 = self.USL2(x13)
        x15 = self.conv1(x14)
        xn = self.activation(x15)

        return xn

# Discriminator

<div dir=rtl>
יצירת המבחין
</div>

In [None]:
class Discriminator(nn.Module):

    def __init__(self):
        super(Discriminator, self).__init__()

        self.conv1 = nn.Conv2d(3, 64, 4, 2, 1,padding_mode="reflect") #שכבת קונובלוציה
        self.activation = nn.LeakyReLU(0.2) #אקטיבציה leaky ReLU עם שיפוע 0.2

        self.DSL1 = DownsampleLayer(64, 128, 4, 2, 1, "reflect", 0,activation_type="l_relu")
        self.DSL2 = DownsampleLayer(128, 256, 4, 2, 1, "reflect", 0,activation_type="l_relu")
        self.DSL3 = DownsampleLayer(256, 512, 4, 1, 1, "reflect", 0,activation_type="l_relu")

        self.conv2 = nn.Conv2d(512, 1, 4, padding=1, padding_mode="reflect") #שכבת קונבולוציה

    def forward(self, x):
        x1 = self.conv1(x)
        x2 = self.activation(x1)
        x3 = self.DSL1(x2)
        x4 = self.DSL2(x3)
        x5 = self.DSL3(x4)
        xn = self.conv2(x5)

        return xn

# losses

<div dir=rtl>
פונקציית העלות של המבחין.
מקבל: תמונה אמיתית X, תמונה מזוייפת X, מודל מבחין X, ופונקציית עלות

מחזיר: ערך השגיאה
</div>

In [None]:
def get_disc_loss(real_X, fake_X, disc_X, adv_criterion):

    disc_fake_X_hat = disc_X(fake_X.detach()) #מנתק את התמונה המזוייפת מהמייצר ומעביר במבחין
    disc_fake_X_loss = adv_criterion(disc_fake_X_hat, torch.zeros_like(disc_fake_X_hat)) #משיג את ערך השגיאה לפי ערך המבחין על תמונה מזוייפת והשוואה עם 1
    disc_real_X_hat = disc_X(real_X) #מעביר תמונה אמיתית במבחין
    disc_real_X_loss = adv_criterion(disc_real_X_hat, torch.ones_like(disc_real_X_hat)) #משיג את ערך השגיאה לפי ערך המבחין על תמונה אמיתית והשוואה עם 0
    disc_loss = (disc_fake_X_loss + disc_real_X_loss) / 2 #מחבר את ערכי השגיאה ומחלק ב2

    return disc_loss

<div dir=rtl>
פונקציית עלות של המייצר לפי המבחין.

מקבל: תמונה אמיתית X, מבחין Y, מייצר XY, פונקציית עלות.
מחזיר: ערך השגיאה, תמונה מזוייפת Y(על מנת לחסוך חישוב נוסף בזמן אימון)
</div>

In [None]:
def get_gen_adversarial_loss(real_X, disc_Y, gen_XY, adv_criterion):

    fake_Y = gen_XY(real_X) #מעביר תמונה אמיתית ומשיג תמונה מזוייפת
    disc_fake_Y_hat = disc_Y(fake_Y) #הציון שקיבל מהמבחין
    adversarial_loss = adv_criterion(disc_fake_Y_hat, torch.ones_like(disc_fake_Y_hat)) # משיג את ערך השגיאה לפי ערך שקיבל מהמבחין והשוואה ל1

    return adversarial_loss, fake_Y

<div dir=rtl>
פונקציית עלות נוספת של המייצר.

מקבל: תמונה אמיתית X, מייצר YX, פונקציית עלות.
מחזיר: ערך השגיאה, ותמונה אמיתית X שעברה במייצר YX(על מנת לחסוך חישוב נוספים בזמן אימון)
</div>

In [None]:
def get_identity_loss(real_X, gen_YX, identity_criterion):

    identity_X = gen_YX(real_X) #מעביר תמונה אמיתית ומחזיר תמונה מאותו הסוג
    identity_loss = identity_criterion(identity_X, real_X) #מחזיר את ערך השגיאה בין הפיקסלים

    return identity_loss, identity_X

<div dir=rtl>
פונקציית עלות נוספת של המייצר.


מקבל: תמונה X אמיתית, תמונה Y מזוייפת, מייצר YX, פונקציית עלות.

משלים את ה"סיבוב", מקבל תמונה מזוייפת Y ומנסה להחזיר לX, ואז מוצא את ערך השגיאה שהוא המרחק בין הפיקסלים
</div>

In [None]:
def get_cycle_consistency_loss(real_X, fake_Y, gen_YX, cycle_criterion):

    cycle_X = gen_YX(fake_Y) #משלים "סיבוב", מקבל תמונה מזוייפת ומנסה להחזיר למקור
    cycle_loss = cycle_criterion(cycle_X, real_X) #מחזיר את ערך השגיאה בין הפיקסלים
    return cycle_loss, cycle_X

<div dir=rtl>
חיבור כל פונקציות העלות לפונקציה אחת בה נשתמש באימון.

מקבל: תמונה A, תמונה B, מייצר AB, מייצר BA, מבחין A, מבחין B, שלוש פונקציות עלות לכל פונקציה, למבדה לכל ערך שגיאה על מנת לתת משקל מתאים.

מחזיר: סך כל השגיאה של המייצרים, תמונה מזוייפת A, תמונה מזוייפת B(על מנת לחסוך חיסוב נוסף בזמן אימון).
</div>

In [None]:
def get_gen_loss(real_A, real_B, gen_AB, gen_BA, disc_A, disc_B, adv_criterion, identity_criterion, cycle_criterion, lambda_identity=0.1, lambda_cycle=10):

    # Adversarial Loss
    adv_loss_BA, fake_A = get_gen_adversarial_loss(real_B, disc_A, gen_BA, adv_criterion) #מחזיר ערך שגיאה של מייצר תמונה, ותמונה מזוייפת
    adv_loss_AB, fake_B = get_gen_adversarial_loss(real_A, disc_B, gen_AB, adv_criterion) #מחזיר ערך שגיאה של מייצר ציור, וציור מזוייף
    gen_adversarial_loss = adv_loss_BA + adv_loss_AB #סוכם את ערכי השגיאה

    # Identity Loss
    identity_loss_A, identity_A = get_identity_loss(real_A, gen_BA, identity_criterion) #מחזיר ערך שגיאה נוסף של מייצר תמונה
    identity_loss_B, identity_B = get_identity_loss(real_B, gen_AB, identity_criterion) #מחזיר ערך שגיאה נוסף של מייצר ציור
    gen_identity_loss = identity_loss_A + identity_loss_B #סוכם את ערכי השגיאה

    # Cycle consistency Loss
    cycle_loss_BA, cycle_A = get_cycle_consistency_loss(real_A, fake_B, gen_BA, cycle_criterion) #מחזיר ערך שגיאה נוסף של מייצר תמונה
    cycle_loss_AB, cycle_B = get_cycle_consistency_loss(real_B, fake_A, gen_AB, cycle_criterion) #מחזיר ערך שגיאה נוסף של מייצר ציור
    gen_cycle_loss = cycle_loss_BA + cycle_loss_AB #סוכם את ערכי השגיאה

    # Total loss
    gen_loss = lambda_identity * gen_identity_loss + lambda_cycle * gen_cycle_loss + gen_adversarial_loss #סוכם את כל ערכי השגיאה עם המשקל למבדה שלהם

    return gen_loss, fake_A, fake_B

# dataset (cool)

<div dir=rtl>
הכנת הנתונים לאימון.

בנאי מקבל: תיקייה בה שמור הנתונים, מצב אימון/מבחן, פונקצייה לעיבוד הנתונים
</div>

In [None]:
class MonetPhotoDataset(Dataset):
    def __init__(self, data_dir, mode='train', transforms=None):
        monet_dir = os.path.join(data_dir, 'monet_jpg') #מחבר את הנתיב לציורים
        photo_dir = os.path.join(data_dir, 'photo_jpg') #מחבר את הנתיב לתמונות
        
        if mode == 'train': #אם מיועד לאימון
            self.monet = [os.path.join(monet_dir, name) for name in sorted(os.listdir(monet_dir))[:295]] #מייצר רשימה של 295 ציורים מהתקייה
            self.photo = [os.path.join(photo_dir, name) for name in sorted(os.listdir(photo_dir))[:295]] #מייצר רשימה של 295 תמונות מהתקייה
        elif mode == 'test': #אם מיועד למבחן
            self.monet = [os.path.join(monet_dir, name) for name in sorted(os.listdir(monet_dir))[295:]] #מייצר רשימה של שאר התמונות בתיקייה
            self.photo = [os.path.join(photo_dir, name) for name in sorted(os.listdir(photo_dir))[295:]] #מייצר רשימה של שאר הציורים בתיקייה
        
        self.transforms = transforms #פונקצייה לעיבוד תמונה
        
    def __len__(self):
        return len(self.monet) #גודל סט הנתונים לאימון
    
    def __getitem__(self, index): #פונקציה הנדרשת בשביל לעבוד עם הנתונים
        monet = self.monet[index]
        photo = self.photo[index]
        
        monet = Image.open(monet)
        photo = Image.open(photo)
        
        if self.transforms is not None:
            monet = self.transforms(monet)
            photo = self.transforms(photo)
        
        return monet, photo

<div dir=rtl>
יצירת פונקציה לעיבוד התמונה.
</div>

In [None]:
dset_trans = transforms.Compose([
                transforms.Resize(256), #הופך את כולם לאותו גודל
                transforms.CenterCrop(256), #חותך את כולם לאותו גודל
                transforms.ToTensor(),
                transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)), #מעביר את ערכי הפיקסלים מ0 עד 1 ל1- עד 1
            ])

In [None]:
dataroot = "gan-getting-started" #התיקייה בה נמצאות התמונות
workers = 2 #לפי המלצת האינטרנט
batch_size = 1 #גודל הבאץ'של הנתונים

In [None]:
dataset = MonetPhotoDataset(dataroot,"train",dset_trans) #טעינת סט הנתונים לאימון
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, #הכנתם לאימון
                                         shuffle=True)

test_dataset = MonetPhotoDataset(dataroot,"test",dset_trans) #טעינת סט הנתונים למבחן
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, #הכנתם למבחן
                                         shuffle=False)

In [None]:
print(dataloader.batch_size)

<div dir=rtl>
הצגת תמונה וציור אקראיים מהנתונים
</div>

In [None]:
for real_A, real_B in tqdm(dataloader, 0):
            show_tensor_images(torch.cat([real_A, real_B]))
            break

# training


<div dir=rtl>
הגדרת פונקציות עלות ועוד משתנים לאימון.
</div>

In [None]:
adv_criterion = nn.MSELoss() #פונקציית עלות
recon_criterion = nn.L1Loss() #פונקציית עלות

n_epochs = 200 #מספר אפוקים
display_step = 200 #כל כמה צעדים להראות דוגמא
lr = 0.0002 #קצב למידה
device = 'cuda' #הגדרת ההתקן עליו נאמן GPU

In [None]:
gen_AB = Generator().to(device) #יצירת מייצר והעברה לזיכרון GPU
gen_BA = Generator().to(device) #יצירת מייצר והעברה לזיכרון GPU
gen_opt = torch.optim.Adam(list(gen_AB.parameters()) + list(gen_BA.parameters()), lr=lr, betas=(0.5, 0.999)) #יצירת מייעל למייצרים
disc_A = Discriminator().to(device) #יצירת מבחין תמונות והעברה לזיכרון GPU
disc_A_opt = torch.optim.Adam(disc_A.parameters(), lr=lr, betas=(0.5, 0.999)) #יצירת מייעל למבחין תמונות
disc_B = Discriminator().to(device) #יצירת מבחין ציורים והעברה לזיכרון GPU
disc_B_opt = torch.optim.Adam(disc_B.parameters(), lr=lr, betas=(0.5, 0.999)) #יצירת מייעל למבחין ציורים
def weights_init(m): #פונקצייה לאתחול המשקלים במודלים
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
        torch.nn.init.normal_(m.weight, 0.0, 0.02)
    if isinstance(m, nn.BatchNorm2d):
        torch.nn.init.normal_(m.weight, 0.0, 0.02)
        torch.nn.init.constant_(m.bias, 0)

# אתחול משקלים במודלים
gen_AB = gen_AB.apply(weights_init)
gen_BA = gen_BA.apply(weights_init)
disc_A = disc_A.apply(weights_init)
disc_B = disc_B.apply(weights_init)

In [None]:
plt.rcParams["figure.figsize"] = (10, 10)

#שמירת ערכי השגיאה במערכים
disc_A_loss_history = []
disc_B_loss_history = []
gen_loss_history = []

#אתחול סופר הצעדים
cur_step = 0

for epoch in range(n_epochs): #לולאה לריצה על מספר אפוקים

    #שמים את המודלים על מצב אימון
    gen_AB.train()
    gen_BA.train()
    disc_A.train()
    disc_B.train()
    # real_B photos
    # real_A monet
    for real_A, real_B in tqdm(dataloader, 0): #רצים על התמונות בסט נתונים

        #העברת התמונות לזיכרון GPU
        real_A = real_A.to(device)
        real_B = real_B.to(device)
        cur_batch_size = real_A.shape[0] #גודל באץ' נוכחי

        ### עדכון מבחין תמונות ###
        disc_A_opt.zero_grad() # מנקה נגזרות ישנות
        with torch.no_grad(): #מבלי לשמור נגזרות
            fake_A = gen_BA(real_B) #משיג תמונה מזוייפת
        disc_A_loss = get_disc_loss(real_A, fake_A, disc_A, adv_criterion) #פונקציית עלות
        disc_A_loss.backward(retain_graph=True) # עדכון נגזרות
        disc_A_opt.step() # עדכון משקלים בעזרת מייעל

        ### עדכון מבחין ציורים ###
        disc_B_opt.zero_grad() # מנקה נגזרות ישנות
        with torch.no_grad(): #מבלי לשמור נגזרות
            fake_B = gen_AB(real_A) #משיג ציור מזוייף
        disc_B_loss = get_disc_loss(real_B, fake_B, disc_B, adv_criterion) #פונקציית עלות
        disc_B_loss.backward(retain_graph=True) # עדכון נגזרות
        disc_B_opt.step() # עדכון משקלים בעזרת מייעל

        ### עדכון מייצרים ###
        gen_opt.zero_grad() #מנקה נגזרות ישנות
        gen_loss, fake_A, fake_B = get_gen_loss( #פונקציית עלות
            real_A, real_B, gen_AB, gen_BA, disc_A, disc_B, adv_criterion, recon_criterion, recon_criterion
        )
        gen_loss.backward() # עדכון נגזרות
        gen_opt.step() # עדכון משקלים בעזרת מייעל

        ### הצגת דוגמאות ###
        if cur_step % display_step == 0:
            print(f"Epoch {epoch}: Step {cur_step}: Generator loss: {gen_loss.item()}, Discriminator loss: {disc_A_loss.item()}")
            # מחבר תמונות ומציג אותם בעזרת פונקציה
            show_tensor_images(torch.cat([real_A, real_B])) 
            show_tensor_images(torch.cat([fake_B, fake_A]))

        cur_step += 1 #מוסיף צעד כל תמונה
    
    #שומר את ערכי השגיאה במערכים
    disc_A_loss_history.append(disc_A_loss.item())
    disc_B_loss_history.append(disc_B_loss.item())
    gen_loss_history.append(gen_loss.item())

#אחרי אימון שומר את המודלים והמייעלים
torch.save({
            'gen_AB': gen_AB.state_dict(),
            'gen_BA': gen_BA.state_dict(),
            'gen_opt': gen_opt.state_dict(),
            'disc_A': disc_A.state_dict(),
            'disc_A_opt': disc_A_opt.state_dict(),
            'disc_B': disc_B.state_dict(),
            'disc_B_opt': disc_B_opt.state_dict()
        }, f"cycleGAN-_-.pth")
#שומר את מערכי השגיאה במחשב
np.save('disc_A_loss_history', disc_A_loss_history)
np.save('disc_B_loss_history', disc_B_loss_history)
np.save('gen_loss_history', gen_loss_history)

In [None]:
plt.plot(disc_A_loss_history,'-')
plt.plot(gen_loss_history,'-')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['disc','gen'])
plt.title('Train vs Valid Accuracy')

plt.show()