IMPORTING LIBRARIES FOR MODEL COMPRESSION AND DATA EXTRACTION

# New Section

In [None]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, SubsetRandomSampler
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
from sklearn.metrics import f1_score
from sklearn.cluster import KMeans
from sklearn.model_selection import StratifiedShuffleSplit
import numpy as np
import pandas as pd
from tqdm import tqdm

EXTRACTING DATA

In [None]:
import zipfile

# Unzip the dataset
zip_path = '/content/archive.zip'
extract_path = '/content/archive/'

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

print("✅ Dataset extracted!")


✅ Dataset extracted!


COMPRESSING ORIGINAL BASELINE MODEL WITH STRUCTURED PRUNING,DYNAMIC QUANTIZATION,KNOWLEDGE DISTILLATION AND ARCHITECTURE OPTIMIZATION

In [None]:
# Install specific torch-pruning version
!pip install torch-pruning==0.2.7

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
import torch_pruning as tp
import copy
import os
import time
import numpy as np
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
import json

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
cpu_device = torch.device("cpu")

# Step 1: Pruning
def prune_model(model, pruning_rate=0.2):
    print("Starting pruning...")
    model = copy.deepcopy(model).to(device)
    model.eval()
    example_inputs = torch.randn(1, 3, 224, 224).to(device)
    try:
        DG = tp.DependencyGraph()
        print("Building dependency graph...")
        DG.build_dependency(model, example_inputs=example_inputs)
        pruner = tp.pruner.MagnitudePruner(
            model,
            example_inputs=example_inputs,
            importance=tp.importance.MagnitudeImportance(p=1),
            iterative_steps=1,
            ch_sparsity=pruning_rate,
            ignored_layers=[model.fc]
        )
        print("Applying pruning...")
        pruner.step()
        print("Verifying pruned model...")
        with torch.no_grad():
            output = model(example_inputs)
            print(f"Pruned model output shape: {output.shape}")
    except Exception as e:
        print(f"Pruning failed: {e}")
        return model
    print("Pruning completed.")
    return model.to(device)

# Step 2: Dynamic Quantization
def quantize_model(model):
    print("Starting dynamic quantization...")
    model.eval()
    try:
        quantized_model = torch.quantization.quantize_dynamic(
            model.to("cpu"), {nn.Conv2d, nn.Linear}, dtype=torch.qint8
        )
    except Exception as e:
        print(f"Dynamic quantization failed: {e}")
        return model
    print("Dynamic quantization completed.")
    return quantized_model

# Step 3: Architecture Optimization (SlimResNet18)
class SlimResNet18(nn.Module):
    def  __init__(self, num_classes=100):
        super(SlimResNet18, self). __init__()
        base_model = models.resnet18(weights="IMAGENET1K_V1")
        self.conv1 = nn.Conv2d(3, 32, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(32)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(base_model.layer1, in_channels=32, out_channels=32)
        self.layer2 = self._slim_layer(base_model.layer2, in_channels=32, out_channels=64)
        self.layer3 = self._slim_layer(base_model.layer3, in_channels=64, out_channels=128)
        self.layer4 = self._slim_layer(base_model.layer4, in_channels=128, out_channels=256)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256, num_classes)

    def _make_layer(self, layer, in_channels, out_channels):
        new_blocks = []
        for block in layer:
            downsample = None
            if block.downsample is not None or in_channels != out_channels:
                downsample = nn.Sequential(
                    nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=block.stride, bias=False),
                    nn.BatchNorm2d(out_channels)
                )
            new_block = models.resnet.BasicBlock(
                inplanes=in_channels,
                planes=out_channels,
                stride=block.stride,
                downsample=downsample
            )
            new_block.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=block.stride, padding=1, bias=False)
            new_block.bn1 = nn.BatchNorm2d(out_channels)
            new_block.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
            new_block.bn2 = nn.BatchNorm2d(out_channels)
            new_blocks.append(new_block)
            in_channels = out_channels
        return nn.Sequential(*new_blocks)

    def _slim_layer(self, layer, in_channels, out_channels):
        new_blocks = []
        for block in layer:
            downsample = None
            if block.downsample is not None or in_channels != out_channels:
                downsample = nn.Sequential(
                    nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=block.stride, bias=False),
                    nn.BatchNorm2d(out_channels)
                )
            new_block = models.resnet.BasicBlock(
                inplanes=in_channels,
                planes=out_channels,
                stride=block.stride,
                downsample=downsample
            )
            new_block.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=block.stride, padding=1, bias=False)
            new_block.bn1 = nn.BatchNorm2d(out_channels)
            new_block.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
            new_block.bn2 = nn.BatchNorm2d(out_channels)
            new_blocks.append(new_block)
            in_channels = out_channels
        return nn.Sequential(*new_blocks)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Compression pipeline (without training)
