In [None]:
import os
import random
from glob import glob
from collections import Counter
import numpy as np
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms
from sklearn.metrics import confusion_matrix, accuracy_score
from torch import amp
from torch.optim.lr_scheduler import LinearLR, CosineAnnealingLR, SequentialLR
from tqdm import tqdm

In [None]:
import os
import pandas as pd
import numpy as np
from glob import glob
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms
from sklearn.model_selection import train_test_split
from collections import Counter
from tqdm import tqdm

# ===== CHUẨN HOÁ NHÃN (mapping cho AffectNet hoặc FER+) =====
EMOTION_MAP = {
    "anger": "angry",
    "disgust": "angry",      # gộp disgust vào angry
    "contempt": "angry",     # gộp contempt vào angry
    "fear": "fear",
    "happiness": "happy",
    "sadness": "sad",
    "surprise": "surprise",
    "neutral": "neutral"
}

# ===== CÁC CLASS HỢP LỆ (6-class gộp phù hợp nhất) =====
VALID_CLASSES = ["angry", "fear", "happy", "neutral", "sad", "surprise"]

# ===== LÀM SẠCH NHÃN =====
def clean_labels(df):
    """
    Chuẩn hóa & gộp nhãn cho FER+ và AffectNet.
    - lowercase, bỏ khoảng trắng, sửa chính tả
    - gộp disgust & contempt vào angry
    """
    df['label'] = df['label'].str.lower().str.strip()
    df['label'] = df['label'].replace({
        'suprise': 'surprise',    # fix chính tả
        'happiness': 'happy',
        'sadness': 'sad',
        'anger': 'angry',
        'disgust': 'angry',
        'contempt': 'angry'
    })
    df = df[df['label'].isin(VALID_CLASSES)]
    return df.reset_index(drop=True)


# ===== LOAD DATASET THỐNG NHẤT =====
def load_dataset_unified(base_dir, exts=("jpg", "jpeg", "png"), validate=True):
    """
    Hỗ trợ 2 cấu trúc:
    - base_dir/train/label/*.jpg
    - base_dir/test/label/*.jpg
    - base_dir/label/*.jpg
    """
    rows = []
    has_subset = any(name in os.listdir(base_dir) for name in ["train", "test"])
    
    if has_subset:
        for subset in ["train", "test"]:
            subset_dir = os.path.join(base_dir, subset)
            if not os.path.isdir(subset_dir):
                continue
            for label in os.listdir(subset_dir):
                label_dir = os.path.join(subset_dir, label)
                if not os.path.isdir(label_dir):
                    continue
                for ext in exts:
                    for path in glob(os.path.join(label_dir, f"*.{ext}")):
                        rows.append({"image_path": path, "label": label, "subset": subset})
    else:
        for label in os.listdir(base_dir):
            label_dir = os.path.join(base_dir, label)
            if not os.path.isdir(label_dir):
                continue
            for ext in exts:
                for path in glob(os.path.join(label_dir, f"*.{ext}")):
                    rows.append({"image_path": path, "label": label})
    
    df = pd.DataFrame(rows)
    
    return df

