In [132]:
import os
from torch.utils.data import Dataset
import torch
import torchaudio
torchaudio.set_audio_backend("soundfile")
from torch import nn
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchaudio.transforms import MFCC

  torchaudio.set_audio_backend("soundfile")


In [133]:
import os
print(os.getcwd())

/Users/ramupadhyay/Documents


In [134]:
BATCH_SIZE=128
EPOCHS=15
LEARNING_RATE=0.0001
SAMPLE_RATE=22050
NUM_SAMPLES=22050*4
N_MFCC=40

In [153]:
import os

class AudioDataset(Dataset):
    def __init__(self, audio_dir, transformation, target_sample_rate, num_samples, device):
        self.audio_dir = audio_dir
        self.device = device
        self.transformation = transformation.to(self.device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples
        
        # list all files with class labels
        self.audio_files = []
        self.class_mapping = sorted(
    [d for d in os.listdir(audio_dir) if not d.startswith(".")]
)

        for label_name in self.class_mapping:
            class_folder = os.path.join(audio_dir, label_name)
            for file in os.listdir(class_folder):
                if file.endswith(".wav"):
                    self.audio_files.append((os.path.join(class_folder, file), label_name))

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, index):
        
        audio_path, label_name = self.audio_files[index]
        signal, sr = torchaudio.load(audio_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transformation(signal)
        label = self.class_mapping.index(label_name)
        
        return signal, label

    # helper functions
    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            signal = resampler(signal)
        return signal

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal

In [154]:
def create_data_loader(train_data,batch_size):
    return DataLoader(train_data,batch_size=BATCH_SIZE)

In [155]:
def train_single_epoch(model, data_loader, optimiser, loss_fn, device):
    model.train()
    total_loss = 0

    for input, target in data_loader:
        input, target = input.to(device), target.to(device)
        prediction = model(input)
        loss = loss_fn(prediction, target)
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
        total_loss += loss.item()
    
    avg_loss = total_loss / len(data_loader)
    scheduler.step(avg_val_loss)
    print(f"Average Loss: {avg_loss:.4f}")
  

In [156]:
def evaluate(model, dataloader, loss_fn, device):
    """Calculates loss and accuracy on a given dataloader."""
    model.eval() # Set model to evaluation mode (turns off Dropout, etc.)
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad(): # Do not calculate gradients during evaluation
        for input, target in dataloader:
            input, target = input.to(device), target.to(device)
            prediction = model(input)
            
            # Calculate Loss
            total_loss += loss_fn(prediction, target).item()
            
            # Calculate Accuracy
            _, predicted_class = torch.max(prediction, 1)
            total += target.size(0)
            correct += (predicted_class == target).sum().item()

    avg_loss = total_loss / len(dataloader)
    accuracy = 100 * correct / total
    model.train() # Set model back to training mode
    return avg_loss, accuracy
def train(model, train_dataloader, validation_dataloader, loss_fn, optimizer, device, epochs):
    best_validation_loss = float('inf')
    
    for epoch in range(epochs):
        print(f"Epoch {epoch+1}")
        
        # --- 1. Training Step ---
        current_train_loss = 0
        model.train()
        for input, target in train_dataloader:
            input, target = input.to(device), target.to(device)
            
            # Reset gradients
            optimizer.zero_grad()
            
            # Forward pass and calculate loss
            prediction = model(input)
            loss = loss_fn(prediction, target)
            current_train_loss += loss.item()
            
            # Backpropagation and optimization
            loss.backward()
            optimizer.step()
            
        avg_train_loss = current_train_loss / len(train_dataloader)

        # --- 2. Validation Step (Crucial for Early Stopping) ---
        avg_val_loss, val_accuracy = evaluate(model, validation_dataloader, loss_fn, device)

        print(f"  Training Loss: {avg_train_loss:.4f}")
        print(f"  Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")
        print("---------------------------------")
        
        # --- 3. Early Stopping / Model Saving ---
        if avg_val_loss < best_validation_loss:
            best_validation_loss = avg_val_loss
            # Save the model state dict if it achieves the lowest validation loss yet
            torch.save(model.state_dict(), "fest_model.pth")
            print("  âœ… Saved Model - New Best Validation Loss!")
            
    print("FINISHED TRAINING!")

In [160]:
class CNNNetwork(nn.Module):
    def __init__(self, input_shape=(1, 40, 44)): # Expects (Channels, MFCC_Bands, Time_Steps)
        super().__init__()
        # 1st convolutional block (16 channels)
        self.Conv1=nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        # 2nd convolutional block (32 channels)
        self.Conv2=nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        # 3rd convolutional block (64 channels)
        self.Conv3=nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        # 4th convolutional block (128 channels)
        self.Conv4=nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(p=0.4) 
        
        # ðŸš¨ NEW: Calculate the size of the feature map dynamically
        with torch.no_grad():
            dummy_input = torch.rand(1, *input_shape)
            x = self.Conv1(dummy_input)
            x = self.Conv2(x)
            x = self.Conv3(x)
            x = self.Conv4(x)
            flattened_size = self.flatten(x).shape[1]
            
        print(f"INFO: Dynamically calculated flattened size is: {flattened_size}")
        
        # Define linear layer using the calculated size
        self.linear = nn.Linear(6144, 5) 

    def forward(self, input_data):
        x = self.Conv1(input_data)
        x = self.Conv2(x)
        x = self.Conv3(x)
        x = self.Conv4(x)
        x = self.flatten(x)
        x = self.dropout(x)
        logits = self.linear(x)
        return logits

In [161]:
if __name__=='__main__':
    print("Using CPU")
    device="cpu"
    mfcc_transformation = MFCC(
    sample_rate=SAMPLE_RATE,
    n_mfcc=N_MFCC,  
    melkwargs={
        "n_fft": 1024,
        "hop_length": 512,
        "n_mels": 64, 
    }
    )
    dataset = AudioDataset(
    audio_dir="/Users/ramupadhyay/Desktop/train",
    transformation=mfcc_transformation,
    target_sample_rate=SAMPLE_RATE,
    num_samples=NUM_SAMPLES,
    device="cpu"
    )
    train_dataloader=create_data_loader(dataset,BATCH_SIZE)

    TRAIN_RATIO = 0.85 
    VALIDATION_RATIO = 0.15


    dataset_size = len(dataset)
    train_size = int(TRAIN_RATIO * dataset_size)
    validation_size = dataset_size - train_size

# 3. Use random_split to create two new datasets
    train_dataset, validation_dataset = torch.utils.data.random_split(
    dataset, 
    [train_size, validation_size],
    generator=torch.Generator().manual_seed(42) # Set seed for reproducibility
    )

# 4. Create separate DataLoaders for each set
    train_dataloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    validation_dataloader = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=False)
    cnn=CNNNetwork().to(device)
    print(cnn)

