# 1.Load Dependencies

In [None]:
#Install necessary libraries
!pip install audiomentations
!pip install cylimiter

In [5]:
import librosa
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, confusion_matrix, fbeta_score, make_scorer
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Activation, Dropout
from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten,MaxPool1D, Conv1D
from sklearn.preprocessing import StandardScaler
import seaborn as sns
import os
import scipy
from sklearn.feature_selection import mutual_info_classif
from config import Config
from audiomentations import Compose, Limiter

In [2]:
#Initialize config
conf = Config()

# 2. Signal-Labels

## 2.1 Load audio signals and labels

In [8]:
# Load audio files

signal_names = ['clips_lakers', 'lakers_okc', 'okc_denver', 'portland_gsw', 'rockets_knicks', 'sixers_lakers', 'memphis_okc', 'bucks_toronto', 'gsw_cavs', 'portland_lakers']
signals = {}
for sig in signal_names:
    sig_ = sig + '.wav'
    audio_path = os.path.join('/wavfiles', sig_)
    signal, sr = librosa.load(audio_path, sr=conf.sr)
    signals[sig] = signal

In [9]:
#Load CSV files with labels in a dictionary
df_dict = {}
for audio in signals:
  df_dict[audio] = pd.read_csv('/csv_files/'+audio+'.csv')
  df_dict[audio]['LABEL'] = df_dict[audio]['LABEL'].astype(str)

## 2.2 Keep only annotated audio

In [10]:
# Create a new dictionary to store the filtered audio data
filtered_signals = {}

# Iterate through each audio and its corresponding DataFrame
for audio_name, signal in signals.items():
    # Get the DataFrame for the current audio
    df = df_dict[audio_name]

    # Initialize an empty list to store the filtered audio samples
    filtered_audio_samples = []

    # Iterate through each row in the DataFrame
    for index, row in df.iterrows():
        # Extract the starting time and duration from the DataFrame
        starting_time = row['TIME']
        duration = row['DURATION']

        # Convert time to sample indices
        start_sample = int(librosa.time_to_samples(starting_time, sr=conf.sr))
        end_sample = start_sample + int(librosa.time_to_samples(duration, sr=conf.sr))

        # Extract the samples corresponding to the annotated time
        annotated_audio = signal[start_sample:end_sample]

        # Append the annotated audio to the list
        filtered_audio_samples.extend(annotated_audio)

    # Store the filtered audio samples in the new dictionary
    filtered_signals[audio_name] = np.array(filtered_audio_samples)


# 3. Model Preparation

## 3.1 Data Augmentation(Dynamic Range Compression)

In [128]:
def aug(sig_dict):

  augment = Compose([Limiter()])
  aug_signals = {}
  for audio in sig_dict:

    aug_signals[audio] = np.array(augment(samples=sig_dict[audio], sample_rate = conf.sr))

  return aug_signals

## 3.2 Feature Extraction

### 3.2.1 Features_extraction(except mel_spectrograms)

