In [61]:
import cv2
import mediapipe as mp
import numpy as np
import pandas as pd
import os
import joblib
import time

# PyTorch Imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

# Sklearn Imports
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn.utils import compute_class_weight
import matplotlib.pyplot as plt
import seaborn as sns

In [62]:
# --- CẤU HÌNH ---
EMOTION_LABELS = ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
TRAIN_CSV = "dataset_csv/fer2013_train.csv"
TEST_CSV  = "dataset_csv/fer2013_test.csv"
IMG_SIZE = (48, 48)
BATCH_SIZE = 64
EPOCHS = 40
LEARNING_RATE = 0.001
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f">>> Đang sử dụng thiết bị: {DEVICE}")

>>> Đang sử dụng thiết bị: cuda


In [63]:
# --- CÁC HÀM XỬ LÝ HÌNH HỌC (GIỮ NGUYÊN) ---
def center_scale_landmarks(X):
    X_cs = []
    if X.ndim == 1: X = X.reshape(1, -1)
    for s in X:
        xs, ys = s[::2], s[1::2]
        mean_x, mean_y = xs.mean(), ys.mean()
        scale = max(xs.max()-xs.min(), ys.max()-ys.min())
        if scale == 0: scale = 1
        X_cs.append(np.column_stack([(xs-mean_x)/scale, (ys-mean_y)/scale]).flatten())
    return np.array(X_cs)


In [64]:
def warp_face_mp(img, src_lms_norm, mean_lms_norm, size=(48,48)):
    src_pts = src_lms_norm.reshape(-1, 2) * size[0]
    dst_pts = mean_lms_norm.reshape(-1, 2) * size[0]
    
    if len(img.shape) == 3: img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    else: img_gray = img
        
    M, _ = cv2.estimateAffinePartial2D(src_pts, dst_pts)
    if M is not None:
        warped = cv2.warpAffine(img_gray, M, size)
    else:
        warped = cv2.resize(img_gray, size)
    
    # Chuẩn hóa về [0, 1]
    warped = warped.astype('float32') / 255.0
    return warped


In [65]:
# --- 1. DATASET CLASS (QUAN TRỌNG) ---
class FERDataset(Dataset):
    def __init__(self, csv_path, pca, scaler, mean_shape, is_train=False):
        self.df = pd.read_csv(csv_path)
        self.paths = self.df.iloc[:, 0].values
        self.labels = self.df.iloc[:, 2].values
        self.lms = self.df.iloc[:, 4:].values.astype(np.float32)
        
        self.pca = pca
        self.scaler = scaler
        self.mean_shape = mean_shape
        self.is_train = is_train

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):
        # 1. Đọc ảnh
        img_path = self.paths[idx]
        img = cv2.imread(img_path)
        if img is None:
            # Nếu lỗi đọc ảnh, trả về ảnh đen (để tránh crash code)
            img = np.zeros((48, 48, 3), dtype=np.uint8)
            
        label = self.labels[idx]
        lm_raw = self.lms[idx]

        # 2. Augmentation (Chỉ chỉnh sáng, KHÔNG FLIP để tránh lỗi landmark)
        if self.is_train:
            # Brightness / Contrast
            alpha = np.random.uniform(0.8, 1.2)
            beta = np.random.randint(-20, 20)
            img = cv2.convertScaleAbs(img, alpha=alpha, beta=beta)
            
            # Noise
            if np.random.rand() > 0.7:
                noise = np.random.normal(0, 5, img.shape).astype(np.uint8)
                img = cv2.add(img, noise)

        # 3. Xử lý Landmark & Warp
        lm_norm = lm_raw / 48.0
        warped_img = warp_face_mp(img, lm_norm, self.mean_shape, IMG_SIZE)
        
        # 4. Xử lý Shape Vector
        lm_cs = center_scale_landmarks(lm_raw.reshape(1, -1))
        shape_vec = self.scaler.transform(self.pca.transform(lm_cs)).flatten()

        # 5. Chuyển sang Tensor
        # Ảnh: (H, W) -> (1, H, W) cho PyTorch Conv2d
        img_tensor = torch.tensor(warped_img, dtype=torch.float32).unsqueeze(0)
        shape_tensor = torch.tensor(shape_vec, dtype=torch.float32)
        label_tensor = torch.tensor(label, dtype=torch.long)

        return img_tensor, shape_tensor, label_tensor

