In [None]:
## quantization test code - ENDG 511

In [None]:
# quantization (dynamic)

import torch
import torch.nn as nn
import torch.quantization
import os

# Load the pruned model (ensure it's on CPU for quantization)
model_pruned = torch.load(r"/Users/SeleemB/Desktop/ENDG511_Final_Project/models/model_language_mobilenet_20_epoch_new_normalize_0p6_dropout.pth")
model_pruned.eval()

# Apply dynamic quantization to Linear layers
quantized_model = torch.quantization.quantize_dynamic(
    model_pruned,                   # the model to quantize
    {nn.Linear},                    # layers to quantize (Conv2d not supported here)
    dtype=torch.qint8               # quantization data type to 8-bit int type
)

# Save the quantized model
quantized_model_path = "models/quantized_iterative_pruned_model.pth"
torch.save(quantized_model, quantized_model_path)

# Report model sizes
original_size = os.path.getsize("/Users/SeleemB/Desktop/ENDG511_Final_Project/models/model_language_mobilenet_20_epoch_new_normalize.pth") / 1e6
quantized_size = os.path.getsize(quantized_model_path) / 1e6

print(f"Original pruned model size: {original_size:.2f} MB")
print(f"Quantized model size: {quantized_size:.2f} MB")

Original pruned model size: 9.85 MB
Quantized model size: 9.35 MB


In [None]:
import random
class LanguageDataset(Dataset):
    """
    A PyTorch Dataset class for loading and processing spectrogram images of radio signals.

    This dataset:
    - Loads images from a specified directory.
    - Applies preprocessing transformations (grayscale conversion, resizing, normalization).
    - Computes class weights for handling class imbalance.
    - Returns image tensors along with their respective labels.

    Attributes:
    ----------
    data_dir : str
        Path to the dataset directory.
    class_labels : list
        List of signal class names.
    class_weights : torch.Tensor
        Normalized inverse frequency weights for each class.
    data_files : list
        List of all image file names in the dataset.
    transform : torchvision.transforms.Compose
        Transformations applied to each image.
    """

    def __init__(self, data_dir, augment=False):
        """
        Initializes the dataset by loading class names, computing class frequencies, and setting up transformations.

        Parameters:
        ----------
        data_dir : str
            Path to the dataset directory.
        """
        self.data_dir = data_dir

        # Define class labels (must match the dataset naming convention)
        self.class_labels = [ 'arabic', 'english', 'german', 'mandarin', 'spanish', 'garbage', 'french']
        #self.class_labels = [ 'arabic', 'german', 'mandarin', 'french']
        self.random_crop = RandomCrop(size=(224, 224))  # assuming final size
        self.augment = augment

        # Get all filenames from the dataset directory
        self.data_files = os.listdir(data_dir)

        # Compute class frequencies (how many samples per class exist)
        class_counts = {label: sum(1 for file in self.data_files if file.startswith(label)) for label in self.class_labels}

        # Compute class weights (inverse frequency) to handle class imbalance
        total_samples = sum(class_counts.values())
        class_weights = [1 / (count / total_samples) if count > 0 else 0 for count in class_counts.values()]

        # Normalize class weights so they sum to 1
        class_weights_sum = sum(class_weights)
        self.class_weights = torch.tensor([w / class_weights_sum for w in class_weights], dtype=torch.float)

        # Define image transformations
        self.transform = Compose([
            ToTensor(),
            Grayscale(num_output_channels=3),
            Resize((224, 224), interpolation=InterpolationMode.BICUBIC),
            RandomPerspective(distortion_scale=0.5, p=0.5),  # Perspective distortion
            Normalize(mean=[-0.9256, -0.8168, -0.5910], std=[0.1704, 0.1742, 0.1734])
        ])

    def __getitem__(self, index):
        """
        Loads an image, applies transformations, and returns it along with its label index.

        Parameters:
        ----------
        index : int
            Index of the sample in the dataset.

        Returns:
        -------
        tuple(torch.Tensor, torch.Tensor)
            Transformed image tensor and its corresponding label index.
        """
        # Get the filename of the sample
        file_name = self.data_files[index]
        file_path = os.path.join(self.data_dir, file_name)

        # Load the image
        image = Image.open(file_path)

        # Rotate 90 degrees (optional, remove if unnecessary)
        image = image.transpose(Image.ROTATE_90)

        # Ensure the image is in RGB mode (some formats might be grayscale)
        if image.mode != 'RGB':
            image = image.convert('RGB')
        
        image = self.random_crop(image)
        #image = self.transform(image)

       # Convert image to spectrogram (numpy array) and apply augmentation
        spectrogram = np.array(image)

        if self.augment:
            spectrogram = self.apply_spec_augment(spectrogram)

        # Convert to tensor and apply transformations
        image_tensor = self.transform(Image.fromarray(spectrogram))

        if self.augment:
            image_tensor = self.apply_noise(image_tensor)
        # Extract the class label from the filename
        class_label = file_name.split('_')[0]
        sample_number = file_name.split('_')[1]
        label_index = self.class_labels.index(class_label)

        # Apply transformations and return image with label
        return image_tensor, torch.tensor(label_index, dtype=torch.long), sample_number

    def __len__(self):
        """
        Returns the total number of samples in the dataset.

        Returns:
        -------
        int
            Number of files in the dataset.
        """
        return len(self.data_files)
    def apply_spec_augment(self, spectrogram):
        """Apply SpecAugment (time and frequency masking)"""
        # Apply time and frequency masking using librosa
        spectrogram = self.time_mask(spectrogram)
        spectrogram = self.freq_mask(spectrogram)
        return spectrogram

    def time_mask(self, spectrogram, max_mask_size=50):
        """Apply time masking to the spectrogram"""
        n_frames = spectrogram.shape[1]
        mask_start = random.randint(0, n_frames - max_mask_size)
        mask_end = mask_start + random.randint(1, max_mask_size)
        spectrogram[:, mask_start:mask_end] = 0
        return spectrogram

    def freq_mask(self, spectrogram, max_mask_size=10):
        """Apply frequency masking to the spectrogram"""
        n_freqs = spectrogram.shape[0]
        mask_start = random.randint(0, n_freqs - max_mask_size)
        mask_end = mask_start + random.randint(1, max_mask_size)
        spectrogram[mask_start:mask_end, :] = 0
        return spectrogram
    
    def apply_noise(self, img_tensor, noise_level=0.03):
        noise = torch.randn_like(img_tensor) * noise_level
        return torch.clamp(img_tensor + noise, 0.0, 1.0)  # Keep within valid image range


