<a href="https://colab.research.google.com/github/eshal26/PCA-CNN/blob/main/VGG_PCA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [23]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import os
import shutil
import sklearn
from sklearn.model_selection import train_test_split

In [24]:
train_dir = 'train_dataset'
val_dir = 'validation_dataset'
test_dir = 'test_dataset'

# Define transformations for training, validation, and testing data
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(degrees=15),
    transforms.RandomVerticalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.8238, 0.8539, 0.9391], std=[0.1325, 0.1437, 0.0529])
])

val_test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.8238, 0.8539, 0.9391], std=[0.1325, 0.1437, 0.0529])
])

# Create datasets
train_dataset = datasets.ImageFolder(root=train_dir, transform=train_transform)
val_dataset = datasets.ImageFolder(root=val_dir, transform=val_test_transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=val_test_transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [25]:
from sklearn.decomposition import PCA, IncrementalPCA

class CustomVGGWithPCA(nn.Module):
    def __init__(self, num_classes=2, pca_components=40):
        super(CustomVGGWithPCA, self).__init__()

        # Feature extraction layers with added pooling to reduce dimensions
        self.features = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Output: 112x112x64

            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),  # Output: 56x56x128

            # Additional pooling layer to reduce dimensionality further
            nn.MaxPool2d(kernel_size=4, stride=4)   # Output: 14x14x128
        )

        # Initialize PCA to None
        self.pca = None

        # Classifier layers
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Linear(pca_components, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),
            nn.Linear(512, num_classes)
        )

    def extract_features(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)  # Flatten the features for PCA
        return x

    def forward(self, x):
        x = self.extract_features(x)
        device = x.device

        # Apply PCA if it is fitted
        if self.pca is not None:


            # Convert tensor to NumPy array (detach and move to CPU)
            x_cpu = x.detach().cpu().numpy()

            # Apply PCA transformation
            x_pca = self.pca.transform(x_cpu)  # Apply PCA transformation on NumPy array

            # Convert the transformed data back to PyTorch tensor
            x = torch.from_numpy(x_pca).to(device, dtype=torch.float32)
        else:
            raise RuntimeError("PCA must be fitted before the forward pass")

        x = self.classifier(x)
        return x

In [27]:
from sklearn.decomposition import IncrementalPCA

def fit_pca_on_train_data(train_loader, model, device, pca_components ,  batch_size=64):
    # Dynamically determine the number of features from a sample batch
    with torch.no_grad():
        sample_batch, _ = next(iter(train_loader))
        sample_features = model.extract_features(sample_batch.to(device))
        num_features = sample_features.shape[1]  # Get feature dimension from sample batch


    # Initialize IncrementalPCA with the number of features dynamically set
    ipca = IncrementalPCA(n_components=pca_components, batch_size=batch_size)

    # Process each batch and fit IncrementalPCA
    for images, _ in train_loader:
        images = images.to(device)
        with torch.no_grad():
            features = model.extract_features(images)
            features = features.detach().cpu().numpy()  # Convert to numpy array for IncrementalPCA
            ipca.partial_fit(features)  # Fit incrementally

    # Assign the fitted IncrementalPCA back to model’s PCA attribute
    model.pca = ipca
    print("PCA fitting completed on training data.")

# Example usage
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pca_components = 40
num_classes = 2

model = CustomVGGWithPCA(num_classes=num_classes, pca_components=pca_components).to(device)
fit_pca_on_train_data(train_loader,  model,  device, pca_components=pca_components)


PCA fitting completed on training data.


In [28]:
# Training function
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        running_loss, correct_predictions, total_samples = 0.0, 0, 0

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            correct_predictions += (predicted == labels).sum().item()
            total_samples += labels.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_accuracy = 100 * correct_predictions / total_samples
        print(f"Epoch {epoch+1}/{num_epochs}: Training Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%")


criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=40)


Epoch 1/40: Training Loss: 1.3678, Accuracy: 77.04%
Epoch 2/40: Training Loss: 0.9917, Accuracy: 77.50%
Epoch 3/40: Training Loss: 0.7822, Accuracy: 79.07%
Epoch 4/40: Training Loss: 0.6591, Accuracy: 80.34%
Epoch 5/40: Training Loss: 0.6228, Accuracy: 80.11%
Epoch 6/40: Training Loss: 0.5220, Accuracy: 81.87%
Epoch 7/40: Training Loss: 0.5104, Accuracy: 81.72%
Epoch 8/40: Training Loss: 0.4799, Accuracy: 82.26%
Epoch 9/40: Training Loss: 0.4716, Accuracy: 82.18%
Epoch 10/40: Training Loss: 0.4834, Accuracy: 82.03%
Epoch 11/40: Training Loss: 0.4651, Accuracy: 82.60%
Epoch 12/40: Training Loss: 0.4679, Accuracy: 82.49%
Epoch 13/40: Training Loss: 0.4736, Accuracy: 82.87%
Epoch 14/40: Training Loss: 0.4721, Accuracy: 83.14%
Epoch 15/40: Training Loss: 0.4380, Accuracy: 83.99%
Epoch 16/40: Training Loss: 0.4572, Accuracy: 82.68%
Epoch 17/40: Training Loss: 0.4614, Accuracy: 82.37%
Epoch 18/40: Training Loss: 0.4583, Accuracy: 83.29%
Epoch 19/40: Training Loss: 0.4555, Accuracy: 82.41%
Ep