In [None]:
import torch
import torch.nn as nn
from PIL import Image
import os
import numpy as np
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
import sys
from tqdm import tqdm
from torch.utils.data import DataLoader
import torch.optim as optim
from torchvision.utils import save_image
from torch.utils.data import Dataset
import numpy as np
import warnings
warnings.filterwarnings("ignore")

In [None]:
base_path = '/kaggle/working/' 
monet_path = os.path.join(base_path, 'monet') 
actual_path = os.path.join(base_path, 'actual')
os.makedirs(monet_path, exist_ok=True)
os.makedirs(actual_path, exist_ok=True)

# **Configuration**

In [None]:
def save_checkpoint(model, optimizer, filename="/kaggle/working/.pth.tar"):
    print("=> Saving checkpoint")
    checkpoint = {
        "state_dict": model.state_dict(),
        "optimizer": optimizer.state_dict(),
    }
    torch.save(checkpoint, filename)


def load_checkpoint(checkpoint_file, model, optimizer, lr):
    print("=> Loading checkpoint")
    checkpoint = torch.load(checkpoint_file, map_location=DEVICE)
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])

    # If we don't do this then it will just have learning rate of old checkpoint
    # and it will lead to many hours of debugging \:
    for param_group in optimizer.param_groups:
        param_group["lr"] = lr


def seed_everything(seed=42):
    os.environ["PYTHONHASHSEED"] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
#TRAIN_DIR = "data/train"
#VAL_DIR = "data/val"
BATCH_SIZE = 1
LEARNING_RATE = 1e-5
LAMBDA_IDENTITY = 0.0
LAMBDA_CYCLE = 10
NUM_WORKERS = 4
NUM_EPOCHS = 25
LOAD_MODEL = False
SAVE_MODEL = True
CHECKPOINT_GEN_A = "/kaggle/working/genA.pth.tar"
CHECKPOINT_GEN_M = "/kaggle/working/genM.pth.tar"
CHECKPOINT_CRITIC_A = "/kaggle/working/criticA.pth.tar"
CHECKPOINT_CRITIC_M = "/kaggle/working/criticM.pth.tar"

transforms = A.Compose(
    [
        A.Resize(width=256, height=256),
        A.HorizontalFlip(p=0.5),
        A.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], max_pixel_value=255),
        ToTensorV2(),
    ],
    additional_targets={"image0": "image"},
)

# **Generator**

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, down=True, use_act=True, **kwargs):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, padding_mode="reflect", **kwargs)
            if down
            else nn.ConvTranspose2d(in_channels, out_channels, **kwargs),
            nn.InstanceNorm2d(out_channels),
            nn.ReLU(inplace=True) if use_act else nn.Identity(),
        )

    def forward(self, x):
        return self.conv(x)


class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.block = nn.Sequential(
            ConvBlock(channels, channels, kernel_size=3, padding=1),
            ConvBlock(channels, channels, use_act=False, kernel_size=3, padding=1),
        )

    def forward(self, x):
        return x + self.block(x)


class Generator(nn.Module):
    def __init__(self, img_channels, num_features=64, num_residuals=9):
        super().__init__()
        self.initial = nn.Sequential(
            nn.Conv2d(
                img_channels,
                num_features,
                kernel_size=7,
                stride=1,
                padding=3,
                padding_mode="reflect",
            ),
            nn.InstanceNorm2d(num_features),
            nn.ReLU(inplace=True),
        )
        self.down_blocks = nn.ModuleList(
            [
                ConvBlock(
                    num_features, num_features * 2, kernel_size=3, stride=2, padding=1
                ),
                ConvBlock(
                    num_features * 2,
                    num_features * 4,
                    kernel_size=3,
                    stride=2,
                    padding=1,
                ),
            ]
        )
        self.res_blocks = nn.Sequential(
            *[ResidualBlock(num_features * 4) for _ in range(num_residuals)]
        )
        self.up_blocks = nn.ModuleList(
            [
                ConvBlock(
                    num_features * 4,
                    num_features * 2,
                    down=False,
                    kernel_size=3,
                    stride=2,
                    padding=1,
                    output_padding=1,
                ),
                ConvBlock(
                    num_features * 2,
                    num_features * 1,
                    down=False,
                    kernel_size=3,
                    stride=2,
                    padding=1,
                    output_padding=1,
                ),
            ]
        )

        self.last = nn.Conv2d(
            num_features * 1,
            img_channels,
            kernel_size=7,
            stride=1,
            padding=3,
            padding_mode="reflect",
        )

    def forward(self, x):
        x = self.initial(x)
        for layer in self.down_blocks:
            x = layer(x)
        x = self.res_blocks(x)
        for layer in self.up_blocks:
            x = layer(x)
        return torch.tanh(self.last(x))


