In [94]:
import numpy as np

# Create monkey patches
np.float = float
np.int = int
np.object = object
np.bool = bool

In [95]:
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

import matplotlib.pyplot as plt
import gunshot_utils as utils
import importlib
import ast
import re
import os
import pickle
from glob import glob
import librosa.display
import IPython.display as ipd

import torch as th
import torchaudio
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from pydub import AudioSegment
from pydub.playback import play

importlib.reload(utils)

In [96]:
class GunshotDetectionCNN(nn.Module):
    def __init__(self, num_frames):
        super(GunshotDetectionCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 10, kernel_size=(3, 7))
        self.pool1 = nn.MaxPool2d(kernel_size=(3, 1))
        self.conv2 = nn.Conv2d(10, 20, kernel_size=(3, 3))
        self.pool2 = nn.MaxPool2d(kernel_size=(3, 1))

        # Dummy input to calculate the output size after conv and pooling layers
        dummy_input = th.zeros(1, 3, 80, num_frames)  # Shape: (batch_size, channels, height, width)
        dummy_output = self.pool2(F.relu(self.conv2(self.pool1(F.relu(self.conv1(dummy_input))))))

        # Flatten the dummy output to find the size for the first fully connected layer
        output_size = dummy_output.view(-1).shape[0]
        # print(f"Calculated output size for fc1: {output_size}")  # Debugging line

        # Adjust the fully connected layer input size based on the calculated output size
        self.fc1 = nn.Linear(output_size, 256)
        self.fc2 = nn.Linear(256, 1)
        self.dropout = nn.Dropout(0.5)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        # print(f"Shape after conv and pooling layers: {x.shape}")  # Debugging line

        # Flatten the tensor
        x = x.view(x.size(0), -1)  # Flatten the tensor while keeping the batch size
        # print(f"Shape after flattening: {x.shape}")  # Debugging line

        x = self.dropout(F.relu(self.fc1(x)))  # Apply dropout
        x = self.sigmoid(self.fc2(x))
        return x

# Example usage
model = GunshotDetectionCNN(num_frames=utils.NUM_FRAMES)

In [97]:
class GunshotDataset(th.utils.data.Dataset):
    def __init__(self, spectograms, sample_rates, targets):
        self.X = []
        self.y = []

        for X, sample_rate, y in zip(spectograms, sample_rates, targets):
            X_frames, y_frames = utils.make_frames(X, y)
            self.X += X_frames
            self.y += y_frames

        tmp = th.cat(self.X)
        self.mean = th.mean(tmp, dim=(0, 2)).unsqueeze(1)
        self.std = th.std(tmp, dim=(0, 2)).unsqueeze(1)
        del tmp

        self.X = [(x - self.mean)/self.std for x in self.X]

    def __len__(self):
        return len(self.X)

    def __getitem__(self, i):
        return self.X[i], self.y[i]

In [98]:
gunshot_data_paths = '/Users/borosabel/Documents/Uni/Thesis/PopMIR/Data/Audio/Gunshots/csvfree'
no_gunshot_data_paths = '/Users/borosabel/Documents/Uni/Thesis/PopMIR/Data/Audio/Gunshots/Combined'

def generate_file_dataframe(gunshot_data_paths, no_gunshot_data_paths):
    # Initialize a list to store the records
    records = []

    # Recursively loop through all files in the gunshot folder
    gunshot_count = 0
    for root, dirs, files in os.walk(gunshot_data_paths):
        for filename in files:
            if filename.endswith('.wav'):  # Only consider wav files
                file_path = os.path.join(root, filename)
                # For gunshot files
                gunshot_flag = 1
                label = 1
                timestampt = [0.5]  # Start time for gunshot

                # Append the record to the list
                records.append([file_path, timestampt, gunshot_flag, label])
                gunshot_count += 1

    # Recursively loop through all files in the no-gunshot folder
    no_gunshot_count = 0
    for root, dirs, files in os.walk(no_gunshot_data_paths):
        for filename in files:
            if no_gunshot_count >= gunshot_count:
                break  # Stop adding no-gunshot samples once the count matches gunshot samples
            if filename.endswith('.mp3') and 'without_gunshot' in filename:
                file_path = os.path.join(root, filename)
                # For no-gunshot files
                gunshot_flag = 0
                label = 0
                timestampt = []  # Empty list for no gunshot

                # Append the record to the list
                records.append([file_path, timestampt, gunshot_flag, label])
                no_gunshot_count += 1

    # Create a DataFrame from the records
    df = pd.DataFrame(records, columns=['filename', 'gunshot_location_in_seconds', 'num_gunshots', 'label'])
    return df

