In [1]:
import numpy as np
import random
import math
import cv2
import os

from torchsummary import summary
import torch.nn.parallel
import torch.backends.cudnn as cudnn

import torchvision.transforms as transforms
from torchvision.utils import save_image

from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable

import torch.nn as nn
import torch.nn.functional as F
import torch

manualSeed = random.randint(1, 10000)
print("Random Seed: ", manualSeed)
random.seed(manualSeed)
torch.manual_seed(manualSeed)
cudnn.benchmark = True

Random Seed:  5159


## Define Layers

In [2]:
class Residual_Unit_GN(nn.Module):
    def __init__(self, channels, kernel_size, padding):
        super(Residual_Unit_GN, self).__init__()
        
        self.Conv1 = nn.Conv2d(in_channels=channels, out_channels=channels, 
                               kernel_size=kernel_size, stride=1, padding=padding) 
        self.Conv2 = nn.Conv2d(in_channels=channels, out_channels=channels, 
                               kernel_size=kernel_size, stride=1, padding=padding)
        self.BN1   = nn.BatchNorm2d(num_features=channels)
        self.BN2   = nn.BatchNorm2d(num_features=channels)
        self.ReLU  = nn.ReLU()
        
        
    def forward(self, x):
        identity = x
        
        out = self.Conv1(x)
        out = self.BN1(out)
        out = self.ReLU(out)
        out = self.Conv2(out)
        out = self.BN2(out)
        out = out + identity
        
        return out
    

class Conv_Unit_DN(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, strides_1, strides_2, padding):
        super(Conv_Unit_DN, self).__init__()
        
        self.Conv1  = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, 
                                kernel_size=kernel_size, stride=strides_1, padding=padding)
        self.Conv2  = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, 
                                kernel_size=kernel_size, stride=strides_2, padding=padding)
        self.BN1    = nn.BatchNorm2d(num_features=out_channels)
        self.BN2    = nn.BatchNorm2d(num_features=out_channels)
        self.LReLU1 = nn.LeakyReLU()
        self.LReLU2 = nn.LeakyReLU()
        
        
    def forward(self, x):
        out = self.Conv1(x)
        out = self.BN1(out)
        out = self.LReLU1(out)
        out = self.Conv2(out)
        out = self.BN2(out)
        out = self.LReLU2(out)
        
        return out


class Residual_Unit_DN(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding):
        super(Residual_Unit_DN, self).__init__()
        
        first_channels = int(out_channels/2)
        
        self.Conv1  = nn.Conv2d(in_channels=in_channels   , out_channels=first_channels, 
                        kernel_size=kernel_size, stride=1, padding=padding)
        self.Conv2  = nn.Conv2d(in_channels=first_channels, out_channels=out_channels, 
                        kernel_size=kernel_size, stride=1, padding=padding)
        self.BN1    = nn.BatchNorm2d(num_features=first_channels)
        self.BN2    = nn.BatchNorm2d(num_features=out_channels)
        self.LReLU1 = nn.LeakyReLU()

    def forward(self, x):
        identity = x

        out = self.Conv1(x)
        out = self.BN1(out)
        out = self.LReLU1(out)
        out = self.Conv2(out)
        out = self.BN2(out)
        out = out + identity
        
        return out

## Generator

In [3]:
class Generator(nn.Module):
    def __init__(self, ngpu):
        super(Generator, self).__init__()
        self.ngpu = ngpu
        
        self.Conv1   = nn.Conv2d(in_channels=1 , out_channels=64, kernel_size=3, stride=1, padding=1)
        self.Conv2   = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.Conv3   = nn.Conv2d(in_channels=64, out_channels=1 , kernel_size=3, stride=1, padding=1)
        self.ReLU    = nn.ReLU()
        self.BN      = nn.BatchNorm2d(num_features=64)
        self.Tanh    = nn.Tanh()
        self.RUnit1  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit2  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit3  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit4  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit5  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit6  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit7  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit8  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit9  = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit10 = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit11 = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit12 = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit13 = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit14 = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit15 = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        self.RUnit16 = Residual_Unit_GN(channels=64, kernel_size=3, padding=1)
        
    def forward(self, x):
        out = self.Conv1(x)
        out = self.ReLU(out)
        identical = out
        
        out = self.RUnit1(out)
        out = self.RUnit2(out)
        out = self.RUnit3(out)
        out = self.RUnit4(out)
        out = self.RUnit5(out)
        out = self.RUnit6(out)
        out = self.RUnit7(out)
        out = self.RUnit8(out)
        out = self.RUnit9(out)
        out = self.RUnit10(out)
        out = self.RUnit11(out)
        out = self.RUnit12(out)
        out = self.RUnit13(out)
        out = self.RUnit14(out)
        out = self.RUnit15(out)
        out = self.RUnit16(out)
        out = self.Conv2(out)
        out = self.BN(out)
        out = out + identical
        out = self.Conv3(out)
        out = self.Tanh(out)
        
        return out