In [129]:
def feat_extr(sig, lp_order=12, n_mfccs=13):

    mfccs = {}
    centroids = {}
    zcrs = {}
    chroma = {}
    flatness = {}
    rolloff = {}
    rms = {}
    contrast = {}
    bandwidth = {}

    lp_all = {}
    lpc_dict = {}
    lpcc_dict = {}

    #Dictionary containing features EXCEPT LPC and LPCC
    features = {}

    feat_all = {}

    for audio in sig:

        mfcc = librosa.feature.mfcc(y=sig[audio], sr=conf.sr, n_fft=conf.nfft, hop_length=conf.hop, center=False,n_mfcc=n_mfccs)
        mfccs[audio] = mfcc

        cent = librosa.feature.spectral_centroid(y=sig[audio], sr=conf.sr, n_fft=conf.nfft, hop_length=conf.hop, center=False)
        centroids[audio] = cent

        zcr = librosa.feature.zero_crossing_rate(y=sig[audio],frame_length=conf.nfft, hop_length=conf.hop, center=False)
        zcrs[audio] = zcr

        chromagram = librosa.feature.chroma_stft(y=sig[audio], sr=conf.sr, n_fft=conf.nfft, hop_length=conf.hop, center=False)
        chroma[audio] = chromagram

        flt = librosa.feature.spectral_flatness(y=sig[audio], n_fft=conf.nfft, hop_length=conf.hop, center=False)
        flatness[audio] = flt

        roll = librosa.feature.spectral_rolloff(y=sig[audio], sr=conf.sr, n_fft=conf.nfft, hop_length=conf.hop, center=False)
        rolloff[audio] = roll

        root_mean_square = librosa.feature.rms(y=sig[audio], frame_length=conf.nfft, hop_length=conf.hop, center=False)
        rms[audio] = root_mean_square

        contr = librosa.feature.spectral_contrast(y=sig[audio], sr=conf.sr, n_fft=conf.nfft, hop_length=conf.hop, center=False)
        contrast[audio] = contr

        spec_band = librosa.feature.spectral_bandwidth(y=sig[audio], sr=conf.sr, n_fft = conf.nfft, hop_length=conf.hop, center=False)
        bandwidth[audio] = spec_band

        #Concatenate features for each audio vertically and invert each numPy array of features
        features[audio] = np.concatenate((mfccs[audio], zcrs[audio], centroids[audio], chroma[audio], flatness[audio], rolloff[audio], rms[audio], contrast[audio], bandwidth[audio]), axis=0).T


        #Linear Predictive Coding(LPC) and Linear Prediction Cepstral Coefficients features extraction

        #Step 1: Compute Linear Predictive Coding(LPC)
        lpc_arr = []
        lpcc_coeffs = []

        frames = librosa.util.frame(sig[audio], frame_length=conf.nfft, hop_length=conf.hop).T
        for frame in frames:
            lpc = librosa.lpc(frame, order=lp_order)
            lpc_arr.append(lpc)

        lpc_arr = np.array(lpc_arr)
        lpc_dict[audio] = lpc_arr

        #Step 2: Linear Prediction Cepstral Coefficients(LPCC)
        for lpc in lpc_arr:
            lpcc = np.zeros(lp_order)
            lpcc[0] = np.log(lp_order-1)

            #Recursive formula for LPCC computation
            for m in range(1, lp_order):
                lpcc[m] = lpc[m]
                for k in range(1, m):
                    lpcc[m] += (k / m) * lpc[m - k] * lpcc[k]
            lpcc_coeffs.append(lpcc)

        lpcc_coeffs = np.array(lpcc_coeffs)
        lpcc_dict[audio] = lpcc_coeffs

        #Concatenate LPC and LPCC horizontally
        lp_all[audio] = np.concatenate((lpc_dict[audio], lpcc_dict[audio]), axis=1)

        #Concatenate all features horizontally to get final feature vector
        feat_all[audio] = np.concatenate((features[audio], lp_all[audio]), axis=1)

    return feat_all

### 3.2.2 Mel_spectrograms feature extraction

In [130]:
#Mel spectrograms computation
def generate_mel_spec(sig, mels=128): 

  mels_dict = {}
  for audio in sig:
    mels_dict[audio] = librosa.feature.melspectrogram(y=sig[audio], sr=conf.sr, n_fft=conf.nfft, hop_length=conf.hop, center=False, n_mels=mels).T

  return mels_dict

## 3.3 Frame labelling

