In [12]:
import os
import json
import pandas as pd

# 📁 กำหนด path หลักของ Thai Dataset
DATA_DIR = "C:\\Users\\Phattarapol\\Desktop\\Senior Project\\Thai Dataset"

# ✅ โหลดไฟล์ emotion label
with open("emotion_label.json", "r", encoding="utf-8") as f:
    emo_data = json.load(f)

# ✅ ฟังก์ชันช่วยหา path จริงของไฟล์
def find_file_path(filename):
    for studio_id in range(1, 81):  # ✅ ใช้เฉพาะ studio001 ถึง studio080
        studio_folder = f"studio{studio_id:03d}"
        file_path = os.path.join(DATA_DIR, studio_folder, "con", filename)
        if os.path.exists(file_path):
            return file_path
    return None  # ❌ ถ้าไม่เจอ

# ✅ สร้าง DataFrame จากไฟล์ที่เจอ
records = []

for filename, label in emo_data.items():
    info = label[0]
    majority_emo = info["majority_emo"].lower()
    path = find_file_path(filename)
    if path is not None:
        records.append({
            "File": filename,
            "Emotions": majority_emo,
            "Path": path
        })

# ✅ รวมเป็น DataFrame
df = pd.DataFrame(records)

# ✅ ดูจำนวนและตรวจสอบ
print(f"✅ พบไฟล์ทั้งหมด: {len(df)}")
print(df["Emotions"].value_counts())
df.head()


✅ พบไฟล์ทั้งหมด: 21850
Emotions
frustrated    6161
neutral       5863
happy         3047
angry         2475
sad           2230
none          2072
other            2
Name: count, dtype: int64


Unnamed: 0,File,Emotions,Path
0,s001_con_actor001_impro1_1.flac,neutral,C:\Users\Phattarapol\Desktop\Senior Project\Th...
1,s001_con_actor001_impro1_10.flac,neutral,C:\Users\Phattarapol\Desktop\Senior Project\Th...
2,s001_con_actor001_impro1_11.flac,neutral,C:\Users\Phattarapol\Desktop\Senior Project\Th...
3,s001_con_actor001_impro1_12.flac,neutral,C:\Users\Phattarapol\Desktop\Senior Project\Th...
4,s001_con_actor001_impro1_13.flac,neutral,C:\Users\Phattarapol\Desktop\Senior Project\Th...


In [13]:
target_emotions = ['neutral', 'happy', 'sad', 'angry']
df = df[df['Emotions'].isin(target_emotions)].reset_index(drop=True)


In [14]:
print(df["Emotions"].value_counts())
print(df["Emotions"].unique())

Emotions
neutral    5863
happy      3047
angry      2475
sad        2230
Name: count, dtype: int64
['neutral' 'angry' 'happy' 'sad']


In [15]:
# ✅ Step 1: Library
import time
import pandas as pd
import torch
import torchaudio
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelEncoder
from transformers import WhisperProcessor, WhisperModel, WhisperForConditionalGeneration
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
from sklearn.utils import resample
import random
import torchaudio.transforms as T

AUGMENTED_DIR = "C:\\Users\\Phattarapol\\Desktop\\Senior Project\\Thai Dataset\\DataAugment"
os.makedirs(AUGMENTED_DIR, exist_ok=True)

def augment_and_save(waveform, sr, idx, method="noise"):
    if method == "noise":
        noise = 0.005 * torch.randn_like(waveform)
        augmented = waveform + noise
    elif method == "pitch":
        augmented = T.Resample(sr, sr)(waveform)
        augmented = T.PitchShift(sr, n_steps=2)(augmented)
    elif method == "speed":
        speed_factor = random.uniform(0.9, 1.1)
        augmented = T.Resample(sr, int(sr * speed_factor))(waveform)
    else:
        augmented = waveform

    augmented = augmented.detach()
    path = os.path.join(AUGMENTED_DIR, f"aug_{idx}_{method}.wav")
    torchaudio.save(path, augmented, sr)
    return path

