====================================================================================
# Project Name: Matrix Completion with Deep Learning
## Class Name: Applied Matrix Analysis
## Student Name: Shakil Ahmed Sumon
====================================================================================

# Deep Meural Networks (DNN) model

## Importing necessary libraries 

In [14]:
import torch
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
from torch import nn, optim
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor

## Loading the dataset

In [16]:
train_data = datasets.MNIST(root = './data', train = True, download = True)
test_data = datasets.MNIST(root = './data', train = False, download = True)

## Creating a dataset class for loading the data into the dataloader

In [17]:
class MaskedMNISTDataset(Dataset):
    def __init__(self, data):
        self.data = data
        self.mask = mask
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        image, _ = self.data[idx]
        image = np.array(image)
        image = torch.from_numpy(image).float()

        return image, image

In [18]:
# Create the MaskedMNISTDataset
train_dataset = MaskedMNISTDataset(train_data)
test_dataset = MaskedMNISTDataset(test_data)

# Split the train dataset into training and validation
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64)
test_loader = DataLoader(test_dataset, batch_size=64)

In [19]:
# Define the model
class LinearModel(nn.Module):
    def __init__(self):
        super(LinearModel, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_1 = nn.Linear(28*28, 28*2)
        self.linear_2 = nn.Linear(28*2, 14*2)
        self.linear_3 = nn.Linear(14*2, 28*28)
        self.linear_4 = nn.Linear(28*28, 28*28)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.flatten(x)
        output = self.linear_1(x)
        output = self.linear_2(self.relu(output))
        output = self.linear_3(self.relu(output))
        output = self.linear_4(self.relu(output))
        return torch.tanh(output)

# Instantiate the model, loss function and optimizer
model = LinearModel()

criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Parralleize model in case of multiple GPUs
model = nn.DataParallel(model)
model.to(device)

loss_array = []
# Training Loop
def train_model(model, criterion, optimizer, num_epochs=500):
  
  best_loss = 10000
  for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
      
      inputs = inputs.view(inputs.size(0), -1).to(device)/255.0
      mask = torch.rand(inputs.size()) > 0.5
      inputs = inputs * mask.to(device)
      labels = labels.view(labels.size(0), -1).to(device)/255.0
      outputs = model(inputs)
      loss = criterion(outputs[mask], labels[mask])
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      running_loss += loss.item()

    
    val_loss = validate_model(model, val_loader, criterion)
    loss_array.append(val_loss)
    
    # saving the the model for best loss
    if val_loss < best_loss:
      best_loss = val_loss
      last_checkpoint_info = {
                    'epoch': epoch,
                    'score' : best_loss
                }
      torch.save(model.state_dict(),   '/nfs/hpc/share/sumons/model_linear_50_sigmoid.pt')
    
    print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {running_loss / len(train_loader)}, Validation Loss: {val_loss}')

# Validation function
def validate_model(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
      for i, (inputs, labels) in enumerate(dataloader):
        inputs = inputs.view(inputs.size(0), -1).to(device)/255.0
        mask = torch.rand(inputs.size()) > 0.5
        inputs = inputs * mask.to(device)
        labels = labels.view(labels.size(0), -1).to(device)/255.0
        outputs = model(inputs)
        loss = criterion(outputs[mask], labels[mask])
        running_loss += loss.item()

    return running_loss / len(dataloader)

## Training the model and then saving the loss in a numpy file

In [15]:
train_model(model,criterion, optimizer)
np.save('/nfs/hpc/share/sumons/loss_dnn.npy', loss_array)

In [32]:
# function for testing the model on test data
# Also gives us reconstructed images
def test_model(model, dataloader, criterion):
  masked_input = []
  reconstructed_image = []
  original_image = []
  model.eval()
  running_loss = 0.0
  with torch.no_grad():
    for i, (inputs, labels) in enumerate(dataloader):
      
      inputs = inputs.view(inputs.size(0), -1).to(device)/255.0
      mask = torch.rand(inputs.size()) > 0.5
      inputs = inputs * mask.to(device)
      masked_input.append(inputs)
      labels = labels.view(labels.size(0), -1).to(device)/255.0
      outputs = model(inputs)
      reconstructed_image.append(outputs)
      original_image.append(labels)
      # mask = (inputs != 0)
      loss = criterion(outputs[mask], labels[mask])
      running_loss += loss.item()

    return running_loss / len(dataloader), masked_input, reconstructed_image, original_image

In [16]:
loss, masked_input, reconstructed_image, original_image = test_model(model, test_loader, criterion)
print(loss)

## Functions for plotting the reconstruted images from latent variables

In [17]:


def plot_reconstructed_image(index_1, index_2):
  image = reconstructed_image[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_original_image(index_1, index_2):
  image = original_image[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_masked_image(index_1, index_2):
  image = masked_input[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_images(index_1, index_2, save = False):

  # Assume that img1, img2, and img3 are your images
  img1 = plot_original_image(index_1, index_2)
  img2 = plot_masked_image(index_1, index_2)
  img3 = plot_reconstructed_image(index_1, index_2)

  # Create figure and axes
  fig, axs = plt.subplots(1, 3, figsize=(15, 5))

  # Display an image at each subplot with a title
  axs[0].imshow(img1)
  axs[0].set_title('Original Image')

  axs[1].imshow(img2)
  axs[1].set_title('Masked Image')

  axs[2].imshow(img3)
  axs[2].set_title('Reconstructed Image')

  # Remove axes for a better visual output
  for ax in axs:
      ax.axis('off')
  
  if save:
    plt.savefig('/nfs/hpc/share/sumons/linear_20_0.png')

  plt.show()

plot_images(0,6)

# CNN Model

In [None]:
import torch
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
from torch import nn, optim
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor
import torch.nn.functional as F

In [None]:
class MaskedMNISTDataset(Dataset):
    def __init__(self, data):
        self.data = data
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        image, _ = self.data[idx]
        image = np.array(image)
        image = torch.from_numpy(image).float()

        return image, image

In [None]:
train_data = datasets.MNIST(root = './data', train = True, download = True)
test_data = datasets.MNIST(root = './data', train = False, download = True)

In [None]:
# Create the MaskedMNISTDataset
train_dataset = MaskedMNISTDataset(train_data)
test_dataset = MaskedMNISTDataset(test_data)

# Split the train dataset into training and validation
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64)
test_loader = DataLoader(test_dataset, batch_size=64)

In [None]:
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=3, padding=1)  
        self.conv2 = nn.Conv2d(6, 16, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(16*7*7, 120)  
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 28*28)  
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.unsqueeze(1)
        # print(x.size())
        out = self.conv1(x)
        out = self.relu(out)
        out = F.max_pool2d(out, 2)
        out = self.conv2(out)
        out = self.relu(out)
        out = F.max_pool2d(out, 2)
        out = out.view(out.size(0), -1) 
        out = self.fc1(out)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.relu(out)
        out = self.fc3(out)
        out = torch.sigmoid(out)
        return out.view(-1, 28, 28)


In [11]:

# Instantiate the model, loss function and optimizer
model = CNNModel()
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

loss_array = []

# Training Loop
def train_model(model, criterion, optimizer, num_epochs=500):
  
  best_loss = 1000
  for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
      
      inputs = inputs.to(device)/255.0
      mask = torch.rand(inputs.size()) > 0.5
      inputs = inputs * mask.to(device)
      labels = labels.to(device)/255.0
      outputs = model(inputs)
      loss = criterion(outputs[mask], labels[mask])
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      running_loss += loss.item()

    val_loss = validate_model(model, val_loader, criterion)
    loss_array.append(val_loss)
    if val_loss < best_loss:
      best_loss = val_loss
      last_checkpoint_info = {
                    'epoch': epoch,
                    'score' : best_loss
                }
      torch.save(model.state_dict(),   '/nfs/hpc/share/sumons/model_CNN_50.pt')
    
    print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {running_loss / len(train_loader)}, Validation Loss: {val_loss}')

# Validation function
def validate_model(model, dataloader, criterion):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
      for i, (inputs, labels) in enumerate(dataloader):
        inputs = inputs.to(device)/255.0
        mask = torch.rand(inputs.size()) > 0.5
        inputs = inputs * mask.to(device)
        labels = labels.to(device)/255.0
        outputs = model(inputs)
        loss = criterion(outputs[mask], labels[mask])
        running_loss += loss.item()

    return running_loss / len(dataloader)

In [18]:
train_model(model,criterion, optimizer)
np.save('/nfs/hpc/share/sumons/loss_cnn.npy', loss_array)

In [19]:
loss_array = np.load('/nfs/hpc/share/sumons/loss_cnn.npy')
print(loss_array)
plt.plot(loss_array)
plt.title('Loss vs Epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

In [9]:
def test_model(model, dataloader, criterion):
  masked_input = []
  reconstructed_image = []
  original_image = []
  model.eval()
  running_loss = 0.0
  with torch.no_grad():
    for i, (inputs, labels) in enumerate(dataloader):
      
      inputs = inputs/255.0
      inputs = inputs.to(device)
      mask = torch.rand(inputs.size()) > 0.5
      inputs = inputs * mask.to(device)
      masked_input.append(inputs)
      labels = labels/255.0
      labels = labels.to(device)
      outputs = model(inputs)
      reconstructed_image.append(outputs)
      original_image.append(labels)
      loss = criterion(outputs[mask], labels[mask])
      running_loss += loss.item()

    return running_loss / len(dataloader), masked_input, reconstructed_image, original_image

In [20]:
loss, masked_input, reconstructed_image, original_image = test_model(model, test_loader, criterion)
print(loss)

In [21]:
def plot_reconstructed_image(index_1, index_2):
  image = reconstructed_image[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_original_image(index_1, index_2):
  image = original_image[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_masked_image(index_1, index_2):
  image = masked_input[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_images(index_1, index_2, save = False):

  # Assume that img1, img2, and img3 are your images
  img1 = plot_original_image(index_1, index_2)
  img2 = plot_masked_image(index_1, index_2)
  img3 = plot_reconstructed_image(index_1, index_2)

  # Create figure and axes
  fig, axs = plt.subplots(1, 3, figsize=(15, 5))

  # Display an image at each subplot with a title
  axs[0].imshow(img1)
  axs[0].set_title('Original Image')

  axs[1].imshow(img2)
  axs[1].set_title('Masked Image')

  axs[2].imshow(img3)
  axs[2].set_title('Reconstructed Image')

  # Remove axes for a better visual output
  for ax in axs:
      ax.axis('off')
  
  if save:
    plt.savefig('/nfs/hpc/share/sumons/CNN_50%_4.png')

  plt.show()

plot_images(0,6, True)

# Variational Autoencoder

In [1]:
import torch
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
from torch import nn, optim
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor
import torch.nn.functional as F

In [2]:
class MaskedMNISTDataset(Dataset):
    def __init__(self, data):
        self.data = data
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        image, _ = self.data[idx]
        image = np.array(image)
        image = torch.from_numpy(image).float()

        return image, image

In [3]:
train_data = datasets.MNIST(root = './data', train = True, download = True)
test_data = datasets.MNIST(root = './data', train = False, download = True)

In [4]:
# Create the MaskedMNISTDataset
train_dataset = MaskedMNISTDataset(train_data)
test_dataset = MaskedMNISTDataset(test_data)

# Split the train dataset into training and validation
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64)
test_loader = DataLoader(test_dataset, batch_size=64)

In [5]:


class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()

        # Encoder
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=4, stride=2, padding=1)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=4, stride=2, padding=1)
        self.fc1 = nn.Linear(in_features=128*7*7, out_features=1024)
        self.fc21 = nn.Linear(in_features=1024, out_features=20)  # mu layer
        self.fc22 = nn.Linear(in_features=1024, out_features=20)  # logvar layer

        # Decoder
        self.fc3 = nn.Linear(in_features=20, out_features=1024)
        self.fc4 = nn.Linear(in_features=1024, out_features=128*7*7)
        self.convT1 = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=4, stride=2, padding=1)
        self.convT2 = nn.ConvTranspose2d(in_channels=64, out_channels=1, kernel_size=4, stride=2, padding=1)

    def encode(self, x):
        h1 = F.relu(self.conv1(x))
        h2 = F.relu(self.conv2(h1))
        h2 = h2.view(h2.size(0), -1)
        h3 = F.relu(self.fc1(h2))
        return self.fc21(h3), self.fc22(h3)

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5*logvar)
        eps = torch.randn_like(std)
        return mu + eps*std

    def decode(self, z):
        h4 = F.relu(self.fc3(z))
        h5 = F.relu(self.fc4(h4))
        h5 = h5.view(h5.size(0), 128, 7, 7)
        h6 = F.relu(self.convT1(h5))
        return torch.sigmoid(self.convT2(h6))  # pixel values are in [0,1]

    def forward(self, x):
        x = x.unsqueeze(1)
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        # print(z.size())
        z = self.decode(z)
        # print(z.size())
        z = z.squeeze(1)
        # print(z.size())
        return z, mu, logvar


In [6]:
model = VAE()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Define the optimizer
optimizer = optim.SGD(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()

# Define the loss function for VAE
def loss_function(recon_x, x, mu, logvar):
    MSE = criterion(recon_x, x)
#     MSE = criterion(recon_x.view(-1, 784), x.view(-1, 784))
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return MSE + KLD

In [7]:
# Training Loop
loss_array = []
def train_model(model, optimizer, num_epochs=500):
  
  best_loss = 1000
  for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
      
      inputs = inputs.to(device)/255.0
      mask = torch.rand(inputs.size()) > 0.8
      inputs = inputs * mask.to(device)
      labels = labels.to(device)/255.0
      outputs, mu, logvar = model(inputs)
      # print(f"in the training loop. Outputs size: {outputs.size()}, Masks size: {outputs[mask].size()}, Original Mask size: {mask.size()}")
      loss = loss_function(outputs[mask].to(device), labels[mask].to(device), mu, logvar)
      # outputs = model(inputs)
      # loss = criterion(outputs[mask], labels[mask])
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      running_loss += loss.item()

    val_loss = validate_model(model, val_loader)
    loss_array.append(val_loss)
    if val_loss < best_loss:
      best_loss = val_loss
      last_checkpoint_info = {
                    'epoch': epoch,
                    'score' : best_loss
                }
      torch.save(model.state_dict(),   '/nfs/hpc/share/sumons/model_vae_20.pt')
    
    print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {running_loss / len(train_loader)}, Validation Loss: {val_loss}')

# Validation function
def validate_model(model, dataloader):
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
      for i, (inputs, labels) in enumerate(dataloader):
        inputs = inputs.to(device)/255.0
        mask = torch.rand(inputs.size()) > 0.8
        inputs = inputs * mask.to(device)
        labels = labels.to(device)/255.0
        outputs, mu, logvar = model(inputs)
        loss = loss_function(outputs[mask].to(device), labels[mask].to(device), mu, logvar)
        # loss = criterion(outputs[mask], labels[mask])
        running_loss += loss.item()

    return running_loss / len(dataloader)




In [22]:
train_model(model,optimizer)
np.save('/nfs/hpc/share/sumons/loss_vae.npy', loss_array)

In [58]:
def test_model(model, dataloader):
  masked_input = []
  reconstructed_image = []
  original_image = []
  model.eval()
  running_loss = 0.0
  with torch.no_grad():
    for i, (inputs, labels) in enumerate(dataloader):
      
      inputs = inputs/255.0
      inputs = inputs.to(device)
      mask = torch.rand(inputs.size()) > 0.6
      inputs = inputs * mask.to(device)
      masked_input.append(inputs)
      labels = labels/255.0
      labels = labels.to(device)
      outputs, mu, logvar = model(inputs)
      reconstructed_image.append(outputs)
      original_image.append(labels)
      loss = loss_function(outputs*mask.to(device), labels*mask.to(device), mu, logvar)
      # loss = criterion(outputs[mask], labels[mask])
      running_loss += loss.item()

    return running_loss / len(dataloader), masked_input, reconstructed_image, original_image

In [23]:
loss, masked_input, reconstructed_image, original_image = test_model(model, test_loader)
print(loss)

In [24]:
def plot_reconstructed_image(index_1, index_2):
  image = reconstructed_image[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_original_image(index_1, index_2):
  image = original_image[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_masked_image(index_1, index_2):
  image = masked_input[index_1][index_2].cpu()
  image = image.view(28, 28)
  return image * 255

def plot_images(index_1, index_2, save = False):

  # Assume that img1, img2, and img3 are your images
  img1 = plot_original_image(index_1, index_2)
  img2 = plot_masked_image(index_1, index_2)
  img3 = plot_reconstructed_image(index_1, index_2)

  # Create figure and axes
  fig, axs = plt.subplots(1, 3, figsize=(15, 5))

  # Display an image at each subplot with a title
  axs[0].imshow(img1)
  axs[0].set_title('Original Image')

  axs[1].imshow(img2)
  axs[1].set_title('Masked Image')

  axs[2].imshow(img3)
  axs[2].set_title('Reconstructed Image')

  # Remove axes for a better visual output
  for ax in axs:
      ax.axis('off')
  
  if save:
    plt.savefig('/nfs/hpc/share/sumons/vae_20%transfer_60_2.png')

  plt.show()

plot_images(0,35, True)

# Graphs for the model performance

In [36]:
# Loading the loss files 

dnn = np.load('/nfs/hpc/share/sumons/loss_dnn.npy')
cnn = np.load('/nfs/hpc/share/sumons/loss_cnn.npy')
vae = np.load('/nfs/hpc/share/sumons/vae_los_mses.npy')

In [25]:
import matplotlib.pyplot as plt

# Assuming dnn, cnn, and vae are your data
epochs = range(1, len(dnn) + 1)

# Plot DNN loss
plt.plot(epochs[:300], dnn[:300], 'b-.', marker='o', markersize=1, label='DNN')

# Plot CNN loss
plt.plot(epochs[:300], cnn[:300], 'r-.', marker='v', markersize=1, label='CNN')

# Plot VAE loss
plt.plot(epochs[:300], vae[:300], 'g-.', marker='s', markersize=1, label='VAE')

# Setting title and labels
plt.title('Loss vs Epoch')
plt.xlabel('Epoch')
plt.ylabel('Loss')

# Setting y-axis limits
# plt.ylim(0.015, 0.225)

# Setting grid and ticks on y-axis
# plt.yticks(np.linspace(0.015, 0.225, 11))
# 
# Adding grid
plt.grid(True)

# Adding legend
plt.legend()

# Display plot
plt.show()