# Generate the DataFrame
df = generate_file_dataframe(gunshot_data_paths, no_gunshot_data_paths)

In [99]:
df

In [100]:
files = df[['filename', 'num_gunshots', 'gunshot_location_in_seconds']]
labels = df[['label']]

In [101]:
X_train_paths, X_test_paths, y_train_paths, y_test_paths = train_test_split(files, labels, test_size=0.3, random_state=42)

In [102]:
X_train_paths

In [124]:
spectrograms_train, sample_rates_train, labels_train = utils.preprocess_audio_train(X_train_paths, max_non_gunshot_samples=5)
spectrograms_test, sample_rates_test, labels_test = utils.preprocess_audio_train(X_test_paths, max_non_gunshot_samples=1)

In [125]:
# 3 dimensions of mel-spectograms with 80 mel bands and 15 frames.
spectrograms_train[0].shape

In [126]:
dataset = GunshotDataset(spectrograms_train, sample_rates_train, labels_train)
dataloader = DataLoader(dataset, batch_size=256, shuffle=True)

In [127]:
mean_std = {'mean': dataset.mean, 'std': dataset.std}
with open('mean_std.pkl', 'wb') as f:
    pickle.dump(mean_std, f)

# Save using torch
th.save({'mean': dataset.mean, 'std': dataset.std}, 'mean_std.pth')

In [128]:
device = 'cuda' if th.cuda.is_available() else 'cpu'
print(device)

In [129]:
import torch as th
import numpy as np
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

device = th.device("cuda" if th.cuda.is_available() else "cpu")

def train_model(model, optimizer, criterion, train_loader, valid_features, valid_labels, epochs=10, thresholds=None, mean=None, std=None, patience=3):
    if thresholds is None:
        thresholds = np.arange(0.1, 1.0, 0.05)  # Define a range of thresholds to test

    if mean is None or std is None:
        raise ValueError("Mean and std must be provided for normalization.")

    mean = mean.to(device)
    std = std.to(device)

    model = model.to(device)
    best_threshold = 0.0
    best_score = 0.0
    epochs_since_improvement = 0

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0

        # Training phase
        for features, labels in train_loader:
            features, labels = features.to(device), labels.to(device).float()

            optimizer.zero_grad()

            # Normalize features
            features = (features - mean) / std

            outputs = model(features)
            outputs = outputs.squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * features.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)

        print(f"Epoch [{epoch+1}/{epochs}], Loss: {running_loss/len(train_loader):.4f}")

        model.eval()
        best_epoch_threshold, current_score = evaluate_model(model, valid_features, valid_labels, thresholds, mean, std)

        if current_score > best_score:
            best_score = current_score
            best_threshold = best_epoch_threshold
            epochs_since_improvement = 0
            print(f"New best F1 score: {best_score:.4f}, model saved.")
        else:
            epochs_since_improvement += 1

        print(f"TEST best_f1: {best_score} with best threshold: {best_threshold}")

        # Check for early stopping
        if epochs_since_improvement >= patience:
            print(f"No improvement in F1 score for {patience} epochs. Stopping training.")
            break

    # Compute and display the confusion matrix on the validation set
    cm = compute_confusion_matrix(model, valid_features, valid_labels, best_threshold, mean, std)
    display_confusion_matrix(cm)

    return best_threshold, best_score

def evaluate_model(model, features, labels, thresholds, mean, std):
    best_threshold = 0.0
    best_f1_score = 0.0

    with th.no_grad():
        for threshold in thresholds:
            all_predictions = []
            all_labels = []

            for feature, label in zip(features, labels):
                feature = feature.to(device)
                label = th.tensor(label).float().to(device)  # Ensure label is a float tensor

                # Normalize feature
                feature = (feature - mean) / std

                # Get model predictions
                output = model(feature.unsqueeze(0)).squeeze().cpu().numpy()  # Add batch dimension

                # Apply threshold
                predictions = (output >= threshold).astype(float)

                all_predictions.append(predictions)
                all_labels.append(label.item())

            # Calculate F1 score
            avg_f1_score = f1_score(all_labels, all_predictions)

            if avg_f1_score > best_f1_score:
                best_f1_score = avg_f1_score
                best_threshold = threshold

    return best_threshold, best_f1_score