def compress_resnet18(num_classes=100):
    print("Creating SlimResNet18...")
    student_model = SlimResNet18(num_classes=num_classes).to(device)
    print("Applying pruning...")
    pruned_model = prune_model(student_model, pruning_rate=0.1)
    print("Testing forward pass...")
    try:
        with torch.no_grad():
            dummy_input = torch.randn(1, 3, 224, 224).to(device)
            output = pruned_model(dummy_input)
            print(f"Forward pass successful. Output shape: {output.shape}")
    except Exception as e:
        print(f"Forward pass failed: {e}")
        raise e
    return pruned_model

# Step 4: Knowledge Distillation
def distill_model(student_model, teacher_model, train_loader, val_loader, epochs=5):
    print("Starting distillation...")
    teacher_model.eval()
    student_model.train()
    criterion = nn.KLDivLoss(reduction="batchmean")
    cross_entropy = nn.CrossEntropyLoss()
    optimizer = optim.Adam(student_model.parameters(), lr=0.001)

    alpha = 0.9
    temperature = 4.0

    for epoch in range(epochs):
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            with torch.no_grad():
                teacher_outputs = teacher_model(inputs)
            student_outputs = student_model(inputs)
            loss_distill = criterion(
                torch.log_softmax(student_outputs / temperature, dim=1),
                torch.softmax(teacher_outputs / temperature, dim=1)
            ) * (temperature ** 2)
            loss_ce = cross_entropy(student_outputs, labels)
            loss = alpha * loss_distill + (1 - alpha) * loss_ce
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch+1}, Loss: {running_loss / len(train_loader)}")
        val_accuracy = evaluate_accuracy(student_model, val_loader)
        print(f"Validation Accuracy: {val_accuracy}%")
    return student_model

# Evaluation functions
def evaluate_accuracy(model, data_loader, eval_device=device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(eval_device), labels.to(eval_device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

def measure_latency(model, data_loader, eval_device=device, num_runs=100):
    model.eval()
    latencies = []
    with torch.no_grad():
        for i, (inputs, _) in enumerate(data_loader):
            if i >= num_runs:
                break
            inputs = inputs.to(eval_device)
            start_time = time.time()
            _ = model(inputs)
            latencies.append((time.time() - start_time) * 1000)
    return np.mean(latencies)

def get_model_size_mb(model, path="/content/temp_model.pth"):
    torch.save(model.state_dict(), path)
    size_mb = os.path.getsize(path) / (1024 * 1024)
    os.remove(path)
    return size_mb

# Training and evaluation pipeline
def train_and_evaluate(student_model):
    teacher_model = models.resnet50(weights="IMAGENET1K_V1")
    teacher_model.fc = nn.Linear(teacher_model.fc.in_features, 100)
    teacher_model = teacher_model.to(device)

    print("Training student model...")
    distilled_model = distill_model(student_model, teacher_model, train_loader, val_loader, epochs=7)

    print("Quantizing model...")
    quantized_model = quantize_model(distilled_model)

    print("Evaluating quantized model...")
    try:
        accuracy = evaluate_accuracy(quantized_model, val_loader_cpu, eval_device=cpu_device)
        latency = measure_latency(quantized_model, val_loader_cpu, eval_device=cpu_device)
        size_mb = get_model_size_mb(quantized_model)
    except Exception as e:
        print(f"Evaluation failed: {e}")
        return quantized_model

    print(f"Compressed Model Metrics:")
    print(f"Accuracy: {accuracy}%")
    print(f"Latency: {latency}ms")
    print(f"Model Size: {size_mb}MB")

    return quantized_model

# Load dataset
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

data_dir = "/content/drive/MyDrive/archive_dataset"
try:
    dataset = ImageFolder(root=data_dir, transform=transform)
    print(f"Dataset loaded with {len(dataset.classes)} classes.")
except Exception as e:
    print(f"Dataset loading failed: {e}")
    raise e
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2)
val_loader_cpu = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=2, pin_memory=False)

