In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
!nvidia-smi

Tue Nov 26 14:06:08 2019       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.33.01    Driver Version: 418.67       CUDA Version: 10.1     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla K80           Off  | 00000000:00:04.0 Off |                    0 |
| N/A   71C    P0    88W / 149W |      0MiB / 11441MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|  No ru

In [0]:
import os
import glob
import torch
import torch.nn as nn
import torch.utils as utils
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
import torchvision.datasets as dset
import torchvision.transforms as transforms
from torchvision.utils import save_image
import numpy as np
import matplotlib.pyplot as plt
from numpy import newaxis

import imgaug as ia
from imgaug import augmenters as iaa
from skimage import io, transform

In [0]:
# import warnings
# warnings.filterwarnings("ignore")
# plt.ion()

In [0]:
class DirtyDocumentsDataset(Dataset):
    def __init__(self, dirty_dir, clean_dir, transform=None):
        self.dirty_dir = dirty_dir
        self.clean_dir = clean_dir
        self.transform = transform
    
    def __len__(self):
        return len(os.listdir(self.dirty_dir))
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
            
        train_names = sorted(glob.glob(self.dirty_dir + "*.png"))
        train_image = io.imread(train_names[idx], as_gray=True)

        clean_names = sorted(glob.glob(self.clean_dir + "*.png"))
        clean_image = io.imread(clean_names[idx], as_gray=True)

        train_image = train_image[:,:,newaxis]
        clean_image = clean_image[:,:,newaxis]

        sample = { 'train_image' : train_image, 'clean_image':clean_image}

        if self.transform:
           sample = self.transform(sample)

        return sample

