## Chuẩn bị dữ liệu SER với bộ ViSEC

Notebook này thực hiện các bước tiền xử lý cho bài toán Nhận diện cảm xúc trong giọng nói (SER) sử dụng bộ dữ liệu `hustep-lab/ViSEC`:

1. Tải bộ dữ liệu ViSEC từ Hugging Face.
2. Chuẩn hóa nhãn cảm xúc từ 4 lớp gốc (`happy`, `neutral`, `sad`, `angry`) về 3 lớp:
   - `positive`: happy
   - `neutral`: neutral
   - `negative`: sad, angry
3. Phân tích phân bố nhãn cảm xúc, accent, giới tính.
4. Chia dữ liệu thành 3 tập:
   - `train`: 70%
   - `validation`: 15%
   - `test`: 15%
   với ràng buộc stratify theo nhãn cảm xúc.
5. Chuẩn hóa cột `audio` về dạng `Audio(sampling_rate=16000)`.
6. Lưu lại các tập dữ liệu dưới dạng Hugging Face Dataset bằng `save_to_disk` tại thư mục `./data/ser_visec`.
7. Ghi thống kê cơ bản ra file `stats.json` để phục vụ báo cáo và huấn luyện sau này.


In [None]:
# Cài đặt thư viện cần thiết (chạy một lần trong môi trường mới)
# Nếu đã có rồi thì có thể bỏ qua cell này
%pip install -q datasets soundfile librosa tqdm torchcodec

from datasets import load_dataset, Dataset, Audio
from pathlib import Path
from sklearn.model_selection import train_test_split
import pandas as pd
import json

# Thư mục lưu dataset SER đã xử lý
OUTPUT_DIR = Path("./data/ser_visec")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print("Thư mục lưu dữ liệu:", OUTPUT_DIR.resolve())

# 1. Tải dataset ViSEC từ Hugging Face
print("\n[1/5] Đang tải dataset ViSEC từ Hugging Face...")
visec_raw = load_dataset("hustep-lab/ViSEC")
print("Các splits có sẵn:", list(visec_raw.keys()))
print("Số mẫu tổng (train):", len(visec_raw["train"]))

# 2. Map 4 labels cảm xúc gốc → 3 labels (positive, negative, neutral)
print("\n[2/5] Đang chuẩn hóa nhãn cảm xúc từ 4 lớp về 3 lớp...")

def map_emotion_to_3_labels(emotion: str) -> str:
    """Map 4 nhãn ViSEC (happy, neutral, sad, angry) → 3 nhãn (positive, negative, neutral)."""
    emotion = str(emotion).lower().strip()
    if emotion == "happy":
        return "positive"
    if emotion == "neutral":
        return "neutral"
    if emotion in ["sad", "angry"]:
        return "negative"
    return "neutral"

visec_data = []
for ex in visec_raw["train"]:
    mapped_emotion = map_emotion_to_3_labels(ex["emotion"])
    visec_data.append(
        {
            "audio": ex["audio"],
            "emotion": mapped_emotion,
            "emotion_original": ex["emotion"],
            "duration": float(ex["duration"]),
            "accent": ex["accent"],
            "gender": ex["gender"],
        }
    )

visec_df = pd.DataFrame(visec_data)

print("Phân bố nhãn cảm xúc sau khi map:")
print(visec_df["emotion"].value_counts())

print("\nPhân bố accent:")
print(visec_df["accent"].value_counts())

print("\nPhân bố gender:")
print(visec_df["gender"].value_counts())

# 3. Chia train / validation / test với stratify theo nhãn cảm xúc
print("\n[3/5] Đang chia train / validation / test (70% / 15% / 15%) với stratify theo emotion...")

train_df, temp_df = train_test_split(
    visec_df,
    test_size=0.3,
    stratify=visec_df["emotion"],
    random_state=42,
)

val_df, test_df = train_test_split(
    temp_df,
    test_size=0.5,
    stratify=temp_df["emotion"],
    random_state=42,
)