In [None]:
import sklearn
def stratified_split(dataset, train_test_split):
    """
    Implement a Stratified Split for an imbalanced dataset.

    Parameters:
    -----------
    dataset : Dataset
        The dataset to split.
    train_test_split : float
        The proportion of data to allocate for training.

    Returns:
    --------
    tuple : (train_dataset, test_dataset)
        The stratified training and validation datasets.
    """
    splitter = sklearn.model_selection.StratifiedShuffleSplit(train_size=train_test_split, random_state=None)

    labels_iterable = [dataset[i][1] for i in range(len(dataset))]

    for train_index, test_index in splitter.split(range(len(dataset)), labels_iterable):

            train_dataset = [dataset[i] for i in train_index]
            test_dataset = [dataset[i] for i in test_index]

    return train_dataset, test_dataset
    #raise NotImplementedError


In [None]:
import torch
import torch.nn as nn
import os
import time
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# --------- Load Validation Data ---------
# Basic transform: resize to match model input, convert to tensor
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# Load validation dataset
dataset = LanguageDataset("languages")
dataset_train, dataset_val = stratified_split(dataset, train_test_split=0.8)

#set values as static values similar to LAB2_test_code.ipynb
data_loader_val = DataLoader(dataset_val, batch_size=16, num_workers=0,
                             pin_memory=True, drop_last=False)

# --------- Load Models ---------
# Pruned model path (original .pth)
pruned_model_path = "/Users/SeleemB/Desktop/ENDG511_Final_Project/models/model_language_mobilenet_20_epoch_new_normalize_pruned.pth"
quantized_model_path = "models/quantized_iterative_pruned_model.pth"

# Load models to CPU
model_pruned = torch.load(pruned_model_path, map_location='cpu')
model_pruned.eval()

quantized_model = torch.load(quantized_model_path, map_location='cpu')
quantized_model.eval()

# --------- Evaluation Function ---------
def evaluate_model(model, dataloader, max_batches=None):
    model.eval()
    correct = 0
    total = 0
    total_time = 0.0

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloader):
            inputs, labels = inputs.to('cpu'), labels.to('cpu')

            start = time.time()
            outputs = model(inputs)
            end = time.time()

            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
            total_time += (end - start)

            if max_batches and i >= max_batches - 1:
                break

    accuracy = 100 * correct / total
    avg_inference_time = total_time / total
    return accuracy, avg_inference_time

# --------- Evaluate Models ---------
print("Evaluating pruned model...")
acc_pruned, time_pruned = evaluate_model(model_pruned, data_loader_val, max_batches=50)

print("Evaluating quantized model...")
acc_quant, time_quant = evaluate_model(quantized_model, data_loader_val, max_batches=50)

# --------- Get Model Sizes ---------
size_pruned = os.path.getsize(pruned_model_path) / 1e6
size_quant = os.path.getsize(quantized_model_path) / 1e6

# --------- Plot Comparison ---------
labels = ['Accuracy (%)', 'Avg Inference Time (s)', 'Model Size (MB)']
pruned_stats = [acc_pruned, time_pruned, size_pruned]
quant_stats = [acc_quant, time_quant, size_quant]

x = range(len(labels))
width = 0.35

fig, ax = plt.subplots(figsize=(10, 5))
ax.bar([p - width/2 for p in x], pruned_stats, width=width, label='Pruned Model')
ax.bar([p + width/2 for p in x], quant_stats, width=width, label='Quantized Model')

ax.set_ylabel('Value')
ax.set_title('Model Comparison: Pruned vs Quantized')
ax.set_xticks(list(x))
ax.set_xticklabels(labels)
ax.legend()
ax.grid(True, linestyle='--', alpha=0.6)
plt.tight_layout()
plt.show()

# --------- Print Final Stats ---------
print(f"\n--- Evaluation Summary ---")
print(f"Pruned Model     | Accuracy: {acc_pruned:.2f}% | Time: {time_pruned:.4f}s | Size: {size_pruned:.2f} MB")
print(f"Quantized Model  | Accuracy: {acc_quant:.2f}% | Time: {time_quant:.4f}s | Size: {size_quant:.2f} MB")


FileNotFoundError: Couldn't find any class folder in /Users/SeleemB/Desktop/ENDG511_Final_Project/languages.