# ===== MERGE MULTIPLE DATASETS =====
def merge_datasets(dataset_configs, test_size=0.15, random_state=42, use_oversample=False):
    """
    Hợp nhất nhiều dataset (FER+, AffectNet, ...) và chia train/val.

    Args:
        dataset_configs (list): danh sách dict cấu hình từng dataset
        test_size (float): tỉ lệ validation
        random_state (int): seed cố định để tái lập
        use_oversample (bool): 
            True  -> tạo dữ liệu ảo cân bằng giữa các lớp (oversampling)
            False -> giữ nguyên phân bố, dùng sample weight khi train
    """
    all_data = []
    
    # --- Load và làm sạch từng dataset ---
    for config in dataset_configs:
        print(f"\nLoading {config['path']}...")
        df = load_dataset_unified(config["path"], validate=config.get("validate", True))
        if df.empty:
            print(f"No data found in {config['path']}")
            continue

        # Chuẩn hóa & mapping nhãn
        df['label'] = df['label'].str.lower().str.strip()
        if "label_map" in config:
            df['label'] = df['label'].map(config["label_map"]).fillna(df['label'])
        df = clean_labels(df)

        # Giữ lại subset train nếu có
        if "subset" in df.columns and config.get("has_subset", False):
            df = df[df["subset"] == "train"].drop("subset", axis=1)
        
        print(f"Loaded {len(df)} clean samples from {config['path']}")
        all_data.append(df)
    
    # --- Merge toàn bộ ---
    df_merged = pd.concat(all_data, ignore_index=True)
    df_merged = df_merged.sample(frac=1, random_state=random_state).reset_index(drop=True)

    print("\nClass distribution (before balancing):")
    print(df_merged['label'].value_counts().sort_index())

    # --- Chia train/val ---
    train_df, val_df = train_test_split(
        df_merged,
        test_size=test_size,
        stratify=df_merged['label'],
        random_state=random_state
    )

    # --- Nếu không oversample ---
    if not use_oversample:
        train_df = train_df.copy()
        train_df["is_augmented"] = False
        print("\nOversampling disabled (using sample weights instead).")
        print(f"Train size: {len(train_df)} | Val size: {len(val_df)}")
        print("\nTrain class distribution:")
        print(train_df['label'].value_counts().sort_index())
        return train_df.reset_index(drop=True), val_df.reset_index(drop=True)

    # --- Nếu bật oversampling ---
    print("\nPerforming oversampling to balance classes...")
    max_count = train_df['label'].value_counts().max()
    print(f"Largest class has {max_count} samples")

    balanced_parts = []
    for label, group in train_df.groupby('label'):
        if len(group) < max_count:
            repeat_factor = int(np.ceil(max_count / len(group)))
            augmented_group = pd.concat([group] * repeat_factor, ignore_index=True)
            augmented_group = augmented_group.sample(max_count, random_state=random_state)
            augmented_group["is_augmented"] = False
            augmented_group.iloc[:len(group) * (repeat_factor - 1),
                                 augmented_group.columns.get_loc("is_augmented")] = True
            balanced_parts.append(augmented_group)
        else:
            group["is_augmented"] = False
            balanced_parts.append(group)

    train_balanced = pd.concat(balanced_parts, ignore_index=True)
    train_balanced = train_balanced.sample(frac=1, random_state=random_state).reset_index(drop=True)

    print("\nTrain class distribution (after oversampling):")
    print(train_balanced['label'].value_counts().sort_index())
    print(f"\nFinal Train size: {len(train_balanced)} | Val size: {len(val_df)}")

    return train_balanced.reset_index(drop=True), val_df.reset_index(drop=True)

# ===== TRANSFORMS =====
from torchvision import transforms

def get_transforms(img_size=112, use_pretrained=True):
    """
    Chuẩn hóa transform cho FERPlus + AffectNet.
    - use_pretrained=True: dành cho backbone pretrained (RGB, ImageNet mean/std)
    - use_pretrained=False: dành cho model custom grayscale
    """
    if use_pretrained:
        # --- Dành cho pretrained ResNet (RGB, ImageNet mean/std) ---
        train_t = transforms.Compose([
            transforms.Grayscale(num_output_channels=3),  # FERPlus → RGB
            transforms.Resize((img_size + 8, img_size + 8)),
            transforms.RandomResizedCrop(img_size, scale=(0.85, 1.0)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(10),
            transforms.ColorJitter(
                brightness=0.2, contrast=0.2, saturation=0.1, hue=0.02
            ),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225]),
            transforms.RandomErasing(
                p=0.25, scale=(0.02, 0.15), value="random"
            ),
        ])

        val_t = transforms.Compose([
            transforms.Grayscale(num_output_channels=3),
            transforms.Resize((img_size, img_size)),
            transforms.CenterCrop(img_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225]),
        ])

    else:
        # --- Cho model custom (1 kênh grayscale, không pretrained) ---
        train_t = transforms.Compose([
            transforms.Grayscale(num_output_channels=1),
            transforms.Resize((img_size + 8, img_size + 8)),
            transforms.RandomResizedCrop(img_size, scale=(0.85, 1.0)),
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomRotation(10),
            transforms.ColorJitter(brightness=0.2, contrast=0.2),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5]),
            transforms.RandomErasing(p=0.3, scale=(0.02, 0.15)),
        ])

        val_t = transforms.Compose([
            transforms.Grayscale(num_output_channels=1),
            transforms.Resize((img_size, img_size)),
            transforms.CenterCrop(img_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5]),
        ])

    return train_t, val_t