print("Số mẫu:")
print("  Train:", len(train_df))
print("  Val:  ", len(val_df))
print("  Test: ", len(test_df))

print("\nPhân bố emotion trong từng split:")
print("Train:")
print(train_df["emotion"].value_counts())
print("\nVal:")
print(val_df["emotion"].value_counts())
print("\nTest:")
print(test_df["emotion"].value_counts())

# 4. Chuyển về Hugging Face Dataset và cast cột audio về Audio(16000)
print("\n[4/5] Đang chuyển về Hugging Face Dataset và chuẩn hóa cột audio...")

# Hàm helper để giữ lại đúng các cột cần thiết
COLUMNS = ["audio", "emotion", "emotion_original", "duration", "accent", "gender"]

train_ds = Dataset.from_pandas(train_df[COLUMNS]).cast_column("audio", Audio(sampling_rate=16000))
val_ds = Dataset.from_pandas(val_df[COLUMNS]).cast_column("audio", Audio(sampling_rate=16000))
test_ds = Dataset.from_pandas(test_df[COLUMNS]).cast_column("audio", Audio(sampling_rate=16000))

print("Ví dụ 1 mẫu train:")
print({k: train_ds[0][k] for k in ["emotion", "emotion_original", "duration", "accent", "gender"]})

# 5. Lưu các split xuống đĩa và ghi thống kê
print("\n[5/5] Đang lưu các split đã xử lý và ghi thống kê...")

train_path = OUTPUT_DIR / "train"
val_path = OUTPUT_DIR / "val"
test_path = OUTPUT_DIR / "test"

train_ds.save_to_disk(str(train_path))
val_ds.save_to_disk(str(val_path))
test_ds.save_to_disk(str(test_path))

print("Đã lưu:")
print("  - Train:", train_path.resolve())
print("  - Val:  ", val_path.resolve())
print("  - Test: ", test_path.resolve())

train_durations = [ex["duration"] for ex in train_ds]

stats = {
    "train_samples": len(train_ds),
    "val_samples": len(val_ds),
    "test_samples": len(test_ds),
    "total_samples": len(train_ds) + len(val_ds) + len(test_ds),
    "emotion_distribution": visec_df["emotion"].value_counts().to_dict(),
    "avg_duration": float(sum(train_durations) / len(train_durations)),
    "min_duration": float(min(train_durations)),
    "max_duration": float(max(train_durations)),
}

with open(OUTPUT_DIR / "stats.json", "w", encoding="utf-8") as f:
    json.dump(stats, f, ensure_ascii=False, indent=2)

print("\nHoàn thành chuẩn bị dữ liệu SER cho bộ ViSEC.")
print("Thống kê chính:")
for k, v in stats.items():
    print(f"  {k}: {v}")





Note: you may need to restart the kernel to use updated packages.


  from .autonotebook import tqdm as notebook_tqdm


Thư mục lưu dữ liệu: D:\Downloads\CS529_ASR_SER\data\ser_visec

[1/5] Đang tải dataset ViSEC từ Hugging Face...


To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
Generating train split: 100%|██████████| 5280/5280 [00:04<00:00, 1189.26 examples/s]


Các splits có sẵn: ['train']
Số mẫu tổng (train): 5280

[2/5] Đang chuẩn hóa nhãn cảm xúc từ 4 lớp về 3 lớp...


ImportError: To support decoding audio data, please install 'torchcodec'.

## Phiên bản đầy đủ tiền xử lý SER (ViSEC) theo 6 bước

Phần dưới đây hiện thực đầy đủ pipeline tiền xử lý cho SER trên bộ `hustep-lab/ViSEC` đúng theo Nội dung 2 trong thuyết minh:

1. **Chuẩn hóa định dạng**: audio về 16 kHz, mono, chuẩn hóa biên độ (tránh clipping).
2. **Lọc nhiễu, cắt im lặng, loại bỏ mẫu lỗi**: loại bỏ mẫu quá ngắn/dài, quá nhiều im lặng, clipping, RMS quá thấp.
3. **Cân bằng dữ liệu giữa các nhãn cảm xúc**: dùng chiến lược `undersample` (hoặc `oversample`).
4. **Tăng cường dữ liệu (data augmentation)**: time-stretch, pitch-shift, thêm noise, thay đổi volume.
5. **Chia dữ liệu**: `train` / `val` / `test` với stratify theo nhãn cảm xúc.
6. **Lưu kết quả**: lưu thành Hugging Face Dataset tại `./data/ser_visec/{train,val,test}` và ghi thống kê `stats.json`.

Sau khi chạy xong cell code bên dưới, bạn có thể sử dụng trực tiếp các thư mục này trong notebook `finetune_SER_VISEC.ipynb`.


In [None]:
# PIPELINE TIỀN XỬ LÝ ĐẦY ĐỦ CHO SER (VISEC)

%pip install -q datasets librosa soundfile tqdm noisereduce scikit-learn torchcodec

import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import librosa
import soundfile as sf
import noisereduce as nr

from scipy import signal
from pathlib import Path
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.utils import resample

from datasets import load_dataset, Dataset, Audio
import json
import random
import librosa.effects as effects

# ==========================
# 1. CHUẨN HÓA + TẠO normalized_data
# ==========================

print("=" * 60)
print("1. CHUẨN HÓA ĐỊNH DẠNG VÀ TẠO NORMALIZED_DATA CHO VISEC")
print("=" * 60)

visec_raw = load_dataset("hustep-lab/ViSEC")
print("Splits:", list(visec_raw.keys()))
print("Số mẫu (train):", len(visec_raw["train"]))


def map_emotion_to_3_labels(emotion: str) -> str:
    emotion = str(emotion).lower().strip()
    if emotion == "happy":
        return "positive"
    if emotion == "neutral":
        return "neutral"
    if emotion in ["sad", "angry"]:
        return "negative"
    return "neutral"


normalized_data = []

target_sr = 16000

for idx, ex in enumerate(tqdm(visec_raw["train"], desc="Normalizing ViSEC")):
    audio = ex["audio"]  # {'array', 'sampling_rate'}
    y = audio["array"]
    sr = audio["sampling_rate"]

    # Đảm bảo mono
    if y.ndim > 1:
        y = librosa.to_mono(y)

    # Resample về 16 kHz nếu cần
    if sr != target_sr:
        y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
        sr = target_sr

    # Normalize amplitude (tránh clipping)
    max_val = np.abs(y).max()
    if max_val > 0.95:
        y = y / max_val * 0.95

    normalized_data.append(
        {
            "audio_array": y,
            "sampling_rate": sr,
            "emotion": map_emotion_to_3_labels(ex["emotion"]),
            "emotion_original": ex["emotion"],
            "duration": len(y) / sr,
            "accent": ex["accent"],
            "gender": ex["gender"],
        }
    )

print(f"\nĐã chuẩn hóa {len(normalized_data)} samples từ ViSEC.")

# ==========================
# 2. LỌC NHIỄU, CẮT IM LẶNG, LOẠI MẪU LỖI
# ==========================

print("=" * 60)
print("2. LỌC NHIỄU, CẮT IM LẶNG, LOẠI MẪU LỖI")
print("=" * 60)


def detect_silence(audio, sr, threshold_db=-40):
    rms = librosa.feature.rms(y=audio)[0]
    rms_db = librosa.power_to_db(rms**2)
    silence_mask = rms_db < threshold_db
    silence_ratio = np.sum(silence_mask) / len(silence_mask)
    return silence_ratio


def remove_silence(audio, sr, frame_length=2048, hop_length=512, top_db=20):
    audio_trimmed, _ = librosa.effects.trim(
        audio,
        top_db=top_db,
        frame_length=frame_length,
        hop_length=hop_length,
    )
    return audio_trimmed


