In [None]:
import os
import cv2
import shutil
import numpy as np
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from sklearn.metrics import classification_report, confusion_matrix
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.optimizers import Adam

TARGET_DATASET = 'CK+'

CONFIG = {
    'CK+': {
        'path': 'CK+48',           # ƒê∆∞·ªùng d·∫´n folder
        'img_size': 48,
        'batch_size': 32,
        'epochs': 20,
        'model_type': 'standard',  # D√πng model chu·∫©n
        'test_split': 0.2          # T·ª± chia train/test
    },
    'JAFFE': {
        'path': 'jaffe',
        'img_size': 48,
        'batch_size': 8,           # Batch nh·ªè cho data √≠t
        'epochs': 60,              # Train l√¢u h∆°n
        'model_type': 'compact',   # D√πng model nh·ªè g·ªçn
        'test_split': 0.2
    },
    'FER2013': {
        'path': 'fer2013',
        'img_size': 48,
        'batch_size': 64,          # Batch l·ªõn cho data nhi·ªÅu
        'epochs': 30,
        'model_type': 'standard',
        'has_subfolders': True     # ƒê√£ c√≥ s·∫µn folder train/test ri√™ng
    }
}

PCA_COMPONENTS = 0.95  # Gi·ªØ l·∫°i 95% th√¥ng tin

def prepare_jaffe_sorting(base_path):
    """H√†m ri√™ng ƒë·ªÉ s·∫Øp x·∫øp file JAFFE n·∫øu ch∆∞a s·∫Øp x·∫øp"""
    sorted_dir = base_path + '_sorted'
    if os.path.exists(sorted_dir): return sorted_dir

    print(f"‚ö° [JAFFE] ƒêang s·∫Øp x·∫øp d·ªØ li·ªáu t·ª´ {base_path}...")
    emotion_mapping = {'AN': 'angry', 'DI': 'disgust', 'FE': 'fear',
                       'HA': 'happy', 'NE': 'neutral', 'SA': 'sad', 'SU': 'surprise'}

    if not os.path.exists(sorted_dir): os.makedirs(sorted_dir)

    search_path = os.path.join(base_path, 'jaffe')
    if not os.path.exists(search_path): search_path = base_path

    count = 0
    for filename in os.listdir(search_path):
        if filename.endswith('.tiff') or filename.endswith('.tif'):
            try:
                code = filename.split('.')[1][:2]
                if code in emotion_mapping:
                    target = os.path.join(sorted_dir, emotion_mapping[code])
                    if not os.path.exists(target): os.makedirs(target)
                    shutil.copy(os.path.join(search_path, filename), os.path.join(target, filename))
                    count += 1
            except: pass
    print(f"‚úÖ ƒê√£ s·∫Øp x·∫øp {count} ·∫£nh v√†o {sorted_dir}")
    return sorted_dir

def load_images_from_folder(path, img_size):
    """H√†m ƒë·ªçc ·∫£nh ƒë·ªá quy t·ª´ folder"""
    print(f"[INFO] ƒêang qu√©t folder: {path}")
    data, labels = [], []
    if not os.path.exists(path):
        print(f"‚ùå Kh√¥ng t√¨m th·∫•y ƒë∆∞·ªùng d·∫´n: {path}")
        return np.array([]), np.array([]), []

    classes = sorted([d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))])
    print(f"   -> Classes: {classes}")

    for idx, class_name in enumerate(classes):
        class_path = os.path.join(path, class_name)
        files = os.listdir(class_path)
        for f in files:
            try:
                img = cv2.imread(os.path.join(class_path, f), cv2.IMREAD_GRAYSCALE)
                if img is not None:
                    img = cv2.resize(img, (img_size, img_size))
                    data.append(img)
                    labels.append(idx)
            except: pass

    data = np.array(data, dtype="float32") / 255.0
    data = np.expand_dims(data, axis=-1)
    labels = to_categorical(np.array(labels), num_classes=len(classes))
    return data, labels, classes