# Run compression and training
try:
    print("Running Part 1: Compression")
    model = compress_resnet18(num_classes=100)
    print("Running Part 2: Training and Evaluation")
    compressed_model = train_and_evaluate(model)
except Exception as e:
    print(f"Error: {e}")

# Commented code for saving model and metadata
"""
# Save the model and metadata
model_path = "/content/compressed_resnet18.pth"
torch.save(compressed_model.state_dict(), model_path)

metadata = {
    "model_format": "pytorch",
    "architecture": "slim_resnet18",
    "num_classes": 100
}
with open("/content/metadata.json", "w") as f:
    json.dump(metadata, f)

# Export to TFLite
import tensorflow as tf
dummy_input = torch.randn(1, 3, 224, 224).to(device)
onnx_path = "/content/model.onnx"
torch.onnx.export(compressed_model, dummy_input, onnx_path, opset_version=11)

!pip install onnx onnx-tf tf2onnx
from onnx_tf.backend import prepare
import onnx
onnx_model = onnx.load(onnx_path)
tf_rep = prepare(onnx_model)
tflite_path = "/content/compressed_resnet18.tflite"
tf_rep.export_graph("/content/tf_model")
converter = tf.lite.TFLiteConverter.from_saved_model("/content/tf_model")
tflite_model = converter.convert()
with open(tflite_path, "wb") as f:
    f.write(tflite_model)

print(f"TFLite model saved to {tflite_path}")
"""

Dataset loaded with 100 classes.
Running Part 1: Compression
Creating SlimResNet18...
Applying pruning...
Starting pruning...
Building dependency graph...
Applying pruning...
Verifying pruned model...
Pruned model output shape: torch.Size([1, 100])
Pruning completed.
Testing forward pass...
Forward pass successful. Output shape: torch.Size([1, 100])
Running Part 2: Training and Evaluation
Training student model...
Starting distillation...
Epoch 1, Loss: 0.45246759516000745
Validation Accuracy: 28.475%
Epoch 2, Loss: 0.4202399163246155
Validation Accuracy: 41.525%
Epoch 3, Loss: 0.39106116822361947
Validation Accuracy: 49.3875%
Epoch 4, Loss: 0.36937774324417116
Validation Accuracy: 52.6625%
Epoch 5, Loss: 0.3507634290754795
Validation Accuracy: 55.35%
Epoch 6, Loss: 0.3336819490045309
Validation Accuracy: 59.8375%
Epoch 7, Loss: 0.31795470909774304
Validation Accuracy: 60.4375%
Quantizing model...
Starting dynamic quantization...
Dynamic quantization completed.
Evaluating quantized mod