def reduce_noise(audio, sr, stationary=False):
    try:
        audio_clean = nr.reduce_noise(
            y=audio,
            sr=sr,
            stationary=stationary,
        )
        return audio_clean
    except Exception:
        # fallback: high-pass filter
        b, a = signal.butter(3, 80, "hp", fs=sr)
        audio_clean = signal.filtfilt(b, a, audio)
        return audio_clean


def check_audio_quality(audio, sr, min_duration=0.5, max_duration=30):
    duration = len(audio) / sr

    # 1. Độ dài
    if duration < min_duration or duration > max_duration:
        return False, f"Invalid duration: {duration:.2f}s"

    # 2. Tỷ lệ im lặng
    silence_ratio = detect_silence(audio, sr)
    if silence_ratio > 0.8:
        return False, f"Too much silence: {silence_ratio:.2%}"

    # 3. Clipping
    max_val = np.abs(audio).max()
    if max_val > 0.99:
        return False, f"Audio clipping detected: {max_val:.3f}"

    # 4. RMS energy
    rms = librosa.feature.rms(y=audio)[0].mean()
    if rms < 0.01:
        return False, f"RMS too low: {rms:.4f}"

    return True, "OK"


def preprocess_audio_pipeline(audio_array, sr, reduce_noise_flag=True):
    # 1. Remove silence
    audio_processed = remove_silence(audio_array, sr)

    # 2. Reduce noise (optional)
    if reduce_noise_flag:
        audio_processed = reduce_noise(audio_processed, sr)

    # 3. Normalize
    max_val = np.abs(audio_processed).max()
    if max_val > 0:
        audio_processed = audio_processed / max_val * 0.95

    return audio_processed


cleaned_data = []
removed_samples = []

for idx, sample in enumerate(tqdm(normalized_data, desc="Cleaning audio")):
    audio_array = sample["audio_array"]
    sr = sample["sampling_rate"]

    is_valid, reason = check_audio_quality(audio_array, sr)

    if not is_valid:
        removed_samples.append(
            {
                "idx": idx,
                "reason": reason,
                "emotion": sample["emotion"],
            }
        )
        continue

    try:
        audio_cleaned = preprocess_audio_pipeline(audio_array, sr)
        is_valid_after, reason_after = check_audio_quality(audio_cleaned, sr)

        if is_valid_after:
            sample["audio_array"] = audio_cleaned
            sample["duration"] = len(audio_cleaned) / sr
            cleaned_data.append(sample)
        else:
            removed_samples.append(
                {
                    "idx": idx,
                    "reason": f"After processing: {reason_after}",
                    "emotion": sample["emotion"],
                }
            )
    except Exception as e:
        removed_samples.append(
            {
                "idx": idx,
                "reason": f"Processing error: {str(e)}",
                "emotion": sample["emotion"],
            }
        )

print(f"\nĐã làm sạch {len(cleaned_data)} samples")
print(f"Đã loại bỏ {len(removed_samples)} samples")

if removed_samples:
    print("\nMột số lý do loại bỏ phổ biến:")
    reason_counts = pd.Series([r["reason"] for r in removed_samples]).value_counts()
    print(reason_counts.head(10))

# ==========================
# 3. CÂN BẰNG DỮ LIỆU GIỮA CÁC NHÃN CẢM XÚC
# ==========================

print("=" * 60)
print("3. CÂN BẰNG DỮ LIỆU GIỮA CÁC NHÃN CẢM XÚC")
print("=" * 60)


def balance_emotion_labels(data_list, target_count=None, strategy="undersample"):
    df = pd.DataFrame(data_list)
    emotion_counts = df["emotion"].value_counts()
    print("Phân bố ban đầu:")
    print(emotion_counts)

    if target_count is None:
        target_count = emotion_counts.min()

    balanced_data = []

    for emotion in emotion_counts.index:
        emotion_data = df[df["emotion"] == emotion].to_dict("records")

        if len(emotion_data) > target_count:
            if strategy == "undersample":
                emotion_data = resample(
                    emotion_data,
                    n_samples=target_count,
                    random_state=42,
                    replace=False,
                )
        elif len(emotion_data) < target_count:
            if strategy == "oversample":
                emotion_data = resample(
                    emotion_data,
                    n_samples=target_count,
                    random_state=42,
                    replace=True,
                )

        balanced_data.extend(emotion_data)

    np.random.seed(42)
    np.random.shuffle(balanced_data)

    balanced_df = pd.DataFrame(balanced_data)
    print("\nPhân bố sau khi cân bằng:")
    print(balanced_df["emotion"].value_counts())

    return balanced_data


