# Imports

In [1]:
import numpy as np
import pandas as pd

import psutil
from pathlib import Path
from tqdm import tqdm

from modules.preprocessing import AudioPreprocessor
from modules.feature_extraction import FeatureExtractor
from modules.pipelines import ModelPipeline
from modules.evaluate import PerformanceAnalyzer

from concurrent.futures import ThreadPoolExecutor, as_completed

import warnings
warnings.filterwarnings("ignore")

# Config

In [2]:
from config import run_config, NUM_WORKERS, DATA_DIR, AUDIO_PATH

run_config()

# Load Dataset

In [3]:
df = pd.read_csv(DATA_DIR / "filtered_data_labeled.tsv", sep='\t')
df.head(1)

Unnamed: 0,client_id,path,sentence,up_votes,down_votes,age,gender,accent,label
0,5001d9a0d3f8f5aae6f386f70713b2d5d046edc7ba0068...,common_voice_en_19687170.mp3,He associated with the Formists.,2,1,fifties,female,us,3


In [4]:
df['path'] = df['path'].apply(lambda x: AUDIO_PATH / x)
df.head(1)

Unnamed: 0,client_id,path,sentence,up_votes,down_votes,age,gender,accent,label
0,5001d9a0d3f8f5aae6f386f70713b2d5d046edc7ba0068...,data\audios\common_voice_en_19687170.mp3,He associated with the Formists.,2,1,fifties,female,us,3


# Features

In [5]:
from typing import Optional, Tuple, List

# === Parallel Processing ===
def process_sample(
    row: pd.Series, 
    idx: int, 
    mode: str, 
    preprocessor: AudioPreprocessor, 
    extractor: FeatureExtractor, 
    force_update: bool
) -> Optional[Tuple[int, np.ndarray, str]]:

    y_proc: Optional[np.ndarray] = preprocessor.load_cached_preprocessed(idx) if not force_update else None
    
    # Load and preprocess audio if not cached or force_update is True
    if y_proc is None or force_update:
        y_raw: Optional[np.ndarray] = preprocessor.load_audio(row['path'])
        if y_raw is None:
            # print(f"Failed to load audio for index {idx}.")
            return None
        y_proc = preprocessor.preprocess(y_raw)
        preprocessor.cache_preprocessed(idx, y_proc, force_update)
    
    # Extract features
    feat = extractor.extract(y_proc, sr=16000, mode=mode)
    return idx, feat, row['label']


def process_batch(
    batch_df: pd.DataFrame, 
    mode: str, 
    preprocessor: AudioPreprocessor, 
    extractor: FeatureExtractor, 
    force_update: bool, 
    offset: int = 0
) -> List[Tuple[int, np.ndarray, str]]:
    results: List[Tuple[int, np.ndarray, str]] = []
    
    for i, row in tqdm(batch_df.iterrows(), total=len(batch_df), desc=f"Batch {offset}", leave=False):
        result: Optional[Tuple[int, np.ndarray, str]] = process_sample(row, i, mode, preprocessor, extractor, force_update)
        if result: results.append(result)
    
    return results

In [6]:
def prepare_features_parallel(df, mode="traditional", force_update_preprocessing=False, force_update_features=False, batch_size=None):
    print(f"🔄 Preparing features in {mode} mode...")
    extractor = FeatureExtractor()
    X_cached, y_cached = extractor.load_cached_features(mode)
    if X_cached is not None and not force_update_features:
        return X_cached, y_cached
    
    print("🔄 Loading and preprocessing audio...")
    preprocessor = AudioPreprocessor()
    features_dict, labels_dict = {}, {}

    # Auto-select batch size based on available memor
    total_memory_gb = psutil.virtual_memory().total / (1024 ** 3)
    est_mem_per_sample = 0.01 if mode == "traditional" else 0.2
    est_batch_size = max(10, int((total_memory_gb * 0.4) / est_mem_per_sample))
    batch_size = batch_size or min(est_batch_size, len(df) // NUM_WORKERS)
    if total_memory_gb < 2:
        print("⚠️ Warning: Low memory detected. Reducing batch size to avoid OOM errors.")
        batch_size = min(batch_size, 10)
    print(f"🧠 Auto-selected batch size: {batch_size} (Estimated memory per sample: {est_mem_per_sample:.2f} GB, Total RAM: {total_memory_gb:.2f} GB)")

    batches = [df.iloc[i:i + batch_size] for i in range(0, len(df), batch_size)]
    print(f"🔄 Total batches: {len(batches)}")

    print("📦 Processing batches:")
    with ThreadPoolExecutor(max_workers=NUM_WORKERS) as executor:
        futures = {
            executor.submit(process_batch, batch, mode, preprocessor, extractor, force_update_preprocessing, i): i
            for i, batch in enumerate(batches)
            }
        for future in tqdm(as_completed(futures), total=len(futures), desc="📊 Batches Done"):
            batch_results = future.result()
            if batch_results:
                for idx, feat, label in batch_results:
                    features_dict[idx] = feat
                    labels_dict[idx] = label

    print("🔄 Finished processing batches.")
    # for i in range(len(batches)): extractor.remove_cached_features(mode, index=i)

    sorted_indices = sorted(features_dict.keys())
    X = np.array([features_dict[i] for i in sorted_indices])
    y = np.array([labels_dict[i] for i in sorted_indices])
    extractor.cache_features(X, y, mode=mode, force_update=force_update_features)
    return X, y

In [10]:
X, y = prepare_features_parallel(df, mode="traditional", force_update_features=False, force_update_preprocessing=False) # , batch_size=250

🔄 Preparing features in traditional mode...


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-base-960h and are newly initialized: ['masked_spec_embed']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [12]:
X.shape, y.shape

((172158, 147), (172158,))

# Test Inference

In [9]:
# === Batch Inference Utility ===
def run_batch_inference(model, input_folder, output_path, sr=16000, feature_mode="traditional"):
    extractor = FeatureExtractor()
    preprocessor = AudioPreprocessor()
    results = []

    for file in Path(input_folder).rglob("*.wav"):
        y = preprocessor.preprocess(preprocessor.load_audio(str(file), sr=sr))
        if y is not None:
            x = extractor.extract(y, sr=sr, mode=feature_mode).reshape(1, -1)
            pred = model.predict(x)[0]
            results.append({"file": file.name, "prediction": pred})

    df = pd.DataFrame(results)
    df.to_csv(output_path, index=False)
    print(f"✅ Batch inference saved to {output_path}")