'\n# Save the model and metadata\nmodel_path = "/content/compressed_resnet18.pth"\ntorch.save(compressed_model.state_dict(), model_path)\n\nmetadata = {\n    "model_format": "pytorch",\n    "architecture": "slim_resnet18",\n    "num_classes": 100\n}\nwith open("/content/metadata.json", "w") as f:\n    json.dump(metadata, f)\n\n# Export to TFLite\nimport tensorflow as tf\ndummy_input = torch.randn(1, 3, 224, 224).to(device)\nonnx_path = "/content/model.onnx"\ntorch.onnx.export(compressed_model, dummy_input, onnx_path, opset_version=11)\n\n!pip install onnx onnx-tf tf2onnx\nfrom onnx_tf.backend import prepare\nimport onnx\nonnx_model = onnx.load(onnx_path)\ntf_rep = prepare(onnx_model)\ntflite_path = "/content/compressed_resnet18.tflite"\ntf_rep.export_graph("/content/tf_model")\nconverter = tf.lite.TFLiteConverter.from_saved_model("/content/tf_model")\ntflite_model = converter.convert()\nwith open(tflite_path, "wb") as f:\n    f.write(tflite_model)\n\nprint(f"TFLite model saved to {tfli

RECOMPRESSING MODEL RES-NET18 WITH STRUCTURED PRUNING AND KNOWLEDGE DISTILLATION

In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from tqdm import tqdm
import zipfile
import time
import torch.nn.utils.prune as prune

# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Paths
MODEL_PATH = '/content/compressed_resnet18.pth'
ZIP_PATH = '/content/archive.zip'
EXTRACT_PATH = '/content/archive/'

# Extract Dataset
with zipfile.ZipFile(ZIP_PATH, 'r') as zip_ref:
    zip_ref.extractall(EXTRACT_PATH)

print("✅ Dataset extracted!")

# Load Dataset
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])
train_dataset = torchvision.datasets.ImageFolder(root=EXTRACT_PATH, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=2, pin_memory=True)

# Define Model
num_classes = 100
model = torchvision.models.resnet18(pretrained=False)
model.fc = nn.Linear(model.fc.in_features, num_classes)

# Load Pretrained Weights Safely
checkpoint = torch.load(MODEL_PATH, map_location=device)
filtered_checkpoint = {k: v for k, v in checkpoint.items() if k in model.state_dict() and model.state_dict()[k].size() == v.size()}
model.load_state_dict(filtered_checkpoint, strict=False)
model.to(device)

# Apply Structured Pruning (Conv2d, structured)
parameters_to_prune = []
for name, module in model.named_modules():
    if isinstance(module, nn.Conv2d):
        parameters_to_prune.append((module, 'weight'))

for module, param in parameters_to_prune:
    prune.ln_structured(module, name=param, amount=0.4, n=2, dim=0)  # Structured channel pruning
    prune.remove(module, param)  # Make pruning permanent

# Knowledge Distillation Setup
teacher_model = torchvision.models.resnet18(pretrained=True)
teacher_model.fc = nn.Linear(teacher_model.fc.in_features, num_classes)
teacher_model.to(device)
teacher_model.eval()

criterion = nn.CrossEntropyLoss()
kd_loss = nn.KLDivLoss(reduction='batchmean')
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Training Loop with KD
for epoch in range(2):  # Light fine-tuning
    model.train()
    loop = tqdm(train_loader, desc=f"Epoch [{epoch+1}/2]")
    for images, labels in loop:
        images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)

        optimizer.zero_grad()
        outputs = model(images)
        with torch.no_grad():
            teacher_outputs = teacher_model(images)

        loss = 0.7 * kd_loss(nn.functional.log_softmax(outputs / 4, dim=1),
                             nn.functional.softmax(teacher_outputs / 4, dim=1)) + \
               0.3 * criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        loop.set_postfix(loss=loss.item())

# Save Fine-Tuned Model
torch.save(model.state_dict(), '/content/fine_tuned_resnet18_pruned.pth')

# Accuracy Evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"Final Accuracy on Training Data: {accuracy:.2f}%")

