In [78]:
# Mount Google Drive so we can access data
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [79]:
# Install idx2numpy package for extracting data
!pip install idx2numpy



In [80]:
# Import packages
import os
import json
import gzip
import torch
import torchvision
import numpy as np 
import pandas as pd

import idx2numpy
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms.functional as transforms
import torchvision.transforms as trans
import matplotlib.pyplot as plt

from collections import OrderedDict 

In [81]:
def get_best_results(results):
  '''
  Take in a results dictionary 
  and return epoch on which 
  minimum validation loss was
  reached, minimum val. loss on
  that epoch, and min. accuracy
  on that epoch
  '''
  # Find the epoch on which the minimum validation loss was reached
  best_val_epoch = np.argmin(np.array(results['val_loss']))
  
  # Store validation metrics
  best_epoch_val_loss = np.array(results['val_loss'][best_val_epoch])

  # Store training metrics
  best_epoch_train_loss = np.array(results['train_loss'][best_val_epoch])
  
  # Store best results in a list
  best_results = [best_val_epoch, 
                  best_epoch_val_loss,
                  best_epoch_train_loss]

  # Return statement
  return(best_results)


def plot_loss(results, lab, model_num):
  '''
  Convenience function to plot results
  '''
  # Store model name and destination path
  model_name = str(model_num) + '_' + lab + '_results.png'
  path = os.path.join('/content/drive/MyDrive/', '2_figs', model_name)
  
  # Plot the results
  plt.plot(results['train_' + lab], label='Train')
  plt.plot(results['val_' + lab], label='Validation')
  
  # Add annotations
  plt.legend()
  plt.title(lab.title() + ' by Epoch')
  
  # Save the figure and close the plot
  plt.savefig(path)
  plt.clf()

In [82]:
def load_one_dataset(path):
    '''
    Convenience function to load a single dataset
    '''
    f = gzip.open(path, 'rb')
    data = torch.from_numpy(idx2numpy.convert_from_file(f).astype('float64'))
    f.close()
    
    return(data)


def load_all_datasets(train_imgs, train_labs, test_imgs, test_labs, batch_size):
    '''
    Load training as well as test images here
    '''
    train_images = load_one_dataset(train_imgs).type(torch.float32)/255.0
    train_labels = load_one_dataset(train_labs).type(torch.long)
    train = list(zip(train_images, train_labels))

    test_images = load_one_dataset(test_imgs).type(torch.float32)/255.0
    test_labels = load_one_dataset(test_labs).type(torch.long)
    test = list(zip(test_images, test_labels))
    
    train_loader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True, num_workers=2)
    test_loader = torch.utils.data.DataLoader(test, batch_size=batch_size, shuffle=False, num_workers=2)
    
    return(train_loader, test_loader)

In [83]:
def add_noise(img, quadrants):
  '''
  Randomly remove 1 or 2 quadrants
  from the input image.
  '''
  # Get the number of quadrants to erase
  n_quads_to_erase = np.random.choice([1, 2])

  # Get which quadrants to erase
  quads_to_erase = np.random.choice([1, 2, 3, 4], size = n_quads_to_erase)

  # Create a copy of the image
  noisy_img = img.clone()

  # Now erase the quadrants
  for quad in quads_to_erase:
    noisy_img = transforms.erase(noisy_img, *quadrants[quad])
  
  # Return statement
  return(noisy_img)

In [84]:
def plot_image(img):
  '''
  Take an image stored as a Torch
  tensor and display it in the notebook
  '''
  # Display the img
  plt.imshow(img.numpy(), cmap= 'gray')

In [85]:
def test_noise():
  '''
  Test the noise function
  '''
  # Just for testing out noise function
  data_dir = '/content/drive/MyDrive/data'

  # Set paths
  paths = {
        'train_imgs': os.path.join(data_dir, 'train-images-idx3-ubyte.gz'),
        'train_labs': os.path.join(data_dir, 'train-labels-idx1-ubyte.gz'),
        'test_imgs': os.path.join(data_dir,'t10k-images-idx3-ubyte.gz'),
        'test_labs': os.path.join(data_dir,'t10k-labels-idx1-ubyte.gz')
  }

  # Load datasets
  train_loader, test_loader = load_all_datasets(**paths, batch_size = 32)

  # Get the next batch from the train loader
  images, labels = iter(train_loader).next()

  # Store the quadrant definitions: move this into training loop later
  quadrants = {
      
      1: [0, 0, 14, 14, 0], 
      2: [0, 14, 14, 14, 0],
      3: [14, 0, 14, 14, 0],
      4: [14, 14, 14, 14, 0],
  }

  # Take the first image in the batch
  img = images[0]

  # Get noisy image
  noisy_img = add_noise(img, quadrants)

  # Return statement
  return(img, noisy_img)

