# Auto-encoders for image compression, generation and denoising



### Imports library and data

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import os
import pickle
#Import pytorch:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import nibabel as nib
from sklearn.model_selection import train_test_split
from skimage.metrics import peak_signal_noise_ratio as psnr
import glob
def FileRead(file_path):
    data = nib.load(file_path).get_fdata()
    return data[:,:,:]
def normalize(x):
    x = (x-np.min(x)) / (np.max(x)-np.min(x))
    return x
def denormalize(x):
    x = x* (np.max(x)-np.min(x))+np.min(x) 
    return x
path='./data'
listFiles = glob.glob('./data/*.nii')
totalData = np.dstack([normalize(FileRead(file)) for file in listFiles])
totalData = np.rollaxis(totalData, -1)
totalData = np.expand_dims(totalData, axis=-1)
X_train, X_test = train_test_split(totalData, test_size=0.1, random_state=42)
X_train=torch.tensor( np.squeeze(X_train))
X_test=torch.tensor( np.squeeze(X_test))
print('shape X train : ', X_train.shape)
print('shape X test : ', X_test.shape)

### Denoising dataset preparation

In [None]:
class AutoEncoderCNN(nn.Module):
    def __init__(self, C, embedding_dim):
        super(AutoEncoderCNN, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(1, C, kernel_size = 5, stride = 1), 
            nn.ReLU(),
            nn.Conv2d(C, C, kernel_size = 5, stride = 2), 
            nn.ReLU(),
            nn.Conv2d(C, C, kernel_size = 3, stride = 2), 
            nn.ReLU(),
            nn.Conv2d(C, embedding_dim, kernel_size = 4, stride = 1)   
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(embedding_dim, C, kernel_size = 4, stride = 1), 
            nn.ReLU(),
            nn.ConvTranspose2d(C, C, kernel_size = 3, stride = 2, output_padding = 1),  
            nn.ReLU(),
            nn.ConvTranspose2d(C, C, kernel_size = 5, stride = 2, output_padding = 1),  
            nn.ReLU(),
            nn.ConvTranspose2d(C, 1, kernel_size = 5, stride = 1) 
        )  
    def encode(self, x):
        x = encoder(x)
        return x
            
    def decode(self, x):
        x = decoder(x)
        return x
            
    def forward(self, x):
        x = self.encoder(x[:,None,:,:])
        x = self.decoder(x)
        x = x.reshape((x.shape[0],256,256))        
        return x   

In [None]:
def display(losses,label='Training loss function'):
    # Display loss evolution
    fig, axes = plt.subplots(figsize=(8,6))
    axes.plot(losses,'r-',lw=2,label=label)
    axes.set_xlabel('N iterations',fontsize=18)
    axes.set_ylabel('Loss',fontsize=18)
    plt.legend(loc='upper right',fontsize=16)

In [None]:
# Noise level (Modify here)
noise_scale = 0.1

# Add noise
noise = np.random.normal(loc=0
                         , scale=noise_scale, size=X_train.shape)
X_train_noisy = X_train + noise
X_train_noisy = np.clip(X_train_noisy, 0., 1.).type(torch.FloatTensor).float()
print(type(X_train_noisy))
noise = np.random.normal(loc=0, scale=noise_scale, size=X_test.shape)
X_test_noisy = X_test + noise
X_test_noisy = np.clip(X_test_noisy, 0., 1.).type(torch.FloatTensor).float()

# Datasets building
train_Set_Denoise = TensorDataset(X_train_noisy, X_train)# FILL HERE
test_Set_Denoise = TensorDataset(X_test_noisy, X_test)# FILL HERE


### Building the model

In [None]:
print(torch.__version__)
print(torch.cuda.is_available())
# Check if CUDA is available and set the device accordingly
print("CUDA Available: ",torch.cuda.is_available())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# Create the model
denoiser = AutoEncoderCNN(C=16,embedding_dim=32)
# Move your model to the device
denoiser = denoiser.to(device)

print(denoiser)

### Train

In [None]:
# Hyperparameters
N_epochs = 500
batch_size = 8
learning_rate = 0.0005
loss_function = nn.MSELoss()# FILL HERE
# Dataloader
trainLoader = DataLoader(train_Set_Denoise, batch_size=batch_size,shuffle=True, num_workers=0)
testLoader = DataLoader(test_Set_Denoise, batch_size=batch_size,shuffle=True, num_workers=0)

# FilePath for saving the model
model_path = 'model_weights.pth'
losses_path = 'training_losses.pkl'
use_pretrained = True

optimizer = optim.Adam(denoiser.parameters(), lr=learning_rate)# FILL HERE

In [None]:
def train(model, data_loader, opt, n_epochs):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    losses = []  
    i=0
    for epoch in range(n_epochs):  # Loop over epochs
        running_loss = 0.0

        for features, labels in data_loader:      
            features = features.float().to(device)
            labels = labels.float().to(device)
            prediction = model(features)
            loss = loss_function(prediction, labels)
            losses.append(loss.item())
            opt.zero_grad()
            loss.backward()
            opt.step()

            # print statistics
            running_loss += loss.item()
            if i % 100 == 99:    
                print('[Epoch : %d, iteration: %5d] loss: %.3f'%
                      (epoch + 1, i + 1, running_loss / 100))
                running_loss = 0.0
            i+=1   

    print('Training done')

    # Save the model's state_dict and the losses
    torch.save(model.state_dict(), model_path)
    with open(losses_path, 'wb') as f:
        pickle.dump(losses, f)
    print('Model and losses saved')


    return losses

In [None]:
if os.path.exists(model_path) and os.path.exists(losses_path) and use_pretrained:
    print('Model and losses found')
    if torch.cuda.is_available():
        denoiser.load_state_dict(torch.load(model_path))
        with open(losses_path, 'rb') as f:
            losses = pickle.load(f)
    else:
        denoiser.load_state_dict(torch.load(model_path,map_location=torch.device('cpu')))
        with open(losses_path, 'rb') as f:
            losses = pickle.load(f)

else:
    print('Training model')
    losses = train(denoiser, trainLoader, optimizer, N_epochs)

### Visualisation

In [None]:
# Display loss:
display(losses)

In [None]:
# Check if CUDA is available and set the device accordingly
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Move your model to the device
denoiser = denoiser.to(device)

# Compute and display example reconstructed test on the training dataset


# Move your data to the device
X_test_noisy = X_test_noisy.to(device)

# Compute reconstructed images
reconstructed_images = denoiser(X_test_noisy[:,:,:])

# Print device and type of the data
print(X_test_noisy[:n,:,:].device)

# Convert the reconstructed images back to CPU and to a numpy array for display
reconstructed_images = reconstructed_images.detach().cpu().numpy()
for n in range(len(X_test_noisy)):
    # Display:
    plt.figure(figsize=(40,80))
    ax = plt.subplot(1,2,1)
    plt.imshow(X_test_noisy[n,:,:].cpu(),cmap='gray')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    ax = plt.subplot(1,2,2)
    plt.imshow(reconstructed_images[n,:,:],cmap='gray')
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
    plt.show()