def get_dataset(name):
    """H√†m ƒëi·ªÅu ph·ªëi vi·ªác t·∫£i d·ªØ li·ªáu d·ª±a tr√™n t√™n dataset"""
    cfg = CONFIG[name]
    path = cfg['path']

    # 1. X·ª≠ l√Ω ri√™ng cho JAFFE (c·∫ßn sort tr∆∞·ªõc)
    if name == 'JAFFE':
        path = prepare_jaffe_sorting(path)
        X, y, classes = load_images_from_folder(path, cfg['img_size'])
        # Chia d·ªØ li·ªáu th·ªß c√¥ng v√¨ JAFFE kh√¥ng c√≥ folder train/test
        X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, stratify=y, random_state=42)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42)
        return (X_train, y_train), (X_val, y_val), (X_test, y_test), classes

    # 2. X·ª≠ l√Ω ri√™ng cho FER2013 (ƒë√£ c√≥ folder train/test)
    elif name == 'FER2013':
        train_path = os.path.join(path, 'train')
        test_path = os.path.join(path, 'test')

        print("‚è≥ ƒêang t·∫£i t·∫≠p TRAIN...")
        X_train_full, y_train_full, classes = load_images_from_folder(train_path, cfg['img_size'])
        print("‚è≥ ƒêang t·∫£i t·∫≠p TEST...")
        X_test, y_test, _ = load_images_from_folder(test_path, cfg['img_size'])

        # T√°ch Validation t·ª´ Train (90-10)
        X_train, X_val, y_train, y_val = train_test_split(X_train_full, y_train_full, test_size=0.1, random_state=42)
        return (X_train, y_train), (X_val, y_val), (X_test, y_test), classes

    # 3. X·ª≠ l√Ω cho CK+ (G·ªôp chung r·ªìi chia)
    else:
        X, y, classes = load_images_from_folder(path, cfg['img_size'])
        X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=cfg['test_split'], random_state=42)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
        return (X_train, y_train), (X_val, y_val), (X_test, y_test), classes


def build_model(input_shape, num_classes, model_type='standard'):
    model = Sequential(name=f"CNN_{model_type}")

    if model_type == 'compact': # D√†nh cho JAFFE
        model.add(Conv2D(16, (3,3), activation='relu', padding='same', input_shape=input_shape))
        model.add(MaxPooling2D((2,2)))
        model.add(Conv2D(32, (3,3), activation='relu', padding='same'))
        model.add(MaxPooling2D((2,2)))
        model.add(Conv2D(64, (3,3), activation='relu', padding='same'))
        model.add(MaxPooling2D((2,2)))
        model.add(Flatten())
        model.add(Dense(64, activation='relu'))
        model.add(Dropout(0.5))
        lr = 0.0005
    else: # D√†nh cho CK+, FER2013 (Standard)
        model.add(Conv2D(32, (3,3), activation='relu', padding='same', input_shape=input_shape))
        model.add(MaxPooling2D((2,2)))
        model.add(Conv2D(64, (3,3), activation='relu', padding='same'))
        model.add(MaxPooling2D((2,2)))
        model.add(Conv2D(128, (3,3), activation='relu', padding='same'))
        model.add(MaxPooling2D((2,2)))
        model.add(Flatten())
        model.add(Dense(128, activation='relu'))
        model.add(Dropout(0.5))
        lr = 0.001

    model.add(Dense(num_classes, activation='softmax'))
    model.compile(optimizer=Adam(lr), loss='categorical_crossentropy', metrics=['accuracy'])
    return model

def plot_history_comparison(hist_A, hist_B, dataset_name):
    plt.figure(figsize=(12, 5))

    # Plot Accuracy
    plt.subplot(1, 2, 1)
    plt.plot(hist_A.history['val_accuracy'], 'r-o', label='Lu·ªìng A (G·ªëc)')
    plt.plot(hist_B.history['val_accuracy'], 'b-s', label='Lu·ªìng B (PCA)')
    plt.title(f'So s√°nh Accuracy ({dataset_name})')
    plt.xlabel('Epoch'); plt.ylabel('Validation Accuracy')
    plt.legend(); plt.grid(True)

    # Plot Loss
    plt.subplot(1, 2, 2)
    plt.plot(hist_A.history['val_loss'], 'r--', label='Lu·ªìng A Loss')
    plt.plot(hist_B.history['val_loss'], 'b--', label='Lu·ªìng B Loss')
    plt.title(f'So s√°nh Loss ({dataset_name})')
    plt.xlabel('Epoch'); plt.ylabel('Validation Loss')
    plt.legend(); plt.grid(True)

    plt.tight_layout()
    plt.show()