In [86]:
class DenoisingEncoder(nn.Module):
  
  def __init__(self, encoder_units, decoder_units, input_dim, output_dim):
    
    # Conventional super-class declaration
    super(DenoisingEncoder,self).__init__()

    # Initialize lists to store layers
    encoder = []
    decoder = []

    # Add input and output dimensions to layer list for encoder
    self.encoder_units = [input_dim] + encoder_units
    self.decoder_units = [encoder_units[-1]] + decoder_units + [output_dim]

    # Compute the total no. of layers for the encoder/decoder
    self.encoder_layers = len(self.encoder_units)
    self.decoder_layers = len(self.decoder_units)

    # Append the hidden layers for the encoder
    for i in range(1, self.encoder_layers):
      
      # Add linear layer
      layer = ('Linear{}'.format(i), nn.Linear(self.encoder_units[i-1], self.encoder_units[i]))
      activation = ('RELU{}'.format(i), nn.ReLU(True))
      
      # Append
      encoder.append(layer)
      encoder.append(activation)
    
    # Append the hidden layers for the decoder
    for i in range(1, self.decoder_layers - 1):
      
      # Add the layers
      layer = ('Linear{}'.format(i), nn.Linear(self.decoder_units[i-1], self.decoder_units[i]))
      activation = ('RELU{}'.format(i), nn.ReLU(True))
      
      # Append to the lists
      decoder.append(layer)
      decoder.append(activation)

    # Create final output layer
    i = self.decoder_layers - 1
    layer = ('Linear{}'.format(i), nn.Linear(self.decoder_units[i-1], self.decoder_units[i]))
    activation = ('Sigmoid{}'.format(i), nn.Sigmoid())
    
    # Append to decoder list
    decoder.append(layer)
    decoder.append(activation)
    
    # Wrap this in a container and declare the encoder/decoder
    self.encoder = nn.Sequential(OrderedDict(encoder))
    self.decoder = nn.Sequential(OrderedDict(decoder))
    
  def forward(self,x):
    
    # First encode the noisy image and then decode
    x=self.encoder(x)
    x=self.decoder(x)
    
    return x

In [87]:
def train(config,
          epochs=2, 
          input_dim = 784, output_dim = 784, 
          data_dir = '/content/drive/MyDrive/data'):
    '''
    This is the main training loop
    '''
    # Set the seed so each experiment is reproducible
    np.random.seed(21390)
    torch.manual_seed(10394)

    # Set device
    if torch.cuda.is_available():
      device = torch.device("cuda")
    else:
      device = torch.device("cpu")
    
    # Set paths to datasets
    paths = {
        'train_imgs': os.path.join(data_dir, 'train-images-idx3-ubyte.gz'),
        'train_labs': os.path.join(data_dir, 'train-labels-idx1-ubyte.gz'),
        'test_imgs': os.path.join(data_dir,'t10k-images-idx3-ubyte.gz'),
        'test_labs': os.path.join(data_dir,'t10k-labels-idx1-ubyte.gz')
    }

    # Store the quadrant definitions: move this into training loop later
    quadrants = {
      
      1: [0, 0, 14, 14, 0], 
      2: [0, 14, 14, 14, 0],
      3: [14, 0, 14, 14, 0],
      4: [14, 14, 14, 14, 0],
    }

    # Load datasets
    train_loader, test_loader = load_all_datasets(**paths, batch_size = config['batch_size'])
    
    # Set parameters
    net = DenoisingEncoder(config['encoder_units'], config['decoder_units'], input_dim, output_dim)
    
    # Send net object to device memory
    net.to(device)
    
    # We use the cross-entropy loss
    criterion = nn.MSELoss()

    # We use mini-batch stochastic gradient descent with momentum
    optimizer = optim.SGD(net.parameters(), lr=config['lr'], momentum=config['momentum'], 
                                            weight_decay=config['weight_decay'])

    # Store results here
    results = {
      'train_loss': [], 
      'val_loss': [], 
      }

    # Loop over the dataset multiple times
    for epoch in range(epochs):  
        
        # Initialize running loss
        running_loss = 0.0
        running_accuracy = 0.0

        # Initialize the validation running loss
        val_running_loss = 0.0
        val_running_accuracy = 0.0
        
        # Iterate through data now
        for i, data in enumerate(train_loader):
            
            # Get the inputs: data is a list of [inputs, labels]
            clean_images, _ = data
            
            # Initialize container for noisy images
            noisy_images = []

            # Now get noisy images
            for img in clean_images: 
              noisy_images.append(add_noise(img, quadrants))
            
            # Convert noisy image list to Torch tensor
            noisy_images = torch.stack(noisy_images, dim =0)
            
            # Flatten noisy images
            noisy_images=noisy_images.view(noisy_images.size(0),-1).type(torch.FloatTensor)
            
            # Flatten clean images
            flat_clean_imgs = clean_images.view(clean_images.size(0),-1).type(torch.FloatTensor)

            # Send the inputs and labels to the memory of the device
            noisy_images, flat_clean_imgs = noisy_images.to(device), flat_clean_imgs.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward
            pred_images = net(noisy_images)
            
            # Calculate loss
            loss = criterion(pred_images, flat_clean_imgs)

            # Backward
            loss.backward()
            
            # Optimize
            optimizer.step()

            # Add to running loss
            running_loss += loss.item()
        
        # Loop through the validation data
        for j, data in enumerate(test_loader):
          
          # No need to calculate gradients for validation set
          with torch.no_grad():

              # Get the inputs: data is a list of [inputs, labels]
              val_clean_images, _ = data
            
              # Initialize container for noisy images
              val_noisy_images = []

              # Now get noisy images
              # Flatten clean images for loss calculation
              for img in val_clean_images: 
                val_noisy_images.append(add_noise(img, quadrants))
            
              # Convert noisy image list to Torch tensor
              # Flatten noisy validation images
              val_noisy_images = torch.stack(val_noisy_images, dim =0)
              val_noisy_images=val_noisy_images.view(val_noisy_images.size(0),-1).type(torch.FloatTensor)
              
              # Flatten clean validation images
              val_flat_clean_imgs = val_clean_images.view(val_clean_images.size(0),-1).type(torch.FloatTensor)

              # Send the inputs and labels to the memory of the device
              val_noisy_images, val_flat_clean_imgs = val_noisy_images.to(device), val_flat_clean_imgs.to(device)

              # Send the data item through the network to get output
              val_pred_images = net(val_noisy_images)

              # Compute the loss
              # Add to running loss
              val_loss = criterion(val_pred_images, val_flat_clean_imgs)
              val_running_loss += val_loss.item()
        
        # Rescale the training and validation perfomance metrics
        running_loss = (running_loss*config['batch_size'])/len(train_loader)
        
        # Rescale the validation loss
        val_running_loss = (val_running_loss*config['batch_size'])/len(test_loader)
        
        # Append to the results tracker
        results['train_loss'].append(np.float(running_loss))
        results['val_loss'].append(np.float(val_running_loss))

        # Make print message format string
        msg = "Epoch:{} | Training Loss:{} | Validation Loss: {}" "\n"

        # Print performance
        print(msg.format(epoch, running_loss, val_running_loss))
        
    # Print message
    print('Done training...')

    # Get the best results and print
    best_results = get_best_results(results)
    print(best_results)
    
    # Plot the losses
    plot_loss(results, lab='loss', model_num=config['model_num'])
    
    # Return statement
    return(results)

