# **Feature Extraction Methods: Imbalanced Data Without Annotations**

- *Key Features*: [MFCCs, Mel-Spectrograms, Chroma Frequencies, RMS Power]
- *Key Manipulations*: [Varying Window Sizes, Normalization, Average Pooling (Compression), Filtering]
- *Process Assistence*: [Converting them to numpy arrays now, easy label access across features]
- *Conversion*: [To numpy arrays and pkl files]


In [22]:
# Standard libraries
import numpy as np
import pandas as pd
import os
import time

# Libraries for audio
from IPython.display import Audio
import librosa
import librosa.display

# Training and Testing Split
from sklearn.model_selection import train_test_split

# for normalization & avgpooling features
import tensorflow as tf
# for preprocessing
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import OneHotEncoder

# Operational
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import scipy.ndimage
import pygame
import time
from scipy.signal import butter, filtfilt
import random
import IPython.display as ipd
from functools import partial

In [48]:
# Variabels to be reused
path = 'C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/audio_files' 
npy_path = 'C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/train_audio_npy/' 
train_csv = 'C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/train-not-annotated.csv' 
annotated_train_csv = 'C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/train-annotated.csv'
not_annotated_splt = 'C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/trainval-split/trainval.csv'
sr = 22050

In [93]:
trainval_data = pd.read_csv(not_annotated_splt)
train_data = trainval_data[trainval_data['set'] == 'tr']
val_data = trainval_data[trainval_data['set'] == 'val']

# **Creating a class to do the extraction**

In [107]:
class Extraction:
  def __init__(self, train_df, val_df, window_size, overlap=0.5, npy_path=npy_path, sr=sr, n_mels=60, n_mfcc=20, n_chroma=12, features=['mfcc'], normalize=True, avgpool=False):
    """
    Instantiate the Extraction class to extract features.

    Parameters:
      sr (int): Sample rate of the audio files.
      n_mfccs (int): Number of MFCCs to extract.
      n_mels (int): Number of Mel bands to extract.
      n_chroma (int): Number of chroma bins to use.
      features (list): List of features to extract.
        accepted features: 'mfcc', 'chroma', 'rms', 'melspectrogram'.
      normalize (bool): Whether to normalize the features.
      maxpool (bool): Whether to maxpool the features.
    """
    
    self.train_df = train_df
    self.val_df = val_df
    self.npy_path = npy_path
    self.window_size = window_size
    self.overlap = overlap
    self.sr = sr
    self.n_mels = n_mels
    self.n_mfcc = n_mfcc
    self.n_chroma = n_chroma

     # confirm features have been specified
    assert len(features) != 0, "Must Specify At Least One Feature In The Form Of A List."
    self.features = features

    self.accepted_feature = ['mfcc', 'chroma', 'rms', 'melspectrogram']
    for feature in self.features:
      assert feature in self.accepted_feature, f"{feature} is not an accepted feature, only 'mfcc', 'chroma', 'rms', 'melspectrogram' are accepted features."

    self.normalize = normalize
    self.avgpool = avgpool

    # extract train and val labels and features
    self.train_y, self.train_features = self.feature_extraction(train_df)
    self.val_y, self.val_features = self.feature_extraction(val_df)

    # process the features by average pooling
    self.train_features, self.val_features = self.process_features(self.train_features, self.val_features)

    
  

  def normalize_audio(self, audio):
    return (audio - np.min(audio)) / (np.max(audio) - np.min(audio))
  
  def bandpass_filter(self, audio, lowcut=800, highcut=8000, order=4):
    nyquist = 0.5 * self.sr  # Nyquist frequency
    low = lowcut / nyquist
    high = highcut / nyquist

    b, a = butter(order, [low, high], btype='band')
    filtered_audio = filtfilt(b, a, audio)
    return filtered_audio
  
  def generate_pink_noise(self, num_samples):
    white_noise = np.random.randn(num_samples)
    
    # Apply a filter to convert white noise into pink noise (1/f noise)
    X = np.fft.rfft(white_noise)
    S = np.arange(1, len(X) + 1)  # Frequency scaling
    pink_noise = np.fft.irfft(X / S)
    
    return self.normalize_audio(pink_noise)
  
  def pad_with_noise(self, audio_data, window_length):
    current_length = librosa.get_duration(y=audio_data, sr=self.sr)
    if current_length >= window_length:
        return audio_data
    
    target_length_samples = int(window_length * sr) + 1
    current_length_samples = len(audio_data)
    padding_length_samples = target_length_samples - current_length_samples
    
    # Generate pink noise to pad with
    pink_noise = self.generate_pink_noise(padding_length_samples)
    padded_audio = np.concatenate([audio_data, pink_noise])
    
    return padded_audio
  
  def avg_pooling_keras(self, feature):
    # Clear the previous Keras session
    tf.keras.backend.clear_session()

    # Define the input shape based on features
    input_shape = feature.shape[1:]  # (n_mels, time_steps)

    # Create the Keras model for average pooling
    inputs = tf.keras.layers.Input(shape=input_shape)
    pooled = tf.keras.layers.GlobalAveragePooling1D()(inputs)
    pooling_model = tf.keras.models.Model(inputs=inputs, outputs=pooled)

    # Perform pooling using the model
    pooled_features = pooling_model.predict(feature)

    return pooled_features