In [66]:
# --- 2. MODEL DEFINITION (PYTORCH) ---
class HybridModel(nn.Module):
    def __init__(self, shape_input_dim, num_classes=7):
        super(HybridModel, self).__init__()
        
        # Nhánh 1: CNN (Texture)
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # 24x24
            nn.Dropout(0.2),
            
            # Block 2
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # 12x12
            nn.Dropout(0.3),
            
            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # 6x6
            nn.Dropout(0.4)
        )
        
        # Nhánh 2: Shape (Dense)
        self.shape_fc = nn.Sequential(
            nn.Linear(shape_input_dim, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, 128),
            nn.ReLU()
        )
        
        # Kết hợp
        # CNN Output: 256 channels * 6 * 6 = 9216
        self.classifier = nn.Sequential(
            nn.Linear(256 * 6 * 6 + 128, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x_img, x_shape):
        x1 = self.features(x_img)
        x1 = x1.view(x1.size(0), -1) # Flatten
        
        x2 = self.shape_fc(x_shape)
        
        combined = torch.cat((x1, x2), dim=1)
        out = self.classifier(combined)
        return out


In [67]:
from torch.utils.data import WeightedRandomSampler

def train_pipeline():
    print("\n>>> CHUẨN BỊ DỮ LIỆU (CHIẾN LƯỢC: OVERSAMPLING)...")
    
    # 1. PCA & Scaler (Giữ nguyên)
    print(" - Đang fit PCA & Scaler...")
    df_train = pd.read_csv(TRAIN_CSV)
    X_lms_train = df_train.iloc[:, 4:].values.astype(np.float32)
    
    # Lấy danh sách nhãn để tính toán sampler
    y_train_raw = df_train.iloc[:, 2].values
    
    X_cs = center_scale_landmarks(X_lms_train)
    pca = PCA(n_components=0.99)
    X_pca = pca.fit_transform(X_cs)
    scaler = StandardScaler()
    scaler.fit(X_pca)
    
    mean_shape_raw = X_cs.mean(axis=0)
    mean_shape = (mean_shape_raw - mean_shape_raw.min()) / (mean_shape_raw.max() - mean_shape_raw.min())
    
    if not os.path.exists("model_mp"): os.makedirs("model_mp")
    joblib.dump(pca, "model_mp/shape_pca.pkl")
    joblib.dump(scaler, "model_mp/shape_scaler.pkl")
    joblib.dump(mean_shape, "model_mp/mean_shape.pkl")

    # 2. Tạo Dataset
    train_dataset = FERDataset(TRAIN_CSV, pca, scaler, mean_shape, is_train=True)
    test_dataset  = FERDataset(TEST_CSV, pca, scaler, mean_shape, is_train=False)
    
    # --- [QUAN TRỌNG] TẠO SAMPLER ĐỂ CÂN BẰNG DỮ LIỆU ---
    print(" - Đang cấu hình WeightedRandomSampler...")
    
    # Đếm số lượng mẫu của từng lớp
    class_counts = np.bincount(y_train_raw)
    
    # Tính trọng số: Lớp càng ít mẫu, trọng số càng cao
    # (1.0 / count)
    class_weights = 1.0 / class_counts
    
    # Gán trọng số cho từng mẫu dữ liệu trong tập train
    sample_weights = [class_weights[label] for label in y_train_raw]
    sample_weights = torch.FloatTensor(sample_weights)
    
    # Tạo Sampler: Bốc mẫu dựa trên trọng số
    sampler = WeightedRandomSampler(
        weights=sample_weights,
        num_samples=len(sample_weights),
        replacement=True # Cho phép bốc lặp lại (quan trọng)
    )
    
    # Lưu ý: Khi dùng sampler, shuffle phải là False
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, sampler=sampler, num_workers=0)
    test_loader  = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)

    # 3. Model & Loss
    # Vì đã dùng Sampler cân bằng rồi, ta KHÔNG cần class_weight trong Loss nữa (tránh chỉnh sửa kép)
    # Tuy nhiên, ta dùng Label Smoothing để giảm việc model quá tự tin vào Happy
    shape_dim = X_pca.shape[1]
    model = HybridModel(shape_input_dim=shape_dim).to(DEVICE)
    
    # label_smoothing=0.1 giúp model bớt "cứng đầu", giảm overfitting
    criterion = nn.CrossEntropyLoss(label_smoothing=0.1) 
    
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4) # Thêm weight_decay
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=4, verbose=True)

    print("\n>>> BẮT ĐẦU TRAINING (CÂN BẰNG TUYỆT ĐỐI)...")
    best_acc = 0.0
    
    for epoch in range(EPOCHS):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        start_time = time.time()
        
        for i, (imgs, shapes, labels) in enumerate(train_loader):
            imgs, shapes, labels = imgs.to(DEVICE), shapes.to(DEVICE), labels.to(DEVICE)
            
            optimizer.zero_grad()
            outputs = model(imgs, shapes)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
        train_acc = 100 * correct / total
        avg_loss = running_loss / len(train_loader)
        
        # Validation
        val_acc, val_loss = evaluate_model(model, test_loader, criterion)
        scheduler.step(val_loss)
        
        end_time = time.time()
        print(f"Epoch [{epoch+1}/{EPOCHS}] | Time: {end_time-start_time:.1f}s | "
              f"Loss: {avg_loss:.4f} | Acc: {train_acc:.2f}% | "
              f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%")
        
        # Lưu model nếu tốt hơn
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), "model_mp/hybrid_model_pytorch.pth")
            print("   -> Đã lưu model tốt nhất!")

