## Question 1: Baseline MNIST Classifier

**Objective:** Implement and train a fully-connected neural network for
MNIST digit classification.

**Implementation:** 5-layer fully-connected network
(784→1024→1024→1024→1024→1024→10) with ReLU activations, dropout (0.2),
and CrossEntropyLoss. Trained for 50 epochs using Adam optimizer
(lr=0.001) on MNIST dataset.

In \[ \]:

    import torch
    import torch.nn as nn
    import torch.optim as optim
    import torchvision
    import torchvision.transforms as transforms
    from torch.utils.data import DataLoader
    import matplotlib.pyplot as plt
    import numpy as np

    class BaselineMNISTClassifier(nn.Module):
        def __init__(self):
            super(BaselineMNISTClassifier, self).__init__()
            self.fc1 = nn.Linear(28*28, 1024)
            self.fc2 = nn.Linear(1024, 1024)
            self.fc3 = nn.Linear(1024, 1024)
            self.fc4 = nn.Linear(1024, 1024)
            self.fc5 = nn.Linear(1024, 1024)
            self.out = nn.Linear(1024, 10)
            self.dropout = nn.Dropout(0.2)
            
        def forward(self, x):
            x = x.view(-1, 28*28)
            x = self.dropout(torch.relu(self.fc1(x)))
            x = self.dropout(torch.relu(self.fc2(x)))
            x = self.dropout(torch.relu(self.fc3(x)))
            x = self.dropout(torch.relu(self.fc4(x)))
            x = self.dropout(torch.relu(self.fc5(x)))
            x = self.out(x)
            return x

In \[4\]:


    class CompressedMNISTClassifier(nn.Module):
        def __init__(self, baseline_model, D):
            super(CompressedMNISTClassifier, self).__init__()
            self.D = D
            self.compressed_layers = []
            self.biases = []
        # compress all layers except the last one
            layers_to_compress = [
                baseline_model.fc1, baseline_model.fc2, baseline_model.fc3,
                baseline_model.fc4, baseline_model.fc5]
            
            for i, layer in enumerate(layers_to_compress):
                U, S, V = torch.svd(layer.weight.data) #svd here
                d = min(self.D, len(S))
                
        # store the compressed weights
                U_compressed = U[:, :d]
                S_compressed = S[:d]
                V_compressed = V[:, :d]
                self.compressed_layers.append((U_compressed, S_compressed, V_compressed))
                self.biases.append(layer.bias.data)
            
            # the last layer without compression
            self.out_weight = baseline_model.out.weight.data
            self.out_bias = baseline_model.out.bias.data
        def forward(self, x):
            x = x.view(-1, 28*28)
            for i in range(len(self.compressed_layers)):
                U, S, V = self.compressed_layers[i]
                W_approx = torch.mm(torch.mm(U, torch.diag(S)), V.t())
                x = torch.relu(torch.mm(x, W_approx.t()) + self.biases[i])
                if self.training:
                    x = torch.dropout(x, p=0.5, train=True)
            # final layer
            x = torch.mm(x, self.out_weight.t()) + self.out_bias
            return x

In \[ \]:

    def count_parameters(model, compressed=False, D=None):
        if not compressed:
            return sum(p.numel() for p in model.parameters())
        else:
            total_params = 0
            total_params += 784 * D + D + 1024 * D  # U + S + V #D non zero parameters
            total_params += 4 * (1024 * D + D + 1024 * D)  # 4 layers
            # Last layer
            total_params += 1024 * 10 + 10  # Uncompressed
            # Biases
            total_params += 5 * 1024 + 10
            return total_params

In \[6\]:

    def train(model, train_loader, test_loader, epochs, device):
        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        
        train_losses = []
        test_accuracies = []
        
        for epoch in range(epochs):
            model.train()
            running_loss = 0.0
            for images, labels in train_loader:
                images, labels = images.to(device), labels.to(device)
                
                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                
                running_loss += loss.item()
                
            # Evaluate
            model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for images, labels in test_loader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()
            
            accuracy = 100 * correct / total
            print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}, Accuracy: {accuracy:.2f}%')
            
            train_losses.append(running_loss/len(train_loader))
            test_accuracies.append(accuracy)
        
        return train_losses, test_accuracies

