<a href="https://colab.research.google.com/github/Mihirirj/Audio_PRE-FE/blob/main/Audio.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import glob
import librosa
import librosa.display
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm

print("Libraries imported.")

Libraries imported.


In [3]:
CREMA_D_FOLDER = '/content/drive/My Drive/AudioWAV'
OUTPUT_CSV_PATH = '/content/drive/My Drive/crema_d_features.csv'

In [4]:
# Audio parameters
TARGET_SR = 16000
SEGMENT_DURATION_S = 3.0
SEGMENT_SAMPLES = int(TARGET_SR * SEGMENT_DURATION_S)
PREEMPHASIS_COEFF = 0.97

# Feature parameters
N_MFCC = 13 # Number of MFCC coefficients
HOP_LENGTH = 512 # Samples between successive frames for MFCC/pitch
WIN_LENGTH = 1024 # Window size for analysis
PITCH_FMIN = librosa.note_to_hz('C2') # 65 Hz
PITCH_FMAX = librosa.note_to_hz('C7') # 2093 Hz
SILENCE_THRESHOLD_DB = 40 # Quieter than this relative to max is silence

# Emotion mapping from CREMA-D filenames
EMOTION_MAP = {
    'HAP': 'happy',
    'SAD': 'sad',
    'NEU': 'neutral',
    'FEA': 'fear',
    'ANG': 'angry',
    'DIS': 'disgust'
}
# We only care about the 6 core emotions specified
VALID_EMOTIONS = list(EMOTION_MAP.keys())

print(f"Target SR: {TARGET_SR}")
print(f"Segment Duration: {SEGMENT_DURATION_S}s ({SEGMENT_SAMPLES} samples)")
print(f"Output CSV: {OUTPUT_CSV_PATH}")

Target SR: 16000
Segment Duration: 3.0s (48000 samples)
Output CSV: /content/drive/My Drive/crema_d_features.csv


In [5]:
audio_files = glob.glob(os.path.join(CREMA_D_FOLDER, '*.wav'))

if not audio_files:
    print(f"!!! ERROR: No .wav files found in {CREMA_D_FOLDER}")
    print("!!! Please double-check the CREMA_D_FOLDER path.")
else:
    print(f"Found {len(audio_files)} audio files.")

Found 7442 audio files.


In [6]:
# Define Helper Functions

def extract_labels(filename):
    """Extracts speaker ID and emotion from CREMA-D filename."""
    basename = os.path.basename(filename)
    parts = basename.split('_')
    if len(parts) >= 3:
        speaker_id = parts[0]
        emotion_code = parts[2]
        if emotion_code in EMOTION_MAP:
            emotion_label = EMOTION_MAP[emotion_code]
            return {'speaker_id': speaker_id, 'emotion': emotion_label, 'filename': basename}
    return None # Return None if filename format is unexpected or emotion is not valid

def preprocess_and_segment(file_path, target_sr, segment_samples, preemph_coeff):
    """Loads, preprocesses, and segments a single audio file."""
    segments = []
    original_sr = 0
    try:
        # Load audio, force to mono, use target SR if possible during load
        # Load with original SR first to check, then resample if needed
        y, sr_orig = librosa.load(file_path, sr=None, mono=True)
        original_sr = sr_orig # Store original SR for info

        # Resample if necessary
        if sr_orig != target_sr:
            y = librosa.resample(y=y, orig_sr=sr_orig, target_sr=target_sr)

        # Apply pre-emphasis
        y = librosa.effects.preemphasis(y, coef=preemph_coeff)

        # Normalize audio (peak normalization)
        y = librosa.util.normalize(y)

        # Segment audio into fixed-length chunks
        num_segments = int(np.ceil(len(y) / segment_samples))

        for i in range(num_segments):
            start_sample = i * segment_samples
            end_sample = start_sample + segment_samples
            segment = y[start_sample:end_sample]

            # Pad the last segment if it's shorter than required
            if len(segment) < segment_samples:
                padding = segment_samples - len(segment)
                segment = np.pad(segment, (0, padding), 'constant')

            segments.append(segment)

    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return [], original_sr # Return empty list on error

    return segments, original_sr

