# **ConvNet From Scratch with augmentations**

This will contain:
1. image augmentation for increasing the volume of input  
2. visualization of intermediate features and images
3. Fine-tuning the known architecture for the sake of learning

All will be coded in PyTorch




In [None]:
import torch
from torch import nn
from torchvision import datasets
from torchvision import transforms
import numpy as np
import matplotlib.pyplot as plt

: 

In [None]:
#Loading the datasets
transform = transforms.Compose([
    transforms.ToTensor()
])

train_dataset = datasets.CIFAR10(
    root = 'data',
    train = True,
    download = True,
    transform = transform
)

test_dataset = datasets.CIFAR10(
    root = 'data',
    train = False,
    download = True,
    transform = transform
)

: 

"Since we are using the concept of data augmentation, we will be using less samples of data. Currently, we have 5000 samples for cat and dogs each. But, we will use the dataset with 1000 training images, 500 validation imgages and 1000 test images"

In [None]:
from torch.utils.data import Subset
def create_balanced_subset(dataset, class_indices, start_index, end_index):

  targets = np.array(dataset.targets)
  selected_indices = []

  for cls in class_indices:
    indices = np.where(targets == cls)[0]
    selected_indices.extend(indices[start_index: end_index])

  dataset = Subset(dataset, selected_indices)
  return dataset

train_data = create_balanced_subset(train_dataset, [3,5], 0, 1000)
validation_data = create_balanced_subset(train_dataset, [3,5], 1000, 1500)
test_data = create_balanced_subset(test_dataset, [3,5], 0, 1000)

: 

In [None]:
def check_subset_balance(subset, name="Subset"):
    # This pulls labels from the original dataset using the subset's internal indices
    labels = [subset.dataset.targets[i] for i in subset.indices]

    unique, counts = np.unique(labels, return_counts=True)
    print(f"--- {name} ---")
    print(f"Total samples: {len(subset)}")
    for cls, count in zip(unique, counts):
        print(f"Class {cls}: {count} samples")

check_subset_balance(train_data, "Training Set")
check_subset_balance(validation_data, "Validation Set")

: 

In [None]:
#Changing the class 3 -> 0 and 5 -> 1
class DatasetWrapper:
  def __init__(self, subset, mapping = {3 : 0, 5: 1}, transform = None):
    self.subset = subset
    self.mapping = mapping
    self.transform = transform

  def __getitem__(self, index):
    image, target = self.subset[index]
    if self.transform:
      image = self.transform(image)
    return image, self.mapping[target]

  def __len__(self):
    return len(self.subset)

train_ready = DatasetWrapper(train_data)
validation_ready = DatasetWrapper(validation_data)
test_ready = DatasetWrapper(test_data)

: 

In [None]:
#Creating dataloader
from torch.utils.data import DataLoader

batch_size = 64
train_loader = DataLoader(train_ready, batch_size, shuffle = True)
val_loader = DataLoader(validation_ready, batch_size, shuffle = True)
test_loader = DataLoader(test_ready, batch_size, shuffle = True)

: 

In [None]:
images, labels = next(iter(train_loader))

images = images[:32].numpy()
images = np.transpose(images, (0, 2, 3, 1)) #Channel to the last
label_mappings  = {
    0: "Cat",
    1: "Dog"
}
plt.figure(figsize = (20, 10))
for i in range(len(images)):
  plt.subplot(4, 8, i + 1)
  plt.imshow(images[i])
  plt.xlabel(f"Label: {label_mappings[labels[i].item()]}")
  plt.xticks([])
  plt.yticks([])
plt.show()
#Visualizing the first batch of the images

: 

In [None]:
import torch
from torch import nn

device = torch.accelerator.current_accelerator().type if torch.accelerator.is_available() else "cpu"
print(f"Using {device} device")


: 

In [None]:
from torch.nn.modules import activation
from torchsummary import summary
#Defining the model