In \[13\]:

    def main():
        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.1307,), (0.3081,))])
        train_dataset = torchvision.datasets.MNIST('./data', train=True, download=True, transform=transform)
        test_dataset = torchvision.datasets.MNIST('./data', train=False, transform=transform)
        train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
        test_loader = DataLoader(test_dataset, batch_size=128, shuffle=True)
        
        print("Training baseline model...")
        baseline_model = BaselineMNISTClassifier().to(device)
        baseline_losses, baseline_accuracies = train(baseline_model, train_loader, test_loader, epochs=50, device=device)
        
        baseline_params = count_parameters(baseline_model)
        print(f"\nBaseline model parameters: {baseline_params:,}")

        print(f"\nBaseline model Saved:", )
        torch.save(baseline_model.state_dict(), "mnist_baseline.pth")


        D_values = [10, 20, 50, 100, 200, 1024]
        compression_results = []

        for D in D_values:
            print(f"\nTesting compression with D={D}")

            compressed_model = CompressedMNISTClassifier(baseline_model, D).to(device)
            compressed_params = count_parameters(compressed_model, compressed=True, D=D)


            # Evaluate compressed model
            compressed_model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for images, labels in test_loader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = compressed_model(images)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted == labels).sum().item()
            
            accuracy = 100 * correct / total
            compression_ratio = baseline_params / compressed_params
            
            print(f"D={D}:")
            print(f"Parameters: {compressed_params:,}")
            print(f"Compression ratio: {compression_ratio:.2f}x")
            print(f"Accuracy: {accuracy:.2f}%")
            
            compression_results.append({
                'D': D,
                'params': compressed_params,
                'ratio': compression_ratio,
                'accuracy': accuracy
            })
        
        # plot results
        plt.figure(figsize=(12, 5))
        
        plt.subplot(1, 2, 1)
        plt.plot([r['D'] for r in compression_results], 
                 [r['accuracy'] for r in compression_results], 
                 'bo-')
        plt.xlabel('D (Number of singular values)')
        plt.ylabel('Accuracy (%)')
        plt.title('Accuracy vs Compression Level')
        
        plt.subplot(1, 2, 2)
        plt.plot([r['D'] for r in compression_results], 
                 [r['ratio'] for r in compression_results], 
                 'ro-')
        plt.xlabel('D (Number of singular values)')
        plt.ylabel('Compression Ratio')
        plt.title('Compression Ratio vs D')
        
        plt.tight_layout()
        plt.show()

    if __name__ == '__main__':
        main()

    Training baseline model...
    Epoch 1, Loss: 0.2964, Accuracy: 96.37%
    Epoch 2, Loss: 0.1475, Accuracy: 97.55%
    Epoch 3, Loss: 0.1181, Accuracy: 97.36%
    Epoch 4, Loss: 0.0977, Accuracy: 96.91%
    Epoch 5, Loss: 0.0900, Accuracy: 97.61%
    Epoch 6, Loss: 0.0810, Accuracy: 97.80%
    Epoch 7, Loss: 0.0706, Accuracy: 97.93%
    Epoch 8, Loss: 0.0712, Accuracy: 97.83%
    Epoch 9, Loss: 0.0689, Accuracy: 98.23%
    Epoch 10, Loss: 0.0546, Accuracy: 98.10%
    Epoch 11, Loss: 0.0537, Accuracy: 98.17%
    Epoch 12, Loss: 0.0532, Accuracy: 98.01%
    Epoch 13, Loss: 0.0525, Accuracy: 98.01%
    Epoch 14, Loss: 0.0472, Accuracy: 98.08%
    Epoch 15, Loss: 0.0469, Accuracy: 98.08%
    Epoch 16, Loss: 0.0457, Accuracy: 98.21%
    Epoch 17, Loss: 0.0470, Accuracy: 98.16%
    Epoch 18, Loss: 0.0461, Accuracy: 98.30%
    Epoch 19, Loss: 0.0454, Accuracy: 98.54%
    Epoch 20, Loss: 0.0379, Accuracy: 98.20%
    Epoch 21, Loss: 0.0422, Accuracy: 98.34%
    Epoch 22, Loss: 0.0400, Accuracy: 98.24%
    Epoch 23, Loss: 0.0464, Accuracy: 98.05%
    Epoch 24, Loss: 0.0421, Accuracy: 98.16%
    Epoch 25, Loss: 0.0386, Accuracy: 98.54%
    Epoch 26, Loss: 0.0332, Accuracy: 98.25%
    Epoch 27, Loss: 0.0414, Accuracy: 98.19%
    Epoch 28, Loss: 0.0410, Accuracy: 98.35%
    Epoch 29, Loss: 0.0315, Accuracy: 97.99%
    Epoch 30, Loss: 0.0366, Accuracy: 98.21%
    Epoch 31, Loss: 0.0352, Accuracy: 98.40%
    Epoch 32, Loss: 0.0387, Accuracy: 98.40%
    Epoch 33, Loss: 0.0390, Accuracy: 98.21%
    Epoch 34, Loss: 0.0362, Accuracy: 98.40%
    Epoch 35, Loss: 0.0526, Accuracy: 98.31%
    Epoch 36, Loss: 0.0293, Accuracy: 98.38%
    Epoch 37, Loss: 0.0280, Accuracy: 98.34%
    Epoch 38, Loss: 0.0328, Accuracy: 98.37%
    Epoch 39, Loss: 0.0327, Accuracy: 98.35%
    Epoch 40, Loss: 0.0347, Accuracy: 98.35%
    Epoch 41, Loss: 0.0303, Accuracy: 98.16%
    Epoch 42, Loss: 0.0326, Accuracy: 98.39%
    Epoch 43, Loss: 0.0291, Accuracy: 98.49%
    Epoch 44, Loss: 0.0332, Accuracy: 98.32%
    Epoch 45, Loss: 0.0381, Accuracy: 98.42%
    Epoch 46, Loss: 0.0335, Accuracy: 98.49%
    Epoch 47, Loss: 0.0339, Accuracy: 98.30%
    Epoch 48, Loss: 0.0320, Accuracy: 98.47%
    Epoch 49, Loss: 0.0225, Accuracy: 98.54%
    Epoch 50, Loss: 0.0311, Accuracy: 98.44%

    Baseline model parameters: 5,012,490

    Baseline model Saved:

    Testing compression with D=10
    D=10:
    Parameters: 115,430
    Compression ratio: 43.42x
    Accuracy: 27.92%

    Testing compression with D=20
    D=20:
    Parameters: 215,480
    Compression ratio: 23.26x
    Accuracy: 71.90%

    Testing compression with D=50
    D=50:
    Parameters: 515,630
    Compression ratio: 9.72x
    Accuracy: 93.44%

    Testing compression with D=100
    D=100:
    Parameters: 1,015,880
    Compression ratio: 4.93x
    Accuracy: 97.88%

    Testing compression with D=200
    D=200:
    Parameters: 2,016,380
    Compression ratio: 2.49x
    Accuracy: 98.40%

    Testing compression with D=1024
    D=1024:
    Parameters: 10,260,500
    Compression ratio: 0.49x
    Accuracy: 98.44%

Question 2

## Question 2: Model Compression via Low-Rank Factorization

**Objective:** Compress the trained baseline model using low-rank
factorization (W ≈ UV^T) and evaluate accuracy vs. compression
trade-offs.

**Implementation:** Factorizes fully-connected layers into U, V matrices
with rank D. Tests compression levels (D=10, 20, 50, 100, 200, 1024) and
analyzes parameter count reduction vs. accuracy. Initializes compressed
weights from SVD decomposition of original weights.

In \[14\]:

    import torch.nn.functional as F
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 
    baseline_model = BaselineMNISTClassifier().to(device)
    baseline_model.load_state_dict(torch.load("mnist_baseline.pth", weights_only=True))

Out\[14\]:

    <All keys matched successfully>

In \[ \]:

    class FactorizedLinear(nn.Module):
        def __init__(self, in_features, out_features, D=20):
            super(FactorizedLinear, self).__init__()
            self.in_features = in_features
            self.out_features = out_features
            self.D = D

            # initialize U and V matrices
            self.U = nn.Parameter(torch.randn(out_features, D))
            self.V = nn.Parameter(torch.randn(in_features, D))
            self.bias = nn.Parameter(torch.zeros(out_features))
            
        def forward(self, x):
            return F.linear(x, torch.mm(self.U, self.V.t()), self.bias)
        
        def initialize_from_svd(self, U_svd, S_svd, V_svd):

            with torch.no_grad():
                self.U.data.copy_(U_svd[:, :self.D])
                self.V.data.copy_(torch.mm(V_svd[:, :self.D], torch.diag(S_svd[:self.D])))


    class FactorizedMNISTClassifier(nn.Module):
        def __init__(self, D=20):
            super(FactorizedMNISTClassifier, self).__init__()
            self.fc1 = FactorizedLinear(28*28, 1024, D)
            self.fc2 = FactorizedLinear(1024, 1024, D)
            self.fc3 = FactorizedLinear(1024, 1024, D)
            self.fc4 = FactorizedLinear(1024, 1024, D)
            self.fc5 = FactorizedLinear(1024, 1024, D)
            self.out = nn.Linear(1024, 10)  # Last layer remains unfactorized
            self.dropout = nn.Dropout(0.5)
            
        def forward(self, x):
            x = x.view(-1, 28*28)
            x = self.dropout(torch.relu(self.fc1(x)))
            x = self.dropout(torch.relu(self.fc2(x)))
            x = self.dropout(torch.relu(self.fc3(x)))
            x = self.dropout(torch.relu(self.fc4(x)))
            x = self.dropout(torch.relu(self.fc5(x)))
            x = self.out(x)
            return x
        
        def initialize_from_baseline(self, baseline_model):

            #Initialize factorized layers using SVD results from baseline model
            
            U, S, V = torch.svd(baseline_model.fc1.weight.data)
            self.fc1.initialize_from_svd(U, S, V)
            self.fc1.bias.data.copy_(baseline_model.fc1.bias.data)
            U, S, V = torch.svd(baseline_model.fc2.weight.data)
            self.fc2.initialize_from_svd(U, S, V)
            self.fc2.bias.data.copy_(baseline_model.fc2.bias.data)
            U, S, V = torch.svd(baseline_model.fc3.weight.data)
            self.fc3.initialize_from_svd(U, S, V)
            self.fc3.bias.data.copy_(baseline_model.fc3.bias.data)
            U, S, V = torch.svd(baseline_model.fc4.weight.data)
            self.fc4.initialize_from_svd(U, S, V)
            self.fc4.bias.data.copy_(baseline_model.fc4.bias.data)
            U, S, V = torch.svd(baseline_model.fc5.weight.data)
            self.fc5.initialize_from_svd(U, S, V)
            self.fc5.bias.data.copy_(baseline_model.fc5.bias.data)
            self.out.weight.data.copy_(baseline_model.out.weight.data)
            self.out.bias.data.copy_(baseline_model.out.bias.data)