def extract_features(y_segment, sr, n_mfcc, hop_length, win_length, pitch_fmin, pitch_fmax, silence_threshold_db):
    """Extracts features from a single audio segment."""
    features = {}
    try:
        # 1. MFCCs (Mean, Delta Mean, Delta^2 Mean)
        mfccs = librosa.feature.mfcc(y=y_segment, sr=sr, n_mfcc=n_mfcc,
                                     hop_length=hop_length, win_length=win_length)
        delta_mfccs = librosa.feature.delta(mfccs)
        delta2_mfccs = librosa.feature.delta(mfccs, order=2)

        features['mfcc_mean'] = np.mean(mfccs, axis=1)
        features['mfcc_delta_mean'] = np.mean(delta_mfccs, axis=1)
        features['mfcc_delta2_mean'] = np.mean(delta2_mfccs, axis=1) # Captures variance dynamics

        # 2. Pitch Variability (Standard Deviation of F0)
        # Using YIN algorithm for fundamental frequency estimation
        f0, voiced_flag, voiced_probs = librosa.pyin(y=y_segment,
                                                     fmin=pitch_fmin,
                                                     fmax=pitch_fmax,
                                                     sr=sr,
                                                     hop_length=hop_length)
        # Get only voiced frames where f0 is not NaN
        voiced_f0 = f0[voiced_flag]
        if len(voiced_f0) > 1:
             features['pitch_std_dev'] = np.std(voiced_f0)
        else:
             features['pitch_std_dev'] = 0.0 # Assign 0 if no/one voiced frame found

        # 3. Pause Frequency (Fraction of silent frames)
        # Calculate Root Mean Square energy
        rms = librosa.feature.rms(y=y_segment, hop_length=hop_length)[0]
        # Find silent frames (energy below threshold dB from max)
        if len(rms) > 0 and np.max(rms) > 0: # Avoid log(0)
            db_threshold = librosa.amplitude_to_db(np.max(rms)) - silence_threshold_db
            silent_frames = rms < librosa.db_to_amplitude(db_threshold)
            features['silence_fraction'] = np.sum(silent_frames) / len(rms)
        else:
            features['silence_fraction'] = 1.0 if len(y_segment) > 0 else 0.0 # Assume silent if max RMS is 0 or length 0


        # Optional: Jitter & Shimmer (Requires more specialized libraries like parselmouth, skipping for now)
        # features['jitter'] = ...
        # features['shimmer'] = ...

    except Exception as e:
        print(f"Error extracting features: {e}")
        return None # Return None on error during feature extraction

    return features


In [7]:
#  Main Processing Loop
all_features_list = []
processed_files = 0
skipped_files = 0

print("\nStarting preprocessing and feature extraction...")

for file_path in tqdm(audio_files, desc="Processing Files"):
    # Extract labels first
    labels = extract_labels(file_path)

    if labels is None or labels['emotion'] not in EMOTION_MAP.values():
         # print(f"Skipping file (invalid name or emotion): {os.path.basename(file_path)}")
         skipped_files += 1
         continue # Skip files that don't match the required emotions or naming

    # Preprocess and segment the audio file
    segments, original_sr = preprocess_and_segment(file_path, TARGET_SR, SEGMENT_SAMPLES, PREEMPHASIS_COEFF)

    if not segments:
        # print(f"Skipping file due to processing error: {os.path.basename(file_path)}")
        skipped_files += 1
        continue

    # Process each segment
    for i, segment in enumerate(segments):
        segment_features = extract_features(segment, TARGET_SR, N_MFCC, HOP_LENGTH, WIN_LENGTH, PITCH_FMIN, PITCH_FMAX, SILENCE_THRESHOLD_DB)

        if segment_features:
            # Flatten MFCC means into separate columns
            record = {
                'filename': labels['filename'],
                'speaker_id': labels['speaker_id'],
                'emotion': labels['emotion'],
                'segment_id': i,
                'original_sr': original_sr,
                'pitch_std_dev': segment_features['pitch_std_dev'],
                'silence_fraction': segment_features['silence_fraction'],
                # Add other scalar features here...
            }
            # Add MFCC means
            for j, val in enumerate(segment_features['mfcc_mean']):
                record[f'mfcc_mean_{j}'] = val
            for j, val in enumerate(segment_features['mfcc_delta_mean']):
                record[f'mfcc_delta_mean_{j}'] = val
            for j, val in enumerate(segment_features['mfcc_delta2_mean']):
                record[f'mfcc_delta2_mean_{j}'] = val

            all_features_list.append(record)

    processed_files += 1

print(f"\nProcessed {processed_files} files.")
print(f"Skipped {skipped_files} files (invalid format, non-core emotion, or processing error).")
print(f"Extracted features for {len(all_features_list)} segments.")



Starting preprocessing and feature extraction...


Processing Files:   0%|          | 0/7442 [00:00<?, ?it/s]


Processed 7442 files.
Skipped 0 files (invalid format, non-core emotion, or processing error).
Extracted features for 8694 segments.
