<a href="https://colab.research.google.com/github/breakthe-rule/6th-sem-project/blob/main/code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [31]:
import kagglehub
import shutil

# Download the dataset to the default cache location
path = kagglehub.dataset_download("bmadushanirodrigo/x-ray-and-non-x-ray-image-classification-data")

# Define the target path within /content directory
target_path = '/content/x-ray-and-non-x-ray-image-classification-data/'

# Move the dataset to the /content directory
shutil.move(path, target_path)

print("Dataset downloaded and moved to:", target_path)


Downloading from https://www.kaggle.com/api/v1/datasets/download/bmadushanirodrigo/x-ray-and-non-x-ray-image-classification-data?dataset_version_number=2...


100%|██████████| 99.7M/99.7M [00:00<00:00, 119MB/s]

Extracting files...





Dataset downloaded and moved to: /content/x-ray-and-non-x-ray-image-classification-data/


In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import os

In [33]:
# ============================
# 1. Dataset & DataLoader Setup
# ============================

# Define dataset root directory
dataset_root = "/content/x-ray-and-non-x-ray-image-classification-data/Xray_Classifier/Xray_Classifier"

# Paths for train, validation, and test sets
train_dir = os.path.join(dataset_root, "train")
val_dir = os.path.join(dataset_root, "val")
test_dir = os.path.join(dataset_root, "test")

# Image transformations
image_transforms = transforms.Compose([
    transforms.Resize((128, 128)),  # Resize images
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.5], std=[0.5])  # Normalize
])

# Load datasets
train_dataset = datasets.ImageFolder(root=train_dir, transform=image_transforms)
val_dataset = datasets.ImageFolder(root=val_dir, transform=image_transforms)
test_dataset = datasets.ImageFolder(root=test_dir, transform=image_transforms)

# Define batch size
batch_size = 32

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

# Print class mapping
print("Class Mapping:", train_dataset.class_to_idx)  # {'non_xray': 0, 'xray': 1}


Class Mapping: {'non_xray': 0, 'xray': 1}


In [34]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CustomCNN(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        # Convolutional layers with batch normalization
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)

        # Global Average Pooling
        self.gap = nn.AdaptiveAvgPool2d(1)  # Output size (1,1)

        # Fully connected layers
        self.fc1 = nn.Linear(128, 256)
        self.fc2 = nn.Linear(256, num_classes)

        # Define masks for keeping specific neurons inactive
        self.mask_conv1 = torch.ones_like(self.conv1.weight)
        self.mask_conv1[5:15] = 0  # Fix neurons 5-9 inactive

        self.mask_conv2 = torch.ones_like(self.conv2.weight)
        self.mask_conv2[10:30] = 0  # Fix neurons 10-19 inactive

        self.mask_conv3 = torch.ones_like(self.conv3.weight)
        self.mask_conv3[20:60] = 0  # Fix neurons 20-39 inactive

        self.mask_fc1 = torch.ones_like(self.fc1.weight)
        self.mask_fc1[40:100] = 0  # Fix neurons 50-99 inactive

        self.mask_fc2 = torch.ones_like(self.fc2.weight)
        self.mask_fc2[10:100] = 0  # Fix neurons 10-49 inactive

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))

        x = self.gap(x)  # Global Average Pooling
        x = torch.flatten(x, start_dim=1)  # Shape: (batch_size, 128)

        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

    def zero_grad_neurons(self):
        """Ensures specific neurons never update and remain zero"""
        device = self.conv1.weight.device  # Get current device
        self.mask_conv1 = self.mask_conv1.to(device)
        self.mask_conv2 = self.mask_conv2.to(device)
        self.mask_conv3 = self.mask_conv3.to(device)
        self.mask_fc1 = self.mask_fc1.to(device)
        self.mask_fc2 = self.mask_fc2.to(device)

        # Zero out gradients for selected neurons
        if self.conv1.weight.grad is not None:
            self.conv1.weight.grad *= self.mask_conv1
        if self.conv2.weight.grad is not None:
            self.conv2.weight.grad *= self.mask_conv2
        if self.conv3.weight.grad is not None:
            self.conv3.weight.grad *= self.mask_conv3
        if self.fc1.weight.grad is not None:
            self.fc1.weight.grad *= self.mask_fc1
        if self.fc2.weight.grad is not None:
            self.fc2.weight.grad *= self.mask_fc2

        # Force weights to remain zero for masked neurons
        with torch.no_grad():
            self.conv1.weight *= self.mask_conv1
            self.conv2.weight *= self.mask_conv2
            self.conv3.weight *= self.mask_conv3
            self.fc1.weight *= self.mask_fc1
            self.fc2.weight *= self.mask_fc2