In [0]:
class DirtyDocumentsDataset_Test(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
    
    def __len__(self):
        return len(os.listdir(self.root_dir))
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_names = sorted(glob.glob(self.root_dir + "*.png"))
        img_name = img_names[idx]
        image = io.imread(img_name)

        image = image[:,:,newaxis]

        sample = { 'image' : image}

        if self.transform:
           sample = self.transform(sample)

        return sample

In [0]:
# Test Loader Transform classes
class Rescale_Test(object):
    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size
    
    def __call__(self, sample):
        image = sample['image']

        h, w = image.shape[:2]
        if isinstance(self.output_size, int):
            if h > self.output_size:
                new_h = self.output_size
            else:
                new_h = self.output_size
        else:
            new_h, new_w = self.output_size
        
        img = transform.resize(image, (new_h,new_w))

        return {'image':img}
    
class ToTensor_Test(object):
    def __call__(self, sample):
        image = sample['image']

        image = image.transpose((2,0,1))
        return { 'image' : torch.from_numpy(image)}

In [0]:
# Train Loader Transfrom classes
class Rescale(object):
    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size
    
    def __call__(self, sample):
        train_image = sample['train_image']
        clean_image = sample['clean_image']

        h, w = train_image.shape[:2]
        if isinstance(self.output_size, int):
            if h > self.output_size:
                new_h = self.output_size
            else:
                new_h = self.output_size
        else:
            new_h, new_w = self.output_size
        
        tr_img = transform.resize(train_image, (new_h,new_w))
        cl_img = transform.resize(clean_image, (new_h,new_w))

        return {'train_image':tr_img, 'clean_image':cl_img}
    
class ToTensor(object):
    def __call__(self, sample):
        train_image = sample['train_image']
        clean_image = sample['clean_image']

        train_image = train_image.transpose((2,0,1))
        clean_image = clean_image.transpose((2,0,1))
        return { 'train_image' : torch.from_numpy(train_image),'clean_image' : torch.from_numpy(clean_image)}

class RandomCrop:
    def __init__(self, size, threshold):
        assert isinstance(size,(tuple))
        self.size = size
        self.threshold = threshold

    def __call__(self, sample):
        p = np.random.rand(1)

        if p <= self.threshold:
            clean_image = sample['clean_image']
            dirty_image = sample['train_image']

            clean_img = np.array(clean_image)
            dirty_img = np.array(dirty_image)

            x, y = clean_img.shape[:2]

            x = x - 1 - self.size[0]
            y = y - 1 - self.size[1]
            min_x = int(np.random.rand(1) * x)
            min_y = int(np.random.rand(1) * y)
            max_x = min_x + self.size[0]
            max_y = min_y + self.size[1]

            clean_crop = clean_img[min_x:max_x, min_y:max_y]
            dirty_crop = dirty_img[min_x:max_x, min_y:max_y]
        else:
            clean_crop = sample['clean_image']
            dirty_crop = sample['train_image']

        return {'clean_image' : clean_crop, 'train_image' : dirty_crop}

class ImgAugTransform:
    def __init__(self):
        self.sometimes = lambda aug: iaa.Sometimes(0.25,aug)
        self.aug = iaa.Sequential([
            self.sometimes(iaa.GaussianBlur(sigma=(0,3.0)))
        ])

    def __call__(self, sample):
        clean_image = sample['clean_image']
        dirty_image = sample['train_image']

        clean_img = np.array(clean_image)
        dirty_img = np.array(dirty_image)

        return {'clean_image' : clean_img, 'train_image' : self.aug.augment_image(dirty_img)}

In [0]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1,8,3,padding=1),
            nn.BatchNorm2d(8),     # batch x 8 x 256 x 540
            nn.ReLU(),
            nn.Conv2d(8,16,3,padding=1),
            nn.BatchNorm2d(16),    # batch x 16 x 256 x 540
            nn.ReLU(),
            nn.Conv2d(16,32,3,padding=1),
            nn.BatchNorm2d(32),   # batch x 32 x 256 x 540
            nn.ReLU(),
            nn.Conv2d(32,64,3,padding=1),
            nn.BatchNorm2d(64),   # batch x 64 x 256 x 540
            nn.ReLU(),
            nn.MaxPool2d(2,2)     # batch x 64 x 128 x 270
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(64,128,3,padding=1),
            nn.BatchNorm2d(128),  # batch x 64 x 128 x 270
            nn.ReLU(),
            nn.Conv2d(128,128,3,padding=1),
            nn.BatchNorm2d(128), # batch x 128 x 128 x 270
            nn.ReLU(),
            nn.MaxPool2d(2,2),   # batch x 128 x 64x 135
            nn.Conv2d(128,256,3,padding=1), # batch x 256 x 64 x 135
            nn.ReLU(),
            #nn.BatchNorm2d(256)
        )

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        # out = out.view(batch_size, -1)
        return out