balanced_data = balance_emotion_labels(cleaned_data, strategy="undersample")

# ==========================
# 4. DATA AUGMENTATION
# ==========================

print("=" * 60)
print("4. TĂNG CƯỜNG DỮ LIỆU (DATA AUGMENTATION)")
print("=" * 60)


class AudioAugmentation:
    @staticmethod
    def time_stretch(audio, sr, rate_range=(0.8, 1.2)):
        rate = random.uniform(*rate_range)
        return effects.time_stretch(audio, rate=rate)

    @staticmethod
    def pitch_shift(audio, sr, n_steps_range=(-2, 2)):
        n_steps = random.randint(*n_steps_range)
        return effects.pitch_shift(audio, sr=sr, n_steps=n_steps)

    @staticmethod
    def add_gaussian_noise(audio, snr_db_range=(10, 30)):
        snr_db = random.uniform(*snr_db_range)
        signal_power = np.mean(audio**2)
        noise_power = signal_power / (10 ** (snr_db / 10))
        noise = np.random.normal(0, np.sqrt(noise_power), len(audio))
        return audio + noise

    @staticmethod
    def volume_scale(audio, scale_range=(0.7, 1.3)):
        scale = random.uniform(*scale_range)
        return audio * scale

    @staticmethod
    def apply_all(audio, sr, prob=0.5):
        augmented = audio.copy()

        if random.random() < prob:
            augmented = AudioAugmentation.time_stretch(augmented, sr)

        if random.random() < prob:
            augmented = AudioAugmentation.pitch_shift(augmented, sr)

        if random.random() < prob:
            augmented = AudioAugmentation.add_gaussian_noise(augmented)

        if random.random() < prob:
            augmented = AudioAugmentation.volume_scale(augmented)

        max_val = np.abs(augmented).max()
        if max_val > 0:
            augmented = augmented / max_val * 0.95

        return augmented


def augment_dataset(data_list, augmentation_factor=2, apply_to_minority_only=True):
    df = pd.DataFrame(data_list)
    emotion_counts = df["emotion"].value_counts()
    min_count = emotion_counts.min()
    max_count = emotion_counts.max()

    augmented_data = []

    for emotion in emotion_counts.index:
        emotion_data = df[df["emotion"] == emotion].to_dict("records")
        augmented_data.extend(emotion_data)

        should_augment = True
        if apply_to_minority_only:
            should_augment = len(emotion_data) < max_count

        if should_augment:
            target_count = int(min_count * augmentation_factor)
            num_to_augment = target_count - len(emotion_data)

            if num_to_augment > 0:
                print(f"\nAugmenting {emotion}: {len(emotion_data)} → {target_count}")
                for _ in range(num_to_augment):
                    sample = random.choice(emotion_data)
                    audio_array = sample["audio_array"]
                    sr = sample["sampling_rate"]

                    aug_audio = AudioAugmentation.apply_all(audio_array, sr)

                    new_sample = sample.copy()
                    new_sample["audio_array"] = aug_audio
                    new_sample["duration"] = len(aug_audio) / sr
                    new_sample["is_augmented"] = True
                    augmented_data.append(new_sample)

    return augmented_data


augmented_data = augment_dataset(
    balanced_data,
    augmentation_factor=2,
    apply_to_minority_only=True,
)

print(f"\nĐã tăng cường: {len(balanced_data)} → {len(augmented_data)} samples")

# ==========================
# 5. CHIA TRAIN/VAL/TEST VÀ LƯU DATASET
# ==========================

