In [1]:
# Deep Learning Libraries
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.utils.data as data
from torchinfo import summary

# Data Manipulation and Analysis
import numpy as np
import pandas as pd
import collections # A module providing alternative data structures like named tuples, defaultdict, Counter, etc., compared to built-in Python containers.
import random

# Data Visualization
import matplotlib
import matplotlib.pyplot as plt
import seaborn

# File and System Interaction
import glob
import os
from pathlib import Path
import shutil

# Scientific Computing and Math
import math
import cmath

# Date and Time Handling
import time
import datetime

# Linear Algebra
from torch import linalg as LA
from python_scripts import generate_synthetic_data
from python_scripts import format_data
from python_scripts import dataset_processing


In [2]:
# Generate Data
M_train, M_Omega_train, M_test, M_Omega_test = generate_synthetic_data.generate(160, 320, 10, 40, 20, 0.45, 9)

# Each training/test set is of the shape (batch_size, width, height)
# Convert it to (batch_size, 1, width, height)

M_train, M_Omega_train, M_test, M_Omega_test = M_train[:, np.newaxis, :, :], M_Omega_train[:, np.newaxis, :, :], M_test[:, np.newaxis, :, :], M_Omega_test[:, np.newaxis, :, :]

print(f'M_train.shape: {M_train.shape}, M_Omega_train.shape: {M_Omega_train.shape}, M_test.shape: {M_test.shape}, M_Omega_test.shape: {M_Omega_test.shape}')

# Format and Save Data
format_data.format(M_train, M_Omega_train, M_test, M_Omega_test)

# Create DataLoaders
train_dataset = dataset_processing.ImageDataset(40, (160, 320), 0)
train_loader = data.DataLoader(train_dataset, batch_size = 5, shuffle = True)
test_dataset = dataset_processing.ImageDataset(20, (160, 320), 1)
test_loader = data.DataLoader(test_dataset, batch_size = 5, shuffle = True)

M_train.shape: (40, 1, 160, 320), M_Omega_train.shape: (40, 1, 160, 320), M_test.shape: (20, 1, 160, 320), M_Omega_test.shape: (20, 1, 160, 320)


In [10]:
import torch.nn as nn
import torch.nn.functional as F

# define the NN architecture
class ConvDenoiser(nn.Module):
    def __init__(self):
        super(ConvDenoiser, self).__init__()
        ## encoder layers ##
        # conv layer (depth from 1 --> 32), 3x3 kernels
        self.conv1 = nn.Conv2d(1, 32, 3, padding = 1)  
        # conv layer (depth from 32 --> 16), 3x3 kernels
        self.conv2 = nn.Conv2d(32, 16, 3, padding = 1)
        # conv layer (depth from 16 --> 8), 3x3 kernels
        self.conv3 = nn.Conv2d(16, 8, 3, padding = 1)
        # pooling layer to reduce x-y dims by two; kernel and stride of 2
        self.pool = nn.MaxPool2d(2, 2)
        
        ## decoder layers ##
        # transpose layer, a kernel of 2 and a stride of 2 will increase the spatial dims by 2
        self.t_conv1 = nn.ConvTranspose2d(8, 8, 2, stride = 2)  # kernel_size=3 to get to a 7x7 image output
        # two more transpose layers with a kernel of 2
        self.t_conv2 = nn.ConvTranspose2d(8, 16, 2, stride = 2)
        self.t_conv3 = nn.ConvTranspose2d(16, 32, 2, stride = 2)
        # one, final, normal conv layer to decrease the depth
        self.conv_out = nn.Conv2d(32, 1, 3, padding = 1)



    def forward(self, x):
        ## encode ##
        # add hidden layers with relu activation function
        # and maxpooling after
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        # add second hidden layer
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        # add third hidden layer
        x = F.relu(self.conv3(x))
        x = self.pool(x)  # compressed representation
        
        ## decode ##
        # add transpose conv layers, with relu activation function
        x = F.relu(self.t_conv1(x))
        x = F.relu(self.t_conv2(x))
        x = F.relu(self.t_conv3(x))
        # transpose again, output should have a sigmoid applied
        x = F.sigmoid(self.conv_out(x))
                
        return x

# initialize the NN
model = ConvDenoiser()
print(model)

