## <center> **Deep learning Project** </center>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## <center> **Night to Day image conversion** <center>

In [None]:
!unzip -d /content/drive/MyDrive/Colab\ Notebooks/datasets/data/ /content/drive/MyDrive/Colab\ Notebooks/datasets/archive.zip

In [None]:
import os
import shutil

day_path = '/content/drive/MyDrive/Colab Notebooks/datasets/data/day/day/'
night_path = '/content/drive/MyDrive/Colab Notebooks/datasets/data/night/night/'

day_files = os.listdir('/content/drive/MyDrive/Colab Notebooks/datasets/data/day/day/')
night_files = os.listdir('/content/drive/MyDrive/Colab Notebooks/datasets/data/night/night/')

test_loc_day = '/content/drive/MyDrive/Colab Notebooks/datasets/test/day/'
test_loc_night = '/content/drive/MyDrive/Colab Notebooks/datasets/test/night/'

day_files.sort()
night_files.sort()

train_files_day = day_files[:10000]
test_files_day = day_files[10000:]
train_files_night = night_files[:10000]
test_files_night = night_files[10000:]

for file in test_files_day:
  shutil.move(day_path + file, test_loc_day)

for file in test_files_night:
  shutil.move(night_path + file, test_loc_night)

In [None]:
import torch
import torchvision
import torch.nn as nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
from torch.optim.lr_scheduler import StepLR

import os
from PIL import Image

### Loading dataset

In [None]:
class NDdataset(Dataset):
  def __init__(self, day_path, night_path, tfs):
    self.day_path = day_path
    self.night_path = night_path
    self.transforms = tfs
    self.day_images = os.listdir(day_path)
    self.day_images.sort()
    self.night_images = os.listdir(night_path)
    self.night_images.sort() 

  def __len__(self):
    return len(self.day_images)

  def __getitem__(self, idx):
    day = Image.open(self.day_path + self.day_images[idx])
    night = Image.open(self.night_path + self.night_images[idx])

    day = self.transforms(day)
    night = self.transforms(night)

    return day, night

In [None]:
path = '/content/drive/MyDrive/Colab Notebooks/datasets/data/'
batch_size = 100


tfs = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
     transforms.Resize((256, 256))]
)
dataset = NDdataset(path + 'day/day/', path + 'night/night/', tfs)
data_loader = DataLoader(dataset, batch_size = batch_size, shuffle = True)

In [None]:
class NDtestset(Dataset):
  def __init__(self, path, tfs):
    self.path = path
    self.transforms = tfs
    self.night_images = os.listdir(path)
    self.night_images.sort() 

  def __len__(self):
    return len(self.night_images)

  def __getitem__(self, idx):
    night = Image.open(self.path + self.night_images[idx])
    night = self.transforms(night)

    return night

In [None]:
path = '/content/drive/MyDrive/Colab Notebooks/datasets/test/night/'
batch_size = 100

tfs = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
     transforms.Resize((256, 256))]
)
testset = NDtestset(path, tfs)
test_loader = DataLoader(testset, batch_size = batch_size, shuffle = True)

### The discriminator model

In [None]:
class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super(CNNBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(
                in_channels, out_channels, 4, stride, 1, bias=False, padding_mode="reflect"
            ),
            nn.BatchNorm2d(out_channels),
            nn.LeakyReLU(0.2),
        )

    def forward(self, x):
        return self.conv(x)

In [None]:
class Discriminator(nn.Module):
    def __init__(self, in_channels=3, features=[64, 128, 256, 512]):
        super().__init__()
        self.initial = nn.Sequential(
            nn.Conv2d(
                in_channels * 2,
                features[0],
                kernel_size=4,
                stride=2,
                padding=1,
                padding_mode="reflect",
            ),
            nn.LeakyReLU(0.2),
        )

        layers = []
        in_channels = features[0]
        for feature in features[1:]:
            layers.append(
                CNNBlock(in_channels, feature, stride=1 if feature == features[-1] else 2),
            )
            in_channels = feature

        layers.append(
            nn.Conv2d(
                in_channels, 1, kernel_size=4, stride=1, padding=1, padding_mode="reflect"
            ),
        )

        self.model = nn.Sequential(*layers)

    def forward(self, x, y):
        x = torch.cat([x, y], dim=1)
        x = self.initial(x)
        x = self.model(x)
        return x