# Latency Test
input_sample = torch.randn(1, 3, 224, 224).to(device)

# Warm-up
for _ in range(10):
    _ = model(input_sample)

# Measure latency
start_time = time.time()
for _ in range(100):
    _ = model(input_sample)
avg_latency = (time.time() - start_time) / 100 * 1000
print(f"Average Latency (ms): {avg_latency:.2f}")

# Convert to Half Precision for Compression
model.half()
model.eval()
example_input = torch.randn(1, 3, 224, 224).to(device).half()
traced_model = torch.jit.trace(model, example_input)
traced_model_path = "/content/final_resnet18_compressed.pt"
traced_model.save(traced_model_path)
compressed_size = os.path.getsize(traced_model_path) / 1e6
print(f"Compressed Model Size (MB): {compressed_size:.2f}")

print("✅ Model pruned, fine-tuned, compressed, and ready for submission!")


✅ Dataset extracted!


  checkpoint = torch.load(MODEL_PATH, map_location=device)
  device=storage.device,
Epoch [1/2]: 100%|██████████| 1250/1250 [02:37<00:00,  7.96it/s, loss=0.688]
Epoch [2/2]: 100%|██████████| 1250/1250 [02:34<00:00,  8.08it/s, loss=0.576]


Final Accuracy on Training Data: 45.88%
Average Latency (ms): 2.99
Model Size (MB): 44.99
✅ Model pruned (structured), fine-tuned, accuracy calculated, and optimized for evaluation!


USING HALF PRECISION TO REDUCE SIZE  

In [None]:
# Convert to Half Precision for Compression
model.half()
model.eval()
example_input = torch.randn(1, 3, 224, 224).to(device).half()
traced_model = torch.jit.trace(model, example_input)
traced_model_path = "/content/final_resnet18_compressed.pt"
traced_model.save(traced_model_path)
compressed_size = os.path.getsize(traced_model_path) / 1e6
print(f"Compressed Model Size (MB): {compressed_size:.2f}")

print("✅ Model pruned, fine-tuned, compressed, and ready for submission!")

Compressed Model Size (MB): 22.64
✅ Model pruned, fine-tuned, compressed, and ready for submission!


EVALUATION METRIC FOR FINAL COMPRESSED MODEL

In [None]:
# Convert to Half Precision and Trace for Compression
model.half()
model.eval()
example_input = torch.randn(1, 3, 224, 224).to(device).half()
traced_model = torch.jit.trace(model, example_input)
traced_model_path = "/content/final_resnet18_compressed.pt"
traced_model.save(traced_model_path)

# Reload Compressed Model for Consistent Metric Evaluation
loaded_model = torch.jit.load(traced_model_path, map_location=device)
loaded_model.eval()

# Accuracy Evaluation after Compression
correct = 0
total = 0
with torch.no_grad():
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        images = images.half()  # important for half-precision input
        outputs = loaded_model(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total

# Latency Test after Compression
input_sample = torch.randn(1, 3, 224, 224).to(device).half()
for _ in range(10):
    _ = loaded_model(input_sample)

start_time = time.time()
for _ in range(100):
    _ = loaded_model(input_sample)
avg_latency = (time.time() - start_time) / 100 * 1000

# Compressed Model Size
compressed_size = os.path.getsize(traced_model_path) / 1e6

# Print all metrics together
print("\n📊 Final Evaluation Metrics (Post-Compression):")
print(f"Final Accuracy on Training Data: {accuracy:.2f}%")
print(f"Average Latency (ms): {avg_latency:.2f}")
print(f"Compressed Model Size (MB): {compressed_size:.2f}")

print("✅ Metrics now reflect the compressed model state for fair scoring!")



📊 Final Evaluation Metrics (Post-Compression):
Final Accuracy on Training Data: 45.89%
Average Latency (ms): 3.09
Compressed Model Size (MB): 22.64
✅ Metrics now reflect the compressed model state for fair scoring!
