In [3]:
import json
import librosa
from sklearn.model_selection import train_test_split
import torch
import librosa.display
from torch import nn
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import os
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

In [15]:
class AudioDataset(Dataset):
    def __init__(self, audio_files, annotations, sr=22050, n_mels=16, hop_length=256, max_len=4000, root_path = 'audios'):
        self.audio_files = audio_files
        self.annotations = annotations
        self.sr = sr
        self.n_mels = n_mels
        self.hop_length = hop_length
        self.max_len = max_len
        self.root_path = root_path

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        audio_file = self.audio_files[idx]
        audio_path = os.path.join(self.root_path, audio_file)
        y, _ = librosa.load(audio_path, sr=self.sr)
        mel_spec = librosa.feature.melspectrogram(y=y, sr=self.sr, n_mels=self.n_mels, hop_length=self.hop_length)
        mel_spec = librosa.power_to_db(mel_spec, ref=np.max)

        mel_spec = (mel_spec - mel_spec.mean()) / mel_spec.std()

        timestamps = self.annotations[audio_file]
        labels = np.zeros(mel_spec.shape[1])
        for start, end in timestamps:
            start_idx = int(start * self.sr / self.hop_length)
            end_idx = int(end * self.sr / self.hop_length)
            labels[start_idx:end_idx] = 1

        if mel_spec.shape[1] > self.max_len:
            mel_spec = mel_spec[:, :self.max_len]
            labels = labels[:self.max_len]
        else:
            pad_width = self.max_len - mel_spec.shape[1]
            mel_spec = np.pad(mel_spec, ((0, 0), (0, pad_width)), mode='constant')
            labels = np.pad(labels, (0, pad_width), mode='constant')

        return torch.tensor(mel_spec, dtype=torch.float32), torch.tensor(labels, dtype=torch.float32)


In [28]:
class CNNTransformerSoundClassifier(nn.Module):
    def __init__(self, input_size, num_heads, transformer_dim, cnn_filters=16, num_transformer_layers=2, output_size=1):
        super(CNNTransformerSoundClassifier, self).__init__()
        
        # 1D CNN for feature extraction
        self.cnn = nn.Sequential(
            nn.Conv1d(input_size, cnn_filters, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(cnn_filters),
            # nn.MaxPool1d(kernel_size=2),
        )
        
        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=cnn_filters,
            nhead=num_heads,
            dim_feedforward=transformer_dim,
            dropout=0.1,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_transformer_layers)

        # Fully Connected Layer for classification
        self.fc = nn.Linear(cnn_filters, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # Input shape: (batch_size, input_size, sequence_length)
        x = self.cnn(x)  # Output shape: (batch_size, cnn_filters, reduced_sequence_length)
        x = x.permute(0, 2, 1)  # Shape for Transformer: (batch_size, reduced_sequence_length, cnn_filters)
        x = self.transformer(x)  # Shape: (batch_size, reduced_sequence_length, cnn_filters)
        x = self.fc(x)  # Shape: (batch_size, reduced_sequence_length, output_size)
        return self.sigmoid(x)  # Shape: (batch_size, reduced_sequence_length, output_size)

In [29]:
def extract_timestamps(predictions, hop_length, sr, merge_threshold = 2, duration_threshold = 1):
    timestamps = []
    start = None
    for i, pred in enumerate(predictions):
        if pred > 0.5 and start is None:
            start = i
        elif pred <= 0.5 and start is not None:
            end = i
            timestamps.append((start * hop_length / sr, end * hop_length / sr))
            start = None
    if start is not None:
        timestamps.append((start * hop_length / sr, len(predictions) * hop_length / sr))
    
    merged_timestamps = []
    for ts in timestamps:
        if not merged_timestamps or ts[0] - merged_timestamps[-1][1] > merge_threshold:
            merged_timestamps.append(ts)
        else:
            merged_timestamps[-1] = (merged_timestamps[-1][0], ts[1])
    
    for ts in merged_timestamps:
        if ts[1] - ts[0] < duration_threshold:
            merged_timestamps.remove(ts)
    return merged_timestamps

In [30]:
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10):
    model.train()

    for epoch in range(epochs):
        total_train_loss = 0
        train_correct_predictions = 0
        train_total_samples = 0

        # with tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}/{epochs} - Training") as pbar:
        for mel_spec, labels in train_loader:
            # mel_spec, labels = mel_spec.cuda(), labels.cuda()

            optimizer.zero_grad()
            outputs = model(mel_spec)
            loss = criterion(outputs.squeeze(), labels.squeeze())
            loss.backward()
            optimizer.step() 

            total_train_loss += loss.item()

            predictions = (outputs.squeeze() > 0.5).float()
            train_correct_predictions += (predictions == labels).sum().item()
            train_total_samples += labels.numel()

            # Update the progress bar
            # pbar.set_postfix(loss=f"{loss.item():.4f}")
            # pbar.update(1)

        avg_train_loss = total_train_loss / len(train_loader)
        train_accuracy = train_correct_predictions / train_total_samples

        model.eval() 
        total_val_loss = 0
        val_correct_predictions = 0
        val_total_samples = 0

        with torch.no_grad():
            for mel_spec, labels in val_loader:
                # mel_spec, labels = mel_spec.cuda(), labels.cuda()

                outputs = model(mel_spec)
                loss = criterion(outputs.squeeze(), labels.squeeze())
                total_val_loss += loss.item()

                predictions = (outputs.squeeze() > 0.5).float()
                val_correct_predictions += (predictions == labels).sum().item()
                val_total_samples += labels.numel()

        avg_val_loss = total_val_loss / len(val_loader)
        val_accuracy = val_correct_predictions / val_total_samples

        print(f"Epoch {epoch+1}/{epochs} -> "
              f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4%}, "
              f"Val Loss: {avg_val_loss:.4f}, Val Accuracy: {val_accuracy:.4%}")

