In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
path = "./gdrive/MyDrive/iitisoc/"

In [None]:
import os
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image

class ImageDataset(Dataset):
    def __init__(self, clean_dir, noisy_dir):
        super(ImageDataset, self).__init__()
        self.clean_file_filenames = [os.path.join(clean_dir, x) for x in os.listdir(clean_dir)]
        self.noisy_file_filenames = [os.path.join(noisy_dir, x) for x in os.listdir(noisy_dir)]
        self.transform = transforms.ToTensor()
    def __getitem__(self, index):
        clean = Image.open(self.clean_file_filenames[index])
        noisy = Image.open(self.noisy_file_filenames[index])
        return self.transform(noisy), self.transform(clean)
    def __len__(self):
        return len(self.clean_file_filenames)

In [None]:
import torch.nn as nn

class DnCNN(nn.Module):
    def __init__(self):
        super(DnCNN, self).__init__()

        self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 64, kernel_size = 3, padding = 1, bias = False)
        self.relu1 = nn.ReLU(inplace=False)

        layers = []
        for i in range(10):
            layers.append(nn.Conv2d(in_channels = 64, out_channels = 64, kernel_size = 3, padding = 1, bias = False))
            layers.append(nn.BatchNorm2d(64))
            layers.append(nn.ReLU(inplace=True))
        
        self.layers = nn.Sequential(*layers)
        self.conv2 = nn.Conv2d(in_channels = 64, out_channels = 3, kernel_size = 3, padding = 1, bias = False)
    
    def forward(self, x):
        x = self.conv2(self.layers(self.relu1(self.conv1(x))))
        return x

In [None]:
import torch
from torch.utils.data import DataLoader
import torch.optim as optim
from torchvision import transforms

from math import log10
import matplotlib.pyplot as plt


train_dataset = ImageDataset(path + "/large/clean/", path + "/large/noisy/")
test_dataset = ImageDataset(path + "/large/test_clean/", path + "/large/test_noisy/")

batch_size = 1

training_data_loader = DataLoader(dataset=train_dataset, batch_size=batch_size)
testing_data_loader = DataLoader(dataset=test_dataset, batch_size=batch_size)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = DnCNN().to(device)

criterion = nn.MSELoss()
lr = 1e-4
optimizer = optim.SGD(model.parameters(), lr=lr)


def train(epoch):
    epoch_loss = 0
    for i, data in enumerate(training_data_loader, 1):
        input = data[0].cuda()
        target = data[1].cuda()
        out = model(input)

        loss = criterion(out, target)
        epoch_loss += loss

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        print("===> Epoch[{}]({}/{}): Loss: {:.4f}".format(epoch, i, len(training_data_loader), loss.item()))
    
    print("===> Epoch {} Complete: Avg. Loss: {:.4f}".format(epoch, epoch_loss / len(training_data_loader)))

def validate():
    avg_psnr = 0
    model.eval()
    with torch.no_grad():
        for data in testing_data_loader:
            input = data[0].cuda()
            output = model(input)
            mse = criterion(output, input)
            psnr = 10 * log10(1 / mse.item())
            avg_psnr += psnr
    print("===> Avg. PSNR: {:.4f} dB".format(avg_psnr / len(testing_data_loader)))
    model.train()


def save(state, epoch):
    model_out_path = "./model_epoch_{}.pth".format(epoch)
    torch.save(state, model_out_path)
    print("Checkpoint saved to {}".format(model_out_path))

def training():
    checkpoint = torch.load("./model_epoch_50.pth")
    model.load_state_dict(checkpoint['state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer'])
    epoch = checkpoint['epoch']

    num_epochs = 50
    for epoch in range(51, num_epochs + 51):
        train(epoch)
        if epoch % 10 == 0:
            validate()
            save({
                'epoch': epoch + 1,
                'arch': model,
                'state_dict': model.state_dict(),
                'optimizer' : optimizer.state_dict(),
            }, epoch)

def testing():
    for image_number in range(320,330):
        trans = transforms.ToPILImage()
        model = torch.load("./model_epoch_20.pth")
        model = model['arch']
        loader = transforms.Compose([
            transforms.ToTensor()
        ])
        image = Image.open(f"../new_data/final_test/noisy_{image_number}.jpg")
        image = loader(image).float()
        print(image.shape)
        image = image.unsqueeze(0)
        out = torch.clamp(image-model(image), 0., 1.)
        out = out.squeeze(0)
        print(out.shape)
        trans(out).save(f'../new_data/final_test/clean_{image_number}.jpg')

In [None]:
training()

===> Epoch[51](1/32): Loss: 0.1045
===> Epoch[51](2/32): Loss: 0.1144
===> Epoch[51](3/32): Loss: 0.1015
===> Epoch[51](4/32): Loss: 0.0982
===> Epoch[51](5/32): Loss: 0.1143
===> Epoch[51](6/32): Loss: 0.1191
===> Epoch[51](7/32): Loss: 0.1192
===> Epoch[51](8/32): Loss: 0.1243
===> Epoch[51](9/32): Loss: 0.1172
===> Epoch[51](10/32): Loss: 0.1298
===> Epoch[51](11/32): Loss: 0.1173
===> Epoch[51](12/32): Loss: 0.1188
===> Epoch[51](13/32): Loss: 0.1217
===> Epoch[51](14/32): Loss: 0.1278
===> Epoch[51](15/32): Loss: 0.1044
===> Epoch[51](16/32): Loss: 0.0964
===> Epoch[51](17/32): Loss: 0.0977
===> Epoch[51](18/32): Loss: 0.1024
===> Epoch[51](19/32): Loss: 0.1002
===> Epoch[51](20/32): Loss: 0.1285
===> Epoch[51](21/32): Loss: 0.1247
===> Epoch[51](22/32): Loss: 0.1310
===> Epoch[51](23/32): Loss: 0.1370
===> Epoch[51](24/32): Loss: 0.1209
===> Epoch[51](25/32): Loss: 0.1288
===> Epoch[51](26/32): Loss: 0.1170
===> Epoch[51](27/32): Loss: 0.1109
===> Epoch[51](28/32): Loss: 0.1164
=

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")