#-------------------------Feature Extraction---------------------------------------
  def extract_mfcc(self, window):
    mfcc = librosa.feature.mfcc(y=window, sr=self.sr, n_mfcc=self.n_mfcc)
    if self.normalize:
      return librosa.util.normalize(mfcc)
    else:
      return mfcc


  def extract_chroma(self, window):
    chroma = librosa.feature.chroma_stft(y=window, sr=self.sr, n_chroma=self.n_chroma)
    if self.normalize:
      return librosa.util.normalize(chroma)
    else:
      return chroma
   

  def extract_rms(self, window):
    return librosa.feature.rms(y=window)

  def extract_melspectrogram(self, window):
    mel = librosa.feature.melspectrogram(y=window, sr=self.sr, n_mels=self.n_mels)
    if self.normalize:
      return librosa.util.normalize(mel)
    else:
      return mel
    
  def avgpooling(self, train_X, val_X, n_time, n_features):
    """
    Average pooling the train and val features.

    Parameters:
      train_X (npy): Training feature array of shape (batch_size, n_features, n_time)
      val_X (npy): Validation feature array of shape (batch_size, n_features, n_time)
      n_time (int): Time axis
      n_features (int): Feature axis

    Returns:
      train_X (npy): Avgpooled training feature array of shape (batch_size, n_features)
      val_X (npy): Avgpooled validation feature array of shape (batch_size, n_features)
    """
    # Clear the Keras session
    tf.keras.backend.clear_session()
    
    # Create the Keras input layer with shape (n_features, n_time)
    input_layer = tf.keras.layers.Input(shape=(n_features, n_time))
    
    # Apply average pooling over the time axis (axis=-1) to reduce n_time
    avg_pool = tf.keras.layers.Lambda(lambda x: tf.reduce_mean(x, axis=-1))(input_layer)
    
    # Build the model
    pooling_model = tf.keras.models.Model(inputs=input_layer, outputs=avg_pool)

    # Use the model to apply average pooling on the training and validation features
    train_X = pooling_model.predict(train_X)
    val_X = pooling_model.predict(val_X)

    return train_X, val_X

    
  def process_features(self, train_features_dict, val_features_dict):
    for each in train_features_dict.keys():
      
      if each == 'mfcc':
        n_features=self.n_mfcc
      elif each == 'chroma':
        n_features=self.n_chroma
      elif each == 'rms':
        n_features=1
      elif each == 'melspectrogram':
        n_features=self.n_mels
      
      train_feature = train_features_dict[each]
      val_feature = val_features_dict[each]

      if self.avgpool:
        train_features_dict[each], val_features_dict[each] = self.avgpooling(train_feature, val_feature, n_time=train_feature.shape[2], n_features=n_features)
      else:
        train_features_dict[each], val_features_dict[each] = train_features_dict[each], val_features_dict[each]
    
    return train_features_dict, val_features_dict
      

  def feature_extraction(self, dataframe, window_size=6, filter=True):
    y = [] # To hold the labels
    features_dict = {item: [] for item in self.features} # Create a key for each feature listed

    for _, row in dataframe.iterrows():
          label = row['species']
          file_path = self.npy_path + row['filename_npy']

          
          audio = np.load(file_path)
          audio = self.normalize_audio(audio)
          audio = self.pad_with_noise(audio, window_length=window_size)
          if filter:
                  audio = self.bandpass_filter(audio)

          window_samples = int(window_size * self.sr)
          hop_samples = int(window_samples * (1 - self.overlap))  # For overlapping

          # Break the audio into windows with the specified overlap
          audio_windows = librosa.util.frame(audio, frame_length=window_samples, hop_length=hop_samples).T

          for _, window in enumerate(audio_windows):
              
              y.append(label)

              if len(window) < window_samples:
                  window = self.pad_with_noise(window, window_length=window_size)
              
              # Feature Extraction FR --------------------------------------------------------------------
              # dynatically call the extract_x function to extract the listed features
              for feature in self.features:
                extract = f"extract_{feature}"
                if hasattr(self, extract) and callable(func := getattr(self, extract)):
                  features_dict[feature].append(func(window))

          # cast lists to np arrays
          for each in features_dict.keys():
            features_dict[each] = np.array(features_dict[each])

          y = np.array(y)

          # If not using average pooling, return resized features
          return y, features_dict