### The generator model

In [None]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, down=True, act="relu", use_dropout=False):
        super(Block, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, 4, 2, 1, bias=False, padding_mode="reflect")
            if down
            else nn.ConvTranspose2d(in_channels, out_channels, 4, 2, 1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU() if act == "relu" else nn.LeakyReLU(0.2),
        )

        self.use_dropout = use_dropout
        self.dropout = nn.Dropout(0.5)
        self.down = down

    def forward(self, x):
        x = self.conv(x)
        return self.dropout(x) if self.use_dropout else x

In [None]:
class Generator(nn.Module):
    def __init__(self, in_channels=3, features=64):
        super().__init__()
        self.initial_down = nn.Sequential(
            nn.Conv2d(in_channels, features, 4, 2, 1, padding_mode="reflect"),
            nn.LeakyReLU(0.2),
        )
        self.down1 = Block(features, features * 2, down=True, act="leaky", use_dropout=False)
        self.down2 = Block(
            features * 2, features * 4, down=True, act="leaky", use_dropout=False
        )
        self.down3 = Block(
            features * 4, features * 8, down=True, act="leaky", use_dropout=False
        )
        self.down4 = Block(
            features * 8, features * 8, down=True, act="leaky", use_dropout=False
        )
        self.down5 = Block(
            features * 8, features * 8, down=True, act="leaky", use_dropout=False
        )
        self.down6 = Block(
            features * 8, features * 8, down=True, act="leaky", use_dropout=False
        )
        self.bottleneck = nn.Sequential(
            nn.Conv2d(features * 8, features * 8, 4, 2, 1), nn.ReLU()
        )

        self.up1 = Block(features * 8, features * 8, down=False, act="relu", use_dropout=True)
        self.up2 = Block(
            features * 8 * 2, features * 8, down=False, act="relu", use_dropout=True
        )
        self.up3 = Block(
            features * 8 * 2, features * 8, down=False, act="relu", use_dropout=True
        )
        self.up4 = Block(
            features * 8 * 2, features * 8, down=False, act="relu", use_dropout=False
        )
        self.up5 = Block(
            features * 8 * 2, features * 4, down=False, act="relu", use_dropout=False
        )
        self.up6 = Block(
            features * 4 * 2, features * 2, down=False, act="relu", use_dropout=False
        )
        self.up7 = Block(features * 2 * 2, features, down=False, act="relu", use_dropout=False)
        self.final_up = nn.Sequential(
            nn.ConvTranspose2d(features * 2, in_channels, kernel_size=4, stride=2, padding=1),
            nn.Tanh(),
        )

    def forward(self, x):
        d1 = self.initial_down(x)
        d2 = self.down1(d1)
        d3 = self.down2(d2)
        d4 = self.down3(d3)
        d5 = self.down4(d4)
        d6 = self.down5(d5)
        d7 = self.down6(d6)
        bottleneck = self.bottleneck(d7)
        up1 = self.up1(bottleneck)
        up2 = self.up2(torch.cat([up1, d7], 1))
        up3 = self.up3(torch.cat([up2, d6], 1))
        up4 = self.up4(torch.cat([up3, d5], 1))
        up5 = self.up5(torch.cat([up4, d4], 1))
        up6 = self.up6(torch.cat([up5, d3], 1))
        up7 = self.up7(torch.cat([up6, d2], 1))
        return self.final_up(torch.cat([up7, d1], 1))

### Model definition

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [None]:
gen = Generator(3).to(device)
disc = Discriminator().to(device)

# learning rate
lr = 2e-3

# optimizer for discriminator
opt_disc = torch.optim.Adam(disc.parameters(), lr = lr, betas=(0.7, 0.999))

# optimizer for generator 
opt_gen = torch.optim.Adam(gen.parameters(), lr = lr, betas=(0.7, 0.999))