In [88]:
# Test configuration
test_config = {'lr': 0.7,
               'batch_size': 64,
               'weight_decay': 0.01,
               'encoder_units': [512, 256, 128, 64],
               'decoder_units': list(reversed([512, 256, 128, 64]))[1:],
               'momentum': 0.99,
               'done': 0,
               'model_num': 1}


# Try out the test configuration
train(test_config, epochs = 2)

Epoch:0 | Training Loss:10.73760139103383 | Validation Loss: 10.697904240553546

Epoch:1 | Training Loss:10.73941577268816 | Validation Loss: 10.691874473717562

Done training...
[1, array(10.69187447), array(10.73941577)]


{'train_loss': [10.73760139103383, 10.73941577268816],
 'val_loss': [10.697904240553546, 10.691874473717562]}

<Figure size 432x288 with 0 Axes>

In [None]:
def get_grid(archs, learning_rates, decays, momentums, batch_size = 64,
             checkpoints_dir = "/content/drive/MyDrive/2_checkpoints"):
  '''
  Get a random set of hyper-parameter combinations
  to test with for a given number of layers. 
  '''
  
  # Store the grid and results in this dictionary
  experiment = {
      
      'params': [], 
      'best_results': []

  }

  # Create experiment ID
  experiment_id = len([name for name in os.listdir(checkpoints_dir) if os.path.isfile(name)]) + 1

  # This will generate the list of parameter combinations to search
  for i, arch in enumerate(archs):
    for lr in learning_rates: 
      for d in decays: 
        for m in momentums:   
          config = {
                    'lr': lr,
                    'batch_size': batch_size,
                    'weight_decay': d,
                    'encoder_units': arch,
                    'decoder_units': reversed(arch)[1:],
                    'momentum': m,
                    'done': 0,
                    'model_num': i}
          
          experiment['params'].append(config)

  # This is the experiment path
  path = os.path.join(checkpoints_dir, experiment_id)

  # We write the experiment dictionary as a .json file
  with open(path, 'w', encoding='utf-8') as f:
    json.dump(experiment, f, ensure_ascii=False, indent=4)

In [None]:
def run_grid_search(experiment_id, checkpoints_dir = "/content/drive/MyDrive/2_checkpoints"):
  '''
  Run the grid search
  '''
  # First we load the experiment dictionary
  # Create the complete path
  path = os.path.join(checkpoints_dir, experiment_id)
  
  # Load the experiment data file
  with open(path, 'r', encoding='utf-8') as f:
    experiment = json.load(f)

  # Loop through experiments data file
  for i, config in enumerate(experiment['params']):
    
    # Print progress report
    print("This is combination {} of experiment {}".format(i, experiment_id))

    # Run experiments only for those parameter combinations which have not been tested
    if config['done'] == 0:
      print(config)
      results, best_results = train(config, epochs=max_epochs)
      experiment['best_results'].append(best_results) 
      config['done'] = 1
    
    # We update the .json dictionary every 15 iterations
    if i%15 == 0:
      with open(path, 'w', encoding='utf-8') as f:
        json.dump(experiment, f, ensure_ascii=False, indent=4)