def balance_dataset_with_real_augmentation(df, target_per_class=5863):
    balanced_rows = []
    idx_counter = 0

    for emotion in df["Emotions"].unique():
        df_emotion = df[df["Emotions"] == emotion]
        n_samples = len(df_emotion)

        # เก็บของเดิมไว้ก่อน
        df_emotion_copy = df_emotion.copy()
        df_emotion_copy["augmented"] = False
        balanced_rows.append(df_emotion_copy)

        if n_samples < target_per_class:
            n_needed = target_per_class - n_samples
            sampled = resample(df_emotion, replace=True, n_samples=n_needed, random_state=42)

            augmented_rows = []
            for _, row in sampled.iterrows():
                waveform, sr = torchaudio.load(row["Path"])
                waveform = torchaudio.functional.resample(waveform, sr, 16000)
                waveform = waveform.mean(dim=0, keepdim=True)

                method = random.choice(["noise", "pitch", "speed"])
                aug_path = augment_and_save(waveform, 16000, idx_counter, method)
                idx_counter += 1

                augmented_rows.append({
                    "Path": aug_path,
                    "Emotions": row["Emotions"],
                    "augmented": True
                })

            balanced_rows.append(pd.DataFrame(augmented_rows))

        elif n_samples > target_per_class:
            # ถ้ามีเกิน ก็ตัดลงมา
            trimmed = resample(df_emotion, replace=False, n_samples=target_per_class, random_state=42)
            trimmed["augmented"] = False
            balanced_rows.append(trimmed)

    balanced_df = pd.concat(balanced_rows).reset_index(drop=True)
    return balanced_df

# ✅ Step 2: Dataset Class
class SERDataset(Dataset):
    def __init__(self, dataframe, processor, label_encoder, max_len=30):
        self.df = dataframe
        self.processor = processor
        self.label_encoder = label_encoder
        self.max_len = max_len

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        path = self.df.iloc[idx]['Path']
        label = self.df.iloc[idx]['Emotions']
        is_aug = self.df.iloc[idx].get("augmented", False)
        
        waveform, sr = torchaudio.load(path)
        waveform = torchaudio.functional.resample(waveform, sr, 16000)

        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)
    
        inputs = self.processor(
            waveform.squeeze().numpy(),  # shape: [time]
            sampling_rate=16000,
            return_tensors="pt",
            padding="max_length",
            truncation=True
        )
        input_features = inputs.input_features.squeeze(0)

        label_id = self.label_encoder.transform([str(label)])[0]
        # print(f"{idx} => {input_features.shape}")

        return {
            "input_features": inputs.input_features.squeeze(0),
            "label": torch.tensor(label_id, dtype=torch.long)
        }

# ✅ Step 3: Model with classifier head
class WhisperEmotionClassifier(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.backbone = WhisperModel.from_pretrained("openai/whisper-tiny").encoder
        # self.backbone = WhisperModel.from_pretrained("openai/whisper-large-v3").encoder

        hidden_size = self.backbone.config.hidden_size  # 👈 ใช้ค่าที่ถูกต้องจาก config
        self.projector = nn.Linear(hidden_size, 256)
        self.classifier = nn.Linear(256, num_classes)

    

    def forward(self, input_features):
        outputs = self.backbone(input_features=input_features)
        hidden = outputs.last_hidden_state.mean(dim=1)
        x = self.projector(hidden)
        x = F.relu(x)
        logits = self.classifier(x)
        return logits

In [16]:
# # Encode emotions
# label_encoder = LabelEncoder()
# df["Emotions"] = label_encoder.fit_transform(df["Emotions"])

# # ทำ augmentation และ balance dataset แค่ครั้งเดียว
# balanced_df = balance_dataset_with_real_augmentation(df, target_per_class=5863)
# balanced_df.to_csv("balanced_Thai_data.csv", index=False)

In [17]:
# 🔁 1. โหลดไฟล์ CSV ที่เคยเซฟไว้
balanced_df = pd.read_csv("balanced_Thai_data.csv")

# 🔁 2. แปลง label ถ้าจำเป็น
id2label = {0: 'angry', 1: 'happy', 2: 'neutral', 3: 'sad'}
if balanced_df["Emotions"].dtype in [int, float]:
    balanced_df["Emotions"] = balanced_df["Emotions"].map(lambda x: id2label[int(x)])

# 🔁 3. Encode Label
label_encoder = LabelEncoder()
label_encoder.fit(balanced_df["Emotions"])

# 🔁 4. Split ข้อมูล
from sklearn.model_selection import train_test_split
train_df, test_df = train_test_split(balanced_df, test_size=0.2, stratify=balanced_df["Emotions"])

# 🔁 5. Load processor และ dataset
processor = WhisperProcessor.from_pretrained("openai/whisper-tiny")
train_dataset = SERDataset(train_df, processor, label_encoder)
test_dataset = SERDataset(test_df, processor, label_encoder)

# 🔁 6. Dataloader
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16)