In [2]:
# ============================
# 3. Training, Validation, and Testing
# ============================
from time import time

def train_model(model, train_loader, val_loader, num_epochs=10, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    print(f'Device: {device}')

    criterion = nn.CrossEntropyLoss()  # For multi-class classification
    optimizer = optim.Adam(model.parameters(), lr=lr)

    for epoch in range(num_epochs):
        print(f'Epoch: {epoch}/{num_epochs}')
        model.train()
        running_loss = 0.0
        correct, total = 0, 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            model.zero_grad_neurons()  # Zero out specific gradients
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_acc = 100 * correct / total
        val_loss, val_acc = validate_model(model, val_loader, criterion, device)

        print(f"Train Loss: {running_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}% | "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")

def validate_model(model, val_loader, criterion, device):
    model.eval()
    val_loss = 0.0
    correct, total = 0, 0

    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_acc = 100 * correct / total
    return val_loss / len(val_loader), val_acc

def test_model(model, test_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()

    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    test_acc = 100 * correct / total
    print(f"Test Accuracy: {test_acc:.2f}%")


In [None]:
# ============================
# 4. Run Training & Testing
# ============================

# Initialize model
model = CustomCNN(num_classes=2)

# Train model
train_model(model, train_loader, val_loader, num_epochs=10, lr=0.001)

# Test model
test_model(model, test_loader)

Device: cuda
Epoch: 0/10
Train Loss: 0.2065, Train Acc: 94.26% | Val Loss: 0.1958, Val Acc: 99.58%
Epoch: 1/10
Train Loss: 0.0602, Train Acc: 98.23% | Val Loss: 0.0413, Val Acc: 98.14%
Epoch: 2/10
Train Loss: 0.0266, Train Acc: 99.16% | Val Loss: 0.0202, Val Acc: 99.92%
Epoch: 3/10
Train Loss: 0.0181, Train Acc: 99.66% | Val Loss: 0.1336, Val Acc: 99.49%
Epoch: 4/10
Train Loss: 0.2121, Train Acc: 96.96% | Val Loss: 0.2465, Val Acc: 96.54%
Epoch: 5/10
Train Loss: 0.2070, Train Acc: 95.61% | Val Loss: 0.0590, Val Acc: 98.40%
Epoch: 6/10
Train Loss: 0.0761, Train Acc: 97.55% | Val Loss: 0.0411, Val Acc: 99.16%
Epoch: 7/10
Train Loss: 0.0363, Train Acc: 99.16% | Val Loss: 0.0217, Val Acc: 99.41%
Epoch: 8/10
Train Loss: 0.0217, Train Acc: 99.24% | Val Loss: 0.0134, Val Acc: 99.58%
Epoch: 9/10
Train Loss: 0.0130, Train Acc: 99.75% | Val Loss: 0.0123, Val Acc: 99.58%
Test Accuracy: 99.64%


In [3]:
import kagglehub
import shutil

# Download latest version
path = kagglehub.dataset_download("hemanthsai7/solar-panel-dust-detection")

# Define the target path within /content directory
target_path = '/content/solar-panel-dust-detection/'

# Move the dataset to the /content directory
shutil.move(path, target_path)

print("Dataset downloaded and moved to:", target_path)


KeyboardInterrupt: 

In [7]:
import random
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset

# Image transformations (matching the previous implementation)
image_transforms = transforms.Compose([
    transforms.Resize(128),
    transforms.CenterCrop(128),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

# Load full MNIST dataset
full_dataset = datasets.MNIST(root="./data", train=True, transform=image_transforms, download=True)

# Filter indices: Keep only classes 0 and 1
filtered_indices = [idx for idx, (_, label) in enumerate(full_dataset) if label in [0, 1]]

# Create a new dataset with only images of "0" and "1"
filtered_dataset = Subset(full_dataset, filtered_indices)

# Separate indices for "0" and "1"
class_indices = {0: [], 1: []}

for idx in filtered_indices:
    _, label = full_dataset[idx]
    if len(class_indices[label]) < 1000:
        class_indices[label].append(idx)
    if all(len(indices) >= 1000 for indices in class_indices.values()):
        break

train_indices = class_indices[0] + class_indices[1]

# Remaining indices for validation and testing
remaining_class_indices = {0: [], 1: []}

for idx in set(filtered_indices) - set(train_indices):
    _, label = full_dataset[idx]
    remaining_class_indices[label].append(idx)

val_indices = []
test_indices = []

for label in [0, 1]:
    random.shuffle(remaining_class_indices[label])
    val_indices.extend(remaining_class_indices[label][:200])  # 20 per class for validation
    test_indices.extend(remaining_class_indices[label][200:400])  # 20 per class for testing

# Create subsets
train_dataset = Subset(full_dataset, train_indices)
val_dataset = Subset(full_dataset, val_indices)
test_dataset = Subset(full_dataset, test_indices)

# Define batch size
batch_size = 32

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

# Print dataset sizes
print(f"Train dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")

# Print label counts to verify
print("Training Labels:", [full_dataset[i][1] for i in train_indices][:10])
print("Validation Labels:", [full_dataset[i][1] for i in val_indices][:10])
print("Test Labels:", [full_dataset[i][1] for i in test_indices][:10])


Train dataset size: 2000
Validation dataset size: 400
Test dataset size: 400
Training Labels: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Validation Labels: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Test Labels: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ComplementaryCNN(nn.Module):
    def __init__(self, num_classes=2):
        super().__init__()

        # Convolutional layers with batch normalization
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)  # 1 input channel for MNIST
        self.bn1 = nn.BatchNorm2d(32)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)

        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)

        # Global Average Pooling
        self.gap = nn.AdaptiveAvgPool2d(1)  # Output size (1,1)

        # Fully connected layers
        self.fc1 = nn.Linear(128, 256)
        self.fc2 = nn.Linear(256, num_classes)

        # Define masks (initialized but not moved to device yet)
        self.mask_conv1 = torch.zeros_like(self.conv1.weight)
        self.mask_conv1[:20] = 1  # Train first 20 filters

        self.mask_conv2 = torch.zeros_like(self.conv2.weight)
        self.mask_conv2[:40] = 1  # Train first 40 filters

        self.mask_conv3 = torch.zeros_like(self.conv3.weight)
        self.mask_conv3[:80] = 1  # Train first 80 filters

        self.mask_fc1 = torch.zeros_like(self.fc1.weight)
        self.mask_fc1[:60] = 1  # Train first 60 neurons

        self.mask_fc2 = torch.zeros_like(self.fc2.weight)
        self.mask_fc2[:90] = 1  # Train first 90 neurons

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))

        x = self.gap(x)  # Global Average Pooling
        x = torch.flatten(x, start_dim=1)  # Shape: (batch_size, 128)

        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        return x

    def zero_grad_neurons(self):
        """Ensures only specific neurons are trained and others remain zero"""
        device = next(self.parameters()).device  # Get model device

        # Move masks to the correct device before applying them
        self.mask_conv1 = self.mask_conv1.to(device)
        self.mask_conv2 = self.mask_conv2.to(device)
        self.mask_conv3 = self.mask_conv3.to(device)
        self.mask_fc1 = self.mask_fc1.to(device)
        self.mask_fc2 = self.mask_fc2.to(device)

        # Zero out gradients for unselected neurons
        if self.conv1.weight.grad is not None:
            self.conv1.weight.grad *= self.mask_conv1
        if self.conv2.weight.grad is not None:
            self.conv2.weight.grad *= self.mask_conv2
        if self.conv3.weight.grad is not None:
            self.conv3.weight.grad *= self.mask_conv3
        if self.fc1.weight.grad is not None:
            self.fc1.weight.grad *= self.mask_fc1
        if self.fc2.weight.grad is not None:
            self.fc2.weight.grad *= self.mask_fc2

        # Force weights of unselected neurons to remain zero
        with torch.no_grad():
            self.conv1.weight *= self.mask_conv1
            self.conv2.weight *= self.mask_conv2
            self.conv3.weight *= self.mask_conv3
            self.fc1.weight *= self.mask_fc1
            self.fc2.weight *= self.mask_fc2