class NeuralNetwork(nn.Module):
  def __init__(self):
    super().__init__()
    self.ConV_stack = nn.Sequential(

        nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size = 2),

        nn.Conv2d(in_channels=32, out_channels=64, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size = 2),

        nn.Conv2d(in_channels = 64, out_channels=128, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size = 2),

        nn.Conv2d(in_channels = 128, out_channels = 256, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(kernel_size = 2),

        nn.Conv2d(in_channels = 256, out_channels = 512, kernel_size = 3, padding = 1),
        nn.ReLU(),

        nn.AdaptiveAvgPool2d(1),
        nn.Flatten(),

        nn.Linear(512, 1),
        nn.Sigmoid()
    )

  def forward(self, X):
    Y = self.ConV_stack(X)
    return Y

: 

In [None]:
device = torch.accelerator.current_accelerator() if torch.accelerator.is_available() else "cpu"
model = NeuralNetwork().to(device)
summary(model, (3, 32, 32))

: 

In [None]:
#Defining the optimizer and the loss function for the model
optimizer = torch.optim.RMSprop(model.parameters(), lr = 1e-3)
criterion = nn.BCELoss()
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode = 'min', patience=5)

: 

In [None]:
#Defining the training function
from tqdm import tqdm

def train(dataloader, model, optimizer, loss_fn):
  num_batches = len(dataloader)

  model.train()
  running_loss = 0.0
  correct = 0
  total = 0

  pbar = tqdm(dataloader, desc = "Training", leave = True)

  for (X, y) in pbar:
    X, y = X.to(device), y.to(device).float().unsqueeze(1)

    y_pred = model.forward(X)
    loss = loss_fn(y_pred, y)

    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

    running_loss += loss.item()
    pred = (y_pred > 0.5).float()
    correct += (pred == y).sum().item()
    total += X.size(0)

    pbar.set_postfix(
          loss = f"{(running_loss/(total / X.size(0)))}",
          accuracy = f"{correct/total}"
    )

  train_loss = running_loss / num_batches
  accuracy = correct / total

  return train_loss, accuracy

: 

In [None]:
#Define func for Validation and Test
from tqdm import tqdm

def infer(dataloader, model, loss_fn):
  num_batches = len(dataloader)

  inference_loss = 0.0
  inference_accuracy = 0.0
  total = 0

  model.eval()

  with torch.no_grad():

   pbar = tqdm(dataloader, desc = "Validation", leave = True)

   for X, y in pbar:
      X, y = X.to(device), y.to(device).float().unsqueeze(1)

      y_pred = model.forward(X)
      loss = loss_fn(y_pred, y)

      inference_loss += loss.item()
      predicted = (y_pred > 0.5).float()
      correct = (predicted == y).sum().item()

      inference_accuracy += correct
      total += X.size(0)

      pbar.set_postfix(
          loss = f"{(inference_loss/(total / X.size(0)))}",
          accuracy = f"{inference_accuracy/total}"
      )

  inference_loss /= num_batches
  inference_accuracy /= total

  return inference_loss, inference_accuracy

: 

In [None]:
#Let's train, validate and test with model checkpoint
from tqdm import tqdm

epochs = 50
best_val_loss = float('inf')
save_path = "best_mode.pth"

history = {
    'train_loss': [],
    'train_acc' : [],
    'val_loss' : [],
    'val_acc': [],
}

for epoch in range(epochs):

    #Training
    train_loss, train_acc = train(train_loader, model, optimizer, criterion)

    #Validation
    val_loss, val_acc = infer(val_loader, model, criterion)
    scheduler.step(val_loss)

    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['train_acc'].append(train_acc)
    history['val_acc'].append(val_acc)

    #Model Checkpoint
    if val_loss < best_val_loss:
      best_val_loss = val_loss
      torch.save(model.state_dict(), save_path)
      print(f"Model saved to {save_path} ")

print("Training Complete")

#Testing
test_loss, test_acc = infer(test_loader, model, criterion)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {(test_acc * 100):.4f}")

In [None]:
import matplotlib.pyplot as plt

accuracy = history['train_acc']
val_accuracy = history['val_acc']
loss = history['train_loss']
val_loss = history['val_loss']
epochs = range(1, len(accuracy) + 1)

fig, axes = plt.subplots(1, 2, figsize = (20, 10))