In \[16\]:

    def train_epoch(model, train_loader, criterion, optimizer, device):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        epoch_loss = running_loss / len(train_loader)
        epoch_acc = 100 * correct / total
        return epoch_loss, epoch_acc

    def evaluate(model, test_loader, device):
        model.eval()
        correct = 0
        total = 0
        
        with torch.no_grad():
            for images, labels in test_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        accuracy = 100 * correct / total
        return accuracy

In \[ \]:

    import torch.optim as optim
    from torchvision import datasets, transforms
    from torch.utils.data import DataLoader
    import torch.nn.functional as F

In \[19\]:

    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
    test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In \[21\]:

    def main():
        factorized_model = FactorizedMNISTClassifier(D=20).to(device)
        factorized_model.initialize_from_baseline(baseline_model)
        initial_accuracy = evaluate(factorized_model, test_loader, device) # Initial evaluation

        print(f"Initial test accuracy (before finetuning) for D=20: {initial_accuracy:.2f}%")

        criterion = nn.CrossEntropyLoss() # Finetuning
        optimizer = optim.Adam(factorized_model.parameters(), lr=0.0001)  # Smaller learning rate for finetuning
        
        
        print("\nFinetuning factorized model 1...2...3... Go...")
        for epoch in range(20):
            loss, acc = train_epoch(factorized_model, train_loader, criterion, optimizer, device)
            test_acc = evaluate(factorized_model, test_loader, device)
            print(f"Finetuning - Epoch {epoch+1}: Train Loss = {loss:.4f}, Train Acc = {acc:.2f}%, Test Acc = {test_acc:.2f}%")

    #run main    
    if __name__ == "__main__":
        main()

    Initial test accuracy (before finetuning) for D=20: 71.90%

    Finetuning factorized model 1...2...3... Go...
    Finetuning - Epoch 1: Train Loss = 0.3414, Train Acc = 92.51%, Test Acc = 97.22%
    Finetuning - Epoch 2: Train Loss = 0.2323, Train Acc = 95.13%, Test Acc = 97.45%
    Finetuning - Epoch 3: Train Loss = 0.1976, Train Acc = 95.55%, Test Acc = 97.59%
    Finetuning - Epoch 4: Train Loss = 0.1702, Train Acc = 96.07%, Test Acc = 97.54%
    Finetuning - Epoch 5: Train Loss = 0.1595, Train Acc = 96.27%, Test Acc = 97.63%
    Finetuning - Epoch 6: Train Loss = 0.1558, Train Acc = 96.51%, Test Acc = 97.70%
    Finetuning - Epoch 7: Train Loss = 0.1403, Train Acc = 96.66%, Test Acc = 97.88%
    Finetuning - Epoch 8: Train Loss = 0.1331, Train Acc = 96.94%, Test Acc = 97.76%
    Finetuning - Epoch 9: Train Loss = 0.1307, Train Acc = 96.82%, Test Acc = 97.73%
    Finetuning - Epoch 10: Train Loss = 0.1663, Train Acc = 97.07%, Test Acc = 97.81%
    Finetuning - Epoch 11: Train Loss = 0.1189, Train Acc = 97.11%, Test Acc = 97.93%
    Finetuning - Epoch 12: Train Loss = 0.1170, Train Acc = 97.16%, Test Acc = 97.85%
    Finetuning - Epoch 13: Train Loss = 0.1131, Train Acc = 97.24%, Test Acc = 97.98%
    Finetuning - Epoch 14: Train Loss = 0.1088, Train Acc = 97.30%, Test Acc = 97.98%
    Finetuning - Epoch 15: Train Loss = 0.1086, Train Acc = 97.29%, Test Acc = 98.00%
    Finetuning - Epoch 16: Train Loss = 0.1072, Train Acc = 97.36%, Test Acc = 97.91%
    Finetuning - Epoch 17: Train Loss = 0.1105, Train Acc = 97.30%, Test Acc = 97.91%
    Finetuning - Epoch 18: Train Loss = 0.1002, Train Acc = 97.60%, Test Acc = 97.95%
    Finetuning - Epoch 19: Train Loss = 0.0993, Train Acc = 97.51%, Test Acc = 97.94%
    Finetuning - Epoch 20: Train Loss = 0.0977, Train Acc = 97.62%, Test Acc = 97.93%

In \[ \]:

    import matplotlib.pyplot as plt
    epochs = list(range(1, 21))
    train_loss = [0.3414, 0.2323, 0.1976, 0.1702, 0.1595, 0.1558, 0.1403, 0.1331, 0.1307, 0.1663, 
                  0.1189, 0.1170, 0.1131, 0.1088, 0.1086, 0.1072, 0.1105, 0.1002, 0.0993, 0.0977]
    test_accuracy = [97.22, 97.45, 97.59, 97.54, 97.63, 97.70, 97.88, 97.76, 97.73, 97.81, 
                     97.93, 97.85, 97.98, 97.98, 98.00, 97.91, 97.91, 97.95, 97.94, 97.93]

    plt.figure(figsize=(14, 6))
    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_loss, label='Training Loss', marker='o', color='blue')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training Loss Over Epochs')
    plt.grid(True)
    plt.legend()
    plt.subplot(1, 2, 2)

    plt.plot(epochs, test_accuracy, label='Test Accuracy', marker='o', color='red')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.title('Test Accuracy Over Epochs')
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

## Question 3: Training Compressed Models from Scratch

**Objective:** Train compressed models (using SVD factorization) from
scratch rather than initializing from pre-trained weights, and compare
training dynamics and final accuracy.

**Implementation:** Creates SVDCompressedNet architecture with low-rank
factorization applied during training. Trains models with different
compression ranks (D) from scratch using dropout (0.5), comparing
convergence behavior and accuracy against compressed models initialized
from Q1 baseline.