# Initialize Model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ComplementaryCNN(num_classes=2).to(device)  # Move model to GPU


In [10]:
# ============================
# 4. Run Training & Testing
# ============================

import warnings
import os
warnings.filterwarnings("ignore")

# Initialize model
# model = CustomCNN(num_classes=2)
# model = ComplementaryCNN(num_classes=2)

# Train model
train_model(model, train_loader, val_loader, num_epochs=100, lr=0.00001)

# Test model
test_model(model, test_loader)

Device: cuda
Epoch: 0/100
Train Loss: 0.6934, Train Acc: 52.30% | Val Loss: 0.6873, Val Acc: 96.75%
Epoch: 1/100
Train Loss: 0.6804, Train Acc: 94.75% | Val Loss: 0.6717, Val Acc: 95.00%
Epoch: 2/100
Train Loss: 0.6684, Train Acc: 94.55% | Val Loss: 0.6618, Val Acc: 95.00%
Epoch: 3/100
Train Loss: 0.6601, Train Acc: 93.75% | Val Loss: 0.6556, Val Acc: 94.75%
Epoch: 4/100
Train Loss: 0.6538, Train Acc: 93.85% | Val Loss: 0.6481, Val Acc: 95.00%
Epoch: 5/100
Train Loss: 0.6471, Train Acc: 93.75% | Val Loss: 0.6433, Val Acc: 95.75%
Epoch: 6/100
Train Loss: 0.6407, Train Acc: 93.90% | Val Loss: 0.6337, Val Acc: 96.00%
Epoch: 7/100
Train Loss: 0.6336, Train Acc: 94.25% | Val Loss: 0.6279, Val Acc: 96.50%
Epoch: 8/100
Train Loss: 0.6259, Train Acc: 95.00% | Val Loss: 0.6195, Val Acc: 96.75%
Epoch: 9/100
Train Loss: 0.6183, Train Acc: 96.20% | Val Loss: 0.6114, Val Acc: 96.75%
Epoch: 10/100
Train Loss: 0.6108, Train Acc: 96.20% | Val Loss: 0.6016, Val Acc: 96.75%
Epoch: 11/100
Train Loss: 0.6