axes[0].plot(epochs, accuracy, 'r--', label = "Training Accuracy")
axes[0].plot(epochs, val_accuracy, 'b', label = "Validation Accuracy")
axes[0].set_title("Training and Validation Accuracy")
plt.legend()

axes[1].plot(epochs, loss, 'r--', label = "Training Loss")
axes[1].plot(epochs, val_loss, 'b', label = "Validation Loss")
axes[1].set_title("Training and Validation Loss")
plt.legend()

plt.tight_layout()
plt.show()

In [None]:
#Visualize sample predictions

best_model = NeuralNetwork()
best_model.load_state_dict(torch.load('best_mode.pth'))

best_model.eval()
images, label = next(iter(test_loader))

image = images[0].to(device)
with torch.no_grad():
  pred = best_model(images)
  predicted = (pred > 0.5).float().cpu().squeeze()

print(f"Predicted {predicted}")

label_map = {
    0: "Cat",
    1: "Dog"
}
images = images.cpu().numpy()
images = np.transpose(images, (0, 2, 3, 1))

plt.figure(figsize=(16, 8))
for i in range(16):
  plt.subplot(4,4, i + 1)
  image = images[i]
  plt.imshow(image)
  plt.title(f"Correct Label: {label_map[label[i].item()]}")
  plt.xticks([])
  plt.yticks([])
  plt.xlabel(f"Predicted Lable: {label_map[predicted[i].item()]}")

plt.tight_layout()
plt.show()

Since this just gave the accuracy of about 70 %, we are going to use the data augmentation technique


In [None]:
import torchvision.transforms as transforms

train_transform = transforms.Compose([
    transforms.Resize((180, 180)),
    transforms.ToPILImage(), #Since the input is Tensor as we transformed in the top
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast = 0.2),
    transforms.ToTensor()
])

test_transform = transforms.Compose(
    [transforms.Resize((180, 180))]
)

augmented_train_ready = DatasetWrapper(train_data, transform=train_transform)
val_ready = DatasetWrapper(validation_data, transform=test_transform)
test_ready = DatasetWrapper(test_data, transform= test_transform)

: 

In [None]:
#Visualizing the Data Augmentation
batch_size = 64
train_loader = DataLoader(
    augmented_train_ready,
    batch_size, 
    shuffle = True, 
    num_workers=0,
    pin_memory=True
)

val_loader = DataLoader(
    val_ready, 
    batch_size, 
    shuffle = True
)

test_loader = DataLoader(
    test_ready, 
    batch_size, 
    shuffle = True
)

: 

In [None]:
images, labels = next(iter(train_loader))
images = np.transpose(images, (0, 2, 3, 1))

: 

In [None]:

plt.figure(figsize =(15, 10))
for i in range(16):
    plt.subplot(4,4, i+1)
    plt.imshow(images[i])
    plt.axis('off')

plt.show()

: 

In [None]:
#Visualization of the Data Augmentation
img = images[0]

plt.figure(figsize = (15, 10))

for i in range(8):
    plt.subplot(2, 4, i+1)
    aug_image, label = augmented_train_ready[0]

    aug_image = aug_image.permute(1, 2, 0).numpy()

    plt.subplot(2, 4, i +1)
    plt.imshow(aug_image)
    plt.title(f"Aug {i+1}")
    plt.axis('off')
plt.show()

: 

In [None]:
from torch.nn.modules import activation
from torchsummary import summary

# Defining the model


class AugNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.ConV_stack = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d(1),
            nn.Dropout(0.25),
            nn.Flatten(),
            nn.Linear(512, 1),
            nn.Sigmoid(),
        )

    def forward(self, X):
        Y = self.ConV_stack(X)
        return Y

: 

In [None]:
model = AugNN().to(device)
summary(model, (3, 32, 32))

: 

In [None]:
# Defining the optimizer and the loss function for the model
optimizer = torch.optim.RMSprop(model.parameters(), lr=1e-3)
criterion = nn.BCELoss()
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", patience=5
)


: 

In [None]:
# Training, Validating and Testing
from tqdm import tqdm

epochs = 50
best_val_loss = float("inf")
save_path = "best_mode.pth"