# scheduler_gen = StepLR(opt_gen, step_size = 10, gamma = 0.1)
# scheduler_disc = StepLR(opt_disc, step_size = 1, gamma = 0.1)

# Generator loss function
L1 = nn.L1Loss()
# discriminator loss function
bce = nn.BCEWithLogitsLoss()



# provides convenience methods for mixed precision
g_scaler = torch.cuda.amp.GradScaler()
d_scaler = torch.cuda.amp.GradScaler()

In [None]:
import shutil
if os.path.exists('log/fake'):
  shutil.rmtree('log/fake')
if os.path.exists('log/real'):
  shutil.rmtree('log/real')

writer_fake_day = SummaryWriter('log/fake/day')
writer_real_day = SummaryWriter('log/real/day')
writer_real_night = SummaryWriter('log/real/night')
step = 0

In [None]:
%reload_ext tensorboard
%tensorboard --logdir log

### Training

In [None]:
num_epochs = 1000
for e in range(num_epochs):
  for id, (day, night) in enumerate(data_loader):
    im_day = day.to(device)
    im_night = night.to(device)

    opt_gen.zero_grad()
    opt_disc.zero_grad()

    # training discriminator for both day and night
    with torch.cuda.amp.autocast():
      fake_day = gen(im_night)
      d_real = disc(im_day, im_night)
      disc_d_real_loss = bce(d_real, torch.ones_like(d_real))
      d_fake = disc(im_day, fake_day.detach())
      disc_d_fake_loss = bce(d_fake, torch.zeros_like(d_fake))
      disc_loss = (disc_d_real_loss + disc_d_fake_loss) / 2

    d_scaler.scale(disc_loss).backward()
    d_scaler.step(opt_disc)
    d_scaler.update()

    # training generator for both day and night
    with torch.cuda.amp.autocast():
      d_fake = disc(im_day, fake_day)
      gen_d_loss = bce(d_fake, torch.ones_like(d_fake))
      l1_loss = L1(fake_day, im_day) * 100
      gen_loss = gen_d_loss + l1_loss
    
    g_scaler.scale(gen_loss).backward()
    g_scaler.step(opt_gen)
    g_scaler.update()


    if id == 0:
      print(
          f"Epoch [{e}/{num_epochs}] Batch {id}/{len(data_loader)} \
                Loss D: {disc_loss:.4f}, loss G: {gen_loss:.4f}"
      )
      fixed_noise = torch.randn(batch_size, 3, 256, 256).to(device)
      with torch.no_grad():
          fake_day = gen(im_night)
    
          img_grid_fake_day = torchvision.utils.make_grid(fake_day,
                                                          normalize=True)
          img_grid_day = torchvision.utils.make_grid(im_day, normalize=True)
          img_grid_night = torchvision.utils.make_grid(im_night, normalize=True)
          writer_fake_day.add_image(
              "Fake day images", img_grid_fake_day, global_step=step
          )
          writer_real_day.add_image(
              "Real day images ", img_grid_day, global_step=step
          )
          writer_real_night.add_image(
              "Real night images", img_grid_night, global_step=step
          )
          step += 1

## Saving the model

In [None]:
save_path = '/content/drive/MyDrive/Colab Notebooks/models/'
torch.save(gen.state_dict(), save_path + 'gen/gen.pth')
torch.save(disc.state_dict(), save_path + 'disc/disc.pth')

### Loading the model

In [None]:
test_gen = Generator(3)
test_gen.load_state_dict(torch.load(save_path + 'gen/gen.pth'))
test_disc = torch.load(save_path + 'disc/disc.pth')

In [None]:
writer_test = SummaryWriter('log/test')
step = 0
with torch.no_grad():
  for id, (night) in enumerate(test_loader):
    test_gen = gen.to(device)
    night = night.to(device)

    gen_day = test_gen(night)
    img_grid_fake_day = torchvision.utils.make_grid(gen_day, normalize=True)
    writer_test.add_image("Fake day images", img_grid_fake_day,
                          global_step = step)
    step += 1