In [132]:
def new_frame_labelling(features_dict, df_dict):

  labels = {}
  for audio in features_dict:
    #Initialize start_time on each audio
    start_time = 0

    labels[audio] = np.zeros(features_dict[audio].shape[0])
    for i, row in df_dict[audio].iterrows():

      end_time = start_time + row['DURATION']
      label = row["LABEL"]

    # Convert start and end times from seconds to frame indices.
      start_frame = int(start_time * conf.sr/conf.hop)
      end_frame = int(end_time * conf.sr/conf.hop)

      #Assign labels to frames
      if label == 'cheering':
        labels[audio][start_frame:end_frame] = 0

      elif label == 'whistle':
        labels[audio][start_frame:end_frame] = 1

      elif label == 'air_horn':
        labels[audio][start_frame:end_frame] = 2

      elif label == 'speech':
        labels[audio][start_frame:end_frame] = 3

      elif label == 'boos':
        labels[audio][start_frame:end_frame] = 4

      elif label == 'other':
        labels[audio][start_frame:end_frame] = 5

      #end_time becomes start_time for next annotation
      start_time = end_time

  return labels

## 3.4 Windows

### 3.4.1 Feature Stacking

In [133]:
#ANN input shape : (#windows, #features * #frames_per_window)
#cnn input shape : (#windows, #frames_per_window, #features, depth)

def create_win(features_dict):

  windows = {}

  for audio in features_dict:

    num_samples, num_features = features_dict[audio].shape
    num_windows = (num_samples - conf.win_size) // conf.win_hop + 1

    #Initialize dimensions of windows to fit input shape of Convolutional Neural Network
    windows[audio] = np.zeros((num_windows, num_features, conf.win_size, 1))

    #Iterate to pass frame features to each window, depending on window size
    for i in range(num_windows):

        start_idx = i * conf.win_hop
        end_idx = start_idx + conf.win_size
        windows[audio][i, :, :, 0] = features_dict[audio][start_idx:end_idx, :].T

    #Fix window dimensions depending on type of Neural Network been utilized
    if conf.nn == 'ann':
        windows[audio] = windows[audio].reshape(windows[audio].shape[0], -1)

  return windows

### 3.4.2 Window Stats(mean, variance, median)

In [134]:
def win_stats(win):


    means, vars, meds = [], [], []

    #Check if conf.nn = 'ann' to reshape window in order to calculate stats
    if conf.nn == 'ann':
      win = win.reshape(win.shape[0], -1, conf.win_size)


    #Iterate through each window and calculate mean, var and median for each feature
    for i in range (win.shape[0]):
        mean_per_feat = np.mean( win[i, :, :,],axis=1)
        var_per_feat = np.var(win[i, :, :,], axis=1)
        med_per_feat = np.median(win[i, :, :,], axis=1)

        means.append(mean_per_feat)
        vars.append(var_per_feat)
        meds.append(med_per_feat)

    #Concatenate stats horizontally
    ovr = np.concatenate((means, vars, meds), axis=1)

    #Check type of Neural Network to reshape ovr accordingly
    if conf.nn == 'cnn':
        reshaped_win = ovr.reshape(ovr.shape[0], ovr.shape[1], 1)
    elif conf.nn == 'ann':
        reshaped_win = ovr.reshape(ovr.shape[0], -1)


    return reshaped_win

## 3.5 Windows labelling

### 3.5.1 Majority Voting

In [135]:
def majVot_win_lab(windows, frame_labels):

  labels = {}

  for audio in windows:

      num_windows = windows[audio].shape[0]
      labels[audio] = np.zeros(num_windows)


      #Iterate over each window

      for i in range(num_windows):
          win_start = i * conf.win_hop
          win_end = win_start + conf.win_size

          #Count occurrences of each class in the window

          class_counts = {'0': 0, '1': 0, '2': 0, '3': 0, '4': 0, '5': 0}

          for frame_idx in range(win_start, win_end):
              lab = str(int(frame_labels[audio][frame_idx]))
              class_counts[lab] += 1

          # Determine the majority class for the window
          majority_class = max(class_counts, key=class_counts.get)

          # Assign the majority class label to the entire window
          window_label = int(majority_class)
          labels[audio][i] = window_label

  return labels

### 3.5.2 Weighted Voting