In [68]:
# # --- 3. TRAINING PIPELINE ---
# def train_pipeline():
#     print("\n>>> CHUẨN BỊ DỮ LIỆU...")
    
#     # 1. Load toàn bộ landmark Train để fit PCA trước
#     print(" - Đang fit PCA & Scaler...")
#     df_train = pd.read_csv(TRAIN_CSV)
#     X_lms_train = df_train.iloc[:, 4:].values.astype(np.float32)
#     y_train_raw = df_train.iloc[:, 2].values
    
#     # PCA & Scaler Setup
#     X_cs = center_scale_landmarks(X_lms_train)
#     pca = PCA(n_components=0.99)
#     X_pca = pca.fit_transform(X_cs)
#     scaler = StandardScaler()
#     scaler.fit(X_pca)
    
#     # Mean Shape
#     mean_shape_raw = X_cs.mean(axis=0)
#     mean_shape = (mean_shape_raw - mean_shape_raw.min()) / (mean_shape_raw.max() - mean_shape_raw.min())
    
#     # Lưu resources
#     if not os.path.exists("model_mp"): os.makedirs("model_mp")
#     joblib.dump(pca, "model_mp/shape_pca.pkl")
#     joblib.dump(scaler, "model_mp/shape_scaler.pkl")
#     joblib.dump(mean_shape, "model_mp/mean_shape.pkl")

#     # 2. Tạo Dataset & DataLoader
#     train_dataset = FERDataset(TRAIN_CSV, pca, scaler, mean_shape, is_train=True)
#     test_dataset  = FERDataset(TEST_CSV, pca, scaler, mean_shape, is_train=False)
    
#     train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=0)
#     test_loader  = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=0)
    
#     # 3. Tính Class Weights
#     class_weights = compute_class_weight('balanced', classes=np.unique(y_train_raw), y=y_train_raw)
#     weights_tensor = torch.tensor(class_weights, dtype=torch.float32).to(DEVICE)
#     print(f" - Class Weights: {class_weights}")

#     # 4. Khởi tạo Model
#     shape_dim = X_pca.shape[1]
#     model = HybridModel(shape_input_dim=shape_dim).to(DEVICE)
    
#     criterion = nn.CrossEntropyLoss(weight=weights_tensor)
#     optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
#     scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)