history = {
    "train_loss": [],
    "train_acc": [],
    "val_loss": [],
    "val_acc": [],
}

for epoch in range(epochs):

    # Training
    train_loss, train_acc = train(train_loader, model, optimizer, criterion)

    # Validation
    val_loss, val_acc = infer(val_loader, model, criterion)
    scheduler.step(val_loss)

    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)
    history["train_acc"].append(train_acc)
    history["val_acc"].append(val_acc)

    # Model Checkpoint
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), save_path)
        print(f"Model saved to {save_path} ")

print("Training Complete")

# Testing
test_loss, test_acc = infer(test_loader, model, criterion)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {(test_acc * 100):.4f}")

In [None]:
import matplotlib.pyplot as plt

accuracy = history["train_acc"]
val_accuracy = history["val_acc"]
loss = history["train_loss"]
val_loss = history["val_loss"]
epochs = range(1, len(accuracy) + 1)

fig, axes = plt.subplots(1, 2, figsize=(20, 10))

axes[0].plot(epochs, accuracy, "r--", label="Training Accuracy")
axes[0].plot(epochs, val_accuracy, "b", label="Validation Accuracy")
axes[0].set_title("Training and Validation Accuracy")
plt.legend()

axes[1].plot(epochs, loss, "r--", label="Training Loss")
axes[1].plot(epochs, val_loss, "b", label="Validation Loss")
axes[1].set_title("Training and Validation Loss")
plt.legend()

plt.tight_layout()
plt.show()

1. Feature Extraction without Data Augmentation

In [None]:
import timm
import torch
from torchsummary import summary

conv_base = timm.create_model('xception', pretrained=True, num_classes = 0, global_pool = '')
device = "cuda" if torch.cuda.is_available() else "cpu"
conv_base = conv_base.to(device)
summary(conv_base, (3, 180,180))

: 

In [None]:
test_image = torch.randn(1, 3, 180,180)
test_image = test_image.to(device)
output = conv_base(test_image)
output.shape

: 

In [None]:

from torchvision import transforms

preprocess = transforms.Compose([
    transforms.Resize((180, 180)),
])

train_ready = DatasetWrapper(train_data, transform = preprocess)
train_loader = DataLoader(train_ready, batch_size, shuffle=True)

images, _ = next(iter(train_loader))
images[0].min(), images[0].max()

: 

In [None]:
# Extracting the feature
conv_base.eval()
def get_features_and_labels(dataloader):
    all_features = []
    all_labels = []

    with torch.no_grad():
        for images, labels in dataloader:
            images = images.to(device)
            features = conv_base(images)
            all_features.append(features.cpu().numpy())
            all_labels.append(labels)
        
    return np.concatenate(all_features), np.concatenate(all_labels)

train_features, train_labels = get_features_and_labels(train_loader)

: 

In [None]:
val_features, val_labels = get_features_and_labels(val_loader)

: 

In [None]:
test_features, test_labels = get_features_and_labels(test_loader)

: 

In [None]:
import torch
device = 'cuda' if torch.cuda.is_available() else "cpu"
device

: 

In [None]:
#Defining the classifier model
from torch import nn
from torchsummary import summary

class LinearClassifier(nn.Module):
    def __init__(self):
        super().__init__()

        self.gap = nn.AdaptiveAvgPool2d(1)
        self.flatten = nn.Flatten()
        self.classifier = nn.Sequential(
            nn.Linear(2048, 256),
            nn.ReLU(), 
            nn.Dropout(0.25),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )
    
    def forward(self, X):
        X = self.gap(X)
        X = self.flatten(X)
        return self.classifier(X)

model = LinearClassifier().to(device)
summary(model, ((2048, 6, 6)))

: 

In [None]:
import torch 
from torch.utils.data import TensorDataset, DataLoader

train_features_t = torch.from_numpy(train_features).float()
train_labels_t = torch.from_numpy(train_labels).float()

val_features_t = torch.from_numpy(val_features).float()
val_labels_t = torch.from_numpy(val_labels).float()