Using CPU
INFO: Dynamically calculated flattened size is: 2048
CNNNetwork(
  (Conv1): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (Conv2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (Conv3): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (Conv4): Sequential(
    (0): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (dropout): Dropout(p=0.4, inplace=False)
  (linear): Linear(in_featur

In [162]:
loss_fn=nn.CrossEntropyLoss()
optimiser=torch.optim.Adam(cnn.parameters(),lr=LEARNING_RATE)
scheduler = ReduceLROnPlateau(
    optimiser, 
    mode='min', 
    factor=0.5, # Reduce LR by half
    patience=3 # If validation loss doesn't improve for 3 epochs
)
train(cnn, train_dataloader, validation_dataloader, loss_fn, optimiser, device, EPOCHS)

Epoch 1
  Training Loss: 1.5545
  Validation Loss: 1.2620, Validation Accuracy: 50.77%
---------------------------------
  âœ… Saved Model - New Best Validation Loss!
Epoch 2
  Training Loss: 1.2051
  Validation Loss: 1.1044, Validation Accuracy: 59.07%
---------------------------------
  âœ… Saved Model - New Best Validation Loss!
Epoch 3
  Training Loss: 1.0505
  Validation Loss: 1.0226, Validation Accuracy: 64.86%
---------------------------------
  âœ… Saved Model - New Best Validation Loss!
Epoch 4
  Training Loss: 0.9154
  Validation Loss: 0.8706, Validation Accuracy: 70.27%
---------------------------------
  âœ… Saved Model - New Best Validation Loss!
Epoch 5
  Training Loss: 0.8087
  Validation Loss: 0.7701, Validation Accuracy: 72.78%
---------------------------------
  âœ… Saved Model - New Best Validation Loss!
Epoch 6
  Training Loss: 0.7383
  Validation Loss: 0.6819, Validation Accuracy: 76.45%
---------------------------------
  âœ… Saved Model - New Best Validation Loss

In [163]:
class_mapping=[
    "dog_bark",
    "drilling",
    "engine_idling",
    "siren",
    "street_music"
]
def predict(model, input, target, class_mapping):
    model.eval()
    with torch.no_grad():
        predictions = model(input)
        predicted_indices = torch.argmax(predictions, dim=1)
        predicted_labels = [class_mapping[i.item()] for i in predicted_indices]

        if target is not None:
            expected = [class_mapping[t] for t in target]
            return predicted_labels, expected
        else:
            return predicted_labels


class TestDataset(Dataset):
    def __init__(self, audio_dir, transformation, target_sample_rate, num_samples, device):
        self.audio_dir = audio_dir
        self.device = device
        self.transformation = transformation.to(self.device)
        self.target_sample_rate = target_sample_rate
        self.num_samples = num_samples
        self.files = [f for f in os.listdir(audio_dir) if f.endswith(".wav")]

    def __len__(self):
        return len(self.files)

    def __getitem__(self, index):
        file_name = self.files[index]
        file_path = os.path.join(self.audio_dir, file_name)
        signal, sr = torchaudio.load(file_path)
        signal = signal.to(self.device)
        signal = self._resample_if_necessary(signal, sr)
        signal = self._mix_down_if_necessary(signal)
        signal = self._cut_if_necessary(signal)
        signal = self._right_pad_if_necessary(signal)
        signal = self.transformation(signal)
        return signal, file_name

    # helper functions (same as before)
    def _cut_if_necessary(self, signal):
        if signal.shape[1] > self.num_samples:
            signal = signal[:, :self.num_samples]
        return signal

    def _right_pad_if_necessary(self, signal):
        length_signal = signal.shape[1]
        if length_signal < self.num_samples:
            num_missing_samples = self.num_samples - length_signal
            last_dim_padding = (0, num_missing_samples)
            signal = torch.nn.functional.pad(signal, last_dim_padding)
        return signal

    def _resample_if_necessary(self, signal, sr):
        if sr != self.target_sample_rate:
            resampler = torchaudio.transforms.Resample(sr, self.target_sample_rate)
            signal = resampler(signal)
        return signal

    def _mix_down_if_necessary(self, signal):
        if signal.shape[0] > 1:
            signal = torch.mean(signal, dim=0, keepdim=True)
        return signal


In [170]:
test_dataset = TestDataset(
    audio_dir="/Users/ramupadhyay/Desktop/test",
    transformation=mfcc_transformation,
    target_sample_rate=SAMPLE_RATE,
    num_samples=NUM_SAMPLES,
    device=device
)

test_loader = DataLoader(test_dataset, batch_size=128)

In [171]:
import pandas as pd

cnn.eval()
results = []

with torch.no_grad():
    for inputs, file_names in test_loader:
        inputs = inputs.to(device)
        predictions = cnn(inputs)
        predicted_indices = predictions.argmax(dim=1)
        for fname, pred_idx in zip(file_names, predicted_indices):
            label = class_mapping[pred_idx.item()]
            results.append((fname, label))


# create submission DataFrame
submission_df = pd.DataFrame(results, columns=["ID", "Class"])
submission_df.to_csv("ram12_submission.csv", index=False)
print("ram12_submission.csv created!")


ram12_submission.csv created!


In [172]:
!ls /Users/ramupadhyay/Desktop/train


[30m[43mdog_bark[m[m      [30m[43mdrilling[m[m      [30m[43mengine_idling[m[m [30m[43msiren[m[m         [30m[43mstreet_music[m[m