# ===== DATASET CLASS =====
class EmotionDataset(Dataset):
    def __init__(self, df, transform=None, preload=False):
        self.df = df.reset_index(drop=True)
        self.transform = transform
        self.preload = preload
        
        # Label encoding (consistent order theo VALID_CLASSES)
        labels = VALID_CLASSES
        self.label_to_idx = {lbl: i for i, lbl in enumerate(labels)}
        self.idx_to_label = {i: lbl for lbl, i in self.label_to_idx.items()}
        self.num_classes = len(labels)
        
        # Preload nếu cần
        self.images = None
        if preload:
            print("Preloading images to RAM...")
            self.images = [Image.open(p).convert('RGB') for p in tqdm(self.df['image_path'])]
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img = Image.open(row['image_path']).convert('RGB')
        
        transform = self.transform
    
        img = transform(img)
    
        label_str = row['label']
        label = self.label_to_idx[label_str]
        return img, torch.tensor(label, dtype=torch.long)

    
    def get_class_weights(self):
        labels = [self.label_to_idx[lbl] for lbl in self.df['label']]
        class_counts = Counter(labels)
        total = len(labels)
        weights = {cls: total / count for cls, count in class_counts.items()}
        return torch.tensor([weights[i] for i in range(self.num_classes)], dtype=torch.float)
    
    def get_sampler(self):
        labels = [self.label_to_idx[lbl] for lbl in self.df['label']]
        class_counts = Counter(labels)
        weights = [1.0 / class_counts[lbl] for lbl in labels]
        return WeightedRandomSampler(weights, len(weights), replacement=True)

# ===== MAIN =====
if __name__ == "__main__":
    configs = [
        {
            "path": "/kaggle/input/ferplus",
            "has_subset": True,
            "validate": True
        },
        {
            "path": "/kaggle/input/young-affectnet-hq",
            "has_subset": False,
            "label_map": EMOTION_MAP,
            "validate": True
        }
    ]
    
    # Load & merge
    train_df, val_df = merge_datasets(configs, test_size=0.10, random_state=42)

    
    # Dataset & transforms

    train_transform, val_transform = get_transforms(img_size=112, use_pretrained=True)
    
    train_dataset = EmotionDataset(train_df, transform=train_transform, preload=False)
    
    val_dataset = EmotionDataset(val_df, transform=val_transform, preload=False)
    
    train_loader = DataLoader(
        train_dataset,
        batch_size=512,
        sampler = None,
        shuffle = True,
        num_workers=4,
        pin_memory=True
    )
    val_loader = DataLoader(
        val_dataset,
        batch_size=512,
        shuffle=False,
        num_workers=4,
        pin_memory=True
    )
    
    
    
    print(f"\nTrain: {len(train_dataset)} | Val: {len(val_dataset)}")
    class_weights = train_dataset.get_class_weights()
    print(f"Class weights: {class_weights}")


In [None]:
# Kiểm tra mapping nhãn
print(train_df['label'].value_counts())