In \[ \]:

    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torchvision import datasets, transforms
    from torch.utils.data import DataLoader

    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
    test_dataset = datasets.MNIST(root='./data', train=False, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In \[ \]:

    class BaselineMNISTClassifier(nn.Module):
        def __init__(self):
            super(BaselineMNISTClassifier, self).__init__()
            self.fc1 = nn.Linear(28*28, 1024)
            self.fc2 = nn.Linear(1024, 1024)
            self.fc3 = nn.Linear(1024, 1024)
            self.fc4 = nn.Linear(1024, 1024)
            self.fc5 = nn.Linear(1024, 1024)
            self.out = nn.Linear(1024, 10)
            self.dropout = nn.Dropout(0.5)

        def forward(self, x):
            x = x.view(-1, 28*28)
            x = self.dropout(torch.relu(self.fc1(x)))
            x = self.dropout(torch.relu(self.fc2(x)))
            x = self.dropout(torch.relu(self.fc3(x)))
            x = self.dropout(torch.relu(self.fc4(x)))
            x = self.dropout(torch.relu(self.fc5(x)))
            x = self.out(x)
            return x

    class SVDCompressedNet(nn.Module):
        def __init__(self, D=20):
            super(SVDCompressedNet, self).__init__()
            self.D = D
            self.fc1 = nn.Linear(28*28, 1024)
            self.fc2 = nn.Linear(1024, 1024)
            self.fc3 = nn.Linear(1024, 1024)
            self.fc4 = nn.Linear(1024, 1024)
            self.fc5 = nn.Linear(1024, 1024)
            self.out = nn.Linear(1024, 10)
            self.dropout = nn.Dropout(0.5)

        def svd_compress(self, weight):
            U, S, V = torch.svd(weight)
            return U[:, :self.D] @ torch.diag(S[:self.D]) @ V.t()[:self.D, :]

        def forward(self, x):
            x = x.view(-1, 28*28)
            #do svd for every forward step
            x = self.dropout(torch.relu(self.svd_compress(self.fc1.weight) @ x.t() + self.fc1.bias.unsqueeze(1))).t() 
            x = self.dropout(torch.relu(self.svd_compress(self.fc2.weight) @ x.t() + self.fc2.bias.unsqueeze(1))).t()
            x = self.dropout(torch.relu(self.svd_compress(self.fc3.weight) @ x.t() + self.fc3.bias.unsqueeze(1))).t()
            x = self.dropout(torch.relu(self.svd_compress(self.fc4.weight) @ x.t() + self.fc4.bias.unsqueeze(1))).t()
            x = self.dropout(torch.relu(self.svd_compress(self.fc5.weight) @ x.t() + self.fc5.bias.unsqueeze(1))).t()
            x = self.out(x)  # last layer remains uncompressed
            return x

In \[ \]:

    model = SVDCompressedNet().to(device)
    baseline_model = BaselineMNISTClassifier()
    baseline_model.load_state_dict(torch.load('mnist_baseline.pth', weights_only=True))
    model.load_state_dict(baseline_model.state_dict())

    <All keys matched successfully>

In \[ \]:

    # Training function
    def train(model, train_loader, optimizer, criterion, device):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

    def test(model, test_loader, device):
        model.eval()
        correct = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                pred = output.argmax(dim=1, keepdim=True)
                correct += pred.eq(target.view_as(pred)).sum().item()
        return correct / len(test_loader.dataset)

    # loop
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)  # Using a small learning rate here

In \[ \]:

    #train - Took me forever this
    num_epochs = 10
    for epoch in range(num_epochs):
        train(model, train_loader, optimizer, criterion, device)
        accuracy = test(model, test_loader, device)
        print(f"Epoch {epoch+1}/{num_epochs}, Test Accuracy: {accuracy:.4f}")

    # Final evaluation
    final_accuracy = test(model, test_loader, device)
    print(f"Final Test Accuracy: {final_accuracy:.4f}")

    # Calculate compression ratio
    def count_parameters(model):
        return sum(p.numel() for p in model.parameters() if p.requires_grad)

    baseline_params = count_parameters(baseline_model)
    compressed_params = count_parameters(model)

    Epoch 1/10, Test Accuracy: 0.9720
    Epoch 2/10, Test Accuracy: 0.9750
    Epoch 3/10, Test Accuracy: 0.9764
    Epoch 4/10, Test Accuracy: 0.9777
    Epoch 5/10, Test Accuracy: 0.9774
    Epoch 6/10, Test Accuracy: 0.9771
    Epoch 7/10, Test Accuracy: 0.9776
    Epoch 8/10, Test Accuracy: 0.9789
    Epoch 9/10, Test Accuracy: 0.9787
    Epoch 10/10, Test Accuracy: 0.9795
    Final Test Accuracy: 0.9795

In \[ \]:

    import matplotlib.pyplot as plt
    epochs = list(range(1, 11))
    test_accuracy = [0.9720, 0.9750, 0.9764, 0.9777, 0.9774, 0.9771, 0.9776, 0.9789, 0.9787, 0.9795] #have taken the values from above
    test_accuracy = [acc * 100 for acc in test_accuracy] #for better interpretibility
    plt.figure(figsize=(10, 6))
    plt.plot(epochs, test_accuracy, marker='o', linestyle='-', color='b')
    plt.xlabel('Epochs')
    plt.ylabel('Test Accuracy')
    plt.title('Test Accuracy Over Epochs')
    plt.xticks(epochs)
    plt.grid(True)
    plt.annotate(f'Final: {test_accuracy[-1]:.4f}', 
                 xy=(epochs[-1], test_accuracy[-1]), 
                 xytext=(epochs[-1] - 1, test_accuracy[-1] - 0.002),
                 arrowprops=dict(facecolor='black', arrowstyle='->'),
                 fontsize=10)
    plt.show()

## Question 4: Speaker Recognition with Siamese Networks

**Objective:** Implement speaker recognition using a Siamese network
architecture with GRU encoder and cosine similarity-based verification.

**Implementation:** SpeakerEncoder with bidirectional GRU
(input_size=513, hidden_size=256, 2 layers) processes audio
spectrograms. SiameseNetwork computes L2-normalized embeddings and
cosine similarity between pairs. Trained on speaker verification task
(same/different speaker classification) using contrastive loss.

In \[ \]:

    import torch
    import torch.nn as nn
    import torch.optim as optim
    import numpy as np
    import pickle
    import librosa
    from torch.utils.data import Dataset, DataLoader
    import itertools
    import random
    import matplotlib.pyplot as plt

In \[ \]:

    class SpeakerEncoder(nn.Module):
        def __init__(self, input_size=513, hidden_size=256, num_layers=2):
            super(SpeakerEncoder, self).__init__()
            self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
            self.fc = nn.Linear(hidden_size * 2, hidden_size)

        def forward(self, x):
            output, hidden = self.gru(x)
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
            embedding = self.fc(hidden)
            embedding = torch.nn.functional.normalize(embedding, p=2, dim=1)
            return embedding

    #siamese network
    class SiameseNetwork(nn.Module):
        def __init__(self, encoder):
            super(SiameseNetwork, self).__init__()
            self.encoder = encoder
            
        def forward(self, x1, x2):
            embedding1 = self.encoder(x1)
            embedding2 = self.encoder(x2)
            similarity = torch.sum(embedding1 * embedding2, dim=1, keepdim=True)
            output = torch.sigmoid(similarity)
            return output

In \[ \]:

    class SpeakerPairDataset(Dataset):
        def __init__(self, data, L, speaker_idx, mode='train'):
            self.data = data
            self.L = L
            self.speaker_idx = speaker_idx
            self.mode = mode
            self.pairs, self.labels = self._create_pairs()
        
        def _create_pairs(self):
            pairs = []
            labels = []
            start_idx = self.speaker_idx * 10
            speaker_utterances = self.data[start_idx:start_idx + 10]
            pos_pairs = list(itertools.combinations(range(10), 2))
            if self.L < len(pos_pairs):
                pos_pairs = random.sample(pos_pairs, self.L)
            for i, j in pos_pairs:
                pairs.append((speaker_utterances[i], speaker_utterances[j]))
                labels.append(1)
            other_speakers = list(range(50 if self.mode == 'train' else 20))
            other_speakers.remove(self.speaker_idx)
            for _ in range(self.L):
                i = random.randint(0, 9)
                other_speaker = random.choice(other_speakers)
                j = random.randint(0, 9)
                other_utterance = self.data[other_speaker * 10 + j]
                pairs.append((speaker_utterances[i], other_utterance))
                labels.append(0)
            return pairs, labels
        
        def __len__(self):
            return len(self.pairs)
        
        def __getitem__(self, idx):
            utterance1, utterance2 = self.pairs[idx]
            n_fft = 1024 
            hop_length = n_fft // 4
            spec1 = np.abs(librosa.stft(utterance1, n_fft=n_fft, hop_length=hop_length))
            spec2 = np.abs(librosa.stft(utterance2, n_fft=n_fft, hop_length=hop_length))
            assert spec1.shape[0] == 513, f"Expected 513 frequency bins, got {spec1.shape[0]}"
            spec1 = np.log1p(spec1)
            spec2 = np.log1p(spec2)
            spec1 = (spec1 - spec1.mean()) / (spec1.std() + 1e-8)
            spec2 = (spec2 - spec2.mean()) / (spec2.std() + 1e-8)
            
            spec1 = torch.FloatTensor(spec1.T)
            spec2 = torch.FloatTensor(spec2.T)
            label = torch.FloatTensor([self.labels[idx]]).reshape(1)
            return spec1, spec2, label