print("=" * 60)
print("5. CHIA TRAIN/VAL/TEST VÀ LƯU DATASET CHO SER")
print("=" * 60)


def split_dataset(
    data_list,
    train_ratio=0.7,
    val_ratio=0.15,
    test_ratio=0.15,
    stratify_column="emotion",
    random_state=42,
):
    assert abs(train_ratio + val_ratio + test_ratio - 1.0) < 1e-6

    df = pd.DataFrame(data_list)

    train_df, temp_df = train_test_split(
        df,
        test_size=(val_ratio + test_ratio),
        stratify=df[stratify_column] if stratify_column else None,
        random_state=random_state,
    )

    val_size = val_ratio / (val_ratio + test_ratio)
    val_df, test_df = train_test_split(
        temp_df,
        test_size=(1 - val_size),
        stratify=temp_df[stratify_column] if stratify_column else None,
        random_state=random_state,
    )

    print(f"\nTổng số samples: {len(df)}")
    print(f"  Train: {len(train_df)}")
    print(f"  Val:   {len(val_df)}")
    print(f"  Test:  {len(test_df)}")

    print("\nPhân bố emotion (train):")
    print(train_df["emotion"].value_counts())
    print("\nPhân bố emotion (val):")
    print(val_df["emotion"].value_counts())
    print("\nPhân bố emotion (test):")
    print(test_df["emotion"].value_counts())

    return {
        "train": train_df.to_dict("records"),
        "val": val_df.to_dict("records"),
        "test": test_df.to_dict("records"),
    }


splits = split_dataset(augmented_data)

OUTPUT_DIR = Path("./data/ser_visec")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)


def records_to_dataset(records):
    audio_arrays = [r["audio_array"] for r in records]
    srs = [r["sampling_rate"] for r in records]
    emotions = [r["emotion"] for r in records]
    emotions_orig = [r["emotion_original"] for r in records]
    durations = [r["duration"] for r in records]
    accents = [r["accent"] for r in records]
    genders = [r["gender"] for r in records]

    ds = Dataset.from_dict(
        {
            "audio": audio_arrays,
            "sampling_rate": srs,
            "emotion": emotions,
            "emotion_original": emotions_orig,
            "duration": durations,
            "accent": accents,
            "gender": genders,
        }
    )
    # cast cột audio: dùng sampling_rate từ field "sampling_rate" không trực tiếp được,
    # nhưng vì ta đã resample về 16000 nên có thể dùng Audio(16000).
    ds = ds.remove_columns(["sampling_rate"]).cast_column("audio", Audio(sampling_rate=16000))
    return ds


train_ds = records_to_dataset(splits["train"])
val_ds = records_to_dataset(splits["val"])
test_ds = records_to_dataset(splits["test"])

train_ds.save_to_disk(str(OUTPUT_DIR / "train"))
val_ds.save_to_disk(str(OUTPUT_DIR / "val"))
test_ds.save_to_disk(str(OUTPUT_DIR / "test"))

print("\nĐã lưu dữ liệu SER đã tiền xử lý đầy đủ vào ./data/ser_visec")

# Thống kê
train_durations = [ex["duration"] for ex in train_ds]

stats = {
    "train_samples": len(train_ds),
    "val_samples": len(val_ds),
    "test_samples": len(test_ds),
    "total_samples": len(train_ds) + len(val_ds) + len(test_ds),
    "emotion_distribution_train": pd.Series(
        [ex["emotion"] for ex in train_ds]
    ).value_counts().to_dict(),
    "avg_duration": float(sum(train_durations) / len(train_durations)),
    "min_duration": float(min(train_durations)),
    "max_duration": float(max(train_durations)),
}

with open(OUTPUT_DIR / "stats.json", "w", encoding="utf-8") as f:
    json.dump(stats, f, ensure_ascii=False, indent=2)

print("\nThống kê chính:")
for k, v in stats.items():
    print(f"  {k}: {v}")