# Kiểm tra trùng ảnh giữa train/val
print(len(set(train_df['image_path']) & set(val_df['image_path'])))

# Kiểm tra mapping label->idx
print(train_dataset.label_to_idx)

# Kiểm tra shape ảnh đầu vào
imgs, _ = next(iter(train_loader))
print(imgs.shape)


In [None]:
import matplotlib.pyplot as plt

def plot_distribution(df, title):
    counts = df['label'].value_counts().sort_index()
    plt.figure(figsize=(8,4))
    counts.plot(kind='bar', color='skyblue', edgecolor='black')
    plt.title(title)
    plt.xlabel("Emotion label")
    plt.ylabel("Count")
    plt.grid(axis='y', linestyle='--', alpha=0.6)
    plt.show()

plot_distribution(train_df, "Train Set Class Distribution")
plot_distribution(val_df, "Validation Set Class Distribution")


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# ===================================
# OPTIMIZED SE BLOCK
# ===================================
class SEBlock(nn.Module):
    """
    Squeeze-and-Excitation Block tối ưu cho inference
    Sử dụng Linear thay vì Conv2d để giảm overhead
    """
    def __init__(self, channels, reduction=16):
        super().__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction, channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        # Squeeze
        y = self.avg_pool(x).view(b, c)
        # Excitation
        y = self.fc(y).view(b, c, 1, 1)
        # Scale
        return x * y