# **No Average Pooling**

## **Window Size = 6s**

### **['melspectrogram']**

In [118]:
features_list = ['melspectrogram']

In [None]:
features = Extraction(train_data,
                      val_data,
                      window_size=6,
                      features=features_list,
                      avgpool=False
                      )

In [None]:
train_y = features.train_y
display(train_y.shape)

train_features = features.train_features
for key in train_features.keys():
  display(key)
  display(train_features[key].shape)

In [None]:
val_y = features.val_y
display(train_y.shape)

val_features = features.val_features
for key in val_features.keys():
  display(key)
  display(val_features[key].shape)

### Encode Classes

In [None]:
label_encoder = LabelEncoder().fit(train_y)
train_y_encoded = label_encoder.transform(train_y)
val_y_encoded = label_encoder.transform(val_y)

classes = list(label_encoder.inverse_transform([0, 1, 2]))
print("Encoded classes for [0, 1, 2]:", classes)
print("Encoded training labels:", train_y_encoded)
print("Encoded validation labels:", val_y_encoded)

In [None]:
display(len(train_y))
display(train_y[:10])

display(len(val_y))
display(val_y[:10])

In [None]:
train_features['label'] = train_y
val_features['label'] = val_y

In [None]:
merged_dict = {'train': train_features, 'val': val_features}
merged_dict

### Save the merged dictionary to a pkl

In [None]:
with open('C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/feature-extraction/NotAnnotated/NotAveragePooled/split_features_6s_mel.pkl', 'wb') as file:
  pickle.dump(merged_dict, file)

### **['melspectrogram', 'mfcc']**

In [None]:
features_list = ['melspectrogram', 'mfcc']

In [None]:
features = Extraction(train_data,
                      val_data,
                      window_size=6,
                      features=features_list,
                      avgpool=False
                      )

In [None]:
train_y = features.train_y
display(train_y.shape)

train_features = features.train_features
for key in train_features.keys():
  display(key)
  display(train_features[key].shape)

In [None]:
val_y = features.val_y
display(train_y.shape)

val_features = features.val_features
for key in val_features.keys():
  display(key)
  display(val_features[key].shape)

### Encode Classes

In [None]:
label_encoder = LabelEncoder().fit(train_y)
train_y_encoded = label_encoder.transform(train_y)
val_y_encoded = label_encoder.transform(val_y)