In [0]:
class Decoder(nn.Module):
    def __init__(self):
        super(Decoder,self).__init__()
        self.layer1 = nn.Sequential(
            nn.ConvTranspose2d(256,128,3,2,padding=1,output_padding=1),# batch * 128 * 128 * 270
            nn.BatchNorm2d(128), 
            nn.ReLU(),
            nn.ConvTranspose2d(128,128,3,1,1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.ConvTranspose2d(128,64,3,1,1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64,32,3,1,1),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )
        self.layer2 = nn.Sequential(
            nn.ConvTranspose2d(32,16,3,1,1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.ConvTranspose2d(16,8,3,1,1),
            nn.BatchNorm2d(8),
            nn.ReLU(),
            nn.ConvTranspose2d(8,1,3,2,1,1),
            nn.ReLU()
        )
    
    def forward(self, x):
        # out = x.view(batch_size,256,65,135)
        out = self.layer1(x)
        out = self.layer2(out)
        return out

In [0]:
# Set Hyperparameters and DataLoader

epoch = 1000
batch_size = 20
learning_rate = 0.0005

composed_test = transforms.Compose([ Rescale_Test((256,540)), ToTensor_Test()])
composed = transforms.Compose([ImgAugTransform(), RandomCrop((256,256),0.25),Rescale((256,540)),ToTensor()])

root_dir = "./drive/My Drive/Colab Notebooks/Denoising_test/"

train_dataset = DirtyDocumentsDataset(dirty_dir=root_dir+"train/",clean_dir=root_dir+"train_cleaned/", transform=composed)
test_dataset = DirtyDocumentsDataset_Test(root_dir=root_dir+"test/", transform=composed_test)

train_loader = DataLoader(dataset=train_dataset, batch_size= batch_size)
test_loader = DataLoader(dataset=test_dataset, batch_size= batch_size)

encoder = Encoder().cuda()
decoder = Decoder().cuda()

In [0]:
# loss func and optimizer

parameters = list(encoder.parameters())+ list(decoder.parameters())
loss_func = nn.MSELoss()
optimizer = torch.optim.Adam(parameters, lr=learning_rate)

In [0]:
# train encoder and decoder

# try:
#     encoder, decoder = torch.load('./drive/My Drive/Colab Notebooks/Denoising_test/models/denoising_autoencoder_1000.pkl')
#     print("\n--------model restored--------\n")
# except:
#     pass

In [0]:
min_loss = 1

for i in range(epoch):
    for image_pair in train_loader:
        train_image = image_pair['train_image']
        train_image = Variable(train_image.float()).cuda()
        clean_image = image_pair['clean_image']
        clean_image = Variable(clean_image.float()).cuda()

        optimizer.zero_grad()

        output = encoder(train_image)
        output = decoder(output)

        loss = loss_func(output,clean_image)
        loss.backward()
        optimizer.step()
    
    if min_loss > loss:
        torch.save([encoder,decoder],'./drive/My Drive/Colab Notebooks/Denoising_test/models/denoising_autoencoder_test1.pkl')
        min_loss = loss
    
    #save_image(output[3,:,:,:],'./drive/My Drive/Colab Notebooks/model/training/{}.png'.format(i))
    # Sam is the number of the last batch

    print("Epoch ", i, ": ", loss)

  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "
  "type " + obj.__name__ + ". It won't be checked "


Epoch  0 :  tensor(0.2262, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  1 :  tensor(0.1816, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  2 :  tensor(0.1522, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  3 :  tensor(0.1320, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  4 :  tensor(0.1170, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  5 :  tensor(0.0993, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  6 :  tensor(0.0885, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  7 :  tensor(0.0712, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  8 :  tensor(0.0677, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  9 :  tensor(0.0645, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  10 :  tensor(0.0514, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  11 :  tensor(0.0492, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  12 :  tensor(0.0408, device='cuda:0', grad_fn=<MseLossBackward>)
Epoch  13 :  tensor(0.0401, device='cuda:0', grad_fn=<MseLossBackward>)
Ep

In [0]:
# test encoder and decoder

try:
    encoder, decoder = torch.load('./drive/My Drive/Colab Notebooks/Denoising_test/models/denoising_autoencoder_test1.pkl')
    print("\n--------model loaded--------\n")
    flag = 0
except:
    print("\nmodel does not exist\n")
    flag = 1

test_loader = DataLoader(dataset=test_dataset, batch_size= 10)

if flag == 0:
    j = 0
    for image_test in test_loader:
        image_test = image_test['image']
        image_test = Variable(image_test.float()).cuda()

        output = encoder(image_test)
        output = decoder(output)

        for i in range(image_test.size(0)):
            save_image(image_test[i,:,:,:],'./drive/My Drive/Colab Notebooks/Denoising_test/test_results/test/{}.png'.format(j*batch_size+i))
        for i in range(output.size(0)):
            save_image(output[i,:,:,:],'./drive/My Drive/Colab Notebooks/Denoising_test/test_results/output_test1/{}.png'.format(j*batch_size+i))

        # for inp, out in zip(image_test, output):
        #     inp = inp.cpu()
        #     inp = inp.data.numpy()
        #     plt.title("Test Sample #{}".format(i))
        #     plt.imshow(inp[0],cmap='gray')
        #     plt.show()

        #     out = out.cpu()
        #     out = out.data.numpy()
        #     plt.title("Output Sample #{}".format(i))
        #     plt.imshow(out[0],cmap='gray')
        #     plt.show()

        #     i = i +1
        j= j + 1