test_features_t = torch.from_numpy(test_features).float()
test_labels_t = torch.from_numpy(test_labels).float()

train_ds = TensorDataset(train_features_t, train_labels_t)
val_ds = TensorDataset(val_features_t, val_labels_t)
test_ds = TensorDataset(test_features_t, test_labels_t)

train_loader = DataLoader(train_ds, batch_size=32, shuffle = True)
val_loader = DataLoader(val_ds, batch_size = 32)
test_loader = DataLoader(test_ds, batch_size=32)

: 

In [47]:
train_labels_t.shape

torch.Size([2000])

In [None]:
# Training, Validating and Testing
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.BCELoss()

from tqdm import tqdm


epochs = 10

best_val_loss = float("inf")

save_path = "best_mode.pth"


history = {

    "train_loss": [],

    "train_acc": [],

    "val_loss": [],

    "val_acc": [],

}


for epoch in range(epochs):


    # Training

    train_loss, train_acc = train(train_loader, model, optimizer, criterion)


    # Validation

    val_loss, val_acc = infer(val_loader, model, criterion)

    scheduler.step(val_loss)


    history["train_loss"].append(train_loss)

    history["val_loss"].append(val_loss)

    history["train_acc"].append(train_acc)

    history["val_acc"].append(val_acc)


    # Model Checkpoint

    if val_loss < best_val_loss:

        best_val_loss = val_loss
        torch.save(model.state_dict(), save_path)

        print(f"Model saved to {save_path} ")


print("Training Complete")


# Testing

test_loss, test_acc = infer(test_loader, model, criterion)

print(f"Test Loss: {test_loss:.4f}")

print(f"Test Accuracy: {(test_acc * 100):.4f}")

Training: 100%|██████████| 63/63 [00:00<00:00, 153.50it/s, accuracy=0.539, loss=0.348296257019043]              
Validation: 100%|██████████| 32/32 [00:00<00:00, 258.03it/s, accuracy=0.526, loss=0.17703984594345093]            


Model saved to best_mode.pth 


Training: 100%|██████████| 63/63 [00:00<00:00, 205.98it/s, accuracy=0.5385, loss=0.3485322675704956]            
Validation: 100%|██████████| 32/32 [00:00<00:00, 250.49it/s, accuracy=0.526, loss=0.17703984594345093]            
Training: 100%|██████████| 63/63 [00:00<00:00, 210.29it/s, accuracy=0.5455, loss=0.3482172832489014]            
Validation: 100%|██████████| 32/32 [00:00<00:00, 251.52it/s, accuracy=0.526, loss=0.17703984594345093]            
Training: 100%|██████████| 63/63 [00:00<00:00, 198.94it/s, accuracy=0.5255, loss=0.348206063747406]             
Validation: 100%|██████████| 32/32 [00:00<00:00, 221.80it/s, accuracy=0.526, loss=0.17703984594345093]            
Training: 100%|██████████| 63/63 [00:00<00:00, 212.56it/s, accuracy=0.5245, loss=0.3487097144126892]            
Validation: 100%|██████████| 32/32 [00:00<00:00, 196.35it/s, accuracy=0.526, loss=0.17703984594345093]            
Training: 100%|██████████| 63/63 [00:00<00:00, 199.28it/s, accuracy=0.5495, loss=0.34828

Training Complete


Validation:   0%|          | 0/63 [00:00<?, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x3 and 2048x256)

In [None]:
import timm
import torch
from torchsummary import summary

conv_base = timm.create_model('xception', pretrained=True, num_classes = 0, global_pool = '')

for param in conv_base.parameters():
    param.requires_grad = False

class IntegratedModel(nn.Module):
    def __init__(self):
        self.base = conv_base
        self.gap = nn.AdaptiveAvgPool2d(1)
        self.flatten = nn.Flatten()
        self.classifier = nn.Sequential(
            nn.Linear(2048, 256),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(256, 1),
            nn.Sigmoid()
        )
    
    def forward(self, X):
        X = self.base(X)
        X = self.gap(X)
        X = self.flatten(X)
        return self.classifier(X)

model = IntegratedModel().to(device)
summary(model, (3, 180,180))