#     print("\n>>> BẮT ĐẦU TRAINING (PyTorch)...")
#     best_acc = 0.0
    
#     for epoch in range(EPOCHS):
#         model.train()
#         running_loss = 0.0
#         correct = 0
#         total = 0
        
#         start_time = time.time()
        
#         for i, (imgs, shapes, labels) in enumerate(train_loader):
#             imgs, shapes, labels = imgs.to(DEVICE), shapes.to(DEVICE), labels.to(DEVICE)
            
#             optimizer.zero_grad()
#             outputs = model(imgs, shapes)
#             loss = criterion(outputs, labels)
#             loss.backward()
#             optimizer.step()
            
#             running_loss += loss.item()
#             _, predicted = torch.max(outputs.data, 1)
#             total += labels.size(0)
#             correct += (predicted == labels).sum().item()
            
#         train_acc = 100 * correct / total
#         avg_loss = running_loss / len(train_loader)
        
#         # Validation
#         val_acc, val_loss = evaluate_model(model, test_loader, criterion)
#         scheduler.step(val_loss)
        
#         end_time = time.time()
#         print(f"Epoch [{epoch+1}/{EPOCHS}] | Time: {end_time-start_time:.1f}s | "
#               f"Loss: {avg_loss:.4f} | Acc: {train_acc:.2f}% | "
#               f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2f}%")
        
#         if val_acc > best_acc:
#             best_acc = val_acc
#             torch.save(model.state_dict(), "model_mp/hybrid_model_pytorch.pth")
#             print("   -> Đã lưu model tốt nhất!")

In [69]:
def evaluate_model(model, loader, criterion=None):
    model.eval()
    correct = 0
    total = 0
    running_loss = 0.0
    with torch.no_grad():
        for imgs, shapes, labels in loader:
            imgs, shapes, labels = imgs.to(DEVICE), shapes.to(DEVICE), labels.to(DEVICE)
            outputs = model(imgs, shapes)
            if criterion:
                loss = criterion(outputs, labels)
                running_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    acc = 100 * correct / total
    loss = running_loss / len(loader) if criterion else 0
    return acc, loss

In [70]:
def evaluate_performance():
    if not os.path.exists("model_mp_main_tot_nhat/hybrid_model_pytorch.pth"):
        print("Chưa có model PyTorch!")
        return

    # Load resources
    pca = joblib.load("model_mp_main_tot_nhat/shape_pca.pkl")
    scaler = joblib.load("model_mp_main_tot_nhat/shape_scaler.pkl")
    mean_shape = joblib.load("model_mp_main_tot_nhat/mean_shape.pkl")
    
    # Load Model
    # Cần biết shape dim. Hack: lấy từ PCA
    shape_dim = pca.n_components_
    model = HybridModel(shape_input_dim=shape_dim).to(DEVICE)
    model.load_state_dict(torch.load("model_mp_main_tot_nhat/hybrid_model_pytorch.pth", map_location=DEVICE))
    model.eval()

    test_dataset = FERDataset(TEST_CSV, pca, scaler, mean_shape, is_train=False)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
    
    all_preds = []
    all_labels = []
    
    print(">>> Đang đánh giá...")
    with torch.no_grad():
        for imgs, shapes, labels in test_loader:
            imgs, shapes = imgs.to(DEVICE), shapes.to(DEVICE)
            outputs = model(imgs, shapes)
            _, predicted = torch.max(outputs, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.numpy())
            
    print(classification_report(all_labels, all_preds, target_names=EMOTION_LABELS))
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8,6))
    sns.heatmap(cm, annot=True, fmt='d', xticklabels=EMOTION_LABELS, yticklabels=EMOTION_LABELS)
    plt.show()


## ĐIỂM ẢNH


