<a href="https://colab.research.google.com/github/ArianaBakhtyari/Thesis/blob/Jonahs-Branch/Jonah_Capstone_Autoencoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


Imports

In [None]:
import matplotlib.pyplot as plt
import pandas as pd 
import random 
import torch
import torchvision
from torchvision import transforms, datasets
from torch.utils.data import DataLoader,random_split
from torch import nn
from torchvision.datasets.folder import default_loader
import torch.nn.functional as F
import torch.optim as optim
import math
from PIL import Image
import numpy as np
import os
!nvidia-smi -L

GPU 0: A100-SXM4-40GB (UUID: GPU-f02fb8ed-9cdf-dad2-e985-59341c40d843)


Load Images

In [None]:
# Set the root directory for the images
root_dir = "drive/MyDrive/LandsatTrainingData/Sliced_Images_NoBlack/"

# Set the desired size for the images
image_size = (224, 224)

# Create a transformation that resizes the image and converts it to a tensor
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize(image_size),
    transforms.ToTensor()
])

# Load the images from a folder and apply the transformation
dataset = datasets.ImageFolder(root_dir, transform=transform)

print(len(dataset))


92755


Partition Dataset

In [None]:
ratio = 0.8
n_total = len(dataset)
n_train = int(n_total*ratio)

train, test = random_split(dataset, [n_train, n_total - n_train])
train, val = random_split(train, [int(n_train*ratio), n_train - int(n_train*ratio)])

In [None]:
batch_size=128
train_loader = torch.utils.data.DataLoader(train, batch_size=batch_size)
valid_loader = torch.utils.data.DataLoader(val, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test, batch_size=batch_size,shuffle=True)

In [None]:
for image_batch, _ in train_loader: # with "_" we just ignore the labels (the second element of the dataloader tuple)
        # Move tensor to the proper device
        # Encode data
        print(np.shape(image_batch))
        print(image_batch[0])

torch.Size([128, 1, 224, 224])
tensor([[[0.1216, 0.1216, 0.1176,  ..., 0.1608, 0.1608, 0.1647],
         [0.1176, 0.1216, 0.1216,  ..., 0.1608, 0.1647, 0.1647],
         [0.1216, 0.1216, 0.1216,  ..., 0.1647, 0.1608, 0.1490],
         ...,
         [0.1176, 0.1176, 0.1176,  ..., 0.1176, 0.1176, 0.1176],
         [0.1176, 0.1176, 0.1176,  ..., 0.1176, 0.1176, 0.1176],
         [0.1176, 0.1176, 0.1176,  ..., 0.1176, 0.1176, 0.1176]]])


KeyboardInterrupt: ignored

Autoencoder