In \[ \]:

    def train_model(train_data, test_data, L=10, num_epochs=50):
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {device}")
        encoder = SpeakerEncoder().to(device)
        model = SiameseNetwork(encoder).to(device)
        criterion = nn.BCELoss()
        optimizer = optim.Adam(model.parameters())
        
        # Initialize lists to store metrics
        train_losses = []
        test_losses = []
        train_accuracies = []
        test_accuracies = []
        
        for epoch in range(num_epochs):
            model.train()
            total_loss = 0
            correct_train = 0
            total_train = 0
            
            # Training loop
            for speaker_idx in range(50):
                dataset = SpeakerPairDataset(train_data, L, speaker_idx)
                dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
                
                for batch_idx, (spec1, spec2, labels) in enumerate(dataloader):
                    spec1, spec2, labels = spec1.to(device), spec2.to(device), labels.to(device)
                    
                    optimizer.zero_grad()
                    outputs = model(spec1, spec2)
                    loss = criterion(outputs, labels)
                    
                    loss.backward()
                    optimizer.step()
                    
                    total_loss += loss.item()
                    
                    # Calculate training accuracy
                    predicted = (outputs > 0.5).float()
                    correct_train += (predicted == labels).sum().item()
                    total_train += labels.size(0)
            
            # Average training loss and accuracy for the epoch
            avg_train_loss = total_loss / 50
            train_accuracy = 100 * correct_train / total_train
            train_losses.append(avg_train_loss)
            train_accuracies.append(train_accuracy)
            
            print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}, Training Accuracy: {train_accuracy:.2f}%')
            
            model.eval()
            total_test_loss = 0
            correct_test = 0
            total_test = 0
            with torch.no_grad():
                for speaker_idx in range(20):
                    dataset = SpeakerPairDataset(test_data, L, speaker_idx, mode='test')
                    dataloader = DataLoader(dataset, batch_size=32)
                    
                    for spec1, spec2, labels in dataloader:
                        spec1, spec2, labels = spec1.to(device), spec2.to(device), labels.to(device)
                        outputs = model(spec1, spec2)
                        loss = criterion(outputs, labels)
                        
                        total_test_loss += loss.item()
                        predicted = (outputs > 0.5).float()
                        correct_test += (predicted == labels).sum().item()
                        total_test += labels.size(0)
            
            avg_test_loss = total_test_loss / 20
            test_accuracy = 100 * correct_test / total_test
            test_losses.append(avg_test_loss)
            test_accuracies.append(test_accuracy)
            
            print(f'Epoch {epoch+1}/{num_epochs}, Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%')
        
        epochs = range(1, num_epochs + 1)
        
        plt.figure(figsize=(14, 6))
        
        plt.subplot(1, 2, 1)
        plt.plot(epochs, train_losses, label='Training Loss', marker='o')
        plt.plot(epochs, test_losses, label='Testing Loss', marker='o')
        plt.xlabel('Epochs')
        plt.ylabel('Loss')
        plt.title('Training and Testing Loss Over Epochs')
        plt.legend()
        plt.grid(True)
        
        plt.subplot(1, 2, 2)
        plt.plot(epochs, train_accuracies, label='Training Accuracy', marker='o')
        plt.plot(epochs, test_accuracies, label='Testing Accuracy', marker='o')
        plt.xlabel('Epochs')
        plt.ylabel('Accuracy (%)')
        plt.title('Training and Testing Accuracy Over Epochs')
        plt.legend()
        plt.grid(True)
        
        plt.tight_layout()
        plt.show()
        
        return model, train_accuracies, test_accuracies, train_losses, test_losses