In [71]:
def run_webcam():
    # 1. Load các tài nguyên (Model, PCA, Scaler)
    if not os.path.exists("model_mp_main_tot_nhat/hybrid_model_pytorch.pth"): 
        print("LỖI: Chưa có file model PyTorch.")
        return
    
    pca = joblib.load("model_mp_main_tot_nhat/shape_pca.pkl")
    scaler = joblib.load("model_mp_main_tot_nhat/shape_scaler.pkl")
    mean_shape = joblib.load("model_mp_main_tot_nhat/mean_shape.pkl")
    shape_dim = pca.n_components_
    
    # Load Model lên GPU/CPU
    model = HybridModel(shape_input_dim=shape_dim).to(DEVICE)
    model.load_state_dict(torch.load("model_mp_main_tot_nhat/hybrid_model_pytorch.pth", map_location=DEVICE))
    model.eval()
    
    print("\n>>> ĐANG CHẠY WEBCAM (Nhấn ESC để thoát)...")
    
    # 2. Khởi tạo MediaPipe
    mp_face_mesh = mp.solutions.face_mesh
    cap = cv2.VideoCapture(0)
    
    with mp_face_mesh.FaceMesh(
        max_num_faces=1, 
        refine_landmarks=True,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5
    ) as face_mesh:
        
        while True:
            ret, frame = cap.read()
            if not ret: break
            h, w, _ = frame.shape
            
            # MediaPipe cần ảnh RGB
            rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            res = face_mesh.process(rgb)
            
            if res.multi_face_landmarks:
                # Lấy khuôn mặt đầu tiên
                face_landmarks = res.multi_face_landmarks[0]
                
                lm_list = []
                x_list = []
                y_list = []

                # --- VÒNG LẶP XỬ LÝ ĐIỂM ---
                for i, lm in enumerate(face_landmarks.landmark):
                    # Chỉ lấy 468 điểm chính (bỏ qua phần mống mắt nếu có)
                    if i >= 468: break
                    
                    # 1. Lưu tọa độ để dự đoán
                    lm_list.extend([lm.x, lm.y])
                    
                    # 2. Tính tọa độ pixel để vẽ
                    cx, cy = int(lm.x * w), int(lm.y * h)
                    x_list.append(cx)
                    y_list.append(cy)
                    
                    # 3. VẼ ĐIỂM (LANDMARK)
                    # Vẽ chấm nhỏ màu vàng (BGR: 0, 255, 255), kích thước 1
                    cv2.circle(frame, (cx, cy), 1, (0, 255, 255), -1)

                # --- TÍNH TOÁN DỰ ĐOÁN ---
                lm_raw = np.array(lm_list, dtype=np.float32)
                
                # A. Feature Shape
                lm_cs = center_scale_landmarks(lm_raw)
                shape_vec = scaler.transform(pca.transform(lm_cs)).flatten()
                shape_tensor = torch.tensor(shape_vec, dtype=torch.float32).unsqueeze(0).to(DEVICE)
                
                # B. Feature Image (Warp Face)
                # Lưu ý: lm_raw/48.0 vì hàm warp của mình mong đợi input normalized [0-1] nhưng scale theo size 48
                # (Logic này phải khớp với lúc train, ở đây ta giả lập lại logic đó)
                # Tuy nhiên, cách chuẩn nhất là dùng lm_list (0-1) trực tiếp. 
                # Sửa lại đoạn này cho khớp logic warp_face_mp:
                lm_norm_warp = np.array(lm_list, dtype=np.float32) # Đây là 0.0 - 1.0
                warp = warp_face_mp(frame, lm_norm_warp, mean_shape, IMG_SIZE) 
                
                img_tensor = torch.tensor(warp, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(DEVICE)
                
                # C. Dự đoán
                with torch.no_grad():
                    out = model(img_tensor, shape_tensor)
                    prob = F.softmax(out, dim=1)
                    val, idx = torch.max(prob, 1)
                    
                emotion = EMOTION_LABELS[idx.item()]
                score = val.item()
                
                # --- VẼ GIAO DIỆN (UI) ---
                # Vẽ khung chữ nhật quanh mặt
                x_min, x_max = min(x_list), max(x_list)
                y_min, y_max = min(y_list), max(y_list)
                
                # Chọn màu theo cảm xúc (Vui/Bth: Xanh, Khác: Đỏ)
                color = (0, 255, 0) if emotion in ['happy', 'neutral'] else (0, 0, 255)
                
                cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2)
                
                # Vẽ nhãn cảm xúc + thanh phần trăm
                text = f"{emotion.upper()}"
                cv2.putText(frame, text, (x_min, y_min - 10), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

            cv2.imshow("PyTorch FER - MediaPipe", frame)
            if cv2.waitKey(1) == 27: break # ESC để thoát
            
    cap.release()
    cv2.destroyAllWindows()

## LƯỚI TAM GIÁC

In [72]:
# def run_webcam():
#     # 1. Load các tài nguyên
#     if not os.path.exists("model_mp_main_tot_nhat/hybrid_model_pytorch.pth"): 
#         print("LỖI: Chưa có file model PyTorch.")
#         return
    
#     pca = joblib.load("model_mp_main_tot_nhat/shape_pca.pkl")
#     scaler = joblib.load("model_mp_main_tot_nhat/shape_scaler.pkl")
#     mean_shape = joblib.load("model_mp_main_tot_nhat/mean_shape.pkl")
#     shape_dim = pca.n_components_
    
#     # Load Model
#     model = HybridModel(shape_input_dim=shape_dim).to(DEVICE)
#     model.load_state_dict(torch.load("model_mp_main_tot_nhat/hybrid_model_pytorch.pth", map_location=DEVICE))
#     model.eval()
    
#     print("\n>>> ĐANG CHẠY WEBCAM (Nhấn ESC để thoát)...")
    
#     # --- CẤU HÌNH VẼ MEDIAPIPE ---
#     mp_face_mesh = mp.solutions.face_mesh
#     mp_drawing = mp.solutions.drawing_utils
#     mp_drawing_styles = mp.solutions.drawing_styles
    
#     cap = cv2.VideoCapture(0)
    
#     with mp_face_mesh.FaceMesh(
#         max_num_faces=1, 
#         refine_landmarks=True,
#         min_detection_confidence=0.5,
#         min_tracking_confidence=0.5
#     ) as face_mesh:
        
#         while True:
#             ret, frame = cap.read()
#             if not ret: break
#             h, w, _ = frame.shape
            
#             rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
#             res = face_mesh.process(rgb)
            
#             if res.multi_face_landmarks:
#                 face_landmarks = res.multi_face_landmarks[0]
                
#                 # --- A. THU THẬP DỮ LIỆU ĐỂ DỰ ĐOÁN (Giữ nguyên logic cũ) ---
#                 lm_list = []
#                 x_list = []
#                 y_list = []

#                 for i, lm in enumerate(face_landmarks.landmark):
#                     if i >= 468: break
#                     lm_list.extend([lm.x, lm.y])
                    
#                     # Lưu tọa độ để vẽ khung chữ nhật bao quanh mặt
#                     cx, cy = int(lm.x * w), int(lm.y * h)
#                     x_list.append(cx)
#                     y_list.append(cy)
                    
#                     # KHÔNG VẼ cv2.circle Ở ĐÂY NỮA

#                 # --- B. VẼ LƯỚI TAM GIÁC (MESH) ---
#                 # Vẽ lưới tam giác (Tesselation)
#                 mp_drawing.draw_landmarks(
#                     image=frame,
#                     landmark_list=face_landmarks,
#                     connections=mp_face_mesh.FACEMESH_TESSELATION,
#                     landmark_drawing_spec=None,
#                     connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_tesselation_style()
#                 )
                
#                 # Vẽ đường bao (Mắt, môi, khuôn mặt) cho rõ nét hơn
#                 mp_drawing.draw_landmarks(
#                     image=frame,
#                     landmark_list=face_landmarks,
#                     connections=mp_face_mesh.FACEMESH_CONTOURS,
#                     landmark_drawing_spec=None,
#                     connection_drawing_spec=mp_drawing_styles.get_default_face_mesh_contours_style()
#                 )

#                 # --- C. TÍNH TOÁN DỰ ĐOÁN (MODEL AI) ---
#                 lm_raw = np.array(lm_list, dtype=np.float32)
                
#                 # Feature Shape
#                 lm_cs = center_scale_landmarks(lm_raw)
#                 shape_vec = scaler.transform(pca.transform(lm_cs)).flatten()
#                 shape_tensor = torch.tensor(shape_vec, dtype=torch.float32).unsqueeze(0).to(DEVICE)
                
#                 # Feature Image (Warp Face)
#                 lm_norm_warp = np.array(lm_list, dtype=np.float32)
#                 warp = warp_face_mp(frame, lm_norm_warp, mean_shape, IMG_SIZE) 
#                 img_tensor = torch.tensor(warp, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(DEVICE)
                
#                 # Inference
#                 with torch.no_grad():
#                     out = model(img_tensor, shape_tensor)
#                     prob = F.softmax(out, dim=1)
#                     val, idx = torch.max(prob, 1)
                    
#                 emotion = EMOTION_LABELS[idx.item()]
#                 score = val.item()
                
#                 # --- D. VẼ UI KẾT QUẢ ---
#                 x_min, x_max = min(x_list), max(x_list)
#                 y_min, y_max = min(y_list), max(y_list)
                
#                 color = (0, 255, 0) if emotion in ['happy', 'neutral'] else (0, 0, 255)
                
#                 # Vẽ khung bao quanh mặt
#                 cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), color, 2)
                
#                 # Vẽ nhãn nền đen chữ màu
#                 cv2.rectangle(frame, (x_min, y_min - 40), (x_max, y_min), color, -1)
#                 text = f"{emotion.upper()}"
#                 cv2.putText(frame, text, (x_min + 5, y_min - 10), 
#                             cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

#             cv2.imshow("PyTorch FER - Face Mesh", frame)
#             if cv2.waitKey(1) == 27: break # ESC
            
#     cap.release()
#     cv2.destroyAllWindows()

In [73]:
def predict_single_image(image_path):
    """
    Hàm nhận vào đường dẫn ảnh, dự đoán cảm xúc và hiển thị kết quả.
    """
    # 1. KIỂM TRA TÀI NGUYÊN
    required_files = [
        "model_mp_main_tot_nhat/hybrid_model_pytorch.pth",
        "model_mp_main_tot_nhat/shape_pca.pkl",
        "model_mp_main_tot_nhat/shape_scaler.pkl",
        "model_mp_main_tot_nhat/mean_shape.pkl"
    ]
    
    for f in required_files:
        if not os.path.exists(f):
            print(f"LỖI: Không tìm thấy file {f}. Hãy train model trước!")
            return

    # 2. LOAD TÀI NGUYÊN
    print(">>> Đang load model và tài nguyên...")
    pca = joblib.load("model_mp_main_tot_nhat/shape_pca.pkl")
    scaler = joblib.load("model_mp_main_tot_nhat/shape_scaler.pkl")
    mean_shape = joblib.load("model_mp_main_tot_nhat/mean_shape.pkl")
    
    # Load Model Structure & Weights
    shape_dim = pca.n_components_
    model = HybridModel(shape_input_dim=shape_dim).to(DEVICE)
    model.load_state_dict(torch.load("model_mp_main_tot_nhat/hybrid_model_pytorch.pth", map_location=DEVICE))
    model.eval()

    # 3. ĐỌC ẢNH & MEDIAPIPE
    img = cv2.imread(image_path)
    if img is None:
        print("LỖI: Không đọc được ảnh. Kiểm tra lại đường dẫn.")
        return

    # Resize ảnh nếu quá lớn để xử lý nhanh hơn
    if img.shape[1] > 1000:
        scale_percent = 50 
        width = int(img.shape[1] * scale_percent / 100)
        height = int(img.shape[0] * scale_percent / 100)
        img = cv2.resize(img, (width, height))

    mp_face_mesh = mp.solutions.face_mesh
    
    with mp_face_mesh.FaceMesh(
        static_image_mode=True, # Chế độ ảnh tĩnh (chính xác hơn webcam)
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5
    ) as face_mesh:
        
        rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(rgb_img)
        
        if not results.multi_face_landmarks:
            print("KHÔNG TÌM THẤY KHUÔN MẶT TRONG ẢNH!")
            cv2.imshow("Result", img)
            cv2.waitKey(0)
            return

        # Lấy khuôn mặt đầu tiên
        face_landmarks = results.multi_face_landmarks[0]
        h, w, _ = img.shape
        
        # Trích xuất 468 điểm
        lm_list = []      # Dùng cho tính toán (0.0 - 1.0)
        draw_points = []  # Dùng để vẽ (pixel)
        
        for i, lm in enumerate(face_landmarks.landmark):
            if i >= 468: break
            lm_list.extend([lm.x, lm.y])
            draw_points.append((int(lm.x * w), int(lm.y * h)))

        # 4. TIỀN XỬ LÝ (PREPROCESSING)
        lm_raw = np.array(lm_list, dtype=np.float32)

        # A. Shape Feature
        lm_cs = center_scale_landmarks(lm_raw)
        shape_vec = scaler.transform(pca.transform(lm_cs)).flatten()
        shape_tensor = torch.tensor(shape_vec, dtype=torch.float32).unsqueeze(0).to(DEVICE)

        # B. Image Feature (Warp)
        # Lưu ý: warp_face_mp cần input là (0-1) nếu mean_shape đã chuẩn hóa
        warp = warp_face_mp(img, lm_raw, mean_shape, IMG_SIZE)
        img_tensor = torch.tensor(warp, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(DEVICE)

        # 5. DỰ ĐOÁN (INFERENCE)
        with torch.no_grad():
            outputs = model(img_tensor, shape_tensor)
            probs = torch.nn.functional.softmax(outputs, dim=1)
            score, predicted_idx = torch.max(probs, 1)
            
        emotion = EMOTION_LABELS[predicted_idx.item()]
        confidence = score.item() * 100

        # 6. HIỂN THỊ KẾT QUẢ
        print(f"\n>>> KẾT QUẢ: {emotion.upper()} ({confidence:.2f}%)")
        print("    (Nhấn phím bất kỳ trên cửa sổ ảnh để đóng)")

        # Vẽ Landmarks
        for (cx, cy) in draw_points:
            cv2.circle(img, (cx, cy), 1, (0, 255, 255), -1)

        # Vẽ Text & Hộp thông tin
        cv2.rectangle(img, (0, 0), (300, 60), (0, 0, 0), -1)
        cv2.putText(img, f"Pred: {emotion.upper()}", (10, 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
        cv2.putText(img, f"Conf: {confidence:.2f}%", (10, 55), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (200, 200, 200), 1)

        cv2.imshow("Prediction Result", img)
        cv2.waitKey(0)
        cv2.destroyAllWindows()

In [None]:
if __name__ == "__main__":
    while True:
        print("\n--- PYTORCH FER HYBRID ---")
        print("1. Train Model")
        print("2. Evaluate on Test Set")
        print("3. Predict Single Image (File)")  # <--- Mới
        print("4. Webcam Demo")
        
        choice = input("Chọn chức năng: ")
        
        if choice == '1': 
            train_pipeline()
        elif choice == '2': 
            evaluate_performance()
        elif choice == '3':
            path = input("Nhập đường dẫn ảnh (VD: test.jpg): ")
            # Xóa dấu ngoặc kép nếu người dùng copy path có dấu ""
            path = path.strip('"') 
            predict_single_image(path)
        elif choice == '4': 
            run_webcam()
        else: 
            break


--- PYTORCH FER HYBRID ---
1. Train Model
2. Evaluate on Test Set
3. Predict Single Image (File)
4. Webcam Demo


  model.load_state_dict(torch.load("model_mp_main_tot_nhat/hybrid_model_pytorch.pth", map_location=DEVICE))



>>> ĐANG CHẠY WEBCAM (Nhấn ESC để thoát)...

--- PYTORCH FER HYBRID ---
1. Train Model
2. Evaluate on Test Set
3. Predict Single Image (File)
4. Webcam Demo


  model.load_state_dict(torch.load("model_mp_main_tot_nhat/hybrid_model_pytorch.pth", map_location=DEVICE))



>>> ĐANG CHẠY WEBCAM (Nhấn ESC để thoát)...
