# Segment feature extraction

In [3]:
import librosa
import librosa.display
import IPython.display as ipd
import matplotlib.pyplot as plt
import numpy as np
import csv
import pandas as pd
import os

from tqdm import tqdm
from datasets import load_dataset

In [4]:
# Convert stereo to mono (two or more channels to sigle channel)
def pre_process(audio_file):
    # conert to single hz
    target_sr = 22050 
    # Load the data with resampling
    signal, sr = librosa.load(audio_file, sr=target_sr, mono=False, res_type='kaiser_best')
    if signal.ndim > 1:  # Check if the signal is stereo
        print(True)
        # Convert stereo to mono by averaging the channels
        signal = np.mean(signal, axis=0)
    return signal, target_sr

In [5]:
def extract_MFCC(signal, sr):
    signal.shape # The output is 1 dimensional array with the following value
    # Sample rate
    # print("Signal: ", signal)
    # print("Sample Rate: ", sr)

    # Extract MFCC
    mfccs = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=13)
    # print(mfccs.shape) # First value is the number of rows, second value is the number of columns or frames
    # print(mfccs)

    # First Derivative -  Capture the temporal dynamics of the speech signal, providing information about how the MFCCs are changing.
    delta_mfccs = librosa.feature.delta(mfccs)
    # Second Derivative - Capture the dynamics of the delta features, providing additional information about the speech signal’s temporal characteristics.
    delta2_mfccs = librosa.feature.delta(mfccs, order=2)

    comprehensive_mfcc = np.concatenate((mfccs, delta_mfccs, delta2_mfccs))
    # print(comprehensive_mfcc)
    return comprehensive_mfcc

In [6]:
def extract_features(audio, sr, target_duration = 2, target_sr = 22050):
    # exctract RMS, ZCR, SC, SB
    FRAME_LENGTH = 1024
    HOP_LENGTH = 512

    # Extract RMS from the cropped audio
    rms_cropped = librosa.feature.rms(y=audio, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)[0]
    # Extract ZCR from the cropped audio
    zcr_cropped = librosa.feature.zero_crossing_rate(y=audio, frame_length=FRAME_LENGTH, hop_length=HOP_LENGTH)[0]
    # Extract spectral centroid from the cropped audio
    sc_cropped = librosa.feature.spectral_centroid(y=audio, sr=sr, n_fft=FRAME_LENGTH, hop_length=HOP_LENGTH)[0]
    # Extract spectral bandwidth from the cropped audio
    sb_cropped = librosa.feature.spectral_bandwidth(y=audio, sr=sr, n_fft=FRAME_LENGTH, hop_length=HOP_LENGTH)[0]
    
    return rms_cropped, zcr_cropped, sc_cropped, sb_cropped

In [11]:
def process_audio_segments(folder_path, output_file_name, segment_duration=2):
    mfcc_features = []
    rms_values = []
    zcr_values = []
    sc_values = []
    sb_values = []
    file_names = []  # List to store file names

    for root, dirs, files in tqdm(os.walk(folder_path)):
        for file in files:
            if file.endswith('.mp3') or file.endswith('.wav') or file.endswith('.flac'):
                file_path = os.path.join(root, file)
                print(f"Processing {file}")
                
                signal, sr = pre_process(file_path)
                segment_length = int(segment_duration * sr)
                total_length = len(signal)

                # Loop over the audio signal in segments of `segment_length`
                for start in range(0, total_length, segment_length):
                    end = start + segment_length
                    if end > total_length:
                        break  # Stop if there's less than a full segment left
                    
                    # Get the segment
                    segment = signal[start:end]
            
                    # Extract MFCC features for the segment
                    mfcc = extract_MFCC(segment, sr)
                    mfcc_mean = np.mean(mfcc.T, axis=0)
                    mfcc_features.append(mfcc_mean)
                    
                    # Extract RMS, ZCR, SC, SB features for the segment
                    rms, zcr, sc, sb = extract_features(segment, sr)
                    rms_values.append(rms)
                    zcr_values.append(zcr)
                    sc_values.append(sc)
                    sb_values.append(sb)
    
                    file_names.append(file)

    print("Number of processed files: ", len(rms_values))

    # Create dataframes for MFCC and other features
    mfcc_df = pd.DataFrame(mfcc_features, columns=[f'mfcc_feature{i+1}' for i in range(mfcc_features[0].shape[0])])
    rms_df = pd.DataFrame(rms_values, columns=[f'RMS{i+1}' for i in range(len(rms_values[0]))])
    zcr_df = pd.DataFrame(zcr_values, columns=[f'ZCR{i+1}' for i in range(len(zcr_values[0]))])
    sc_df = pd.DataFrame(sc_values, columns=[f'SpectralCentroid{i+1}' for i in range(len(sc_values[0]))])
    sb_df = pd.DataFrame(sb_values, columns=[f'SpectralBandwidth{i+1}' for i in range(len(sb_values[0]))])

    # Combine all features into a single DataFrame
    combined_df = pd.concat([pd.DataFrame(file_names, columns=['file_name']), mfcc_df, rms_df, zcr_df, sc_df, sb_df], axis=1) # DF to feed the system
    # print(combined_df)
    create_csv(combined_df, output_file_name)

In [12]:
# Create a annotated dataset
def create_csv(combined_df, output_file_name):
    # Save to CSV
    combined_df.to_csv(f"result/{output_file_name}", index=False)
    print(f"Successfully saved combined features to {output_file_name}")

In [10]:
process_audio_segments("AudioData/test/voice_recognition_test/", "voice_recognition/vr_full_segment_same.csv")

0it [00:00, ?it/s]

Processing Testspeaker_notonDS3.mp3


1it [00:04,  4.26s/it]

True


1it [00:04,  4.26s/it]

Number of processed files:  3
Successfully saved combined features to voice_recognition/vr_full_segment_same.csv





# Sample only

In [16]:
def sample_audio(file_path):
    segment_duration = 2
    signal, sr = pre_process(file_path)
    segment_length = int(segment_duration * sr)
    total_length = len(signal)

    # Loop over the audio signal in segments of `segment_length`
    for start in range(0, total_length, segment_length):
        end = start + segment_length
        if end > total_length:
            segment = signal[start:]  # Take the remaining part if it's less than a full segment
        else:
            segment = signal[start:end]
        
        print(f"Playing segment {start // segment_length + 1}")
        display(ipd.Audio(segment, rate=sr))  # Play the current segment

# Call the function to hear each segment of the audio
sample_audio("AudioData/speaker_audio/speaker_audio/Recording1.mp3")

True
Playing segment 1


Playing segment 2


Playing segment 3


Playing segment 4


Playing segment 5