# ===================================
# BASIC RESIDUAL BLOCK WITH SE
# ===================================
class BasicBlockSE(nn.Module):
    """
    Basic Residual Block with SE và dropout được tối ưu
    """
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride=1, reduction=16, drop_prob=0.0):
        super().__init__()
        
        # Main path
        self.conv1 = nn.Conv2d(in_channels, out_channels, 3, stride, 1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, 3, 1, 1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # SE attention
        self.se = SEBlock(out_channels, reduction)
        
        # Dropout - đặt SAU conv2, TRƯỚC SE
        self.drop_prob = drop_prob
        
        # Shortcut connection
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, 1, stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.shortcut(x)
        
        # Conv1 + BN + ReLU
        out = F.relu(self.bn1(self.conv1(x)), inplace=True)
        
        # Conv2 + BN
        out = self.bn2(self.conv2(out))
        
        # Dropout (chỉ trong training)
        if self.training and self.drop_prob > 0:
            out = F.dropout2d(out, p=self.drop_prob, training=True)
        
        # SE attention
        out = self.se(out)
        
        # Residual connection + ReLU
        out += identity
        out = F.relu(out, inplace=True)
        
        return out

from torchvision.models import resnet18
import torch.nn as nn

class EmotionResNet18_SE(nn.Module):
    def __init__(self, num_classes=8, reduction=8, dropout=0.3, drop_block=0.05, pretrained=True):
        super().__init__()
        
        # Load pretrained ResNet18
        base_model = resnet18(weights="IMAGENET1K_V1" if pretrained else None)
        print("Loaded pretrained ResNet18 backbone (RGB, ImageNet)")
        
        # Lấy toàn bộ feature layers
        self.features = nn.Sequential(*list(base_model.children())[:-2])
        
        # Chèn SE block ở tầng cuối (tăng focus vùng khuôn mặt)
        self.se = SEBlock(512, reduction=reduction)
        
        # Global pooling + classifier
        self.avgpool = nn.AdaptiveAvgPool2d(1)
        self.classifier = nn.Sequential(
            nn.Dropout(dropout),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.se(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

model = EmotionResNet18_SE()
x = torch.randn(1, 3, 112, 112)
with torch.no_grad():
    x = model.features(x)
    print("features:", x.shape)

    x = model.se(x)
    print("se:", x.shape)

    x = model.avgpool(x)
    print("avgpool:", x.shape)

    x = torch.flatten(x, 1)
    print("flatten:", x.shape)

    x = model.classifier(x)
    print("classifier:", x.shape)



In [None]:
import os
import json
import torch
import torch.nn as nn
import torch.optim as optim
import torch.amp as amp
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from torch.utils.data import DataLoader, WeightedRandomSampler
from torch.optim.lr_scheduler import LinearLR, CosineAnnealingLR, SequentialLR
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.metrics import classification_report

import torch.nn.functional as F


# ---------------------------
# Metrics
# ---------------------------
def compute_metrics(y_true, y_pred, labels=None):
    acc = accuracy_score(y_true, y_pred)
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    return acc, cm


# ---------------------------
# Training & Evaluation
# ---------------------------
def train_and_evaluate(
                       out_dir="checkpoints",
                       img_size=48,
                       batch_size=256,
                       epochs=30,
                       lr=1e-3,
                       weight_decay=1e-4,
                       num_workers=4,
                       device=None,
                       label_smoothing=0.05,
                       early_stop_patience=20):
    
    os.makedirs(out_dir, exist_ok=True)
    ckpt_path = os.path.join(out_dir, "best_emotion_resnet18_se.pth")
    final_ckpt_path = os.path.join(out_dir, "final_emotion_resnet18_se.pth")
    history_path = os.path.join(out_dir, "history.json")

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if device is None else device
    device_type = "cuda" if device.type == "cuda" else "cpu"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")
    if device.type == "cuda":
        print(f"GPU Name: {torch.cuda.get_device_name(0)}")
        print(f"Memory Allocated: {torch.cuda.memory_allocated() / 1024**2:.1f} MB")


    # === Load transform 
    train_t, val_t = get_transforms(img_size)

    # === Model & optimizer
    model = EmotionResNet18_SE(num_classes=len(train_dataset.label_to_idx))
    # --- Multi-GPU support ---
    if torch.cuda.device_count() > 1:
        print(f"Using {torch.cuda.device_count()} GPUs via DataParallel")
        model = nn.DataParallel(model)

    model = model.to(device)
        
    class_weights = train_dataset.get_class_weights().to(device)
    criterion = nn.CrossEntropyLoss(label_smoothing=label_smoothing, weight=class_weights)
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=weight_decay)

    warmup_epochs = max(1, int(epochs * 0.05))
    warmup = LinearLR(optimizer, start_factor=0.2, total_iters=warmup_epochs)
    cosine = CosineAnnealingLR(optimizer, T_max=max(1, epochs - warmup_epochs))
    scheduler = SequentialLR(optimizer, schedulers=[warmup, cosine], milestones=[warmup_epochs])
    scaler = amp.GradScaler(enabled=(device.type == "cuda"))

    # === Resume nếu checkpoint tồn tại
    best_val, start_epoch, best_epoch, no_improve = 0.0, 0, -1, 0
    history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}

    if os.path.exists('/kaggle/input/resnet-emotion/pytorch/default/1/best_emotion_resnet18_se.pth'):
        # ckpt = torch.load(ckpt_path, map_location=device)
        ckpt = torch.load('/kaggle/input/resnet-emotion/pytorch/default/1/best_emotion_resnet18_se.pth', map_location=device)
        model.load_state_dict(ckpt["model_state"])
        if "optimizer_state" in ckpt:
            optimizer.load_state_dict(ckpt["optimizer_state"])
        if "scheduler_state" in ckpt:
            scheduler.load_state_dict(ckpt["scheduler_state"])
        print(f"[INFO] Loaded existing model from {ckpt_path}")

        if os.path.exists("/kaggle/input/resnet-emotion/pytorch/default/1/history.json"):
            with open("/kaggle/input/resnet-emotion/pytorch/default/1/history.json", "r") as f:
                history = json.load(f)
            best_val = max(history["val_acc"]) / 100
            start_epoch = len(history["val_acc"])
            best_epoch = start_epoch - 1
            print(f"[INFO]Resuming training from epoch {start_epoch}, best val acc = {best_val*100:.2f}%")

    label_list = [train_dataset.idx_to_label[i] for i in range(len(train_dataset.idx_to_label))]

    if start_epoch < 5:
        freeze_epochs = 5
        for name, param in model.named_parameters():
            if "classifier" not in name:
                param.requires_grad = False
        print(f"[INFO] Freezing backbone for first {freeze_epochs} epochs")
        
        unfreeze_done = False

    # ---------------- TRAINING LOOP ----------------
    for epoch in range(start_epoch, epochs):

        if start_epoch < 5:
            # === Unfreeze backbone sau freeze_epochs ===
            if epoch == freeze_epochs and not unfreeze_done:
                print(f"[INFO] Unfreezing backbone at epoch {epoch+1} for fine-tuning")
                for param in model.parameters():
                    param.requires_grad = True
                unfreeze_done = True

        model.train()
        running_loss = 0.0
        y_true_train, y_pred_train = [], []
        pbar = tqdm(train_loader, desc=f"Train Epoch {epoch+1}/{epochs}", leave=False)

        for imgs, labels in pbar:
            imgs, labels = imgs.to(device), labels.to(device)
            optimizer.zero_grad(set_to_none=True)
            
            # with amp.autocast(device_type=device_type, enabled=(device.type==device_type)):
            with amp.autocast(device_type="cuda", enabled=(device.type=="cuda")):

                outputs = model(imgs)
                loss = criterion(outputs, labels)
            
            scaler.scale(loss).backward()
            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=5.0)
            scaler.step(optimizer)
            scaler.update()

            running_loss += loss.item() * imgs.size(0)
            preds = outputs.argmax(dim=1).detach().cpu().numpy()
            y_pred_train.extend(preds.tolist())
            y_true_train.extend(labels.detach().cpu().numpy().tolist())

        train_loss = running_loss / len(train_dataset)
        train_acc, _ = compute_metrics(y_true_train, y_pred_train)
        history["train_loss"].append(train_loss)
        history["train_acc"].append(train_acc * 100)

        # Validation
        model.eval()
        running_val_loss = 0.0
        y_true, y_pred = [], []
        with torch.no_grad(), amp.autocast(device_type="cuda", enabled=(device.type=="cuda")):
            for imgs, labels in tqdm(val_loader, desc=f"Val Epoch {epoch+1}/{epochs}", leave=False):
                imgs, labels = imgs.to(device), labels.to(device)
                outputs = model(imgs)
                loss = criterion(outputs, labels)
                running_val_loss += loss.item() * imgs.size(0)
                preds = outputs.argmax(dim=1).detach().cpu().numpy()
                y_pred.extend(preds.tolist())
                y_true.extend(labels.detach().cpu().numpy().tolist())

        val_loss = running_val_loss / len(val_dataset)
        val_acc, cm = compute_metrics(y_true, y_pred, labels=list(range(len(train_dataset.idx_to_label))))
        history["val_loss"].append(val_loss)
        history["val_acc"].append(val_acc * 100)

        if epoch % 5 == 0:
            # --- Log chi tiết để phát hiện thiên vị ---
            print("Class distribution check:")
            pred_counts = {label_list[i]: (np.array(y_pred) == i).sum() for i in range(len(label_list))}
            true_counts = {label_list[i]: (np.array(y_true) == i).sum() for i in range(len(label_list))}
            for lbl in label_list:
                ratio = pred_counts[lbl] / max(1, true_counts[lbl])
                print(f"  {lbl:<15} pred={pred_counts[lbl]:<6} true={true_counts[lbl]:<6} ratio={ratio:.2f}x")
            
            # --- Classification report ---
            report = classification_report(y_true, y_pred, target_names=label_list, digits=3)
            print("\nPer-class performance:")
            print(report)

        scheduler.step()

        current_lr = scheduler.get_last_lr()[0]

        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"LR: {current_lr:.2e}")
        print("=" * 70)
        
        print("Results:")
        print(f"  Train: loss={train_loss:.4f}, acc={train_acc*100:.2f}%")
        print(f"  Val:   loss={val_loss:.4f}, acc={val_acc*100:.2f}%")
        print()

        # Checkpoint
        if val_acc > best_val + 1e-6:
            best_val = val_acc
            best_epoch = epoch
            ckpt = {
                "epoch": epoch,
                "model_state": model.state_dict(),
                "optimizer_state": optimizer.state_dict(),
                "scheduler_state": scheduler.state_dict(),
                "label_to_idx": train_dataset.label_to_idx
            }
            torch.save(ckpt, ckpt_path)
            print(f"[INFO] Saved best model (val_acc={val_acc*100:.2f}%)")
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= early_stop_patience:
                print(f"[INFO] Early stopping at epoch {epoch+1}, best epoch {best_epoch+1}")
                break

        # Lưu history mỗi epoch
        with open(history_path, "w") as f:
            json.dump(history, f, indent=2)

        if epoch == epochs:
            # --- Save final state ---
            torch.save({
                "epoch": epoch,
                "model_state": model.state_dict(),
                "optimizer_state": optimizer.state_dict(),
                "scheduler_state": scheduler.state_dict(),
                "label_to_idx": train_dataset.label_to_idx
            }, final_ckpt_path)

    # --- Plot ---
    plt.figure(figsize=(10,4))
    plt.subplot(1,2,1)
    plt.plot(history["train_loss"], label="train_loss")
    plt.plot(history["val_loss"], label="val_loss")
    plt.legend(); plt.title("Loss")
    plt.subplot(1,2,2)
    plt.plot(history["train_acc"], label="train_acc")
    plt.plot(history["val_acc"], label="val_acc")
    plt.legend(); plt.title("Accuracy")
    plt.tight_layout()
    plt.show()

    print(f"Best val acc: {best_val*100:.2f}% at epoch {best_epoch+1}")
    return model, history, train_dataset.idx_to_label