In [136]:
def weigVot_win_lab(windows_dict, frame_labels_dict):

    #Initialize weights for each label
    labels = {}
    label_weights = np.array([1, 2, 1, 1, 1, 1])

    for audio in windows_dict:

      num_windows = windows_dict[audio].shape[0]
      labels[audio] = np.zeros(num_windows)

      #Iterate over each window

      for i in range(num_windows):
          #Iterate through frame labels of each window
          window_frame_labels = frame_labels_dict[audio][i*conf.win_hop : i* conf.win_hop + conf.win_size].astype(int)
          #Count number of occurences of each class based on label_weights
          label_counts = np.bincount(window_frame_labels, minlength=len(label_weights))
          weighted_counts = label_counts * label_weights
          #Assign index of max value of weighted_counts as label of corresponding window
          labels[audio][i] = int(np.argmax(weighted_counts))

    return labels

## 3.6 Feature Selection

### 3.6.1 Pearson Correlation

In [137]:
def pearsonr(X):

    if X.ndim > 2:
      X = X.reshape(X.shape[0], -1)

    df_feat = pd.DataFrame(X)
    corr = df_feat.corr()

    #Convert DataFrame to numPy array
    corr_mat = corr.values

    # Initialize a list to store correlated feature pairs
    correlated_pairs = []

    # Define threshold for high correlation
    correlation_threshold = 0.8

    # Find highly correlated features
    for i in range(corr.shape[0]):
        for j in range(i + 1, corr.shape[0]):
            if abs(corr_mat[i, j]) > correlation_threshold:
                correlated_pairs.append((i, j))

    correlated_pairs = np.array(correlated_pairs)

    return correlated_pairs

### 3.6.2 Mutual Information

In [138]:
def mutInfo(X, y):

    if X.ndim > 2:
        X = X.reshape(X.shape[0], -1)

    importances = mutual_info_classif(X, y)
    feat_importances = pd.Series(importances)

    return feat_importances

### 3.6.3 Feature selection based on pearson corr. and mutual info

In [139]:
#Feature selection based on Pearson Correlation and Inforamtion Gain
def feat_selection(features_dict, frame_labels_dict):

    cols_to_drop = []
    features_dict_feat_sel = {}

    #Convert dictionaries to numPy arrays of consecutive frames to do feature selection
    X = np.concatenate(list(features_dict.values()))
    y = np.concatenate(list(frame_labels_dict.values()))

    corr_pairs = pearsonr(X)
    feat_importances = mutInfo(X, y)

    for pairs in corr_pairs:
        cols_to_drop.append(pairs[0] if feat_importances[pairs[0]] < feat_importances[pairs[1]] else pairs[1])

    #Make cols_to_drop a set to remove duplicates and then make it a list again
    cols_to_drop = list(set(cols_to_drop))

    #Put list values in ascending order
    cols_to_drop.sort()

    #Convert to numPy array
    cols_to_drop = np.array(cols_to_drop)

    #Drop selected features
    for audio in features_dict:
      features_dict_feat_sel[audio] = np.delete(features_dict[audio], cols_to_drop, axis=1)

    return features_dict_feat_sel, cols_to_drop

# 4. Artificial NN

In [140]:
#Define ANN model

def ArtNet(X_train):

    model = Sequential(name = 'ANN')
    model.add(Dense(100, input_shape = (X_train.shape[1],), activation = 'relu', name='Hidden_Layer_1'))
    model.add(Dropout(0.2, name='Dropout_Layer_1'))
    model.add(Dense(6, activation = 'softmax', name='Output_Layer'))

    model.summary()

    # Compile the model
    model.compile(optimizer = 'adam',
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False),
            metrics=['accuracy'])
    return model

# 5. CNN