## Discriminator

In [4]:
class Discriminator(nn.Module):
    def __init__(self, ngpu):
        super(Discriminator, self).__init__()
        self.ngpu = ngpu
        
        self.Conv    = nn.Conv2d(in_channels=1 , out_channels=32, kernel_size=3, stride=1 ,padding=1)
        self.LReLU1  = nn.LeakyReLU()
        self.LReLU2  = nn.LeakyReLU()
        self.Flatten = nn.Flatten()
        self.Dense   = nn.Linear(in_features=128, out_features=2)
        self.Sigmoid = nn.Sigmoid()
        self.CUnit1  = Conv_Unit_DN(in_channels=32 , out_channels=64 , kernel_size=3, 
                                    strides_1=1, strides_2=2, padding=1)
        self.CUnit2  = Conv_Unit_DN(in_channels=64 , out_channels=128, kernel_size=3, 
                                    strides_1=1, strides_2=1, padding=1)
        self.CUnit3  = Conv_Unit_DN(in_channels=128, out_channels=128, kernel_size=3, 
                                    strides_1=1, strides_2=2, padding=1)
        self.CUnit4  = Conv_Unit_DN(in_channels=128, out_channels=256, kernel_size=3, 
                                    strides_1=1, strides_2=1, padding=1)
        self.CUnit5  = Conv_Unit_DN(in_channels=256, out_channels=256, kernel_size=3, 
                                    strides_1=1, strides_2=2, padding=1)
        self.CUnit6  = Conv_Unit_DN(in_channels=256, out_channels=512, kernel_size=3, 
                                    strides_1=1, strides_2=2, padding=1)
        self.RUnit1  = Residual_Unit_DN(in_channels=512, out_channels=128, kernel_size=3, padding=1)
        
    def forward(self, x):
        out = self.Conv(x)
        out = self.LReLU1(out)
        out = self.CUnit1(out)
        out = self.CUnit2(out)
        out = self.CUnit3(out)
        out = self.CUnit4(out)
        out = self.CUnit5(out)
        out = self.CUnit6(out)
        out = self.RUnit1(out)
        out = self.LReLU2(out)
        out = self.Flatten(out)
        out = self.Dense(out)
        out = self.Sigmoid(out)
        
        return out
#         return output.view(-1, 1).squeeze(1)

## Data Loader

In [5]:
class Image(Dataset):

    def __init__(self, root_dir):
        self.path_dcp = os.path.join(root_dir, "Decompressed")
        self.path_ori = os.path.join(root_dir, "Original_pgm")
        
        self.dcp_name = os.listdir(self.path_dcp)
        self.dcp_name.sort()
        
    def __len__(self):
        return len(self.dcp_name)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        img_dcp_loc = os.path.join(self.path_dcp, self.dcp_name[idx])
        img_ori_loc = os.path.join(self.path_ori, self.dcp_name[idx])
        img_dcp = cv2.imread(img_dcp_loc, 0).reshape(1,64,64)
        img_ori = cv2.imread(img_ori_loc, 0).reshape(1,64,64)
        sample = {'Dcp': img_dcp, 'Ori': img_ori}
        
        return sample

In [6]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv2d') != -1:
        m.weight.data.normal_(0.0, 0.02)
    elif classname.find('BatchNorm2d') != -1:
        m.weight.data.normal_(1.0, 0.02)
        m.bias.data.fill_(0)

## Start Training

In [None]:
## 
Workers = 64
BatchSize = 64
num_epochs = 5
LR = 0.0002
Beta1 = 0.8
NGpu = 1