In [None]:
if __name__ == "__main__":
    
    # print("Labels:", sorted(train_df['label'].unique()))
    
    model, history, idx2label = train_and_evaluate(
                                                   out_dir="checkpoints",
                                                   img_size=112, # ver1  =48
                                                   batch_size=512,
                                                   epochs=200,
                                                   lr=3e-4)


In [None]:
kkk

In [None]:
if os.path.exists("/kaggle/working/checkpoints/history.json"):
    with open("/kaggle/working/checkpoints/history.json", "r") as f:
        history = json.load(f)
    best_val = max(history["val_acc"]) / 100
    start_epoch = len(history["val_acc"])
    best_epoch = start_epoch - 1

# --- Plot ---
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(history["train_loss"], label="train_loss")
plt.plot(history["val_loss"], label="val_loss")
plt.legend(); plt.title("Loss")
plt.subplot(1,2,2)
plt.plot(history["train_acc"], label="train_acc")
plt.plot(history["val_acc"], label="val_acc")
plt.legend(); plt.title("Accuracy")
plt.tight_layout()
plt.show()

In [None]:
from PIL import Image, UnidentifiedImageError
import requests
from io import BytesIO
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt

def predict_from_url(model, url, transform, idx_to_label, device=None, show_image=True, top_k=None):
    """
    Dự đoán cảm xúc từ URL ảnh.
    Hiển thị top xác suất và biểu đồ cho toàn bộ lớp.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if device is None else device

    try:
        # --- Gửi request ---
        headers = {"User-Agent": "Mozilla/5.0"}
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()

        # --- Kiểm tra dữ liệu hợp lệ ---
        content_type = response.headers.get("Content-Type", "")
        if "image" not in content_type:
            raise ValueError(f"URL không trả về ảnh. Content-Type: {content_type}")

        # --- Mở ảnh ---
        image = Image.open(BytesIO(response.content)).convert("RGB")

    except (UnidentifiedImageError, ValueError) as e:
        print(f"[ERROR] Không thể mở ảnh từ URL: {url}\nLý do: {e}")
        return None, None
    except Exception as e:
        print(f"[ERROR] Lỗi khi tải ảnh: {e}")
        return None, None

    # --- Hiển thị ảnh ---
    if show_image:
        plt.imshow(image)
        plt.axis("off")
        plt.title("Input Image")
        plt.show()

    # --- Tiền xử lý ---
    img_tensor = transform(image).unsqueeze(0).to(device)

    # --- Dự đoán ---
    with torch.no_grad():
        outputs = model(img_tensor)
        probs = F.softmax(outputs, dim=1).cpu().numpy()[0]

    # --- Xử lý kết quả ---
    emotions = [idx_to_label[i] for i in range(len(probs))]
    sorted_idx = probs.argsort()[::-1]  # sắp giảm dần
    top_k = top_k or len(emotions)

    print("Top dự đoán:")
    for i in range(top_k):
        lbl = emotions[sorted_idx[i]]
        conf = probs[sorted_idx[i]] * 100
        print(f"  {i+1}. {lbl:10s} : {conf:.2f}%")

    # --- Biểu đồ xác suất ---
    plt.figure(figsize=(8, 4))
    plt.barh([emotions[i] for i in sorted_idx[::-1]],
             [probs[i]*100 for i in sorted_idx[::-1]],
             color="skyblue")
    plt.xlabel("Probability (%)")
    plt.title("Emotion Probabilities")
    plt.tight_layout()
    plt.show()

    # --- Trả về nhãn cao nhất ---
    pred_label = emotions[sorted_idx[0]]
    pred_conf = probs[sorted_idx[0]]
    return pred_label, pred_conf


In [None]:
import torch
import torch.nn as nn

def load_model_for_inference(ckpt_path, device=None):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if device is None else device
    ckpt = torch.load(ckpt_path, map_location=device)
    label_to_idx = ckpt["label_to_idx"]
    idx_to_label = {v: k for k, v in label_to_idx.items()}

    model = EmotionResNet18_SE(num_classes=len(label_to_idx))
    model.load_state_dict(ckpt["model_state"])
    model.to(device)
    model.eval()

    print(f"[INFO]Model loaded for inference from: {ckpt_path}")
    return model, idx_to_label

if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load model
    model, idx_to_label = load_model_for_inference("/kaggle/working/checkpoints/best_emotion_resnet18_se.pth", device)
    # model, idx_to_label = load_model_for_inference("/kaggle/input/resnet-emotion/pytorch/default/1/best_emotion_resnet18_se.pth", device)

    # Get transform (phải giống lúc train)
    _, val_t = get_transforms(img_size=112)

    # Link ảnh online
    url = "https://t4.ftcdn.net/jpg/00/68/69/59/360_F_68695981_GuWIHWfB0l5wJ2al8rv4xZRUqUtwIo2P.jpg"

    # Dự đoán
    predict_from_url(model, url, val_t, idx_to_label, device)


In [None]:
os.listdir("/kaggle/working/checkpoints")

In [None]:
!zip -r /kaggle/working/checkpoints.zip /kaggle/working/checkpoints


In [None]:
import os
import subprocess
from IPython.display import FileLink, display

def download_file(path, download_file_name):
    os.chdir('/kaggle/working/')
    zip_name = f"/kaggle/working/{download_file_name}.zip"
    command = f"zip {zip_name} {path} -r"
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    if result.returncode != 0:
        print("Unable to run zip command!")
        print(result.stderr)
        return
    display(FileLink(f'{download_file_name}.zip'))
    
download_file('/kaggle/working/checkpoints', 'checkpoints') 