In [None]:
import os
import torch
import librosa
import numpy as np
from torch import nn, optim
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from cp_resnet import get_model
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from tqdm import tqdm

n_mels = 64
segment_length = 3  # seconds
sample_rate = 16000

root_dir = "browser_recordings/browser_recordings/"
test_dir = "real_world_traffic/"

def compute_mel(audio, sr=16000, n_mels=64):
    mel = librosa.feature.melspectrogram(
        y=audio,
        sr=sr,
        n_mels=n_mels,
        hop_length=256,
        n_fft=1024,
        power=2.0
    )
    mel_db = librosa.power_to_db(mel, ref=np.max)
    mel_db = (mel_db - np.mean(mel_db)) / (np.std(mel_db) + 1e-6)
    mel_tensor = torch.tensor(mel_db, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
    return mel_tensor

In [2]:
class AudioDataset(Dataset):
    def __init__(self, root_dir, process_fn, ignored_folder, transform=None):
        """
        Args:
            root_dir (str): Path to main folder containing subfolders.
            process_fn (callable): Function to process raw audio to desired numpy array.
            ignored_folder (str): Name of subfolder to ignore.
            transform (callable, optional): Optional transform to apply to processed data.
        """
        self.root_dir = root_dir
        self.process_fn = process_fn
        self.transform = transform
        self.ignored_folder = ignored_folder
        
        self.file_paths = []
        self.labels = []
        self.label_map = {}  # map folder names to numeric labels
        self._prepare_dataset()

    def _prepare_dataset(self):
        # List subfolders
        subfolders = [d for d in os.listdir(self.root_dir)
                      if os.path.isdir(os.path.join(self.root_dir, d)) and d != self.ignored_folder]
        
        # Create label map
        self.label_map = {name: idx for idx, name in enumerate(subfolders)}
        
        for label_name in subfolders:
            folder_path = os.path.join(self.root_dir, label_name)
            for fname in os.listdir(folder_path):
                if fname.endswith('.npy'):  # Adjust extension if needed
                    self.file_paths.append(os.path.join(folder_path, fname))
                    self.labels.append(self.label_map[label_name])

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        audio_path = self.file_paths[idx]
        label = self.labels[idx]
        
        # Load audio
        audio_data = np.load(audio_path)[:48000]  # (samples,) or (samples, channels)
        # Process audio
        processed = self.process_fn(audio_data)
        processed = processed.squeeze(0)
        # Optional transform (e.g. normalization, tensor conversion)
        if self.transform:
            processed = self.transform(processed)
        return processed, label


In [3]:
audio_data = AudioDataset(root_dir, compute_mel, 'uncertain')
dataloader = DataLoader(audio_data, batch_size=32, shuffle=True)

test_a_data = AudioDataset(test_dir, compute_mel, 'uncertain')
test_loader = DataLoader(test_a_data, batch_size=32, shuffle=True)

In [4]:
model = get_model(n_classes=1, in_channels=1)


0 0.08333333333333333 0.5
1 0.08333333333333333 0.5
2 0.05892556509887896 0.5
3 0.04914731871829904 0.5


In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

# Binary cross entropy with logits (useful if model outputs logits)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Number of epochs
num_epochs = 30

for epoch in range(num_epochs):
    model.train()
    epoch_loss = 0.0
    
    loop = tqdm(dataloader, desc=f"Epoch [{epoch+1}/{num_epochs}]", leave=False, dynamic_ncols=True)
    
    for batch_data, batch_labels in loop:
        batch_data = batch_data.to(device)
        batch_labels = batch_labels.to(device).float().unsqueeze(1)  # (N,) -> (N,1)
        
        # Forward pass
        outputs = model(batch_data)
        
        # Compute loss
        loss = criterion(outputs, batch_labels)
        
        # Backprop and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Update epoch loss
        epoch_loss += loss.item() * batch_data.size(0)
        
        # Update tqdm description
        loop.set_postfix(loss=loss.item())

    avg_loss = epoch_loss / len(dataloader.dataset)
    if epoch+1 % 5 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}] - Average Loss: {avg_loss:.4f}")


Epoch [1/30]:   0%|          | 0/18 [00:00<?, ?it/s]

x: torch.Size([32, 1, 64, 188])
in_c: torch.Size([32, 32, 31, 93])
stage1: torch.Size([32, 32, 3, 46])
stage2: torch.Size([32, 64, 3, 46])
stage3: torch.Size([32, 92, 3, 46])
feed_forward: torch.Size([32, 1, 1, 1])
logit: torch.Size([32, 1])


                                                                          

In [6]:
model.eval()
all_labels = []
all_preds = []

with torch.no_grad():
    for batch_data, batch_labels in dataloader:
        batch_data = batch_data.to(device)
        batch_labels = batch_labels.to(device).float().unsqueeze(1)
        
        outputs = model(batch_data)  # [batch_size, 1]
        
        # Apply sigmoid since model gives logits
        probs = torch.sigmoid(outputs)
        
        # Convert to 0/1 predictions
        preds = (probs >= 0.5).int()
        
        all_labels.append(batch_labels.cpu())
        all_preds.append(preds.cpu())

# Concatenate all batches
all_labels = torch.cat(all_labels).numpy()
all_preds = torch.cat(all_preds).numpy()

# Flatten in case of shape (N,1)
all_labels = all_labels.flatten()
all_preds = all_preds.flatten()

# Calculate metrics
acc = accuracy_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)
cm = confusion_matrix(all_labels, all_preds)

print(f"Accuracy: {acc:.4f}")
print(f"F1 Score: {f1:.4f}")
print("Confusion Matrix:")
print(cm)


Accuracy: 0.9781
F1 Score: 0.9718
Confusion Matrix:
[[329   4]
 [  8 207]]


In [7]:
def test_model(model, test_loader, device):
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for batch_data, batch_labels in test_loader:
            batch_data = batch_data.to(device)
            batch_labels = batch_labels.to(device).float().unsqueeze(1)

            outputs = model(batch_data)
            probs = torch.sigmoid(outputs)
            preds = (probs >= 0.5).int()

            all_labels.append(batch_labels.cpu())
            all_preds.append(preds.cpu())

    # Concatenate results
    all_labels = torch.cat(all_labels).numpy().flatten()
    all_preds = torch.cat(all_preds).numpy().flatten()

    # Compute metrics
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    cm = confusion_matrix(all_labels, all_preds)

    # Print or return
    print(f"Test Accuracy: {acc:.4f}")
    print(f"Test F1 Score: {f1:.4f}")
    print("Test Confusion Matrix:")
    print(cm)

    return acc, f1, cm


In [8]:
result = test_model(model, test_loader, device)

Test Accuracy: 0.8770
Test F1 Score: 0.8340
Test Confusion Matrix:
[[180  17]
 [ 22  98]]


In [9]:
torch.save(model.state_dict(), 'speech_recog.pth')