In \[ \]:

    if __name__ == "__main__":
        with open('trs.pkl', 'rb') as f:
            train_data = pickle.load(f)
        with open('tes.pkl', 'rb') as f:
            test_data = pickle.load(f)
        model, train_accuracies, test_accuracies, train_losses, test_losses = train_model(train_data, test_data, L=10)

    Using device: cuda
    Epoch 1/50, Training Loss: 0.7358, Training Accuracy: 51.00%
    Epoch 1/50, Test Loss: 0.7345, Test Accuracy: 52.50%
    Epoch 2/50, Training Loss: 0.7142, Training Accuracy: 53.60%
    Epoch 2/50, Test Loss: 0.7420, Test Accuracy: 50.25%
    Epoch 3/50, Training Loss: 0.6997, Training Accuracy: 54.60%
    Epoch 3/50, Test Loss: 0.7025, Test Accuracy: 53.25%
    Epoch 4/50, Training Loss: 0.6855, Training Accuracy: 56.70%
    Epoch 4/50, Test Loss: 0.6601, Test Accuracy: 60.75%
    Epoch 5/50, Training Loss: 0.6589, Training Accuracy: 61.50%
    Epoch 5/50, Test Loss: 0.6914, Test Accuracy: 60.25%
    Epoch 6/50, Training Loss: 0.6954, Training Accuracy: 59.20%
    Epoch 6/50, Test Loss: 0.6111, Test Accuracy: 70.00%
    Epoch 7/50, Training Loss: 0.6151, Training Accuracy: 67.00%
    Epoch 7/50, Test Loss: 0.6430, Test Accuracy: 63.75%
    Epoch 8/50, Training Loss: 0.6238, Training Accuracy: 66.70%
    Epoch 8/50, Test Loss: 0.6036, Test Accuracy: 70.00%
    Epoch 9/50, Training Loss: 0.6123, Training Accuracy: 68.40%
    Epoch 9/50, Test Loss: 0.6548, Test Accuracy: 61.75%
    Epoch 10/50, Training Loss: 0.6143, Training Accuracy: 68.80%
    Epoch 10/50, Test Loss: 0.6003, Test Accuracy: 71.00%
    Epoch 11/50, Training Loss: 0.5913, Training Accuracy: 70.40%
    Epoch 11/50, Test Loss: 0.6024, Test Accuracy: 69.50%
    Epoch 12/50, Training Loss: 0.5699, Training Accuracy: 73.30%
    Epoch 12/50, Test Loss: 0.5951, Test Accuracy: 69.75%
    Epoch 13/50, Training Loss: 0.5702, Training Accuracy: 73.20%
    Epoch 13/50, Test Loss: 0.6089, Test Accuracy: 68.75%
    Epoch 14/50, Training Loss: 0.5680, Training Accuracy: 74.50%
    Epoch 14/50, Test Loss: 0.5932, Test Accuracy: 73.25%
    Epoch 15/50, Training Loss: 0.5630, Training Accuracy: 75.60%
    Epoch 15/50, Test Loss: 0.6261, Test Accuracy: 66.00%
    Epoch 16/50, Training Loss: 0.5672, Training Accuracy: 74.20%
    Epoch 16/50, Test Loss: 0.5805, Test Accuracy: 72.75%
    Epoch 17/50, Training Loss: 0.5589, Training Accuracy: 74.90%
    Epoch 17/50, Test Loss: 0.6141, Test Accuracy: 70.50%
    Epoch 18/50, Training Loss: 0.5776, Training Accuracy: 73.80%
    Epoch 18/50, Test Loss: 0.6029, Test Accuracy: 70.00%
    Epoch 19/50, Training Loss: 0.5746, Training Accuracy: 73.60%
    Epoch 19/50, Test Loss: 0.5800, Test Accuracy: 73.50%
    Epoch 20/50, Training Loss: 0.5756, Training Accuracy: 73.90%
    Epoch 20/50, Test Loss: 0.5792, Test Accuracy: 73.25%
    Epoch 21/50, Training Loss: 0.5561, Training Accuracy: 76.00%
    Epoch 21/50, Test Loss: 0.6012, Test Accuracy: 70.00%
    Epoch 22/50, Training Loss: 0.5643, Training Accuracy: 74.60%
    Epoch 22/50, Test Loss: 0.5641, Test Accuracy: 75.00%
    Epoch 23/50, Training Loss: 0.5527, Training Accuracy: 77.20%
    Epoch 23/50, Test Loss: 0.6027, Test Accuracy: 70.00%
    Epoch 24/50, Training Loss: 0.5652, Training Accuracy: 75.30%
    Epoch 24/50, Test Loss: 0.5641, Test Accuracy: 76.00%
    Epoch 25/50, Training Loss: 0.5320, Training Accuracy: 79.80%
    Epoch 25/50, Test Loss: 0.6259, Test Accuracy: 69.25%
    Epoch 26/50, Training Loss: 0.5503, Training Accuracy: 77.50%
    Epoch 26/50, Test Loss: 0.6041, Test Accuracy: 70.00%
    Epoch 27/50, Training Loss: 0.5523, Training Accuracy: 78.70%
    Epoch 27/50, Test Loss: 0.5785, Test Accuracy: 74.00%
    Epoch 28/50, Training Loss: 0.5410, Training Accuracy: 79.70%
    Epoch 28/50, Test Loss: 0.6024, Test Accuracy: 72.25%
    Epoch 29/50, Training Loss: 0.5496, Training Accuracy: 77.80%
    Epoch 29/50, Test Loss: 0.6040, Test Accuracy: 70.50%
    Epoch 30/50, Training Loss: 0.5495, Training Accuracy: 77.50%
    Epoch 30/50, Test Loss: 0.5893, Test Accuracy: 71.25%
    Epoch 31/50, Training Loss: 0.5339, Training Accuracy: 80.00%
    Epoch 31/50, Test Loss: 0.5750, Test Accuracy: 74.25%
    Epoch 32/50, Training Loss: 0.5465, Training Accuracy: 78.70%
    Epoch 32/50, Test Loss: 0.5714, Test Accuracy: 75.00%
    Epoch 33/50, Training Loss: 0.5503, Training Accuracy: 77.70%
    Epoch 33/50, Test Loss: 0.6066, Test Accuracy: 70.00%
    Epoch 34/50, Training Loss: 0.5529, Training Accuracy: 75.80%
    Epoch 34/50, Test Loss: 0.5981, Test Accuracy: 72.50%
    Epoch 35/50, Training Loss: 0.5348, Training Accuracy: 79.20%
    Epoch 35/50, Test Loss: 0.6024, Test Accuracy: 68.50%
    Epoch 36/50, Training Loss: 0.5235, Training Accuracy: 81.70%
    Epoch 36/50, Test Loss: 0.5996, Test Accuracy: 72.00%
    Epoch 37/50, Training Loss: 0.5443, Training Accuracy: 78.70%
    Epoch 37/50, Test Loss: 0.6026, Test Accuracy: 71.50%
    Epoch 38/50, Training Loss: 0.5518, Training Accuracy: 77.60%
    Epoch 38/50, Test Loss: 0.5673, Test Accuracy: 75.75%
    Epoch 39/50, Training Loss: 0.5464, Training Accuracy: 77.90%
    Epoch 39/50, Test Loss: 0.5883, Test Accuracy: 74.50%
    Epoch 40/50, Training Loss: 0.5337, Training Accuracy: 79.30%
    Epoch 40/50, Test Loss: 0.5773, Test Accuracy: 73.00%
    Epoch 41/50, Training Loss: 0.5502, Training Accuracy: 77.20%
    Epoch 41/50, Test Loss: 0.5856, Test Accuracy: 72.25%
    Epoch 42/50, Training Loss: 0.5360, Training Accuracy: 79.50%
    Epoch 42/50, Test Loss: 0.5727, Test Accuracy: 73.75%
    Epoch 43/50, Training Loss: 0.5299, Training Accuracy: 80.00%
    Epoch 43/50, Test Loss: 0.5732, Test Accuracy: 73.50%
    Epoch 44/50, Training Loss: 0.5314, Training Accuracy: 80.70%
    Epoch 44/50, Test Loss: 0.6073, Test Accuracy: 70.75%
    Epoch 45/50, Training Loss: 0.5350, Training Accuracy: 81.30%
    Epoch 45/50, Test Loss: 0.5803, Test Accuracy: 73.50%
    Epoch 46/50, Training Loss: 0.5285, Training Accuracy: 82.30%
    Epoch 46/50, Test Loss: 0.5846, Test Accuracy: 71.75%
    Epoch 47/50, Training Loss: 0.5326, Training Accuracy: 80.40%
    Epoch 47/50, Test Loss: 0.5836, Test Accuracy: 73.50%
    Epoch 48/50, Training Loss: 0.5504, Training Accuracy: 78.20%
    Epoch 48/50, Test Loss: 0.5765, Test Accuracy: 74.75%
    Epoch 49/50, Training Loss: 0.5570, Training Accuracy: 77.50%
    Epoch 49/50, Test Loss: 0.6557, Test Accuracy: 64.50%
    Epoch 50/50, Training Loss: 0.5338, Training Accuracy: 80.10%
    Epoch 50/50, Test Loss: 0.6034, Test Accuracy: 70.00%

## Question 5: Audio Denoising with Spectral Processing

**Objective:** Develop deep learning models for audio denoising using
spectral-domain processing (STFT/ISTFT) and neural network-based
filtering.

**Implementation:** SpeechDenoisingDataset loads noisy/clean/noise audio
triplets. Converts audio to spectrograms using STFT (n_fft=1024,
hop_length=512) with librosa. Neural network processes spectrograms to
predict clean signal. Reconstructs denoised audio via ISTFT. Evaluates
using Signal-to-Noise Ratio (SNR) metrics and saves best model to handle
overfitting.