In [None]:
class Encoder(nn.Module):
    
    def __init__(self, encoded_space_dim,fc2_input_dim):
        super().__init__()
        
        # ### Convolutional section
        # self.encoder_cnn = nn.Sequential(
        #     nn.Conv2d(1, 8, 3, stride=2, padding=1),
        #     nn.ReLU(True),
        #     nn.Conv2d(8, 16, 3, stride=2, padding=1),
        #     nn.BatchNorm2d(16),
        #     nn.ReLU(True),
        #     nn.Conv2d(16, 32, 3, stride=2, padding=0),
        #     nn.ReLU(True)
        # )
        # 
        ### Flatten layer
        self.flatten = nn.Flatten(start_dim=1)
        
        ### Linear section
        self.encoder_lin = nn.Sequential(
            nn.Linear(224*224, 12000),
            nn.ReLU(True),
            nn.Linear(12000, 4800),
            nn.ReLU(True),
            nn.Linear(4800, 1200),
            nn.ReLU(True),
            nn.Linear(1200, encoded_space_dim),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        # x = self.encoder_cnn(x)
        x = self.flatten(x)
        x = self.encoder_lin(x)
        return x

class Decoder(nn.Module):
    
    def __init__(self, encoded_space_dim,fc2_input_dim):
        super().__init__()
        self.decoder_lin = nn.Sequential(
            nn.Linear(encoded_space_dim, 1200),
            nn.ReLU(True),
            nn.Linear(1200, 4800),
            nn.ReLU(True),
            nn.Linear(4800, 12000),
            nn.ReLU(True),
            nn.Linear(12000, 224*224)
        )

        self.unflatten = nn.Unflatten(dim=1, 
          unflattened_size=(1, 224, 224))

        # self.decoder_conv = nn.Sequential(
        #     nn.ConvTranspose2d(32, 16, 3, 
        #     stride=2, output_padding=0),
        #     nn.BatchNorm2d(16),
        #     nn.ReLU(True),
        #     nn.ConvTranspose2d(16, 8, 3, stride=2, 
        #     padding=1, output_padding=1),
        #     nn.BatchNorm2d(8),
        #     nn.ReLU(True),
        #     nn.ConvTranspose2d(8, 1, 3, stride=2, 
        #     padding=1, output_padding=1)
        # )
        
    def forward(self, x):
        x = self.decoder_lin(x)
        x = self.unflatten(x)
        # x = self.decoder_conv(x)
        x = torch.sigmoid(x)
        return x

In [None]:
### Define the loss function
loss_fn = torch.nn.MSELoss()

### Define an optimizer (both for the encoder and the decoder!)
lr= 0.001

### Set the random seed for reproducible results
torch.manual_seed(0)

### Initialize the two networks
d = 256

#model = Autoencoder(encoded_space_dim=encoded_space_dim)
encoder = Encoder(encoded_space_dim=d,fc2_input_dim=128)
decoder = Decoder(encoded_space_dim=d,fc2_input_dim=128)
params_to_optimize = [
    {'params': encoder.parameters()},
    {'params': decoder.parameters()}
]

optim = torch.optim.Adam(params_to_optimize, lr=lr, weight_decay=1e-05)

# Check if the GPU is available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("gpu")
print(f'Selected device: {device}')

# Move both the encoder and the decoder to the selected device
encoder.to(device)
decoder.to(device)

Selected device: cuda


Decoder(
  (decoder_lin): Sequential(
    (0): Linear(in_features=256, out_features=1200, bias=True)
    (1): ReLU(inplace=True)
    (2): Linear(in_features=1200, out_features=4800, bias=True)
    (3): ReLU(inplace=True)
    (4): Linear(in_features=4800, out_features=12000, bias=True)
    (5): ReLU(inplace=True)
    (6): Linear(in_features=12000, out_features=50176, bias=True)
  )
  (unflatten): Unflatten(dim=1, unflattened_size=(1, 224, 224))
)

Quantizers

In [None]:
def uniform_quantizer(values, num_buckets):
    quantized_values = torch.zeros(values.size())
    for i, node_values in enumerate(values):
        for j, node_value in enumerate(node_values):
            for k in range(1, num_buckets + 1):
                if node_value < k / num_buckets:
                    quantized_values[i][j] = k / (num_buckets + 1)
                    break
    return quantized_values

Training Functions

In [None]:
### Training function
def train_epoch(encoder, decoder, device, dataloader, loss_fn, optimizer):
    # Set train mode for both the encoder and the decoder
    encoder.train()
    decoder.train()

    train_loss = []
    # Iterate the dataloader (we do not need the label values, this is unsupervised learning)
    for image_batch, _ in dataloader: # with "_" we just ignore the labels (the second element of the dataloader tuple)
        # Move tensor to the proper device
        image_batch = image_batch.to(device)
        # Encode data
        encoded_data = encoder(image_batch)
        # Decode data
        decoded_data = decoder(encoded_data)
        # Evaluate loss
        loss = loss_fn(decoded_data, image_batch)
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        # Print batch loss
        #print('\t partial train loss (single batch): %f' % (loss.data))
        train_loss.append(loss.detach().cpu().numpy())

    return np.mean(train_loss)

In [None]:
### Testing function
def test_epoch(encoder, decoder, device, dataloader, loss_fn):
    # Set evaluation mode for encoder and decoder
    encoder.eval()
    decoder.eval()
    with torch.no_grad(): # No need to track the gradients
        # Define the lists to store the outputs for each batch
        conc_out = []
        conc_label = []
        for image_batch, _ in dataloader:
            # Move tensor to the proper device
            image_batch = image_batch.to(device)
            # Encode data
            encoded_data = encoder(image_batch)
            # Decode data
            decoded_data = decoder(encoded_data)
            # Append the network output and the original image to the lists
            conc_out.append(decoded_data.cpu())
            conc_label.append(image_batch.cpu())
        # Create a single tensor with all the values in the lists
        conc_out = torch.cat(conc_out)
        conc_label = torch.cat(conc_label) 
        # Evaluate global loss
        val_loss = loss_fn(conc_out, conc_label)
    return val_loss.data

Show results

In [None]:
def plot_ae_outputs(encoder,decoder,n=10):
    plt.figure(figsize=(16,7.75))
    targets = test_dataset.targets.numpy()
    t_idx = {i:np.where(targets==i)[0][0] for i in range(n)}
    for i in range(n):
      ax = plt.subplot(3,n,i+1)
      img = test_dataset[t_idx[i]][0].unsqueeze(0).to(device)
      encoder.eval()
      decoder.eval()
      with torch.no_grad():
        enc_img = uniform_quantizer(encoder(img), 8).to(device)
        q_rec_img  = decoder(enc_img)
        rec_img = decoder(encoder(img))
    
      plt.imshow(img.cpu().squeeze().numpy(), cmap='gist_gray')
      ax.get_xaxis().set_visible(False)
      ax.get_yaxis().set_visible(False)  
      if i == n//2:
        ax.set_title('Original images')

      ax = plt.subplot(3, n, i + 1 + n)
      plt.imshow(rec_img.cpu().squeeze().numpy(), cmap='gist_gray')  
      ax.get_xaxis().set_visible(False)
      ax.get_yaxis().set_visible(False)  
      if i == n//2:
         ax.set_title('Reconstructed images')

      ax = plt.subplot(3, n, i + 1 + n*2)
      plt.imshow(q_rec_img.cpu().squeeze().numpy(), cmap='gist_gray')  
      ax.get_xaxis().set_visible(False)
      ax.get_yaxis().set_visible(False)  
      if i == n//2:
         ax.set_title('Quantized Reconstructed images')
    plt.show()   

Training

In [None]:
num_epochs = 100
diz_loss = {'train_loss':[],'val_loss':[]}
for epoch in range(num_epochs):
   train_loss =train_epoch(encoder,decoder,device,
   train_loader,loss_fn,optim)
   val_loss = test_epoch(encoder,decoder,device,test_loader,loss_fn)
   print('\n EPOCH {}/{} \t train loss {} \t val loss {}'.format(epoch + 1, num_epochs,train_loss,val_loss))
   diz_loss['train_loss'].append(train_loss)
   diz_loss['val_loss'].append(val_loss)

torch.Size([128, 1, 224, 224])
tensor([[[0.1882, 0.1882, 0.1882,  ..., 0.1765, 0.1765, 0.1765],
         [0.1882, 0.1882, 0.1882,  ..., 0.1725, 0.1725, 0.1725],
         [0.1882, 0.1843, 0.1843,  ..., 0.1725, 0.1765, 0.1686],
         ...,
         [0.1922, 0.1843, 0.1804,  ..., 0.1843, 0.1804, 0.1804],
         [0.1804, 0.1686, 0.1686,  ..., 0.1804, 0.1804, 0.1843],
         [0.1686, 0.1647, 0.1647,  ..., 0.1843, 0.1843, 0.1882]]])
torch.Size([128, 1, 224, 224])
tensor([[[0.1882, 0.1882, 0.1882,  ..., 0.1765, 0.1765, 0.1765],
         [0.1882, 0.1882, 0.1882,  ..., 0.1725, 0.1725, 0.1725],
         [0.1882, 0.1843, 0.1843,  ..., 0.1725, 0.1765, 0.1686],
         ...,
         [0.1922, 0.1843, 0.1804,  ..., 0.1843, 0.1804, 0.1804],
         [0.1804, 0.1686, 0.1686,  ..., 0.1804, 0.1804, 0.1843],
         [0.1686, 0.1647, 0.1647,  ..., 0.1843, 0.1843, 0.1882]]],
       device='cuda:0')


KeyboardInterrupt: ignored

Test

In [None]:
print(test_epoch(encoder,decoder,device,test_loader,loss_fn).item())
plot_ae_outputs(encoder,decoder,n=10)
for i in range(15):
  im = train_loader.dataset[8 * i][0].unsqueeze(0).to(device)
  encoder.eval()
  enc = encoder(im)
  print(max(enc[0]), min(enc[0]))

Super Fun Graphing Time

In [None]:
# Plot losses
plt.figure(figsize=(10,8))
plt.semilogy(diz_loss['train_loss'], label='Train')
plt.semilogy(diz_loss['val_loss'], label='Valid')
plt.xlabel('Epoch')
plt.ylabel('Average Loss')
#plt.grid()
plt.legend()
#plt.title('loss')
plt.show()