# **Discriminator**

In [None]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels,
                out_channels,
                4,
                stride,
                1,
                bias=True,
                padding_mode="reflect",
            ),
            nn.InstanceNorm2d(out_channels),
            nn.LeakyReLU(0.2, inplace=True),
        )

    def forward(self, x):
        return self.conv(x)


class Discriminator(nn.Module):
    def __init__(self, in_channels=3, features=[64, 128, 256, 512]):
        super().__init__()
        self.initial = nn.Sequential(
            nn.Conv2d(
                in_channels,
                features[0],
                kernel_size=4,
                stride=2,
                padding=1,
                padding_mode="reflect",
            ),
            nn.LeakyReLU(0.2, inplace=True),
        )

        layers = []
        in_channels = features[0]
        for feature in features[1:]:
            layers.append(
                Block(in_channels, feature, stride=1 if feature == features[-1] else 2)
            )
            in_channels = feature
        layers.append(
            nn.Conv2d(
                in_channels,
                1,
                kernel_size=4,
                stride=1,
                padding=1,
                padding_mode="reflect",
            )
        )
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        x = self.initial(x)
        return torch.sigmoid(self.model(x))


# **Buliding**

In [None]:
class MonetToActual(Dataset):
    def __init__(self, root_monet, root_actual, transform=None):
        self.root_monet = root_monet
        self.root_actual = root_actual
        self.transform = transform

        self.monet_images = os.listdir(root_monet)
        self.actual_images = os.listdir(root_actual)[:7000]
        self.length_dataset = max(len(self.monet_images), len(self.actual_images)) # 1000, 1500
        self.actual_len = len(self.actual_images)
        self.monet_len = len(self.monet_images)

    def __len__(self):
        return self.length_dataset

    def __getitem__(self, index):
        monet_img = self.monet_images[index % self.monet_len]
        actual_img = self.actual_images[index % self.actual_len]

        actual_path = os.path.join(self.root_actual, actual_img)
        monet_path = os.path.join(self.root_monet, monet_img)

        monet_img = np.array(Image.open(monet_path).convert("RGB"))
        actual_img = np.array(Image.open(actual_path).convert("RGB"))

        if self.transform:
            augmentations = self.transform(image=monet_img, image0=actual_img)
            monet_img = augmentations["image"]
            actual_img = augmentations["image0"]

        return monet_img, actual_img

# **Loading Data**

In [None]:
dataset = MonetToActual(
        root_monet='/kaggle/input/gan-getting-started/monet_jpg',
        root_actual='/kaggle/input/gan-getting-started/photo_jpg',
        transform=transforms,
    )
loader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=NUM_WORKERS,
        pin_memory=True,
    )

# **Training Function**

In [None]:
def train_fn(
    disc_A, disc_M, gen_M, gen_A, loader, opt_disc, opt_gen, l1, mse, d_scaler, g_scaler,epoch
):
    H_reals = 0
    H_fakes = 0
    A_reals=0
    A_fakes=0
    loop = tqdm(loader, leave=True)

    for idx, (monet, actual) in enumerate(loop):
        monet = monet.to(DEVICE)
        actual = actual.to(DEVICE)

        # Train Discriminators A and M
        with torch.cuda.amp.autocast():
            fake_actual = gen_A(monet)
            D_A_real = disc_A(actual)
            D_A_fake = disc_A(fake_actual.detach())
            A_reals += D_A_real.mean().item()
            A_fakes += D_A_fake.mean().item()
            D_A_real_loss = mse(D_A_real, torch.ones_like(D_A_real))
            D_A_fake_loss = mse(D_A_fake, torch.zeros_like(D_A_fake))
            D_A_loss = D_A_real_loss + D_A_fake_loss

            fake_monet = gen_M(actual)
            D_M_real = disc_M(monet)
            D_M_fake = disc_M(fake_monet.detach())
            D_M_real_loss = mse(D_M_real, torch.ones_like(D_M_real))
            D_M_fake_loss = mse(D_M_fake, torch.zeros_like(D_M_fake))
            D_M_loss = D_M_real_loss + D_M_fake_loss

            # put it togethor
            D_loss = (D_A_loss + D_M_loss) / 2

        opt_disc.zero_grad()
        d_scaler.scale(D_loss).backward()
        d_scaler.step(opt_disc)
        d_scaler.update()
