## Creating an image decoder (Classifying)

In [1]:
import numpy as np
import torch
import os
import cv2

import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda:0")
    print("Using CUDA.")
else:
    device = torch.device("cpu")
    print("Using CPU.")

Using CUDA.


In [3]:
# Import data
folder_path = 'video_frames/'

image_paths = os.listdir(folder_path)
image_paths = [folder_path + img_path for img_path in image_paths]

In [4]:
# Prepare data
y = []
for img in image_paths:
    y.append( cv2.imread(img, cv2.IMREAD_GRAYSCALE) )

y = np.array(y)
X = np.arange(y.shape[0])

X = torch.tensor(X)
#X = nn.functional.one_hot(X, num_classes=y.shape[0]).float()
y = torch.tensor(y).float()
y = y/255 # ensures that values scale between 0 and 1.

In [5]:
num_classes = y.shape[0]
output_dimensions = y[0].shape
print(f"Num classes: {num_classes}")
print(f"Output dimensions: {output_dimensions}")

Num classes: 6572
Output dimensions: torch.Size([180, 240])


### Fully connected NN: 

In [6]:
class Conv_Decoder(nn.Module):
    def __init__(self, latent_dim=256, num_classes=num_classes):
        super().__init__()
        self.latent_dim = latent_dim
        self.num_classes = num_classes

        # FC layer for feature mapping
        self.fc = nn.Linear(num_classes, latent_dim * 3 * 4)

        # Deconvolution layers
        self.deconv_block1 = nn.Sequential(
            nn.ConvTranspose2d(256, 100, 
                               kernel_size=5, stride=5, padding=0),
            nn.BatchNorm2d(100),
            nn.ReLU()
        )
        
        self.deconv_block2 = nn.Sequential(
            nn.ConvTranspose2d(100, 100, 
                               kernel_size=3, stride=3, padding=0),
            nn.BatchNorm2d(100),
            nn.ReLU()
        )
        
        self.deconv_block3 = nn.Sequential(
            nn.ConvTranspose2d(100, 100, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(100),
            nn.ReLU()
        )
        
        self.deconv_block4 = nn.Sequential(
            nn.ConvTranspose2d(100, 100,
                               kernel_size=2, stride=2, padding=0),
            nn.BatchNorm2d(100),
            nn.ReLU()
        )

        self.deconv_block5 = nn.Sequential(
            nn.ConvTranspose2d(100, 1,
                               kernel_size=2, stride=2, padding=0),
            nn.BatchNorm2d(1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # First convolution
        x = nn.functional.one_hot(x, num_classes=self.num_classes).float()
        x = self.fc(x)
        x = x.view(-1, self.latent_dim, 3, 4)
        
        x1 = self.deconv_block1(x)
        x2 = self.deconv_block2(x1)
        x2 = x2 + torch.sum(x)/12  # skip connection
        
        x3 = self.deconv_block3(x2)
        x4 = self.deconv_block4(x3)
        x4 = x4 + torch.sum(x)/12 # skip connection
        
        x5 = self.deconv_block5(x4)
        output = x5.view(-1, 180, 240)
        
        return output

In [7]:
#model = Conv_Decoder()
model = torch.load('models/Conv_Decoder.pkl', weights_only=False)
print(model)

Conv_Decoder(
  (fc): Linear(in_features=6572, out_features=3072, bias=True)
  (deconv_block1): Sequential(
    (0): ConvTranspose2d(256, 100, kernel_size=(5, 5), stride=(5, 5))
    (1): BatchNorm2d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (deconv_block2): Sequential(
    (0): ConvTranspose2d(100, 100, kernel_size=(3, 3), stride=(3, 3))
    (1): BatchNorm2d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (deconv_block3): Sequential(
    (0): ConvTranspose2d(100, 100, kernel_size=(1, 1), stride=(1, 1))
    (1): BatchNorm2d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (deconv_block4): Sequential(
    (0): ConvTranspose2d(100, 100, kernel_size=(2, 2), stride=(2, 2))
    (1): BatchNorm2d(100, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (deconv_block5): Sequential(
    (0): ConvTranspose2d(100, 1, kernel_size=(2,

In [24]:
# Hyperparameters
decay_1 = 0.9 # Decay of moving average of gradient
decay_2 = 0.98 # Decay of moving average of squared gradient

lr = 0.000001
weight_decay = 0.0

lr_decay_rate = 0.98

In [25]:
# Loss function, Optimizer and Scheduler
criterion = torch.nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(), 
                              betas = (decay_1, decay_2),
                              lr=lr, 
                              weight_decay=weight_decay)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=100)

In [10]:
model.to(device)
X = X.to(device)
y = y.to(device)

In [11]:
model_device = next(model.parameters()).device
inputs_device = X.device
labels_device = y.device

print(model_device)
print(inputs_device)
print(labels_device)

cuda:0
cuda:0
cuda:0


In [12]:
X[2000].shape

torch.Size([])

In [13]:
model.forward(torch.tensor(0, dtype=torch.long).to(device))[0].shape

torch.Size([180, 240])

In [19]:
dataloader = DataLoader(TensorDataset(X, y), batch_size=200, shuffle=True) # both train and test (deliberate overfit)

num_epochs = 1000
verbose = True

In [26]:
# Training loop
loss_values = []
for epoch in range(num_epochs):
    running_loss = 0.0
    model.train() # Set to train mode
    for inputs, labels in dataloader:
        
        # Zero the gradients
        optimizer.zero_grad()
        
        # Feed-forward
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backprop
        loss.backward()
        optimizer.step()
        
        # Add loss
        batch_loss = loss.item()
        running_loss += batch_loss
        loss_values.append(batch_loss)
    
    # Average loss for the epoch
    avg_loss = running_loss / len(dataloader)

    # Update the learning rate at the end of each epoch
    scheduler.step()
    
    if verbose == True:
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.6f}")

Epoch 1/1000, Loss: 0.019742
Epoch 2/1000, Loss: 0.019773
Epoch 3/1000, Loss: 0.019740
Epoch 4/1000, Loss: 0.019759
Epoch 5/1000, Loss: 0.019632
Epoch 6/1000, Loss: 0.019842
Epoch 7/1000, Loss: 0.019668
Epoch 8/1000, Loss: 0.019624
Epoch 9/1000, Loss: 0.019588
Epoch 10/1000, Loss: 0.019600
Epoch 11/1000, Loss: 0.019600
Epoch 12/1000, Loss: 0.019574
Epoch 13/1000, Loss: 0.019660
Epoch 14/1000, Loss: 0.019630
Epoch 15/1000, Loss: 0.019532
Epoch 16/1000, Loss: 0.019707
Epoch 17/1000, Loss: 0.019575
Epoch 18/1000, Loss: 0.019521
Epoch 19/1000, Loss: 0.019538
Epoch 20/1000, Loss: 0.019466
Epoch 21/1000, Loss: 0.019488
Epoch 22/1000, Loss: 0.019538
Epoch 23/1000, Loss: 0.019531
Epoch 24/1000, Loss: 0.019554
Epoch 25/1000, Loss: 0.019507
Epoch 26/1000, Loss: 0.019453
Epoch 27/1000, Loss: 0.019434
Epoch 28/1000, Loss: 0.019592
Epoch 29/1000, Loss: 0.019488
Epoch 30/1000, Loss: 0.019466
Epoch 31/1000, Loss: 0.019455
Epoch 32/1000, Loss: 0.019418
Epoch 33/1000, Loss: 0.019506
Epoch 34/1000, Loss

In [None]:
loss_values

### Model is too small to fully learn the images. I will freeze current weights and add another layer.

In [None]:
model

In [None]:
"""
# Freezing params
for module in model.deconv_block:
    for param in module.parameters():
        param.requires_grad = False
"""

In [None]:
"""
# Updating model design
new_deconv_block = nn.Sequential(
    model.deconv_block[0],  # First ConvTranspose2d layer
    model.deconv_block[1],  # ReLU
    model.deconv_block[2],  # Second ConvTranspose2d layer
    model.deconv_block[3],  # ReLU
    # Add the new layer in the middle (e.g., ConvTranspose2d)
    nn.ConvTranspose2d(100, 100, kernel_size=1, stride=1, padding=1),
    nn.ReLU(),
    model.deconv_block[4],  # Third ConvTranspose2d layer
    model.deconv_block[5],  # ReLU
    model.deconv_block[6],  # Fourth ConvTranspose2d layer
    model.deconv_block[7]   # Sigmoid
)
"""

In [None]:
model.deconv_block = new_deconv_block

In [None]:
model

In [None]:
for param in model.deconv_block[4].parameters():  # Unfreezing newly added layer
    param.requires_grad = True

In [None]:
print(inputs)

In [None]:
inputs.to(device)

In [None]:
inputs_device = inputs.device

In [None]:
print(inputs_device)

In [None]:
num_params = sum(p.numel() for p in model.parameters())
model_size_mb = num_params * 4 / 1e6

print(f"Number of parameters: {num_params}")
print(f"Model size: {model_size_mb:.2f} MB")

In [None]:
del model
del inputs
del labels

torch.cuda.empty_cache()

In [48]:
torch.save(model, 'models/Conv_Decoder.pkl')