In [141]:
def convNet2D(X_train, kernel_size=4, pool_size=2, strides=(1, 1), dropout=0.2, dense_1=128, dense_2=64):

    model = Sequential(name = 'CNN2D')
    #First set of convolutional, MaxPolling and (Dropout) Layers
    model.add(Conv2D(16, kernel_size, activation='relu',
                     strides=strides, padding ='same', input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3]),
                     name='Conv_layer_1'))
    model.add(MaxPool2D(pool_size=pool_size,padding='same', name='MaxPool_layer_1'))
    model.add(Dropout(dropout, name='Dropout_layer_1'))
    #Second set of convolutional, MaxPolling and (Dropout) Layers
    model.add(Conv2D(32, kernel_size, activation='relu',
                     strides=strides, padding ='same', name='Conv_layer_2'))
    model.add(MaxPool2D(pool_size=pool_size,padding='same', name='MaxPool_layer_2'))
    model.add(Dropout(dropout, name='Dropout_layer_2'))
     #Final set of convolutional, MaxPolling and (Dropout) Layers
    model.add(Conv2D(64, kernel_size, activation='relu',
                     strides=strides, padding='same', name='Conv_layer_3'))
    model.add(MaxPool2D(pool_size=pool_size,padding='same', name='MaxPool_layer_3'))

    model.add(Dropout(dropout, name='Dropout_layer_3'))
    #Flattening to convert 2D array to 1D
    model.add(Flatten(name='Flatten_layer'))
    #Fully Connected Dense Layers
    model.add(Dense(dense_1, activation='relu', name='Hidden_layer_1'))
    model.add(Dense(dense_2, activation='relu', name='Hidden_layer_2'))
    model.add(Dense(6, activation='softmax', name='Output_layer'))

    model.summary()

    #Compile the model
    model.compile(optimizer='adam',
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            metrics=['accuracy'])   

    return model

# 6. Load data and model

In [154]:
#Function to get final data for training and testing
def get_dt(sig, augment: bool=True, mel_specs_comp: bool=False, feat_sel: bool=False,  stats: bool=False):

  # Load features dictionary
  if(mel_specs_comp):
    features_dict = generate_mel_spec(sig)

  else:
    features_dict = feat_extr(sig, lp_order=12)

  # Scale features of each game to [-1, 1]
  for audio in features_dict:
    scaler = StandardScaler()
    features_dict[audio] = scaler.fit_transform(features_dict[audio])

  #Dictionary with labels of each frame for every game
  frame_labels_dict = new_frame_labelling(features_dict, df_dict)

  # Feature selection
  if(feat_sel):
    features_dict, cols_to_drop = feat_selection(features_dict, frame_labels_dict)

  #Create window dictionary containing windows for each game
  windows_dict = create_win(features_dict)

  #Create labels for each window of each game
  windows_labels_dict = majVot_win_lab(windows_dict, frame_labels_dict)

  #Choose signals for training and testing
  signals_train = ['clips_lakers', 'okc_denver', 'portland_gsw', 'rockets_knicks', 'memphis_okc', 'bucks_toronto', 'portland_gsw']
  signals_test = ['sixers_lakers', 'lakers_okc', 'gsw_cavs']


  win_train_list = []
  win_test_list = []
  labels_train_list = []
  labels_test_list = []

  for audio in sig:
    if audio in signals_train:
      win_train_list.append(windows_dict[audio])
      labels_train_list.append(windows_labels_dict[audio])
    else:
      win_test_list.append(windows_dict[audio])
      labels_test_list.append(windows_labels_dict[audio])

  #Concatenate signals and labels for training
  win_train = np.concatenate(win_train_list, axis=0)
  # labels_train = np.array(labels_train_list)
  labels_train = np.concatenate(labels_train_list, axis=0)

  win_test = np.concatenate(win_test_list, axis=0)
  # labels_test = np.array(labels_test_list)
  labels_test = np.concatenate(labels_test_list, axis=0)

  # Augmentation check
  if(augment):
    aug_signals = aug(filtered_signals)
    if(mel_specs_comp):
      aug_feat_dict = generate_mel_spec(aug_signals)
    else:
      aug_feat_dict = feat_extr(aug_signals, lp_order=12)

    for audio in aug_feat_dict:
      scaler = StandardScaler()
      aug_feat_dict[audio] = scaler.fit_transform(aug_feat_dict[audio])

    aug_frame_labels_dict = new_frame_labelling(aug_feat_dict, df_dict)

    if(feat_sel):
        for audio in aug_feat_dict:
          aug_feat_dict[audio] = np.delete(aug_feat_dict[audio], cols_to_drop, axis=1)


    aug_windows_dict = create_win(aug_feat_dict)

    #Create labels for each window of each game
    aug_windows_labels_dict = majVot_win_lab(aug_windows_dict, aug_frame_labels_dict)

    for audio in aug_feat_dict:
       if audio in signals_train:
        win_train_list.append(aug_windows_dict[audio])
        labels_train_list.append(aug_windows_labels_dict[audio])

    #Concatenate signals and labels for training
    win_train = np.concatenate(win_train_list, axis=0)
    # labels_train = np.array(labels_train_list)
    labels_train = np.concatenate(labels_train_list, axis=0)


  #if stats==True, use stats of windows on training and testing
  if(stats):
    win_train = win_stats(win_train)
    win_test = win_stats(win_test)

  return win_train,  labels_train, win_test, labels_test