# Step 1: 讀取訓練資料（受損 / 原圖）
train_path = "/home/mj/HardDisk/Github/Image_Compressor/Dataset/Training_Data"  ## 受損影像路徑
# valid_path = "../../Dataset/Validation_Data"  ## 原圖影像路徑
train_data_loader = DataLoader(Image(root_dir = train_path), batch_size=BatchSize, shuffle=True, num_workers=Workers)
# valid_data_loader = DataLoader(Image(valid_path), batch_size=BatchSize, shuffle=True, num_workers=Workers)


# Step 2: 讀取 Models
print("[*] Loading Models")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
netG = Generator(NGpu).to(device)
netG.apply(weights_init)
# print(netG)

netD = Discriminator(NGpu).to(device)
netD.apply(weights_init)
# print(netD)


# Step 3: 初始化參數配置
print("[*] Initialized Parameters")
optimizer_G = torch.optim.Adam(netG.parameters(), lr=LR, betas=(Beta1, 0.999))  
loss_MSE = nn.MSELoss()

# Step : 先稍微訓練 Generative Network
torch.cuda.empty_cache()
print("[*] Start Training Generative Network")
for epoch in range(num_epochs):
    for i, data in enumerate(train_data_loader, 0):
        # clear out the gradients of all Variables
        optimizer_G.zero_grad()
        dcp, ori = data['Dcp'].to(device, dtype=torch.float, non_blocking=True, copy=False), data['Ori'].to(device, dtype=torch.float, non_blocking=True, copy=False)
        
        output = netG(dcp)
        loss = loss_MSE(output, ori)
        loss.backward()
        optimizer_G.step()

        if i % 10 == 0:
            print('[%d/%d][%.4f]\tLoss_D: %.4f'% (epoch, num_epochs, i/len(train_data_loader), loss.item()))        
        
#         if i % 10 == 0:
#             print('[%d/%d][%d/%d]\tLoss_D: %.4f\tLoss_G: %.4f\tD(x): %.4f\tD(G(z)): %.4f / %.4f'
#                   % (epoch, num_epochs, i, len(dataloader),
#                      errD.item(), errG.item(), D_x, D_G_z1, D_G_z2))


# Step : 加入 Discriminiative Network 一起訓練， 儲存權重

# Step : 儲存 Final 權重

[*] Loading Models
[*] Initialized Parameters
[*] Start Training Generative Network
[0/5][0.0000]	Loss_D: 16522.7949
[0/5][0.0003]	Loss_D: 14125.6680
[0/5][0.0006]	Loss_D: 14385.7305
[0/5][0.0009]	Loss_D: 16061.1299
[0/5][0.0013]	Loss_D: 18085.2363
[0/5][0.0016]	Loss_D: 17520.2695
[0/5][0.0019]	Loss_D: 14496.9287
[0/5][0.0022]	Loss_D: 17634.6016
[0/5][0.0025]	Loss_D: 16113.0693
[0/5][0.0028]	Loss_D: 20487.5352
[0/5][0.0031]	Loss_D: 17034.4609
[0/5][0.0034]	Loss_D: 16957.4570
[0/5][0.0038]	Loss_D: 16945.7969
[0/5][0.0041]	Loss_D: 14388.5361


## 歷史的眼淚

In [None]:
# def Residual_Unit_GN(x):
#     ## Connect Layers
#     identity = x

#     out = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)(x)
#     out = nn.BatchNorm2d(num_features=64)(out)
#     out = nn.ReLU()(out)
#     out = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1)(out)
#     out = nn.BatchNorm2d(num_features=64)(out)
#     out = out + identity

#     return out

# def Conv_Unit_DN(x, in_channels, out_channels, strides_1, strides_2):
#     out = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=strides_1)(x)
#     out = nn.BatchNorm2d(num_features=out_channels_1)(out)
#     out = nn.LeakyReLU()(out)
#     out = nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=strides_2)(x)
#     out = nn.BatchNorm2d(num_features=out_channels_1)(out)
#     out = nn.LeakyReLU()(out)
    
#     return out
    
# def Residual_Unit_DN(x):
#     identity = x
    
#     out = nn.Conv2d(in_channels=512, out_channels=64, kernel_size=3, stride=1)(x)
#     out = nn.BatchNorm2d(num_features=64)(out)
#     out = nn.LeakyReLU()(out)
#     out = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1)(out)
#     out = nn.BatchNorm2d(num_features=64)(out)
#     out = out + identity
    
#     return out 