In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import os
import shutil
from sklearn.model_selection import train_test_split
import time
from torchvision.models import resnet18
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


In [None]:
import os
import torch
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt

# Dataset path
datapath = '/kaggle/input/trash-type-image-dataset/TrashType_Image_Dataset'

# Define transformations for training and validation
target_size = (224, 224)
batch_size = 16

train_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=10),  # Mild rotation
    transforms.ColorJitter(brightness=0.2, contrast=0.2),  # Mild color variation
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalization for pre-trained models
])

val_transforms = transforms.Compose([
    transforms.Resize(target_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load training and validation datasets
train_dataset = datasets.ImageFolder(root=datapath, transform=train_transforms)
val_size = int(0.1 * len(train_dataset))
train_size = len(train_dataset) - val_size

train_data, val_data = torch.utils.data.random_split(train_dataset, [train_size, val_size])

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)

print(f"Training samples: {len(train_data)}")
print(f"Validation samples: {len(val_data)}")

# Show some random images from the training set
def show_images(dataloader, class_names):
    images, labels = next(iter(dataloader))
    plt.figure(figsize=(10, 10))
    for i in range(min(batch_size, 9)):
        plt.subplot(3, 3, i + 1)
        img = images[i].permute(1, 2, 0) * torch.tensor([0.229, 0.224, 0.225]) + torch.tensor([0.485, 0.456, 0.406])
        plt.imshow(img.clamp(0, 1))
        plt.title(class_names[labels[i].item()])
        plt.axis('off')
    plt.show()

# Class names from the dataset
class_names = train_dataset.classes
print(f"Class names: {class_names}")
show_images(train_loader, class_names)


In [None]:
import os
import random
import matplotlib.pyplot as plt
from pathlib import Path

# Path to the RealWaste dataset
realwaste_dataset_path = "/kaggle/input/realwaste/realwaste-main/RealWaste"

# Function to plot sample images from a dataset
def plot_sample_images(dataset_path, num_samples=3):
    plt.figure(figsize=(15, 15))
    
    # Get all class directories from the dataset
    class_dirs = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]

    for idx, class_dir in enumerate(class_dirs):
        class_path = os.path.join(dataset_path, class_dir)
        images = os.listdir(class_path)
        random_samples = random.sample(images, min(num_samples, len(images)))

        # Plot the images for each class
        for i, image_name in enumerate(random_samples):
            img_path = os.path.join(class_path, image_name)
            img = plt.imread(img_path)
            plt.subplot(len(class_dirs), num_samples, idx * num_samples + i + 1)
            plt.imshow(img)
            plt.axis("off")
            plt.title(class_dir)

    plt.tight_layout()
    plt.show()

# Plot 3 random images from each class in the RealWaste dataset
plot_sample_images(realwaste_dataset_path)


In [None]:
!rm -rf /kaggle/working/*

In [None]:
import os
import shutil
import random

# Define dataset paths
trash_dataset = "/kaggle/input/trash-type-image-dataset/TrashType_Image_Dataset"
real_dataset = "/kaggle/input/realwaste/realwaste-main/RealWaste"

# Define output paths
trash_test_path = "/kaggle/working/trash_test"
real_test_path = "/kaggle/working/real_test"

# Ensure test directories exist
os.makedirs(trash_test_path, exist_ok=True)
os.makedirs(real_test_path, exist_ok=True)

# Function to copy test images
def split_data(src_folder, dest_folder, test_ratio=0.1):
    for class_name in os.listdir(src_folder):
        class_path = os.path.join(src_folder, class_name)
        if os.path.isdir(class_path):
            images = os.listdir(class_path)
            test_count = int(len(images) * test_ratio)
            test_images = random.sample(images, test_count)

            class_test_path = os.path.join(dest_folder, class_name)
            os.makedirs(class_test_path, exist_ok=True)

            for img in test_images:
                shutil.copy(os.path.join(class_path, img), os.path.join(class_test_path, img))

# Split both datasets
split_data(trash_dataset, trash_test_path)
split_data(real_dataset, real_test_path)

print("Testing sets copied successfully.")


In [None]:
import os
import shutil
import random
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter
from PIL import Image

# Define dataset paths
trash_dataset = "/kaggle/input/trash-type-image-dataset/TrashType_Image_Dataset"
real_dataset = "/kaggle/input/realwaste/realwaste-main/RealWaste"
output_dir = "/kaggle/working/combined_dataset"

# Define class mappings
class_mapping = {
    "Cardboard": ["cardboard", "Cardboard"],
    "Glass": ["glass", "Glass"],
    "Metal": ["metal", "Metal"],
    "Paper": ["paper", "Paper"],
    "Plastic": ["plastic", "Plastic"],
    "Miscellaneous Trash": ["trash", "Miscellaneous Trash"],
    "Organics": ["Food Organics", "Vegetation"],
    "Textile Trash": ["Textile Trash"]
}

# Ensure output directories exist
os.makedirs(output_dir, exist_ok=True)
for class_name in class_mapping.keys():
    os.makedirs(os.path.join(output_dir, class_name), exist_ok=True)

# Function to copy images and count class distribution
class_counts = Counter()

def copy_images(src_folder, class_names, dest_folder):
    for class_name in class_names:
        class_path = os.path.join(src_folder, class_name)
        if os.path.isdir(class_path):
            images = os.listdir(class_path)
            class_counts[dest_folder] += len(images)
            for img in images:
                src_img_path = os.path.join(class_path, img)
                dest_img_path = os.path.join(dest_folder, img)
                shutil.copy(src_img_path, dest_img_path)

# Copy images from both datasets into the new structure
for new_class, old_classes in class_mapping.items():
    dest_folder = os.path.join(output_dir, new_class)
    copy_images(trash_dataset, old_classes, dest_folder)
    copy_images(real_dataset, old_classes, dest_folder)

# Plot sample images
def plot_sample_images():
    fig, axes = plt.subplots(len(class_mapping), 5, figsize=(15, 20))
    for i, (class_name, _) in enumerate(class_mapping.items()):
        class_folder = os.path.join(output_dir, class_name)
        images = os.listdir(class_folder)
        sample_images = random.sample(images, min(5, len(images)))  # Take up to 5 images

        for j, img_name in enumerate(sample_images):
            img_path = os.path.join(class_folder, img_name)
            img = Image.open(img_path)
            axes[i, j].imshow(img)
            axes[i, j].axis("off")
            if j == 0:
                axes[i, j].set_title(class_name, fontsize=12)

    plt.tight_layout()
    plt.show()

# Plot class imbalance chart
def plot_class_distribution():
    plt.figure(figsize=(10, 6))
    sns.barplot(x=list(class_counts.keys()), y=list(class_counts.values()))
    plt.xticks(rotation=45, ha="right")
    plt.ylabel("Number of Images")
    plt.title("Class Distribution in Combined Dataset")
    plt.show()

# Show plots
plot_sample_images()
plot_class_distribution()

print("New dataset with merged classes created successfully.")


In [None]:
import os
import shutil
import random

# Define source folders
clothes_folder = "/kaggle/input/garbage-classification/garbage_classification/clothes"
shoes_folder = "/kaggle/input/garbage-classification/garbage_classification/shoes"

# Define destination folders
textile_trash_folder = "/kaggle/working/combined_dataset/Textile Trash"

# Ensure destination folders exist
os.makedirs(textile_trash_folder, exist_ok=True)

# Function to copy a subset of images
def copy_images(src_folder, dest_folder, num_images):
    images = os.listdir(src_folder)
    selected_images = random.sample(images, min(num_images, len(images)))  # Ensure we don't exceed available images
    
    for img in selected_images:
        src_path = os.path.join(src_folder, img)
        dest_path = os.path.join(dest_folder, img)
        shutil.copy(src_path, dest_path)

# Add 400 images from clothes to Textile Trash
copy_images(clothes_folder, textile_trash_folder, 400)

# Add 200 images from shoes: classify them based on material if possible
copy_images(shoes_folder, textile_trash_folder, 200)  # Assuming most are fabric/leather
# If material separation is needed, manual sorting might be required

print("Added 400 images from clothes and 200 images from shoes to the dataset.")


In [None]:
dataset_path="/kaggle/working/combined_dataset"
# List subdirectories (classes)
classes = os.listdir(dataset_path)
print(f"Classes: {classes}")

# Count the number of images in each class
for cls in classes:
    cls_path = os.path.join(dataset_path, cls)
    num_images = len(os.listdir(cls_path))
    print(f"{cls}: {num_images} images")


In [None]:
import os
import shutil

# Paths for the source and destination datasets
source_dataset_path = "/kaggle/input/data-cus/custom_test"  # Source folder with images
destination_dataset_path = "/kaggle/working/newcombined1"  # Destination folder to save combined dataset

# Ensure the destination folder exists
os.makedirs(destination_dataset_path, exist_ok=True)

# List the classes (folders) from the source dataset
classes = os.listdir(source_dataset_path)

# Iterate over each class and copy images to the destination dataset
for class_name in classes:
    class_source_path = os.path.join(source_dataset_path, class_name)
    class_dest_path = os.path.join(destination_dataset_path, class_name)
    
    if os.path.isdir(class_source_path):
        # Create the class folder in the destination directory if it doesn't exist
        os.makedirs(class_dest_path, exist_ok=True)
        
        # Get the list of image files in the class folder
        images = os.listdir(class_source_path)
        
        # Copy each image to the corresponding folder in the destination dataset
        for img_name in images:
            img_source_path = os.path.join(class_source_path, img_name)
            img_dest_path = os.path.join(class_dest_path, img_name)
            
            # Copy the image
            shutil.copy(img_source_path, img_dest_path)

        print(f"Copied {len(images)} images to {class_name} class.")

print("Image copying completed successfully!")


In [None]:
dataset_path="/kaggle/working/newcombined1"
# List subdirectories (classes)
classes = os.listdir(dataset_path)
print(f"Classes: {classes}")

# Count the number of images in each class
for cls in classes:
    cls_path = os.path.join(dataset_path, cls)
    num_images = len(os.listdir(cls_path))
    print(f"{cls}: {num_images} images")


In [None]:
import os
import random
import matplotlib.pyplot as plt

# Path to the merged dataset (Maindataset)
maindataset_path = "/kaggle/working/newcombined"

# Function to plot sample images from a dataset
def plot_sample_images(dataset_path, num_samples=3):
    plt.figure(figsize=(15, 15))
    
    # Get all class directories from the dataset
    class_dirs = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]

    for idx, class_dir in enumerate(class_dirs):
        class_path = os.path.join(dataset_path, class_dir)
        images = os.listdir(class_path)
        random_samples = random.sample(images, min(num_samples, len(images)))

        # Plot the images for each class
        for i, image_name in enumerate(random_samples):
            img_path = os.path.join(class_path, image_name)
            img = plt.imread(img_path)
            plt.subplot(len(class_dirs), num_samples, idx * num_samples + i + 1)
            plt.imshow(img)
            plt.axis("off")
            plt.title(class_dir)

    plt.tight_layout()
    plt.show()

# Plot 3 random images from each class in the Maindataset
plot_sample_images(maindataset_path)


In [None]:
import os
from collections import Counter
import matplotlib.pyplot as plt

# Count the number of images in each class
def count_class_images(dataset_path):
    class_counts = {}
    class_dirs = [d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))]

    for class_dir in class_dirs:
        class_path = os.path.join(dataset_path, class_dir)
        images = os.listdir(class_path)
        class_counts[class_dir] = len(images)

    return class_counts

# Plot class distribution
def plot_class_distribution(class_counts):
    classes = list(class_counts.keys())
    counts = list(class_counts.values())

    plt.figure(figsize=(10, 6))
    plt.barh(classes, counts, color='skyblue')
    plt.xlabel('Number of Images')
    plt.title('Class Distribution')
    plt.show()

# Get class counts
class_counts = count_class_images(maindataset_path)

# Print the class counts
for class_name, count in class_counts.items():
    print(f"Class: {class_name}, Count: {count}")

# Plot the class distribution
plot_class_distribution(class_counts)

# Check for class imbalance
total_images = sum(class_counts.values())
imbalanced_classes = {cls: count for cls, count in class_counts.items() if count < total_images / len(class_counts)}

if imbalanced_classes:
    print("\nClasses with fewer images than average (potentially imbalanced):")
    for class_name, count in imbalanced_classes.items():
        print(f"{class_name}: {count} images")
else:
    print("\nNo class imbalance detected.")


In [None]:
import shutil

# Path to the directory to be zipped
directory_to_zip = "/kaggle/working/combined_dataset"
# Path where the zip file will be saved
zip_file_path = "/kaggle/working/combined_dataset.zip"

# Create a zip file of the directory
shutil.make_archive(zip_file_path.replace('.zip', ''), 'zip', directory_to_zip)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import os
import shutil
from sklearn.model_selection import train_test_split
import time

In [None]:
data_dir = "/kaggle/working/newcombined1"  
output_dir = "/kaggle/working/waste-split"  

train_ratio = 0.7
val_ratio = 0.2
test_ratio = 0.1

for split in ['train', 'val', 'test']:
    os.makedirs(os.path.join(output_dir, split), exist_ok=True)

for class_name in os.listdir(data_dir):
    class_dir = os.path.join(data_dir, class_name)
    if not os.path.isdir(class_dir):
        continue  

    images = [img for img in os.listdir(class_dir) if img.endswith(('.png', '.jpg', '.jpeg'))]

    if len(images) == 0:
        print(f"Warning: No images found for class '{class_name}'. Skipping.")
        continue

    train_images, temp_images = train_test_split(images, test_size=(val_ratio + test_ratio), random_state=42)
    val_images, test_images = train_test_split(temp_images, test_size=(test_ratio / (val_ratio + test_ratio)), random_state=42)

    for split, split_images in zip(['train', 'val', 'test'], [train_images, val_images, test_images]):
        split_class_dir = os.path.join(output_dir, split, class_name)
        os.makedirs(split_class_dir, exist_ok=True)

        for img in split_images:
            src_path = os.path.join(class_dir, img)
            dest_path = os.path.join(split_class_dir, img)
            shutil.copy(src_path, dest_path)

print(f"Dataset split complete. Splits saved in: {output_dir}")


In [None]:
pip install umap-learn


In [None]:
import umap.umap_ as umap
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

# Load test dataset
test_dataset = datasets.ImageFolder('/kaggle/working/waste-split/test', transform=transform_val)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)

features = []
labels_list = []

# Get the class names from the dataset
class_names = test_dataset.classes

model.eval()
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        
        # Extract features before the fully connected layers
        x = model.conv_block1(images)
        x = model.conv_block2(x)
        x = model.conv_block3(x)
        x = model.conv_block4(x)
        embeddings = model.global_avg_pool(x)  # Use the global average pooling layer
        embeddings = torch.flatten(embeddings, 1)

        features.append(embeddings.cpu().numpy())
        labels_list.extend(labels.cpu().numpy())

features = np.vstack(features)
labels_list = np.array(labels_list)

# UMAP Visualization with distinct colors for each class
reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, metric='euclidean')
umap_embeddings = reducer.fit_transform(features)

# Define a color map with distinct colors for each class
cmap = ListedColormap(plt.cm.get_cmap("tab10").colors[:len(np.unique(labels_list))])

# Create the UMAP plot
plt.figure(figsize=(8, 6))
scatter = plt.scatter(umap_embeddings[:, 0], umap_embeddings[:, 1], c=labels_list, cmap=cmap, alpha=0.6)

# Create a legend for class names
handles = [plt.Line2D([0], [0], marker='o', color='w', markerfacecolor=cmap(i), markersize=10) for i in range(len(class_names))]
plt.legend(handles, class_names, title="Classes", bbox_to_anchor=(1.05, 1), loc='upper left')

# Remove ticks and title
plt.xticks([])
plt.yticks([])
plt.title("UMAP Visualization of Feature Embeddings")
plt.show()


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
import os
from tqdm import tqdm
import numpy as np

# ------------------------------- Data Preparation ------------------------------- #
# Define transformations for training and validation datasets
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), shear=10),
    transforms.GaussianBlur(kernel_size=3),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

transform_val = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load datasets
train_dataset = datasets.ImageFolder('/kaggle/working/waste-split/train', transform=transform_train)
val_dataset = datasets.ImageFolder('/kaggle/working/waste-split/val', transform=transform_val)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# ------------------------------- Compute Class Weights ------------------------------- #
# Get the class counts from the dataset
class_counts = np.bincount(train_dataset.targets)
total_samples = len(train_dataset)
class_weights = total_samples / class_counts  # Inverse of frequency
class_weights = torch.tensor(class_weights, dtype=torch.float32).cuda()  # Move to GPU if available

# ------------------------------- Load Pre-trained Model ------------------------------- #
# Load ResNet50 with pre-trained weights
model = models.resnet50(weights='IMAGENET1K_V1')

# Modify the last fully connected layer to match the number of classes in your dataset
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 8)  # Assuming 8 classes in your dataset

# ------------------------------- Step 1: Freeze All Layers ------------------------------- #
# Freeze all convolutional layers, leaving FC layers trainable
for param in model.parameters():
    param.requires_grad = False

# Unfreeze the FC layers
for param in model.fc.parameters():
    param.requires_grad = True

# Define the optimizer to only update FC layers
optimizer = optim.Adam(model.fc.parameters(), lr=1e-4)

# Weighted Cross-Entropy Loss
criterion = nn.CrossEntropyLoss(weight=class_weights)

# ------------------------------- Step 2: Train Only FC Layer ------------------------------- #
def train_fc(model, train_loader, val_loader, optimizer, criterion, num_epochs=5):
    model.train()
    best_val_accuracy = 0.0  # To track the best validation accuracy
    best_model_path = '/kaggle/working//best_fine_tuned_teacher_model.pth'  # Path to save the best model

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        # Use tqdm for progress bar
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} Training", unit='batch'):
            inputs, labels = inputs.cuda(), labels.cuda()

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Training loss and accuracy
        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total
        print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
        
        # Validation
        model.eval()
        correct = 0
        total = 0
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validation", unit='batch'):
                inputs, labels = inputs.cuda(), labels.cuda()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # Validation loss and accuracy
        val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * correct / total
        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

        # Save the model if validation accuracy improves
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), best_model_path)
            print(f"Saved best model with Validation Accuracy: {val_accuracy:.2f}%")

        model.train()

# Move model to GPU if available
model = model.cuda()

# Train the model (FC layer only)
train_fc(model, train_loader, val_loader, optimizer, criterion, num_epochs=5)

# ------------------------------- Step 3: Unfreeze Last 20 Layers ------------------------------- #
# Unfreeze the last 20 layers of the model
def unfreeze_last_layers(model, num_layers=20):
    # Get all the layers of the model
    layers = list(model.children())
    
    # Unfreeze the last 'num_layers' layers
    for layer in layers[-num_layers:]:
        for param in layer.parameters():
            param.requires_grad = True
    print(f"Unfroze the last {num_layers} layers.")

# Unfreeze last 20 layers and fine-tune
unfreeze_last_layers(model, num_layers=20)

# Define optimizer again (now update both FC layers and the last layers)
optimizer = optim.Adam(model.parameters(), lr=1e-5)

# ------------------------------- Step 4: Fine-tune the Model ------------------------------- #
def fine_tune(model, train_loader, val_loader, optimizer, criterion, num_epochs=5):
    model.train()
    best_val_accuracy = 0.0  # To track the best validation accuracy
    best_model_path = '/kaggle/working//best_fine_tuned_teacher_model.pth'  # Path to save the best model

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        # Use tqdm for progress bar
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} Fine-tuning", unit='batch'):
            inputs, labels = inputs.cuda(), labels.cuda()

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Training loss and accuracy
        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total
        print(f"Fine-tune Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
        
        # Validation
        model.eval()
        correct = 0
        total = 0
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validation", unit='batch'):
                inputs, labels = inputs.cuda(), labels.cuda()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # Validation loss and accuracy
        val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * correct / total
        print(f"Fine-tune Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

        # Save the model if validation accuracy improves
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(model.state_dict(), best_model_path)
            print(f"Saved best fine-tuned model with Validation Accuracy: {val_accuracy:.2f}%")
        
        model.train()

# Fine-tune the model
fine_tune(model, train_loader, val_loader, optimizer, criterion, num_epochs=5)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch.nn.functional as F
import numpy as np

# ------------------------------- CNN Model (Student) ------------------------------- #
class CNNModel(nn.Module):
    def __init__(self, num_classes=8):
        super(CNNModel, self).__init__()

        self.conv_block1 = self._create_conv_block(3, 64)
        self.conv_block2 = self._create_conv_block(64, 128)
        self.conv_block3 = self._create_conv_block(128, 256)
        self.conv_block4 = self._create_conv_block(256, 512)
        self.conv_block5 = self._create_conv_block(512, 512)

        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(512, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.dropout = nn.Dropout(0.3)

    def _create_conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        x = self.conv_block3(x)
        x = self.conv_block4(x)
        x = self.conv_block5(x)
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# ------------------------------- Knowledge Distillation Loss ------------------------------- #
def distillation_loss(student_outputs, teacher_outputs, temperature=2.0):
    """
    Calculate the distillation loss as the Kullback-Leibler divergence between the student and teacher's soft targets.
    """
    teacher_probs = torch.nn.functional.softmax(teacher_outputs / temperature, dim=1)
    student_probs = torch.nn.functional.log_softmax(student_outputs / temperature, dim=1)
    loss = torch.nn.functional.kl_div(student_probs, teacher_probs, reduction='batchmean')
    return loss

# ------------------------------- Load Pretrained Teacher Model ------------------------------- #
teacher_model = models.resnet50(weights='IMAGENET1K_V1')
num_ftrs = teacher_model.fc.in_features
teacher_model.fc = nn.Linear(num_ftrs, 8)  # Adjust to match your dataset's class count

# Load the saved teacher model's weights (make sure to save the teacher model after fine-tuning it)
teacher_model.load_state_dict(torch.load('/kaggle/working//best_fine_tuned_teacher_model.pth'))
teacher_model = teacher_model.cuda()  # Move to GPU if available
teacher_model.eval()  # Set the teacher model to evaluation mode

# ------------------------------- Train Student Model with Knowledge Distillation ------------------------------- #
def train_student_model(student_model, teacher_model, train_loader, val_loader, optimizer, criterion, scheduler, num_epochs=10, temperature=2.0):
    student_model.train()
    best_val_accuracy = 0.0
    best_model_path = '/kaggle/working//best_fine_tuned_teacher_model.pth'

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0
        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} Training", unit='batch'):
            inputs, labels = inputs.cuda(), labels.cuda()

            optimizer.zero_grad()

            # Get the teacher's soft targets
            with torch.no_grad():
                teacher_outputs = teacher_model(inputs)

            # Get the student model's predictions
            student_outputs = student_model(inputs)

            # Compute distillation loss and cross-entropy loss
            distill_loss = distillation_loss(student_outputs, teacher_outputs, temperature)
            ce_loss = criterion(student_outputs, labels)

            # Combine both losses
            total_loss = ce_loss + distill_loss

            total_loss.backward()
            optimizer.step()

            running_loss += total_loss.item()

            # Compute accuracy
            _, predicted = torch.max(student_outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        # Training loss and accuracy
        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total
        print(f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")

        # Validation
        student_model.eval()
        correct = 0
        total = 0
        val_loss = 0.0
        with torch.no_grad():
            for inputs, labels in tqdm(val_loader, desc="Validation", unit='batch'):
                inputs, labels = inputs.cuda(), labels.cuda()
                student_outputs = student_model(inputs)
                loss = criterion(student_outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(student_outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        # Validation loss and accuracy
        val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * correct / total
        print(f"Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

        # Save the model if validation accuracy improves
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            torch.save(student_model.state_dict(), best_model_path)
            print(f"Saved best student model with Validation Accuracy: {val_accuracy:.2f}%")

        # Step the learning rate scheduler based on validation loss
        scheduler.step(val_loss)

        student_model.train()

# ------------------------------- Set Up Optimizer, Loss, and Scheduler ------------------------------- #
# Define optimizer and criterion for student model
student_model = CNNModel(num_classes=8).cuda()
optimizer = optim.Adam(student_model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

# Define learning rate scheduler (ReduceLROnPlateau)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)

# ------------------------------- Start Training with Knowledge Distillation ------------------------------- #
train_student_model(student_model, teacher_model, train_loader, val_loader, optimizer, criterion, scheduler, num_epochs=5)


In [None]:
train_student_model(student_model, teacher_model, train_loader, val_loader, optimizer, criterion, scheduler, num_epochs=15)


In [None]:
train_student_model(student_model, teacher_model, train_loader, val_loader, optimizer, criterion, scheduler, num_epochs=20)


In [None]:
model = CNNModel(num_classes=8)
model.load_state_dict(torch.load('/kaggle/working/best_fine_tuned_teacher_model.pth'))
model.eval()  # Set the model to evaluation mode before inference


In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report

# ------------------------------- Function: Compute Per-Class Accuracy ------------------------------- #
def evaluate_model(model, data_loader, device, num_classes):
    model.eval()
    correct_per_class = torch.zeros(num_classes).to(device)
    total_per_class = torch.zeros(num_classes).to(device)
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
            for i in range(num_classes):
                correct_per_class[i] += (preds[labels == i] == i).sum().item()
                total_per_class[i] += (labels == i).sum().item()

    per_class_acc = correct_per_class / (total_per_class + 1e-6)  # Avoid division by zero
    print("\n🚀 **Per-Class Accuracy on Test Set:**")
    for i in range(num_classes):
        print(f"Class {i}: {per_class_acc[i].item()*100:.2f}%")
    
    return all_preds, all_labels

# ------------------------------- Function: Plot Confusion Matrix ------------------------------- #
def plot_confusion_matrix(y_true, y_pred, class_names):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

# ------------------------------- Load Test Data ------------------------------- #
transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_dataset = datasets.ImageFolder('/kaggle/working/waste-split/test', transform=transform_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# ------------------------------- Model Evaluation on Test Set ------------------------------- #
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Class Names for Confusion Matrix
class_names = ["Cardboard", "Misc. Trash", "Glass", "Organics", "Metal", "Plastic", "Textile Trash", "Paper"]

# Run Evaluation
y_pred, y_true = evaluate_model(model, test_loader, device, num_classes=8)

# Plot Confusion Matrix
plot_confusion_matrix(y_true, y_pred, class_names)

# Print Detailed Classification Report
print("\n **Classification Report on Test Set:**")
print(classification_report(y_true, y_pred, target_names=class_names))


In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report

# ------------------------------- Function: Compute Per-Class Accuracy ------------------------------- #
def evaluate_model(model, data_loader, device, num_classes):
    model.eval()
    correct_per_class = torch.zeros(num_classes).to(device)
    total_per_class = torch.zeros(num_classes).to(device)
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
            for i in range(num_classes):
                correct_per_class[i] += (preds[labels == i] == i).sum().item()
                total_per_class[i] += (labels == i).sum().item()

    per_class_acc = correct_per_class / (total_per_class + 1e-6)  # Avoid division by zero
    print("\n🚀 **Per-Class Accuracy on Test Set:**")
    for i in range(num_classes):
        print(f"Class {i}: {per_class_acc[i].item()*100:.2f}%")
    
    return all_preds, all_labels

# ------------------------------- Function: Plot Confusion Matrix ------------------------------- #
def plot_confusion_matrix(y_true, y_pred, class_names):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=class_names, yticklabels=class_names)
    plt.xlabel("Predicted")
    plt.ylabel("Actual")
    plt.title("Confusion Matrix")
    plt.show()

# ------------------------------- Load Test Data ------------------------------- #
transform_test = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_dataset = datasets.ImageFolder('/kaggle/working/custom', transform=transform_test)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# ------------------------------- Model Evaluation on Test Set ------------------------------- #
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Class Names for Confusion Matrix
class_names = ["Cardboard", "Misc. Trash", "Glass", "Organics", "Metal", "Plastic", "Textile Trash", "Paper"]

# Run Evaluation
y_pred, y_true = evaluate_model(model, test_loader, device, num_classes=8)

# Plot Confusion Matrix
plot_confusion_matrix(y_true, y_pred, class_names)

# Print Detailed Classification Report
print("\n📊 **Classification Report on Test Set:**")
print(classification_report(y_true, y_pred, target_names=class_names))


# Resnet50 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, ConcatDataset
from torchvision import transforms, datasets
from sklearn.utils.class_weight import compute_class_weight
from torch.optim import lr_scheduler
from torchvision.models import resnet50
from sklearn.model_selection import train_test_split
import numpy as np

# Data Augmentation and Preprocessing
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.8, 1.2)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Simple Transform for Validation and Test (No augmentation)
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Custom dataset class
class CustomDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super().__init__(root, transform)

# Merge RealWaste and Custom Datasets
realwaste_data = CustomDataset(root='/kaggle/input/realwaste/realwaste-main/RealWaste', transform=train_transform)
custom_data = CustomDataset(root='/kaggle/input/custom-data/custom_test', transform=train_transform)

# Combine datasets (RealWaste + Custom)
combined_dataset = ConcatDataset([realwaste_data, custom_data])


In [None]:

# Split into Train, Validation, and Test
train_size = int(0.7 * len(combined_dataset))
val_size = int(0.2 * len(combined_dataset))
test_size = len(combined_dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(combined_dataset, [train_size, val_size, test_size])

# Apply validation transform (no augmentation) on validation and test datasets
val_dataset.dataset.transform = val_transform
test_dataset.dataset.transform = val_transform

# DataLoader for training, validation, and test sets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# Class Weights Calculation
target_labels = []

# Iterate over individual datasets in combined dataset to collect labels
for dataset in [realwaste_data, custom_data]:
    target_labels.extend(dataset.targets)  # `targets` contains the class labels in ImageFolder

class_weights = compute_class_weight('balanced', classes=np.unique(target_labels), y=target_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float).cuda()  # Moving weights to GPU if available


In [None]:

class FocalLoss(nn.Module):
    def __init__(self, gamma=2., alpha=0.25, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction

    def forward(self, inputs, targets):
        BCE_loss = nn.CrossEntropyLoss(weight=class_weights, reduction='none')(inputs, targets)
        targets = targets.view(-1, 1)
        P = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1 - P) ** self.gamma * BCE_loss
        if self.reduction == 'mean':
            return F_loss.mean()
        elif self.reduction == 'sum':
            return F_loss.sum()
        else:
            return F_loss


# Load Pretrained ResNet50 and Modify for Fine-tuning
model = resnet50(pretrained=True)

# Replace the last fully connected layer
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(np.unique(target_labels)))  # Number of classes in your dataset

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Loss Function: Weighted Cross-Entropy
criterion = FocalLoss(gamma=2., alpha=0.25).to(device)

# Optimizer and Learning Rate Scheduler
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# Fine-tuning with Validation
num_epochs = 10
best_val_accuracy = 0.0
patience = 5  # Early stopping patience
counter = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    # Training loop
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Statistics
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    scheduler.step()

    # Validation loop
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_accuracy = 100 * val_correct / val_total
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%, Val Accuracy: {val_accuracy:.2f}%")

    # Check if this is the best model so far
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        counter = 0  # Reset patience counter
        torch.save(model.state_dict(), 'best_model.pth')  # Save the best model
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping!")
            break

# Load the best model for testing
model.load_state_dict(torch.load('best_model.pth'))

# Testing the model
model.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")


In [None]:
!pip install shap


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, ConcatDataset
from torchvision import transforms, datasets
from sklearn.utils.class_weight import compute_class_weight
from torch.optim import lr_scheduler
from torchvision.models import resnet50
from sklearn.model_selection import train_test_split
import numpy as np

class FocalLoss(nn.Module):
    def __init__(self, gamma=2., alpha=0.25, reduction='mean'):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.reduction = reduction

    def forward(self, inputs, targets):
        BCE_loss = nn.CrossEntropyLoss(weight=class_weights, reduction='none')(inputs, targets)
        targets = targets.view(-1, 1)
        P = torch.exp(-BCE_loss)
        F_loss = self.alpha * (1 - P) ** self.gamma * BCE_loss
        if self.reduction == 'mean':
            return F_loss.mean()
        elif self.reduction == 'sum':
            return F_loss.sum()
        else:
            return F_loss


# Data Augmentation and Preprocessing
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.8, 1.2)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Simple Transform for Validation and Test (No augmentation)
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Custom dataset class
class CustomDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super().__init__(root, transform)

# Merge RealWaste and Custom Datasets
realwaste_data = CustomDataset(root='/kaggle/input/realwaste/realwaste-main/RealWaste', transform=train_transform)
custom_data = CustomDataset(root='/kaggle/input/custom-data/custom_test', transform=train_transform)

# Combine datasets (RealWaste + Custom)
combined_dataset = ConcatDataset([realwaste_data, custom_data])

# Split into Train, Validation, and Test
train_size = int(0.7 * len(combined_dataset))
val_size = int(0.2 * len(combined_dataset))
test_size = len(combined_dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(combined_dataset, [train_size, val_size, test_size])

# Apply validation transform (no augmentation) on validation and test datasets
val_dataset.dataset.transform = val_transform
test_dataset.dataset.transform = val_transform

# DataLoader for training, validation, and test sets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# Class Weights Calculation
target_labels = []

# Iterate over individual datasets in combined dataset to collect labels
for dataset in [realwaste_data, custom_data]:
    target_labels.extend(dataset.targets)  # `targets` contains the class labels in ImageFolder

class_weights = compute_class_weight('balanced', classes=np.unique(target_labels), y=target_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float).cuda()  # Moving weights to GPU if available

# Load Pretrained ResNet50 and Modify for Fine-tuning
model = resnet50(pretrained=True)

# Replace the last fully connected layer
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, len(np.unique(target_labels)))  # Number of classes in your dataset

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Loss Function: Weighted Cross-Entropy
criterion = FocalLoss(gamma=2., alpha=0.25).to(device)

# Optimizer and Learning Rate Scheduler
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# Fine-tuning with Validation
num_epochs = 10
best_val_accuracy = 0.0
patience = 5  # Early stopping patience
counter = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    # Training loop
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Statistics
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    scheduler.step()

    # Validation loop
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_accuracy = 100 * val_correct / val_total
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%, Val Accuracy: {val_accuracy:.2f}%")

    # Check if this is the best model so far
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        counter = 0  # Reset patience counter
        torch.save(model.state_dict(), 'best_model.pth')  # Save the best model
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping!")
            break

# Load the best model for testing
model.load_state_dict(torch.load('best_model.pth'))

# Testing the model
model.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")


In [None]:
# Test Transform (same as training transform without augmentation)
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Load the custom test dataset
custom_test_data = datasets.ImageFolder(root='/kaggle/input/test-last/test', transform=test_transform)

# DataLoader for test set
test_loader = DataLoader(custom_test_data, batch_size=32, shuffle=False, num_workers=4)
# Load the best model saved during training
model.load_state_dict(torch.load('/kaggle/input/best_model/pytorch/default/1/best_model (1).pth'))
model.eval()

# Testing the model with custom test data
correct = 0
total = 0
class_correct = [0] * len(custom_test_data.classes)
class_total = [0] * len(custom_test_data.classes)

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Track class-wise accuracy
        for i in range(labels.size(0)):
            label = labels[i]
            class_correct[label] += (predicted[i] == label).item()
            class_total[label] += 1

# Overall accuracy
accuracy = 100 * correct / total
print(f"Overall Test Accuracy on Custom Data: {accuracy:.2f}%")

# Class-wise accuracy
for i in range(len(custom_test_data.classes)):
    if class_total[i] > 0:
        class_accuracy = 100 * class_correct[i] / class_total[i]
        print(f"Accuracy for class {custom_test_data.classes[i]}: {class_accuracy:.2f}%")


In [None]:
import torch
import shap
import numpy as np
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
from torchvision import models, datasets
from torch.utils.data import DataLoader
from collections import defaultdict

# Paths
model_path = "/kaggle/input/best_model/pytorch/default/1/best_model (1).pth"
dataset_path = "/kaggle/input/realwaste/realwaste-main/RealWaste/test"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the ResNet-50 model
model = models.resnet50(pretrained=False)
model.fc = torch.nn.Linear(model.fc.in_features, 9)  # 9 classes
model.load_state_dict(torch.load(model_path, map_location=device))
model.eval().to(device)

# # Define transformations
# transform = transforms.Compose([
#     transforms.Resize((224, 224)),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
# ])

# # Load test dataset
# test_dataset = datasets.ImageFolder(root=dataset_path, transform=transform)
# test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# Class names for visualization
class_names = ['Metal', 'Glass', 'Paper', 'Vegetation', 
               'Cardboard', 'Textile Trash', 'Food Organics', 
               'Plastic', 'Miscellaneous Trash']

# Function to get one incorrect image per predicted class
def get_incorrect_images_per_class(model, test_loader):
    incorrect_images_per_class = defaultdict(lambda: None)  # Store one image per incorrect predicted class
    incorrect_labels_per_class = {}
    incorrect_preds_per_class = {}

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            # Find incorrect predictions
            for img, label, pred in zip(images, labels, preds):
                if pred != label and pred.item() not in incorrect_images_per_class:
                    incorrect_images_per_class[pred.item()] = img.cpu()
                    incorrect_labels_per_class[pred.item()] = label.cpu()
                    incorrect_preds_per_class[pred.item()] = pred.cpu()
    
    return incorrect_images_per_class, incorrect_labels_per_class, incorrect_preds_per_class

# Fetch one incorrect image per class
incorrect_images, incorrect_labels, incorrect_preds = get_incorrect_images_per_class(model, test_loader)

# SHAP Visualization for one image per incorrect predicted class
def plot_shap_per_incorrect_class(model, incorrect_images, incorrect_labels, incorrect_preds, class_names):
    # Use SHAP Gradient Explainer with a random image as background
    background = torch.stack(list(incorrect_images.values())[:4]).to(device)  # Use up to 4 images for background
    explainer = shap.GradientExplainer(model, background)

    # Plot for each incorrect class prediction
    fig, axes = plt.subplots(len(incorrect_images), len(class_names), figsize=(20, len(incorrect_images) * 4))
    
    for i, (pred_class, img) in enumerate(incorrect_images.items()):
        test_image = img.unsqueeze(0).to(device)
        shap_values = explainer.shap_values(test_image)

        for j in range(len(class_names)):
            ax = axes[i, j] if len(incorrect_images) > 1 else axes[j]
            shap_contrib = np.abs(shap_values[j][0]).sum(axis=0)

            img_np = test_image.cpu().numpy().transpose(0, 2, 3, 1)[0]
            ax.imshow(img_np)
            ax.imshow(shap_contrib, cmap='hot', alpha=0.5)
            true_label = class_names[incorrect_labels[pred_class].item()]
            pred_label = class_names[incorrect_preds[pred_class].item()]
            ax.axis('off')
            ax.set_title(f"({class_names[j]}\nTrue: {true_label}\nPred: {pred_label}")

    plt.tight_layout()
    plt.show()

# Call the function if there are any incorrect predictions
if incorrect_images:
    plot_shap_per_incorrect_class(model, incorrect_images, incorrect_labels, incorrect_preds, class_names)
else:
    print("No incorrect predictions found!")


In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Initialize lists to store true labels and predictions
all_labels = []
all_predictions = []

# Evaluate on custom test data
model.eval()
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Calculate confusion matrix
cm = confusion_matrix(all_labels, all_predictions)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=custom_test_data.classes)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix for Custom Test Data")
plt.show()


In [None]:
import torch
import shap
import numpy as np
import matplotlib.pyplot as plt
import torchvision.transforms as transforms
from torchvision import models, datasets
from torch.utils.data import DataLoader
from collections import defaultdict
import os

# Paths for custom test data
model_path = "/kaggle/input/best_model/pytorch/default/1/best_model (1).pth"
custom_test_dataset_path = "/kaggle/input/test-last/test"  # Directory where your custom test data is located
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the ResNet-50 model
model = models.resnet50(pretrained=False)
model.fc = torch.nn.Linear(model.fc.in_features, 9)  # 9 classes
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)  # Ensure the model is moved to the correct device
model.eval()

# Define transformations for custom test data
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Load custom test dataset
custom_test_dataset = datasets.ImageFolder(root=custom_test_dataset_path, transform=transform)
custom_test_loader = DataLoader(custom_test_dataset, batch_size=32, shuffle=False)

# Class names for visualization
class_names = ['Metal', 'Glass', 'Paper', 'Vegetation', 
               'Cardboard', 'Textile Trash', 'Food Organics', 
               'Plastic', 'Miscellaneous Trash']

# Function to get one incorrect image per predicted class from custom test data
def get_incorrect_images_per_class(model, test_loader):
    incorrect_images_per_class = defaultdict(lambda: None)  # Store one image per incorrect predicted class
    incorrect_labels_per_class = {}
    incorrect_preds_per_class = {}

    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)  # Move inputs to the same device as the model
            outputs = model(images)
            _, preds = torch.max(outputs, 1)

            # Find incorrect predictions
            for img, label, pred in zip(images, labels, preds):
                if pred != label and pred.item() not in incorrect_images_per_class:
                    incorrect_images_per_class[pred.item()] = img.cpu()
                    incorrect_labels_per_class[pred.item()] = label.cpu()
                    incorrect_preds_per_class[pred.item()] = pred.cpu()
    
    return incorrect_images_per_class, incorrect_labels_per_class, incorrect_preds_per_class

# Fetch one incorrect image per class from custom test data
incorrect_images, incorrect_labels, incorrect_preds = get_incorrect_images_per_class(model, custom_test_loader)

# SHAP Visualization for one image per incorrect predicted class from custom test data
def plot_shap_per_incorrect_class(model, incorrect_images, incorrect_labels, incorrect_preds, class_names):
    # Ensure background images are on the same device
    background = torch.stack(list(incorrect_images.values())[:4]).to(device)  # Use up to 4 images for background
    explainer = shap.GradientExplainer(model, background)

    # Plot for each incorrect class prediction
    fig, axes = plt.subplots(len(incorrect_images), len(class_names), figsize=(20, len(incorrect_images) * 4))
    
    for i, (pred_class, img) in enumerate(incorrect_images.items()):
        test_image = img.unsqueeze(0).to(device)  # Ensure test image is on the same device
        shap_values = explainer.shap_values(test_image)

        for j in range(len(class_names)):
            ax = axes[i, j] if len(incorrect_images) > 1 else axes[j]
            shap_contrib = np.abs(shap_values[j][0]).sum(axis=0)

            img_np = test_image.cpu().numpy().transpose(0, 2, 3, 1)[0]
            ax.imshow(img_np)
            ax.imshow(shap_contrib, cmap='hot', alpha=0.5)
            true_label = class_names[incorrect_labels[pred_class].item()]
            pred_label = class_names[incorrect_preds[pred_class].item()]
            ax.axis('off')
            ax.set_title(f"({class_names[j]})\nTrue: {true_label}\nPred: {pred_label} ")

    plt.tight_layout()
    plt.show()

# Call the function if there are any incorrect predictions
if incorrect_images:
    plot_shap_per_incorrect_class(model, incorrect_images, incorrect_labels, incorrect_preds, class_names)
else:
    print("No incorrect predictions found!")


In [None]:
import torch
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import DataLoader
from torchvision import transforms, datasets

# Test Transform (same as training transform without augmentation)
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Load the custom test dataset
custom_test_data = datasets.ImageFolder(root='/kaggle/input/test-last/test', transform=test_transform)

# DataLoader for test set
test_loader = DataLoader(custom_test_data, batch_size=32, shuffle=False, num_workers=4)

# Load the best model saved during training
model.load_state_dict(torch.load('/kaggle/input/best_model/pytorch/default/1/best_model (1).pth'))
model.eval()

# ImageNet mean and std for unnormalization
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])

# Testing the model with custom test data
correct = 0
total = 0
class_correct = [0] * len(custom_test_data.classes)
class_total = [0] * len(custom_test_data.classes)

# Get the class names from the dataset
class_names = custom_test_data.classes

# List to store images and labels for later plotting
images_to_show = []
true_labels = []
predicted_labels = []

# Start testing
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # Track class-wise accuracy
        for i in range(labels.size(0)):
            label = labels[i]
            class_correct[label] += (predicted[i] == label).item()
            class_total[label] += 1

        # Collect images and corresponding true and predicted labels
        for i in range(inputs.size(0)):
            img = inputs[i].cpu().numpy().transpose((1, 2, 0))  # (C, H, W) to (H, W, C)
            img = std * img + mean  # Unnormalize using ImageNet stats
            img = np.clip(img, 0, 1)  # Ensure pixel values are between 0 and 1
            images_to_show.append(img)
            true_labels.append(class_names[labels[i]])
            predicted_labels.append(class_names[predicted[i]])

# Overall accuracy
accuracy = 100 * correct / total
print(f"Overall Test Accuracy on Custom Data: {accuracy:.2f}%")

# Class-wise accuracy
for i in range(len(custom_test_data.classes)):
    if class_total[i] > 0:
        class_accuracy = 100 * class_correct[i] / class_total[i]
        print(f"Accuracy for class {custom_test_data.classes[i]}: {class_accuracy:.2f}%")

# Now display all the images sequentially with their true and predicted labels
plt.figure(figsize=(15, 15))
for i in range(len(images_to_show)):
    plt.subplot(8, 8, i + 1)
    plt.imshow(images_to_show[i])
    plt.title(f"True: {true_labels[i]}\nPred: {predicted_labels[i]}")
    plt.axis('off')

plt.tight_layout()
plt.show()


# Custom CNN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, ConcatDataset
from torchvision import transforms, datasets
from sklearn.utils.class_weight import compute_class_weight
from torch.optim import lr_scheduler
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

# CBAM: Convolutional Block Attention Module
class CBAM(nn.Module):
    def __init__(self, channels, reduction=16, kernel_size=7):
        super(CBAM, self).__init__()
        # Channel attention
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.fc = nn.Sequential(
            nn.Conv2d(channels, channels // reduction, 1, bias=False),
            nn.ReLU(),
            nn.Conv2d(channels // reduction, channels, 1, bias=False)
        )
        self.sigmoid_channel = nn.Sigmoid()
        # Spatial attention
        self.spatial = nn.Sequential(
            nn.Conv2d(2, 1, kernel_size, padding=kernel_size // 2, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        # Channel attention
        avg_out = self.fc(self.avg_pool(x))
        max_out = self.fc(self.max_pool(x))
        channel_att = self.sigmoid_channel(avg_out + max_out)
        x = x * channel_att

        # Spatial attention
        avg_out = torch.mean(x, dim=1, keepdim=True)
        max_out,_ = torch.max(x, dim=1, keepdim=True)
        spatial_att = self.spatial(torch.cat([avg_out, max_out], dim=1))
        x = x * spatial_att
        return x

# Bottleneck Residual Block with CBAM
class BottleneckBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, downsample=None, reduction=16):
        super(BottleneckBlock, self).__init__()
        mid_channels = out_channels // 4
        self.conv1 = nn.Conv2d(in_channels, mid_channels, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(mid_channels)
        self.conv2 = nn.Conv2d(mid_channels, mid_channels, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(mid_channels)
        self.conv3 = nn.Conv2d(mid_channels, out_channels, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_channels)
        self.cbam = CBAM(out_channels, reduction=reduction)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        out = self.cbam(out)

        if self.downsample is not None:
            identity = self.downsample(x)
            
        out += identity
        out = self.relu(out)
        return out

# Advanced CNN using multiple Bottleneck Blocks
class AdvancedCNN(nn.Module):
    def __init__(self, num_classes=9):
        super(AdvancedCNN, self).__init__()
        # Initial stem
        self.stem = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),  # 224 -> 112
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2, padding=1)  # 112 -> 56
        )
        # Create layers (similar to ResNet-50 style)
        self.layer1 = self._make_layer(64, 256, blocks=3, stride=1)   # 56x56
        self.layer2 = self._make_layer(256, 512, blocks=4, stride=2)    # 56 -> 28
        self.layer3 = self._make_layer(512, 1024, blocks=6, stride=2)   # 28 -> 14
        self.layer4 = self._make_layer(1024, 2048, blocks=3, stride=2)  # 14 -> 7
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(2048, num_classes)

    def _make_layer(self, in_channels, out_channels, blocks, stride):
        downsample = None
        if stride != 1 or in_channels != out_channels:
            downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        layers = []
        layers.append(BottleneckBlock(in_channels, out_channels, stride, downsample))
        for _ in range(1, blocks):
            layers.append(BottleneckBlock(out_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.stem(x)
        x = self.layer1(x)  # output channels: 256
        x = self.layer2(x)  # 512
        x = self.layer3(x)  # 1024
        x = self.layer4(x)  # 2048
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x




In [None]:

# ========================
# Data Augmentation and Dataset Setup
# ========================
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.8, 1.2)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                         std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                         std=[0.229, 0.224, 0.225])
])

# A custom dataset class (uses ImageFolder)
class CustomDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super().__init__(root, transform)

# Replace the paths below with your dataset directories
realwaste_data = CustomDataset(root='/kaggle/input/realwaste/realwaste-main/RealWaste', transform=train_transform)
custom_data = CustomDataset(root='/kaggle/input/custom-data/custom_test', transform=train_transform)

# Combine datasets
combined_dataset = ConcatDataset([realwaste_data, custom_data])

# Split dataset: 70% train, 20% validation, 10% test (or adjust as needed)
total_size = len(combined_dataset)
train_size = int(0.7 * total_size)
val_size = int(0.2 * total_size)
test_size = total_size - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(combined_dataset, [train_size, val_size, test_size])

# Use the non-augmented transform for validation and test sets
val_dataset.dataset.transform = val_transform
test_dataset.dataset.transform = val_transform

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

# ========================
# Class Weights Calculation
# ========================
# Collect all targets from each individual dataset
target_labels = []
for dataset in [realwaste_data, custom_data]:
    target_labels.extend(dataset.targets)  # ImageFolder stores labels in .targets

class_weights = compute_class_weight('balanced', classes=np.unique(target_labels), y=target_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float)


In [None]:
# ========================
# Model, Loss, Optimizer, Scheduler Setup
# ========================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AdvancedCNN(num_classes=len(np.unique(target_labels)))
model = model.to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
optimizer = optim.AdamW(model.parameters(), lr=1e-4)
scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=30)

# ========================
# Training Loop with Early Stopping
# ========================
num_epochs = 50
best_val_accuracy = 0.0
patience = 5
counter = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    scheduler.step()
    
    # Validation loop
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()
    
    train_acc = 100 * correct / total
    val_acc = 100 * val_correct / val_total
    print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {running_loss/len(train_loader):.4f}, "
          f"Train Acc: {train_acc:.2f}%, Val Loss: {val_loss/len(val_loader):.4f}, Val Acc: {val_acc:.2f}%")
    
    # Early stopping check
    if val_acc > best_val_accuracy:
        best_val_accuracy = val_acc
        counter = 0
        torch.save(model.state_dict(), 'best_model_custom.pth')
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping!")
            break

# ========================
# Testing the Best Model
# ========================
model.load_state_dict(torch.load('best_model_custom.pth'))
model.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")


In [None]:
# Load the previously saved model weights
model.load_state_dict(torch.load('best_model_custom.pth'))

# Set the model back to training mode
model.train()

# Continuing from where we left off, we will train for 15 more epochs
num_additional_epochs = 20
starting_epoch = 30 # Start counting from the next epoch

for epoch in range(starting_epoch, starting_epoch + num_additional_epochs):
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    scheduler.step()

    # Validation loop
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    train_acc = 100 * correct / total
    val_acc = 100 * val_correct / val_total
    print(f"Epoch [{epoch+1}/{starting_epoch + num_additional_epochs}], "
          f"Train Loss: {running_loss/len(train_loader):.4f}, "
          f"Train Acc: {train_acc:.2f}%, Val Loss: {val_loss/len(val_loader):.4f}, "
          f"Val Acc: {val_acc:.2f}%")

    # Early stopping check
    if val_acc > best_val_accuracy:
        best_val_accuracy = val_acc
        torch.save(model.state_dict(), 'best_model_custom.pth')


In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Initialize lists to store true labels and predictions
all_labels = []
all_predictions = []

# Evaluate on custom test data
model.eval()
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Calculate confusion matrix
cm = confusion_matrix(all_labels, all_predictions)

# Display the confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=custom_test_data.classes)
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix for Custom Test Data")
plt.show()



In [None]:

from sklearn.metrics import confusion_matrix, classification_report
import matplotlib.pyplot as plt
import seaborn as sns

# Load custom test data from a separate folder
test_custom_dataset = datasets.ImageFolder(root='/kaggle/input/test-last/test', transform=val_transform)
test_custom_loader = DataLoader(test_custom_dataset, batch_size=32, shuffle=False, num_workers=4)

all_preds = []
all_labels = []

model.eval()
with torch.no_grad():
    for inputs, labels in test_custom_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        all_preds.extend(predicted.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Compute confusion matrix
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(10,8))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix on Custom Test Data")
plt.show()

# Print classification report
print(classification_report(all_labels, all_preds))


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, ConcatDataset
from torchvision import transforms, datasets
from sklearn.utils.class_weight import compute_class_weight
from torch.optim import lr_scheduler
import numpy as np

# Squeeze-and-Excitation Block
class SELayer(nn.Module):
    def __init__(self, channel, reduction=16):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y

# Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, use_se=True):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.se = SELayer(out_channels) if use_se else nn.Identity()
        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.se(out)
        out += self.shortcut(x)
        return F.relu(out)

# Enhanced CNN Model
class EnhancedCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(EnhancedCNN, self).__init__()
        self.layer1 = ResidualBlock(3, 64)
        self.layer2 = ResidualBlock(64, 128, stride=2)
        self.layer3 = ResidualBlock(128, 256, stride=2)
        self.layer4 = ResidualBlock(256, 512, stride=2)
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512, 1024),
            nn.Dropout(0.5),
            nn.ReLU(),
            nn.Linear(1024, num_classes)
        )

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.global_pool(x)
        return self.fc(x)

# Data Augmentation and Preprocessing
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.8, 1.2)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Simple Transform for Validation and Test (No augmentation)
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Custom dataset class
class CustomDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None):
        super(CustomDataset, self).__init__(root, transform)

# Merge RealWaste and Custom Datasets
realwaste_data = CustomDataset(root='/kaggle/input/realwaste/realwaste-main/RealWaste', transform=train_transform)
custom_data = CustomDataset(root='/kaggle/input/custom-data/custom_test', transform=train_transform)

# Combine datasets (RealWaste + Custom)
combined_dataset = ConcatDataset([realwaste_data, custom_data])

# Split into Train, Validation, and Test
train_size = int(0.7 * len(combined_dataset))
val_size = int(0.2 * len(combined_dataset))
test_size = len(combined_dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(combined_dataset, [train_size, val_size, test_size])

# Apply validation transform (no augmentation) on validation and test datasets
val_dataset.dataset.transform = val_transform
test_dataset.dataset.transform = val_transform

# DataLoader for training, validation, and test sets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# Class Weights Calculation
target_labels = []

# Iterate over individual datasets in combined dataset to collect labels
for dataset in [realwaste_data, custom_data]:
    target_labels.extend(dataset.targets)  # `targets` contains the class labels in ImageFolder

class_weights = compute_class_weight('balanced', classes=np.unique(target_labels), y=target_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float).cuda()  # Moving weights to GPU if available

# Load EnhancedCNN Model and Modify for Fine-tuning
model = EnhancedCNN(num_classes=len(np.unique(target_labels)))

# Move the model to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Loss Function: Weighted Cross-Entropy
criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)

# Optimizer and Learning Rate Scheduler
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# Fine-tuning with Validation
num_epochs = 50
best_val_accuracy = 0.0
patience = 5  # Early stopping patience
counter = 0

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    # Training loop
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Statistics
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    scheduler.step()

    # Validation loop
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_accuracy = 100 * val_correct / val_total
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%, Val Accuracy: {val_accuracy:.2f}%")

    # Check if this is the best model so far
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        counter = 0  # Reset patience counter
        torch.save(model.state_dict(), 'best_model.pth')  # Save the best model
    else:
        counter += 1
        if counter >= patience:
            print("Early stopping!")
            break

# Load the best model for testing
model.load_state_dict(torch.load('best_model.pth'))

# Testing the model
model.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, random_split, ConcatDataset
from torchvision import transforms, datasets
from sklearn.utils.class_weight import compute_class_weight
from torch.optim import lr_scheduler
import numpy as np
from torchvision.datasets import ImageFolder
from fastai.vision.all import *

# Custom dataset class with augmentation
class CustomDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None, augment=False):
        super(CustomDataset, self).__init__(root, transform)
        self.augment = augment
        self.original_data = self.samples  # Store original images to later augment

    def __getitem__(self, index):
        # Get the original image
        img, label = self.samples[index]
        img = self.loader(img)
        
        if self.augment:
            # Apply augmentations (5 augmentations per image)
            augmented_images = [self.transform(img) for _ in range(5)]
            augmented_images.append(img)  # Adding the original image
            augmented_labels = [label] * 6  # 6 images per sample (original + 5 augments)
            return augmented_images, augmented_labels
        else:
            return self.transform(img), label

# Squeeze-and-Excitation Block
class SELayer(nn.Module):
    def __init__(self, channel, reduction=16):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channel, channel // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channel // reduction, channel, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y

# Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, use_se=True):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.se = SELayer(out_channels) if use_se else nn.Identity()
        self.shortcut = nn.Sequential()

        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = self.se(out)
        out += self.shortcut(x)
        return F.relu(out)

# Enhanced CNN Model
class EnhancedCNN(nn.Module):
    def __init__(self, num_classes=10):
        super(EnhancedCNN, self).__init__()
        self.layer1 = ResidualBlock(3, 64)
        self.layer2 = ResidualBlock(64, 128, stride=2)
        self.layer3 = ResidualBlock(128, 256, stride=2)
        self.layer4 = ResidualBlock(256, 512, stride=2)
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512, 1024),
            nn.Dropout(0.5),
            nn.ReLU(),
            nn.Linear(1024, num_classes)
        )

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.global_pool(x)
        return self.fc(x)

# Data Augmentation and Preprocessing
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.8, 1.2)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Pretrained model normalization
])

# Simple Transform for Validation and Test (No augmentation)
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.0, 0.406], std=[0.229, 0.224, 0.225])
])

# Load RealWaste and Custom Dataset
realwaste_data = CustomDataset(root='/kaggle/input/realwaste/realwaste-main/RealWaste', transform=train_transform)
custom_data = CustomDataset(root='/kaggle/input/custom-data/custom_test', transform=train_transform, augment=True)

from collections import Counter

# Function to count the images per class in a dataset
def count_images_per_class(dataset):
    # Returns a Counter dictionary of class indices and their counts
    return Counter([sample[1] for sample in dataset.samples])

# Print count of images per class before augmentation
print("Before Augmentation:")
print("RealWaste Data (Balanced to 600 images per class):")
balanced_realwaste_data_counts = count_images_per_class(balanced_realwaste_data)
for class_idx, count in balanced_realwaste_data_counts.items():
    print(f"Class {realwaste_data.classes[class_idx]}: {count} images")


# Resample RealWaste Data to 600 images per class (downsampling or oversampling)
balanced_realwaste_data = []
for class_idx in range(len(realwaste_data.classes)):
    class_samples = [sample for sample in realwaste_data.samples if sample[1] == class_idx]
    if len(class_samples) > 600:
        balanced_realwaste_data += class_samples[:600]  # Downsample to 600 images per class
    else:
        balanced_realwaste_data += class_samples * (600 // len(class_samples))  # Oversample to 600 images per class

# Combine balanced RealWaste data with augmented Custom data
combined_data = ConcatDataset([ImageFolder(root='/kaggle/input/realwaste/realwaste-main/RealWaste', transform=train_transform), custom_data])

# Split combined dataset into train, validation, and test
train_size = int(0.7 * len(combined_data))
val_size = int(0.2 * len(combined_data))
test_size = len(combined_data) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(combined_data, [train_size, val_size, test_size])

# Apply validation transform (no augmentation) on validation and test datasets
val_dataset.dataset.transform = val_transform
test_dataset.dataset.transform = val_transform

# DataLoader for training, validation, and test sets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

# Class Weights Calculation
target_labels = []
for dataset in [realwaste_data, custom_data]:
    target_labels.extend(dataset.targets)

class_weights = compute_class_weight('balanced', classes=np.unique(target_labels), y=target_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float).cuda()

# Initialize EnhancedCNN Model
model = EnhancedCNN(num_classes=len(realwaste_data.classes))

# Move model to device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Loss Function: Weighted Cross-Entropy
criterion = nn.CrossEntropyLoss(weight=class_weights).to(device)

# Optimizer and Learning Rate Scheduler
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

# Learning Rate Finder using FastAI
learn = cnn_learner(Datasets(train_loader, val_loader), EnhancedCNN, metrics=accuracy, lr_find=True)

# Train model using optimal learning rate from the learning rate finder
learn.fit_one_cycle(10, lr_max=1e-4)

# Test model accuracy
learn.validate(test_loader)


In [None]:
import json

from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input

import shap


In [None]:
# load pre-trained model and data
model = ResNet50(weights="imagenet")
X, y = shap.datasets.imagenet50()

# getting ImageNet 1000 class names
url = "https://s3.amazonaws.com/deep-learning-models/image-models/imagenet_class_index.json"
with open(shap.datasets.cache(url)) as file:
    class_names = [v[1] for v in json.load(file).values()]
# print("Number of ImageNet classes:", len(class_names))
# print("Class names:", class_names)


In [None]:
def f(x):
    tmp = x.copy()
    preprocess_input(tmp)
    return model(tmp)


# define a masker that is used to mask out partitions of the input image.
masker = shap.maskers.Image("inpaint_telea", X[0].shape)

# create an explainer with model and image masker
explainer = shap.Explainer(f, masker, output_names=class_names)

# here we explain two images using 500 evaluations of the underlying model to estimate the SHAP values
shap_values = explainer(X[1:3], max_evals=1000, batch_size=50, outputs=shap.Explanation.argsort.flip[:4])


In [None]:
shap.image_plot(shap_values)