In [155]:
#Function to load final model
def load_model(X_train):

  if conf.nn == 'cnn':
    if X_train.ndim == 3:
      model = convNet1D(X_train)
    elif X_train.ndim == 4:
      model = convNet2D(X_train)

  elif conf.nn == 'ann' and X_train.ndim == 2:
    model = ArtNet(X_train)

  else:
    raise Exception("Invalid data or Neural Network architecture")

  return model

# 7. Main

In [161]:
def main():

  X_train, y_train, X_test, y_test = get_dt(filtered_signals)

  model = load_model(X_train)

  print('\n\n\n')
  print('----------------------------------TRAINING PROCESS----------------------------------')

  history = model.fit(X_train, y_train, epochs=10, batch_size=256, shuffle=True)

  #Plot training loss
  plt.figure(figsize=(12, 4))
  plt.subplot(1, 2, 1)
  plt.plot(history.history['loss'], label='Training Loss')
  plt.title('Training Loss')
  plt.xlabel('Epochs')
  plt.ylabel('Loss')
  plt.legend()

  # Plot training accuracy
  plt.subplot(1, 2, 2)
  plt.plot(history.history['accuracy'], label='Training Accuracy')
  plt.title('Training Accuracy')
  plt.xlabel('Epochs')
  plt.ylabel('Accuracy')
  plt.legend()

  plt.tight_layout()
  plt.show()


  #Print confusion matrix to see values distribution on each class
  print('\n\n\n')
  print('----------------------------------TESTING PROCESS----------------------------------')
  y_pred_proba = model.predict(X_test)
  y_pred = np.argmax(y_pred_proba, axis=1) #convert probability for each class into class prediction
  acc = accuracy_score(y_test, y_pred)
  precision = precision_score(y_test, y_pred, average='macro')
  recall = recall_score(y_test, y_pred, average=None)
  f1_macro = fbeta_score(y_test, y_pred, beta=1, average='macro')
  print("Precision score: " +str(precision)+ "\n Recall score: "+str(recall)+ "\n Accuracy score:"+str(acc)+"\n F1_macro score: "+str(f1_macro))
  confusion_mat = confusion_matrix(y_test, y_pred)

  # class_labels = ['other', 'cheering', 'whistle', 'air_horn', 'boos']
  class_labels = ['cheering', 'whistle', 'air_horn', 'speech', 'booing', 'other']

  plt.figure(figsize=(8,6))
  sns.heatmap(confusion_mat, annot=True, fmt='d', xticklabels=class_labels, yticklabels=class_labels, cmap='YlGnBu')
  plt.title('Confusion Matrix')
  plt.xlabel('Predicted Labels')
  plt.ylabel('True Labels')
  plt.show()

  return model, history

In [None]:
if __name__ == '__main__':
  model, history = main()