In \[ \]:

    import torch
    import torch.nn as nn
    import torch.optim as optim
    import librosa
    import numpy as np
    from torch.utils.data import Dataset, DataLoader
    import os

    class SpeechDenoisingDataset(Dataset):
        def __init__(self, noisy_folder, clean_folder, noise_folder, max_seq_length=None, pad_value=0):
            self.noisy_folder = noisy_folder
            self.clean_folder = clean_folder
            self.noise_folder = noise_folder
            self.max_seq_length = max_seq_length
            self.pad_value = pad_value
            self.file_paths = self._load_file_paths()

        def _load_file_paths(self):
            file_paths = []
            for i in range(1200):  # Files from 000 to 1199
                noisy_file = os.path.join(self.noisy_folder, f'trx{i:04d}.wav')
                clean_file = os.path.join(self.clean_folder, f'trs{i:04d}.wav')
                noise_file = os.path.join(self.noise_folder, f'trn{i:04d}.wav')
                file_paths.append((noisy_file, clean_file, noise_file))
            return file_paths
        
        def __len__(self):
            return len(self.file_paths)

        def __getitem__(self, idx):
            noisy_file, clean_file, noise_file = self.file_paths[idx]

            y_noisy, sr = librosa.load(noisy_file, sr=None)
            y_clean, _ = librosa.load(clean_file, sr=None)
            y_noise, _ = librosa.load(noise_file, sr=None)

            noisy_spec = librosa.stft(y_noisy,n_fft=1024, hop_length=512, win_length=1024)
            clean_spec = librosa.stft(y_clean,n_fft=1024, hop_length=512, win_length=1024)
            noise_spec = librosa.stft(y_noise,n_fft=1024, hop_length=512, win_length=1024)

            noisy_mag = np.abs(noisy_spec)
            clean_mag = np.abs(clean_spec)
            noise_mag = np.abs(noise_spec)

            ibm = (clean_mag > noise_mag).astype(np.float32)

            if self.max_seq_length:
                noisy_mag = self._pad_sequence(noisy_mag, self.max_seq_length)
                clean_mag = self._pad_sequence(clean_mag, self.max_seq_length)
                ibm = self._pad_sequence(ibm, self.max_seq_length)
            return torch.tensor(noisy_mag.T, dtype=torch.float32), torch.tensor(ibm.T, dtype=torch.float32)

        def _pad_sequence(self, sequence, max_length):

            #Pads a sequence to the max_length with the specified pad_value

            seq_len = sequence.shape[1]
            if seq_len < max_length:
                # Pad along the time axis (axis 1)
                padding = np.full((sequence.shape[0], max_length - seq_len), self.pad_value)
                return np.concatenate((sequence, padding), axis=1)
            else:
                return sequence[:, :max_length]

In \[ \]:

    #view the spectogram

    import librosa.display
    import matplotlib.pyplot as plt
    file_path = './homework3/tr/trn0789.wav'
    y, sr = librosa.load(file_path, sr=None) 
    D = librosa.stft(y, n_fft=1024, hop_length=512, win_length=1024)
    S = np.abs(D)
    plt.figure(figsize=(10, 6))
    librosa.display.specshow(librosa.amplitude_to_db(S, ref=np.max), y_axis='log', x_axis='time', sr=sr)
    plt.colorbar(format='%+2.0f dB')
    plt.title('Spectrogram')
    plt.show()
    print("Spectrogram dimensions:", S.shape)

    Spectrogram dimensions: (513, 70)

In \[ \]:

    import torch.nn as nn
    import torch.optim as optim

    class DenoisingGRU(nn.Module):
        def __init__(self, input_dim, hidden_dim, output_dim, num_layers, dropout):
            super(DenoisingGRU, self).__init__()
            self.gru = nn.GRU(input_dim, hidden_dim, num_layers, 
                              batch_first=True, 
                              dropout=dropout, 
                              bidirectional=True)
            self.layer_norm = nn.LayerNorm(hidden_dim * 2)
            self.fc = nn.Linear(hidden_dim * 2, output_dim)
            self.fc_dropout = nn.Dropout(dropout)

        def forward(self, x):
            gru_out, _ = self.gru(x)
            gru_out = self.layer_norm(gru_out)
            gru_out = self.fc_dropout(gru_out)
            mask = self.fc(gru_out)
            return torch.sigmoid(mask)

In \[ \]:

    def compute_snr(noisy_signal, clean_signal, denoised_signal):
        noise = noisy_signal - denoised_signal
        signal_power = np.sum(clean_signal ** 2)
        noise_power = np.sum(noise ** 2)
        snr = 10 * np.log10(signal_power / noise_power)
        return snr

In \[ \]:

    class SpeechDenoisingValidationDataset(Dataset):
        def __init__(self, noisy_folder, clean_folder, noise_folder, max_seq_length=None, pad_value=0):
            self.noisy_folder = noisy_folder
            self.clean_folder = clean_folder
            self.noise_folder = noise_folder
            self.max_seq_length = max_seq_length
            self.pad_value = pad_value 
            self.file_paths = self._load_file_paths()
        def _load_file_paths(self):
            file_paths = []
            for i in range(120):  # Files from 000 to 119 for validation
                noisy_file = os.path.join(self.noisy_folder, f'vn{i:04d}.wav')
                clean_file = os.path.join(self.clean_folder, f'vs{i:04d}.wav')
                noise_file = os.path.join(self.noise_folder, f'vx{i:04d}.wav')
                file_paths.append((noisy_file, clean_file, noise_file))
            return file_paths

        def __len__(self):
            return len(self.file_paths)

        def __getitem__(self, idx):
            noisy_file, clean_file, noise_file = self.file_paths[idx]
            y_noisy, sr = librosa.load(noisy_file, sr=None)
            y_clean, _ = librosa.load(clean_file, sr=None)
            y_noise, _ = librosa.load(noise_file, sr=None)
            noisy_spec = librosa.stft(y_noisy,n_fft=1024, hop_length=512, win_length=1024)
            clean_spec = librosa.stft(y_clean,n_fft=1024, hop_length=512, win_length=1024)
            noise_spec = librosa.stft(y_noise,n_fft=1024, hop_length=512, win_length=1024)
            noisy_mag = np.abs(noisy_spec)
            clean_mag = np.abs(clean_spec)
            noise_mag = np.abs(noise_spec)
            ibm = (clean_mag > noise_mag).astype(np.float32)
            if self.max_seq_length:
                noisy_mag = self._pad_sequence(noisy_mag, self.max_seq_length)
                clean_mag = self._pad_sequence(clean_mag, self.max_seq_length)
                ibm = self._pad_sequence(ibm, self.max_seq_length)
            return torch.tensor(noisy_mag.T, dtype=torch.float32), torch.tensor(ibm.T, dtype=torch.float32)

        def _pad_sequence(self, sequence, max_length):
            seq_len = sequence.shape[1]
            if seq_len < max_length:
                padding = np.full((sequence.shape[0], max_length - seq_len), self.pad_value)
                return np.concatenate((sequence, padding), axis=1)
            else:
                return sequence[:, :max_length]

In \[ \]:

    val_folder = '.\\homework3\\v\\'
    noisy_folder_val = val_folder
    clean_folder_val = val_folder
    noise_folder_val = val_folder
    validation_dataset = SpeechDenoisingValidationDataset(noisy_folder_val, clean_folder_val, noise_folder_val, max_seq_length=150)
    validation_loader = DataLoader(validation_dataset, batch_size=64, shuffle=False)

In \[ \]:

    from torch.utils.data import DataLoader
    from torch.optim.lr_scheduler import ReduceLROnPlateau
    batch_size = 120  # Batch size for training
    learning_rate = 0.001  # Learning rate for the optimizer
    train_folder = '.\\homework3\\tr'
    noisy_folder = train_folder
    clean_folder = train_folder
    noise_folder = train_folder
    train_dataset = SpeechDenoisingDataset(noisy_folder, clean_folder, noise_folder, max_seq_length=150)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

In \[ \]:

    # Initialize model, loss function, and optimizer
    model = DenoisingGRU(input_dim=513, hidden_dim=128, output_dim=513, num_layers=2, dropout=0.2)
    loss_fn = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)  # Add L2 regularization
    scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=30)
    best_val_loss = float('inf')
    num_epochs = 20