Exception ignored in: <function _after_fork at 0x7ddd1d68b600>
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1663, in _after_fork
    thread._reset_internal_locks(False)
  File "/usr/lib/python3.11/threading.py", line 917, in _reset_internal_locks
    self._started._at_fork_reinit()
  File "/usr/lib/python3.11/threading.py", line 573, in _at_fork_reinit
    self._cond._at_fork_reinit()
  File "/usr/lib/python3.11/threading.py", line 267, in _at_fork_reinit
    def _at_fork_reinit(self):

KeyboardInterrupt: 


KeyboardInterrupt: 

In [24]:
import numpy as np

# Function to print model weights as NumPy arrays
def print_model_weights(model):
    for name, param in model.named_parameters():
        if param.requires_grad and ('conv1.weight' in name):
            np_array = param.cpu().detach().numpy()  # Convert to NumPy
            print(np_array.shape)
            print(f"{name} weights as NumPy array:\n")
            for i in np_array:
              print(i)

# After training, print weights
print_model_weights(model)


(32, 3, 3, 3)
conv1.weight weights as NumPy array:

[[[ 0.  0. -0.]
  [ 0. -0. -0.]
  [-0.  0. -0.]]

 [[-0. -0.  0.]
  [ 0.  0. -0.]
  [ 0. -0. -0.]]

 [[ 0.  0.  0.]
  [-0. -0.  0.]
  [ 0.  0. -0.]]]
[[[-0.12307724 -0.08977024  0.16669354]
  [-0.09600206  0.1897963  -0.06423801]
  [ 0.02577408  0.07815539 -0.06687439]]

 [[ 0.0320854   0.0860556   0.16490394]
  [ 0.01227864 -0.13354412 -0.08475346]
  [ 0.04780046  0.21088582  0.02029063]]

 [[-0.10095371  0.11851704 -0.17908937]
  [-0.1878032  -0.22275762  0.03052068]
  [-0.14742047 -0.08661371 -0.16748086]]]
[[[ 0.12408992 -0.17044146  0.0790939 ]
  [-0.18736403 -0.09220495 -0.07624567]
  [ 0.12770003 -0.02788427 -0.21036267]]

 [[ 0.13417016  0.00252966  0.01043558]
  [ 0.1579734  -0.07719019 -0.18018089]
  [ 0.050235    0.00956193  0.09973624]]

 [[-0.18657917  0.07340908 -0.05518973]
  [ 0.0400635   0.13163246  0.19825056]
  [-0.07612858  0.15967353  0.11016011]]]
[[[-0.20334475 -0.11002831 -0.05332335]
  [-0.217691    0.11863945