def compute_confusion_matrix(model, features, labels, threshold, mean, std):
    all_predictions = []
    all_labels = []

    model.eval()
    with th.no_grad():
        for feature, label in zip(features, labels):
            feature = feature.to(device)
            label = th.tensor(label).float().to(device)  # Ensure label is a float tensor

            # Normalize feature
            feature = (feature - mean) / std

            # Get model predictions
            output = model(feature.unsqueeze(0)).squeeze().cpu().numpy()  # Add batch dimension

            # Apply threshold
            predictions = (output >= threshold).astype(float)

            all_predictions.append(predictions)
            all_labels.append(label.item())

    cm = confusion_matrix(all_labels, all_predictions)
    return cm

def display_confusion_matrix(cm):
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0, 1])
    disp.plot(cmap='magma')
    plt.show()

def evaluate_model_accuracy(model, features, labels, thresholds, mean, std):
    best_threshold = 0.0
    best_accuracy = 0.0

    with th.no_grad():
        for threshold in thresholds:
            all_accuracies = []

            for feature, label in zip(features, labels):
                feature = feature.to(device)
                label = th.tensor(label).to(device)  # Ensure label is a tensor

                # Normalize feature
                feature = (feature - mean) / std

                # Get model predictions
                output = model(feature).squeeze().cpu().numpy()  # Add batch dimension

                # Apply threshold
                predictions = (output >= threshold).astype(float)

                # Calculate accuracy
                accuracy = accuracy_score([label.item()], [predictions.item()])

                all_accuracies.append(accuracy)

            avg_accuracy = np.mean(all_accuracies)
            if avg_accuracy > best_accuracy:
                best_accuracy = avg_accuracy
                best_threshold = threshold

    return best_threshold, best_accuracy

In [130]:
epochs = 2
lr = 3e-4

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001, weight_decay=1e-5)

best_threshold, best_f1 = train_model(model, optimizer, criterion, dataloader, spectrograms_test, labels_test, epochs=25, thresholds=None, mean=dataset.mean, std=dataset.std)

In [131]:
def manual_evaluate_test(model, feature, threshold, frame_size=utils.NUM_FRAMES, sampling_rate=utils.SAMPLING_RATE, mean=None, std=None):
    if mean is None or std is None:
        raise ValueError("Mean and std must be provided for normalization.")

    mean = mean.to(device)
    std = std.to(device)
    model = model.to(device)
    model.eval()

    predictions = []

    # Normalize feature
    feature = feature.to(device)
    feature = (feature - mean) / std

    num_frames = feature.shape[2]

    total_iterations = 0  # To count the iterations

    with th.no_grad():
        # Loop through non-overlapping frames
        for j in range(0, num_frames - frame_size + 1, frame_size):
            total_iterations += 1
            start = j
            end = j + frame_size

            input_frame = feature[:, :, start:end].unsqueeze(0).float()
            output = model(input_frame).squeeze().item()
            predictions.append(output)

        res = []
        for idx in range(len(predictions)):
            if predictions[idx] >= threshold:
                time_in_seconds = idx * frame_size * utils.HOP_LENGTH / sampling_rate
                minutes = int(time_in_seconds // 60)
                seconds = time_in_seconds % 60
                res.append((minutes, seconds))

    return res

In [132]:
spectrograms, sample_rates = utils.preprocess_audio(['/Users/borosabel/Documents/Uni/Thesis/PopMIR/M.I.A. - Paper Planes.mp3'])

In [133]:
# Load mean and std from file
with open('./mean_std.pkl', 'rb') as f:
    data = pickle.load(f)
    mean = data['mean']
    std = data['std']

In [134]:
for spectogram in spectrograms:
    predicted_times = manual_evaluate_test(model, spectogram, threshold=best_threshold, mean=mean, std=std)

In [135]:
print(best_threshold)

In [136]:
for time in predicted_times:
    print(f"Prediction at {time[0]} minutes and {time[1]} seconds")

In [137]:
spectrograms, sample_rates = utils.preprocess_audio(['/Users/borosabel/Documents/Uni/Thesis/PopMIR/50 Cent - Many Men (Wish Death) (Dirty Version).mp3'])

for spectogram in spectrograms:
    predicted_times = manual_evaluate_test(model, spectogram, threshold=best_threshold, mean=mean, std=std)

for time in predicted_times:
    print(f"Prediction at {time[0]} minutes and {time[1]} seconds")