In [18]:
print(df["Emotions"].unique())
print(df["Emotions"].dtype)

['neutral' 'angry' 'happy' 'sad']
object


In [19]:
print(balanced_df["Emotions"].value_counts())
print(balanced_df["Emotions"].unique())
print(balanced_df["Emotions"].dtype)

Emotions
neutral    5863
angry      5863
happy      5863
sad        5863
Name: count, dtype: int64
['neutral' 'angry' 'happy' 'sad']
object


In [20]:
print(label_encoder.classes_)  # ตรวจสอบค่าที่ fit เข้าไป


['angry' 'happy' 'neutral' 'sad']


In [21]:
def freeze_backbone(model):
        for param in model.backbone.parameters():
            param.requires_grad = False
        print("🔒 Backbone frozen.")

def unfreeze_backbone(model):
    for param in model.backbone.parameters():
        param.requires_grad = True
    print("🔓 Backbone unfrozen.")

# Freeze Training

In [22]:
# ✅ Step 5: Training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = WhisperEmotionClassifier(num_classes=len(label_encoder.classes_)).to(device)

In [None]:


# # Load pre-trained weights if available
model.load_state_dict(torch.load("best_model2.pth"))
model.train()

freeze_backbone(model)  # 🔒 Freeze backbone
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=2e-5)
loss_fn = nn.CrossEntropyLoss()

train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

best_acc = 0
patience = 3
no_improve = 0

for epoch in range(50):
    if epoch % 3 == 0:
        time.sleep(60)
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    for batch in tqdm(train_loader):
        inputs = batch['input_features'].to(device)
        labels = batch['label'].to(device)

        outputs = model(inputs)
        loss = loss_fn(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        preds = outputs.argmax(dim=1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    avg_loss = total_loss / len(train_loader)
    acc = correct / total
    train_losses.append(avg_loss)
    train_accuracies.append(acc)
    print(f"[Freeze Phase] Epoch {epoch+1}: Loss={total_loss:.4f}, Acc={correct/total:.4f}")

    # ✅ Step 6: Evaluate
    model.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for batch in test_loader:
            inputs = batch['input_features'].to(device)
            labels = batch['label'].to(device)

            outputs = model(inputs)
            loss = loss_fn(outputs, labels)

            val_loss += loss.item()
            preds = outputs.argmax(dim=1)
            val_correct += (preds == labels).sum().item()
            val_total += labels.size(0)

    val_avg_loss = val_loss / len(test_loader)
    val_acc = val_correct / val_total

    val_losses.append(val_avg_loss)
    val_accuracies.append(val_acc)

    print(f"Epoch {epoch+1} | Val Loss: {val_avg_loss:.4f} | Val Acc: {val_acc:.4f}")

    if acc > best_acc:
        best_acc = acc
        no_improve = 0
        torch.save(model.state_dict(), "best_model_thai.pth")  # บันทึกโมเดลไว้
        print("✅ New best model saved.")
    else:
        no_improve += 1
        print(f"⚠️ No improvement for {no_improve} epoch(s).")

        if no_improve >= patience:
            print("🛑 Early stopping triggered.")
            break


🔒 Backbone frozen.


 11%|█         | 124/1173 [03:45<31:27,  1.80s/it]