In \[ \]:

    # Initialize lists to track metrics
    epoch_max_snr_list = [] 
    all_snr_values = []  
    train_loss_list = []  
    val_loss_list = [] 
    for epoch in range(num_epochs):
        model.train()  # Set model to training mode
        train_loss = 0.0
        for batch_idx, (noisy_inputs, ibm_targets) in enumerate(train_loader):
            optimizer.zero_grad()
            outputs = model(noisy_inputs)
            loss = loss_fn(outputs, ibm_targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        avg_train_loss = train_loss / len(train_loader)
        train_loss_list.append(avg_train_loss)
        print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {avg_train_loss:.4f}")
        
        # Validation Loop
        model.eval() 
        val_loss = 0.0
        max_snr_list = [] 
        with torch.no_grad():
            for val_noisy_inputs, val_ibm_targets in validation_loader:
                val_outputs = model(val_noisy_inputs)
                val_loss += loss_fn(val_outputs, val_ibm_targets)
                batch_max_snr = float('-inf') 
                for i in range(val_noisy_inputs.size(0)):
                    noisy_signal = val_noisy_inputs[i].numpy()
                    clean_signal = val_ibm_targets[i].numpy()
                    recovered_signal = val_outputs[i].numpy()
                    snr = compute_snr(noisy_signal, clean_signal, recovered_signal)
                    all_snr_values.append(snr)  
                    batch_max_snr = max(batch_max_snr, snr)
                max_snr_list.append(batch_max_snr) 
        avg_val_loss = val_loss / len(validation_loader)
        avg_max_snr = np.mean(max_snr_list)  
        epoch_max_snr_list.append(avg_max_snr)
        val_loss_list.append(avg_val_loss)
        print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {avg_val_loss:.4f}, Max Avg SNR: {avg_max_snr:.4f} dB")
        # Save the best model
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_denoising_model.pth')

    Epoch 1/20, Training Loss: 0.6017
    Epoch 1/20, Validation Loss: 0.4400, Max Avg SNR: 8.2505 dB
    Epoch 2/20, Training Loss: 0.4675
    Epoch 2/20, Validation Loss: 0.4147, Max Avg SNR: 9.2368 dB
    Epoch 3/20, Training Loss: 0.4148
    Epoch 3/20, Validation Loss: 0.3998, Max Avg SNR: 10.5524 dB
    Epoch 4/20, Training Loss: 0.3870
    Epoch 4/20, Validation Loss: 0.4515, Max Avg SNR: 10.2967 dB
    Epoch 5/20, Training Loss: 0.3714
    Epoch 5/20, Validation Loss: 0.4552, Max Avg SNR: 10.5448 dB
    Epoch 6/20, Training Loss: 0.3572
    Epoch 6/20, Validation Loss: 0.4367, Max Avg SNR: 10.5045 dB
    Epoch 7/20, Training Loss: 0.3489
    Epoch 7/20, Validation Loss: 0.4099, Max Avg SNR: 10.5280 dB
    Epoch 8/20, Training Loss: 0.3429
    Epoch 8/20, Validation Loss: 0.4449, Max Avg SNR: 10.3200 dB
    Epoch 9/20, Training Loss: 0.3351
    Epoch 9/20, Validation Loss: 0.4166, Max Avg SNR: 10.4459 dB
    Epoch 10/20, Training Loss: 0.3293
    Epoch 10/20, Validation Loss: 0.4088, Max Avg SNR: 10.4244 dB
    Epoch 11/20, Training Loss: 0.3252
    Epoch 11/20, Validation Loss: 0.4121, Max Avg SNR: 10.3916 dB
    Epoch 12/20, Training Loss: 0.3199
    Epoch 12/20, Validation Loss: 0.3908, Max Avg SNR: 10.2997 dB
    Epoch 13/20, Training Loss: 0.3158
    Epoch 13/20, Validation Loss: 0.3920, Max Avg SNR: 10.2864 dB
    Epoch 14/20, Training Loss: 0.3130
    Epoch 14/20, Validation Loss: 0.4087, Max Avg SNR: 9.4413 dB
    Epoch 15/20, Training Loss: 0.3135
    Epoch 15/20, Validation Loss: 0.3807, Max Avg SNR: 10.1854 dB
    Epoch 16/20, Training Loss: 0.3083
    Epoch 16/20, Validation Loss: 0.3977, Max Avg SNR: 10.4265 dB
    Epoch 17/20, Training Loss: 0.3037
    Epoch 17/20, Validation Loss: 0.3844, Max Avg SNR: 10.0120 dB
    Epoch 18/20, Training Loss: 0.3002
    Epoch 18/20, Validation Loss: 0.3889, Max Avg SNR: 9.5695 dB
    Epoch 19/20, Training Loss: 0.2977
    Epoch 19/20, Validation Loss: 0.3830, Max Avg SNR: 9.8321 dB
    Epoch 20/20, Training Loss: 0.2965
    Epoch 20/20, Validation Loss: 0.3918, Max Avg SNR: 9.5528 dB

In \[ \]:

    import matplotlib.pyplot as plt

    # Average Max SNR per Epoch 
    plt.figure(figsize=(10, 6))
    epochs = list(range(1, num_epochs + 1))
    plt.plot(epochs, epoch_max_snr_list, marker='o', color='b', label='Average Max SNR per Epoch', linestyle='-', markersize=5)
    plt.xlabel('Epochs')
    plt.ylabel('SNR (dB)')
    plt.title('Average Max SNR per Epoch')
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Plotting All SNR Values
    plt.figure(figsize=(10, 6))
    plt.plot(all_snr_values, marker='o', color='r', label='All SNR values', linestyle='-', markersize=5)
    plt.xlabel('Batch Index')
    plt.ylabel('SNR (dB)')
    plt.title('All SNR Values Across Batches')
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

    # Plotting Training and Validation Loss
    plt.figure(figsize=(10, 6))
    plt.plot(epochs, train_loss_list, marker='x', color='g', label='Training Loss', linestyle='-', markersize=5)
    plt.plot(epochs, val_loss_list, marker='o', color='m', label='Validation Loss', linestyle='-', markersize=5)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.grid(True)
    plt.legend()
    plt.tight_layout()
    plt.show()

##### Note: Best model saved during training for inference to handle overfitting.

In \[ \]:

    import os
    import librosa
    import torch
    import soundfile as sf
    def load_and_preprocess(filename):
        y, sr = librosa.load(filename, sr=None)
        spec = librosa.stft(y, n_fft=1024, hop_length=512, win_length=1024)
        mag = np.abs(spec)
        mag_tensor = torch.tensor(mag.T, dtype=torch.float32) 
        mag_tensor = mag_tensor.unsqueeze(0)  
        return mag_tensor, sr 
    results_folder = './results'
    os.makedirs(results_folder, exist_ok=True)
    model.eval()
    test_folder = './homework3/te'
    for filename in os.listdir(test_folder):
        if filename.endswith('.wav'):
            filepath = os.path.join(test_folder, filename)
            noisy_spec, sr = load_and_preprocess(filepath)
            y, _ = librosa.load(filepath, sr=None)
            original_spec = librosa.stft(y, n_fft=1024, hop_length=512, win_length=1024)
            phase = np.angle(original_spec) 
            with torch.no_grad():
                denoised_spec = model(noisy_spec)
            denoised_mag = denoised_spec.squeeze(0).T.numpy() * np.abs(original_spec)

            denoised_complex_spec = denoised_mag * np.exp(1j * phase)
            denoised_audio = librosa.istft(denoised_complex_spec, hop_length=512, length=len(y))
            denoised_audio = denoised_audio / np.max(np.abs(denoised_audio))
            output_filename = os.path.join(results_folder, f"{filename}")
            sf.write(output_filename, denoised_audio, sr)

    print("All the files have been processed")

    All the files have been processed