if __name__ == "__main__":
    print(f"üî∞ ƒêANG KH·ªûI CH·∫†Y CH·∫æ ƒê·ªò: {TARGET_DATASET}")
    cfg = CONFIG[TARGET_DATASET]

    # 1. Load d·ªØ li·ªáu
    (X_train, y_train), (X_val, y_val), (X_test, y_test), class_names = get_dataset(TARGET_DATASET)

    if len(X_train) > 0:
        print(f"\nüìä D·ªØ li·ªáu s·∫µn s√†ng: Train={len(X_train)}, Val={len(X_val)}, Test={len(X_test)}")

        # ---------------------------------------------------------
        # üü¢ LU·ªíNG A: ·∫¢NH G·ªêC
        # ---------------------------------------------------------
        print(f"\nüöÄ [LU·ªíNG A] Train CNN tr√™n ·∫£nh g·ªëc ({cfg['epochs']} epochs)...")
        model_A = build_model((48,48,1), len(class_names), cfg['model_type'])
        hist_A = model_A.fit(X_train, y_train, epochs=cfg['epochs'], batch_size=cfg['batch_size'],
                             validation_data=(X_val, y_val), verbose=1)

        # ---------------------------------------------------------
        # üîµ LU·ªíNG B: PCA + CNN
        # ---------------------------------------------------------
        print(f"\nüöÄ [LU·ªíNG B] √Åp d·ª•ng PCA v√† Train...")

        # PCA Process
        N, H, W, C = X_train.shape
        pca = PCA(n_components=PCA_COMPONENTS)

        print("   -> ƒêang fit PCA...")
        X_train_flat = X_train.reshape(N, -1)
        pca.fit(X_train_flat)

        print("   -> ƒêang transform d·ªØ li·ªáu...")
        # H√†m helper ƒë·ªÉ n√©n v√† t√°i t·∫°o
        def pca_process(X_in, pca_model):
            shape = X_in.shape
            flat = X_in.reshape(shape[0], -1)
            compressed = pca_model.transform(flat)
            reconstructed = pca_model.inverse_transform(compressed)
            return reconstructed.reshape(shape[0], 48, 48, 1)

        X_train_pca = pca_process(X_train, pca)
        X_val_pca = pca_process(X_val, pca)
        X_test_pca = pca_process(X_test, pca)

        model_B = build_model((48,48,1), len(class_names), cfg['model_type'])
        hist_B = model_B.fit(X_train_pca, y_train, epochs=cfg['epochs'], batch_size=cfg['batch_size'],
                             validation_data=(X_val_pca, y_val), verbose=1)

        # ---------------------------------------------------------
        # üèÜ ƒê√ÅNH GI√Å V√Ä L∆ØU
        # ---------------------------------------------------------
        print("\nüîé K·∫æT QU·∫¢ SO S√ÅNH TR√äN T·∫¨P TEST:")
        acc_A = model_A.evaluate(X_test, y_test, verbose=0)[1]
        acc_B = model_B.evaluate(X_test_pca, y_test, verbose=0)[1]

        print(f"‚úÖ Accuracy Lu·ªìng A (G·ªëc): {acc_A:.2%}")
        print(f"‚úÖ Accuracy Lu·ªìng B (PCA): {acc_B:.2%}")

        # L∆∞u model v√† PCA
        model_A.save(f'model_A_{TARGET_DATASET}.h5')
        model_B.save(f'model_B_{TARGET_DATASET}.h5')
        with open(f'pca_{TARGET_DATASET}.pkl', 'wb') as f: pickle.dump(pca, f)

        # V·∫Ω bi·ªÉu ƒë·ªì
        plot_history_comparison(hist_A, hist_B, TARGET_DATASET)

    else:
        print("‚ùå L·ªói: Kh√¥ng t·∫£i ƒë∆∞·ª£c d·ªØ li·ªáu.")