In [None]:
import os
import librosa
import soundfile as sf
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.utils import resample
from google.colab import drive


print("Connect Drive")
drive.mount('/content/drive')

BASE_DIR = "/content/drive/MyDrive/Speech_Emotion_Recognition"
INPUT_DIR = os.path.join(BASE_DIR, "input")
RAVDESS_DIR = os.path.join(INPUT_DIR, "ravdess-emotional-speech-audio")
TESS_DIR = os.path.join(INPUT_DIR, "TESS Toronto emotional speech set data")

OUTPUT_DIR = os.path.join(BASE_DIR, "working/processed")
os.makedirs(OUTPUT_DIR, exist_ok=True)

#Emotion mapping
RAVDESS_MAP = {
    '01': 'neutral', '02': 'neutral', 
    '03': 'happy',   '04': 'sad',
    '05': 'angry',   '06': 'fearful',
    '07': 'disgust', '08': 'surprised'
}

TESS_MAP = {
    'neutral': 'neutral',
    'happy': 'happy',
    'sad': 'sad',
    'angry': 'angry',
    'fear': 'fearful',
    'disgust': 'disgust',
    'ps': 'surprised'
}

EMOTION_TO_ID = {
    'neutral': 0, 'happy': 1, 'sad': 2, 'angry': 3,
    'fearful': 4, 'disgust': 5, 'surprised': 6
}


def preprocess_audio(file_path, target_sr=16000):
    try:
        y, sr = librosa.load(file_path, sr=None)
        if y.ndim > 1: y = librosa.to_mono(y)
        y_resampled = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
        return y_resampled, target_sr
    except Exception as e:
        return None, None

all_data = []


# RAVDESS
for root, _, files in os.walk(RAVDESS_DIR):
    for f in files:
        if f.endswith(".wav"):
            parts = f.split('-')
            if len(parts) >= 3:
                emotion_name = RAVDESS_MAP.get(parts[2])
                if emotion_name:
                    all_data.append({
                        'raw_path': os.path.join(root, f),
                        'label_id': EMOTION_TO_ID[emotion_name]
                    })

# TESS
for root, _, files in os.walk(TESS_DIR):
    for f in files:
        if f.endswith(".wav"):
            emo = f.lower().split('.')[0].split('_')[-1]
            emotion_name = TESS_MAP.get(emo)
            if emotion_name:
                all_data.append({
                    'raw_path': os.path.join(root, f),
                    'label_id': EMOTION_TO_ID[emotion_name]
                })

df_raw = pd.DataFrame(all_data)
print(f"Total: {len(df_raw)} ")
print("Data distribution\n", df_raw['label_id'].value_counts())

#Split train/test set
df_train_full, df_test = train_test_split(
    df_raw, test_size=0.2, stratify=df_raw['label_id'], random_state=42
)

#Split valid
df_train, df_val = train_test_split(
    df_train_full, test_size=0.15, stratify=df_train_full['label_id'], random_state=42
)

#Oversampling
print("\nBlancing Train Set...")
max_size = df_train['label_id'].value_counts().max()
lst_balanced = []

for class_id in df_train['label_id'].unique():
    df_class = df_train[df_train['label_id'] == class_id]
    df_class_oversampled = resample(df_class,
                                    replace=True,     
                                    n_samples=max_size, 
                                    random_state=42)
    lst_balanced.append(df_class_oversampled)

df_train_balanced = pd.concat(lst_balanced)
print("Data distribution after balancing\n", df_train_balanced['label_id'].value_counts())

#preprocessing 
def process_and_save_dataset(df, folder_name):
    print(f"Working on {folder_name}...")
    unique_files = df['raw_path'].unique()
    path_map = {}

    save_path = os.path.join(OUTPUT_DIR, folder_name)
    os.makedirs(save_path, exist_ok=True)

    for i, raw_f in enumerate(unique_files):
        y, sr = preprocess_audio(raw_f)
        if y is not None:
            new_name = f"{folder_name}_{i}.wav"
            full_save_path = os.path.join(save_path, new_name)
            sf.write(full_save_path, y, sr)
            path_map[raw_f] = os.path.join(folder_name, new_name)

        if (i+1) % 400 == 0: print(f"  Saved {i+1} file...")

    df['file_path'] = df['raw_path'].map(path_map)
    return df.dropna(subset=['file_path'])

df_train_final = process_and_save_dataset(df_train_balanced, "train")
df_val_final = process_and_save_dataset(df_val, "val")
df_test_final = process_and_save_dataset(df_test, "test")

cols = ['file_path', 'label_id']
df_train_final[cols].to_csv(os.path.join(OUTPUT_DIR, "train_final.csv"), index=False)
df_val_final[cols].to_csv(os.path.join(OUTPUT_DIR, "val_final.csv"), index=False)
df_test_final[cols].to_csv(os.path.join(OUTPUT_DIR, "test_final.csv"), index=False)

print("\n" + "="*40)
print("Done!")
print(f"Final rows:")
print(f"   - Train (Balanced): {len(df_train_final)}")
print(f"   - Val: {len(df_val_final)}")
print(f"   - Test: {len(df_test_final)}")
print("="*40)