classes = list(label_encoder.inverse_transform([0, 1, 2]))
print("Encoded classes for [0, 1, 2]:", classes)
print("Encoded training labels:", train_y_encoded)
print("Encoded validation labels:", val_y_encoded)

In [None]:
display(len(train_y))
display(train_y[:10])

display(len(val_y))
display(val_y[:10])

In [None]:
train_features['label'] = train_y
val_features['label'] = val_y

In [None]:
merged_dict = {'train': train_features, 'val': val_features}
merged_dict

### Save the merged dictionary to a pkl

In [None]:
with open('C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/feature-extraction/NotAnnotated/NotAveragePooled/split_features_6s_mel_mfcc.pkl', 'wb') as file:
  pickle.dump(merged_dict, file)

### **['melspectrogram', 'mfcc', 'chroma']**

In [None]:
features_list = ['melspectrogram', 'mfcc', 'chroma']

In [None]:
features = Extraction(train_data,
                      val_data,
                      window_size=6,
                      features=features_list,
                      avgpool=False
                      )

In [None]:
train_y = features.train_y
display(train_y.shape)

train_features = features.train_features
for key in train_features.keys():
  display(key)
  display(train_features[key].shape)

In [None]:
val_y = features.val_y
display(train_y.shape)

val_features = features.val_features
for key in val_features.keys():
  display(key)
  display(val_features[key].shape)

### Encode Classes

In [None]:
label_encoder = LabelEncoder().fit(train_y)
train_y_encoded = label_encoder.transform(train_y)
val_y_encoded = label_encoder.transform(val_y)

classes = list(label_encoder.inverse_transform([0, 1, 2]))
print("Encoded classes for [0, 1, 2]:", classes)
print("Encoded training labels:", train_y_encoded)
print("Encoded validation labels:", val_y_encoded)

In [None]:
display(len(train_y))
display(train_y[:10])

display(len(val_y))
display(val_y[:10])

In [None]:
train_features['label'] = train_y
val_features['label'] = val_y

In [None]:
merged_dict = {'train': train_features, 'val': val_features}
merged_dict

### Save the merged dictionary to a pkl

In [None]:
with open('C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/feature-extraction/NotAnnotated/NotAveragePooled/split_features_6s_mel_mfcc_chroma.pkl', 'wb') as file:
  pickle.dump(merged_dict, file)

### **['melspectrogram', 'mfcc', 'chroma', 'rms']**

In [None]:
features_list = ['melspectrogram', 'mfcc', 'chroma', 'rms']

In [None]:
features = Extraction(train_data,
                      val_data,
                      window_size=6,
                      features=features_list,
                      avgpool=False
                      )

In [None]:
train_y = features.train_y
display(train_y.shape)

train_features = features.train_features
for key in train_features.keys():
  display(key)
  display(train_features[key].shape)

In [None]:
val_y = features.val_y
display(train_y.shape)

val_features = features.val_features
for key in val_features.keys():
  display(key)
  display(val_features[key].shape)

### Encode Classes

In [None]:
label_encoder = LabelEncoder().fit(train_y)
train_y_encoded = label_encoder.transform(train_y)
val_y_encoded = label_encoder.transform(val_y)

classes = list(label_encoder.inverse_transform([0, 1, 2]))
print("Encoded classes for [0, 1, 2]:", classes)
print("Encoded training labels:", train_y_encoded)
print("Encoded validation labels:", val_y_encoded)

In [None]:
display(len(train_y))
display(train_y[:10])

display(len(val_y))
display(val_y[:10])

In [None]:
train_features['label'] = train_y
val_features['label'] = val_y

In [None]:
merged_dict = {'train': train_features, 'val': val_features}
merged_dict

### Save the merged dictionary to a pkl

In [None]:
with open('C:/Users/thato/Documents/Final-Year-Project/Dataset/Project-V4/feature-extraction/NotAnnotated/NotAveragePooled/split_features_6s_all.pkl', 'wb') as file:
  pickle.dump(merged_dict, file)