In [35]:
FEATURES = 32
HOP_LENGTH=256
MAX_LEN=4000
SAMPLE_RATE = 16000
EPOCHS = 10

In [None]:
with open("annotations.json", "r") as f:
    annotations = json.load(f)

audio_files = list(annotations.keys())

train_files, val_files = train_test_split(audio_files, test_size=0.2, random_state=42)
train_dataset = AudioDataset(train_files, annotations, n_mels=FEATURES, hop_length=HOP_LENGTH, max_len = MAX_LEN, sr = SAMPLE_RATE)
val_dataset = AudioDataset(val_files, annotations, n_mels=FEATURES, hop_length=HOP_LENGTH, max_len = MAX_LEN, sr = SAMPLE_RATE)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=False)

# Initialize the model
model = CNNTransformerSoundClassifier(
    input_size=FEATURES,          # Number of mel spectrogram features (match n_mels in AudioDataset)
    num_heads=4,            # Number of attention heads
    transformer_dim=32,     # Feedforward dimension in the transformer
    cnn_filters=16,         # Number of filters in the CNN layer
    num_transformer_layers=1,  # Number of transformer layers
    output_size=1           # Output 1 for binary classification
)

criterion = nn.BCELoss() 
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-3)

train_model(model, train_loader, val_loader, criterion, optimizer, epochs=20)

Epoch 1/20 -> Train Loss: 0.6266, Train Accuracy: 65.3542%, Val Loss: 0.6433, Val Accuracy: 49.1042%
Epoch 2/20 -> Train Loss: 0.5708, Train Accuracy: 76.1906%, Val Loss: 0.5098, Val Accuracy: 85.6333%
Epoch 3/20 -> Train Loss: 0.4838, Train Accuracy: 83.1250%, Val Loss: 0.4280, Val Accuracy: 85.5417%
Epoch 4/20 -> Train Loss: 0.4270, Train Accuracy: 84.7740%, Val Loss: 0.3731, Val Accuracy: 85.6958%
Epoch 5/20 -> Train Loss: 0.3965, Train Accuracy: 84.8375%, Val Loss: 0.3359, Val Accuracy: 85.6875%
Epoch 6/20 -> Train Loss: 0.3761, Train Accuracy: 84.9531%, Val Loss: 0.3176, Val Accuracy: 85.6792%
Epoch 7/20 -> Train Loss: 0.3628, Train Accuracy: 85.4375%, Val Loss: 0.3136, Val Accuracy: 85.6417%
Epoch 8/20 -> Train Loss: 0.3550, Train Accuracy: 85.7292%, Val Loss: 0.3103, Val Accuracy: 85.5375%
Epoch 9/20 -> Train Loss: 0.3492, Train Accuracy: 85.7365%, Val Loss: 0.3010, Val Accuracy: 85.5000%
Epoch 10/20 -> Train Loss: 0.3436, Train Accuracy: 85.9021%, Val Loss: 0.2997, Val Accuracy

KeyboardInterrupt: 

In [None]:
# class LSTMSoundClassifier(nn.Module):
#     def __init__(self, input_size, hidden_size, num_layers, output_size):
#         super(LSTMSoundClassifier, self).__init__()
#         # self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2)
#         self.fc = nn.Linear(input_size, hidden_size)
#         self.relu = nn.ReLU()
#         self.fc2 = nn.Linear(hidden_size, output_size)
#         self.sigmoid = nn.Sigmoid()

#     def forward(self, x):
#         x = x.permute(0, 2, 1)
#         out = self.relu(self.fc(x))
#         out = self.relu(self.fc2(out))
#         return self.sigmoid(out)