#H->A
#Z->M
        # Train Generators H and Z
        with torch.cuda.amp.autocast():
            # adversarial loss for both generators
            D_A_fake = disc_A(fake_actual)
            D_M_fake = disc_M(fake_monet)
            loss_G_A = mse(D_A_fake, torch.ones_like(D_A_fake))
            loss_G_M = mse(D_M_fake, torch.ones_like(D_M_fake))

            # cycle loss
            cycle_monet = gen_M(fake_actual)
            cycle_actual = gen_A(fake_monet)
            cycle_monet_loss = l1(monet, cycle_monet)
            cycle_actual_loss = l1(actual, cycle_actual)

            # identity loss (remove these for efficiency if you set lambda_identity=0)
            identity_monet = gen_M(monet)
            identity_actual = gen_A(actual)
            identity_monet_loss = l1(monet, identity_monet)
            identity_actual_loss = l1(actual, identity_actual)

            # add all togethor
            G_loss = (
                loss_G_M
                + loss_G_A
                + cycle_monet_loss * LAMBDA_CYCLE
                + cycle_actual_loss * LAMBDA_CYCLE
                + identity_actual_loss * LAMBDA_IDENTITY
                + identity_monet_loss * LAMBDA_IDENTITY
            )

        opt_gen.zero_grad()
        g_scaler.scale(G_loss).backward()
        g_scaler.step(opt_gen)
        g_scaler.update()
        if idx % 500 == 0:
            save_image(fake_actual * 0.5 + 0.5, f"/kaggle/working/actual/FAKE ACTUAL : index:{idx}_epoch:{epoch}.png")
            save_image(fake_monet * 0.5 + 0.5, f"/kaggle/working/monet/FAKE MONET :index:{idx}_epoch:{epoch}.png")

        loop.set_postfix(A_real=A_reals / (idx + 1), A_fake=A_fakes / (idx + 1))

In [None]:
def main():
    disc_A = Discriminator(in_channels=3).to(DEVICE)
    disc_M = Discriminator(in_channels=3).to(DEVICE)
    gen_M = Generator(img_channels=3, num_residuals=9).to(DEVICE)
    gen_A = Generator(img_channels=3, num_residuals=9).to(DEVICE)
    opt_disc = optim.Adam(
        list(disc_A.parameters()) + list(disc_M.parameters()),
        lr=LEARNING_RATE,
        betas=(0.5, 0.999),
    )

    opt_gen = optim.Adam(
        list(gen_M.parameters()) + list(gen_A.parameters()),
        lr=LEARNING_RATE,
        betas=(0.5, 0.999),
    )

    L1 = nn.L1Loss()
    mse = nn.MSELoss()


    dataset = MonetToActual(
        root_monet='/kaggle/input/gan-getting-started/monet_jpg',
        root_actual='/kaggle/input/gan-getting-started/photo_jpg',
        transform=transforms,
)
    loader = DataLoader(
        dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        num_workers=NUM_WORKERS,
        pin_memory=True,
    )
    g_scaler = torch.cuda.amp.GradScaler()
    d_scaler = torch.cuda.amp.GradScaler()

    for epoch in range(NUM_EPOCHS):
        train_fn(
            disc_A,
            disc_M,
            gen_M,
            gen_A,
            loader,
            opt_disc,
            opt_gen,
            L1,
            mse,
            d_scaler,
            g_scaler,
            epoch
        )
        save_checkpoint(gen_A, opt_gen, filename=CHECKPOINT_GEN_A)
        save_checkpoint(gen_M, opt_gen, filename=CHECKPOINT_GEN_M)
        save_checkpoint(disc_A, opt_disc, filename=CHECKPOINT_CRITIC_A)
        save_checkpoint(disc_M, opt_disc, filename=CHECKPOINT_CRITIC_M)
        
main()