ConvDenoiser(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv2): Conv2d(32, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (conv3): Conv2d(16, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (t_conv1): ConvTranspose2d(8, 8, kernel_size=(2, 2), stride=(2, 2))
  (t_conv2): ConvTranspose2d(8, 16, kernel_size=(2, 2), stride=(2, 2))
  (t_conv3): ConvTranspose2d(16, 32, kernel_size=(2, 2), stride=(2, 2))
  (conv_out): Conv2d(32, 1, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
)


In [11]:
# Visulase Model Layers
summary(model, input_size = [40, 1, 160, 320], col_names = ["input_size", "output_size", "num_params", "trainable"])

Layer (type:depth-idx)                   Input Shape               Output Shape              Param #                   Trainable
ConvDenoiser                             [40, 1, 160, 320]         [40, 1, 160, 320]         --                        True
├─Conv2d: 1-1                            [40, 1, 160, 320]         [40, 32, 160, 320]        320                       True
├─MaxPool2d: 1-2                         [40, 32, 160, 320]        [40, 32, 80, 160]         --                        --
├─Conv2d: 1-3                            [40, 32, 80, 160]         [40, 16, 80, 160]         4,624                     True
├─MaxPool2d: 1-4                         [40, 16, 80, 160]         [40, 16, 40, 80]          --                        --
├─Conv2d: 1-5                            [40, 16, 40, 80]          [40, 8, 40, 80]           1,160                     True
├─MaxPool2d: 1-6                         [40, 8, 40, 80]           [40, 8, 20, 40]           --                        --
├─ConvTra

---
## Training

We are only concerned with the training images, which we can get from the `train_loader`.

>In this case, we are actually **adding some noise** to these images and we'll feed these `noisy_imgs` to our model. The model will produce reconstructed images based on the noisy input. But, we want it to produce _normal_ un-noisy images, and so, when we calculate the loss, we will still compare the reconstructed outputs to the original images!

Because we're comparing pixel values in input and output images, it will be best to use a loss that is meant for a regression task. Regression is all about comparing quantities rather than probabilistic values. So, in this case, I'll use `MSELoss`. And compare output images and input images as follows:
```
loss = criterion(outputs, images)
```

In [12]:
# specify loss function
criterion = nn.MSELoss()

# specify loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [13]:
for badata in train_loader:
    print(data[0].shape, data[1].shape)

torch.Size([5, 160, 320]) torch.Size([5, 160, 320])
torch.Size([5, 160, 320]) torch.Size([5, 160, 320])
torch.Size([5, 160, 320]) torch.Size([5, 160, 320])
torch.Size([5, 160, 320]) torch.Size([5, 160, 320])
torch.Size([5, 160, 320]) torch.Size([5, 160, 320])
torch.Size([5, 160, 320]) torch.Size([5, 160, 320])
torch.Size([5, 160, 320]) torch.Size([5, 160, 320])
torch.Size([5, 160, 320]) torch.Size([5, 160, 320])


In [14]:
# number of epochs to train the model
n_epochs = 20

# for adding noise to lowrank
noise_factor=0.5

for epoch in range(1, n_epochs+1):
    # monitor training loss
    train_loss = 0.0
    
    ###################
    # train the model #
    ###################
    for batch, (tensor) in train_loader:
        # _ stands in for labels, here
        # no need to flatten lowrank
        lowrank, groundtruth = tensor[0], tensor[1]
        lowrank, groundtruth = lowrank.unsqueeze(0), groundtruth.unsqueeze(0)
        # print(lowrank.shape, groundtruth.shape)
        # clear the gradients of all optimized variables
        optimizer.zero_grad()
        ## forward pass: compute predicted outputs by passing *noisy* lowrank to the model
        outputs = model(lowrank)
        # calculate the loss
        # the "target" is still the original, not-noisy lowrank
        loss = criterion(outputs, groundtruth)
        # backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        # update running training loss
        train_loss += loss.item()*lowrank.size(0)
            
    # print avg training statistics 
    train_loss = train_loss/len(train_loader)
    print('Epoch: {} \tTraining Loss: {:.6f}'.format(
        epoch, 
        train_loss
        ))

Epoch: 1 	Training Loss: 0.227465
Epoch: 2 	Training Loss: 0.208176
Epoch: 3 	Training Loss: 0.186808
Epoch: 4 	Training Loss: 0.159951
Epoch: 5 	Training Loss: 0.124309
Epoch: 6 	Training Loss: 0.072996
Epoch: 7 	Training Loss: 0.016767
Epoch: 8 	Training Loss: 0.001155
Epoch: 9 	Training Loss: 0.000608
Epoch: 10 	Training Loss: 0.000676
Epoch: 11 	Training Loss: 0.000601
Epoch: 12 	Training Loss: 0.000677
Epoch: 13 	Training Loss: 0.000669
Epoch: 14 	Training Loss: 0.000615
Epoch: 15 	Training Loss: 0.000610
Epoch: 16 	Training Loss: 0.000652
Epoch: 17 	Training Loss: 0.000631
Epoch: 18 	Training Loss: 0.000643
Epoch: 19 	Training Loss: 0.000659
Epoch: 20 	Training Loss: 0.000681


In [17]:
# Evaluation/Inference Function

# Move values to device
def eval_model(model: torch.nn.Module,
               data_loader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module):

    loss = 0

    model.eval()
    with torch.inference_mode():
      for batch, tensor in data_loader:
        # Send data to the target device
        lowrank, groundtruth = tensor[0], tensor[1]
        lowrank, groundtruth = lowrank.unsqueeze(0), groundtruth.unsqueeze(0)
        
        outputs = model(lowrank)
        # calculate the loss
        # the "target" is still the original, not-noisy lowrank
        loss = criterion(outputs, groundtruth)

        # update running training loss
        loss += loss.item() * lowrank.size(0)
            
      # print avg training statistics 
      loss = loss/len(data_loader)
      print('Testing Loss: {:.6f}'.format(
          loss
          ))

        

In [18]:
eval_model(model = model, data_loader = test_loader, loss_fn = criterion)

Testing Loss: 0.000000
Testing Loss: 0.000000
Testing Loss: 0.000000
Testing Loss: 0.000000


In [19]:
low, ground = next(iter(test_loader))
low.shape, ground.shape

(torch.Size([5, 160, 320]), torch.Size([5, 160, 320]))

In [25]:
torch.unique(ground[0])

tensor([0.])

In [21]:
for i in range(5):
    lowrank, groundtruth = low[i], ground[i]
    lowrank, groundtruth = lowrank.unsqueeze(0), groundtruth.unsqueeze(0)

    # Forward Pass
    output = model.forward(lowrank)

    # Loss
    loss = criterion(outputs, groundtruth)

    print(f'Loss of sample {i} = {loss}')

Loss of sample 0 = 1.256795627568863e-07
Loss of sample 1 = 1.256795627568863e-07
Loss of sample 2 = 1.256795627568863e-07
Loss of sample 3 = 1.256795627568863e-07
Loss of sample 4 = 1.256795627568863e-07
