In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, random_split, Dataset
from PIL import Image
import matplotlib.pyplot as plt

# ===========================
# 1. 配置参数
# ===========================
CONFIG = {
    'seed': 42,
    'batch_size': 64,  
    'num_epochs': 30,  # 继续训练 30 轮
    'learning_rate': 1e-4, # 继续训练时 LR 可以稍微调小一点，或者保持 3e-4 也可以，这里稍微保守一点用 1e-4
    'input_size': 224,
    'num_classes': 6,
    'device': torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    'data_dir': '/kaggle/input/neu-image-emotion-classification/fer_data/fer_data/train',
    'test_dir': '/kaggle/input/neu-image-emotion-classification/fer_data/fer_data/test'
}

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

set_seed(CONFIG['seed'])

# ===========================
# 2. 数据处理 (保持不变)
# ===========================
data_transforms = {
    'train': transforms.Compose([
        transforms.Grayscale(num_output_channels=3),
        transforms.Resize((CONFIG['input_size'], CONFIG['input_size'])),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(20),
        transforms.RandomAffine(degrees=0, translate=(0.2, 0.2), scale=(0.8, 1.2)),
        transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        transforms.RandomErasing(p=0.5, scale=(0.02, 0.1))
    ]),
    'val': transforms.Compose([
        transforms.Grayscale(num_output_channels=3),
        transforms.Resize((CONFIG['input_size'], CONFIG['input_size'])),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

full_dataset = datasets.ImageFolder(root=CONFIG['data_dir'])
train_size = int(0.85 * len(full_dataset))
val_size = len(full_dataset) - train_size
train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

class TransformedDataset(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform
    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y
    def __len__(self):
        return len(self.subset)

train_set = TransformedDataset(train_dataset, transform=data_transforms['train'])
val_set = TransformedDataset(val_dataset, transform=data_transforms['val'])

train_loader = DataLoader(train_set, batch_size=CONFIG['batch_size'], shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_set, batch_size=CONFIG['batch_size'], shuffle=False, num_workers=2, pin_memory=True)

# ===========================
# 3. Mixup 工具函数
# ===========================
def mixup_data(x, y, alpha=1.0, use_cuda=True):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1
    batch_size = x.size(0)
    if use_cuda:
        index = torch.randperm(batch_size).cuda()
    else:
        index = torch.randperm(batch_size)
    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

# ===========================
# 4. 模型构建与权重加载
# ===========================
class_counts = { 'Angry': 3963, 'Fear': 4097, 'Happy': 7192, 'Neutral': 4959, 'Sad': 4862, 'Surprise': 3202 }
class_names = full_dataset.classes
counts = [class_counts[name] for name in class_names]
weights = [max(counts) / c for c in counts]
class_weights = torch.FloatTensor(weights).to(CONFIG['device'])

def build_model():
    model = models.resnet34(pretrained=True) 
    num_ftrs = model.fc.in_features
    model.fc = nn.Sequential(
        nn.Dropout(p=0.5),
        nn.Linear(num_ftrs, CONFIG['num_classes'])
    )
    return model.to(CONFIG['device'])

model = build_model()

# --- 【关键修改】加载之前的最佳权重 ---
checkpoint_path = 'best_model_fer.pth'
if os.path.exists(checkpoint_path):
    print(f"Found checkpoint '{checkpoint_path}'. Loading weights to resume training...")
    model.load_state_dict(torch.load(checkpoint_path))
else:
    print(f"No checkpoint found at '{checkpoint_path}'. Starting training from scratch (Pretrained ImageNet).")

criterion = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)
optimizer = optim.AdamW(model.parameters(), lr=CONFIG['learning_rate'], weight_decay=1e-2)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CONFIG['num_epochs'], eta_min=1e-6)

# ===========================
# 5. 训练循环 (增强版打印)
# ===========================

# 在开始训练前，先评估一下当前模型在验证集上的表现，作为基准 best_acc
print("Evaluating initial baseline accuracy...")
model.eval()
initial_corrects = 0
with torch.no_grad():
    for inputs, labels in val_loader:
        inputs, labels = inputs.to(CONFIG['device']), labels.to(CONFIG['device'])
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        initial_corrects += torch.sum(preds == labels.data)
best_acc = initial_corrects.double() / len(val_set)
print(f"Initial Baseline Val Acc: {best_acc:.4f}")

print("\nStart Resumed Training...")
print(f"{'Epoch':^10} | {'Train Loss':^12} | {'Train Acc':^12} | {'Val Loss':^12} | {'Val Acc':^12}")
print("-" * 70)

for epoch in range(CONFIG['num_epochs']):
    # --- Training Phase ---
    model.train()
    running_loss = 0.0
    train_corrects = 0.0 # 用于计算训练集准确率
    
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(CONFIG['device']), labels.to(CONFIG['device'])
        
        optimizer.zero_grad()
        
        # Mixup
        inputs, targets_a, targets_b, lam = mixup_data(inputs, labels, alpha=0.4)
        inputs, targets_a, targets_b = map(torch.autograd.Variable, (inputs, targets_a, targets_b))
        
        outputs = model(inputs)
        loss = mixup_criterion(criterion, outputs, targets_a, targets_b, lam)
        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * inputs.size(0)
        
        # 计算 Mixup 下的训练准确率 (软准确率)
        # 如果预测结果等于 targets_a，得分 lam；如果等于 targets_b，得分 1-lam
        _, preds = torch.max(outputs, 1)
        part_a = (preds == targets_a).float() * lam
        part_b = (preds == targets_b).float() * (1 - lam)
        train_corrects += (part_a + part_b).sum().item()

    scheduler.step()
    
    epoch_train_loss = running_loss / len(train_set)
    epoch_train_acc = train_corrects / len(train_set)
    
    # --- Validation Phase ---
    model.eval()
    val_loss = 0.0
    val_corrects = 0
    
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(CONFIG['device']), labels.to(CONFIG['device'])
            outputs = model(inputs)
            
            # 计算验证 Loss
            loss = criterion(outputs, labels)
            val_loss += loss.item() * inputs.size(0)
            
            # 计算验证 Acc
            _, preds = torch.max(outputs, 1)
            val_corrects += torch.sum(preds == labels.data)
            
    epoch_val_loss = val_loss / len(val_set)
    epoch_val_acc = val_corrects.double() / len(val_set)

    # --- Print Stats ---
    print(f"Epoch {epoch+1:02d}/{CONFIG['num_epochs']} | {epoch_train_loss:.4f}       | {epoch_train_acc:.4f}       | {epoch_val_loss:.4f}       | {epoch_val_acc:.4f}")

    # --- Save Best Model ---
    if epoch_val_acc > best_acc:
        best_acc = epoch_val_acc
        torch.save(model.state_dict(), 'best_model_fer.pth')
        print(f"  >>> New Best Model Saved! Acc: {best_acc:.4f}")

print(f"\nTraining Complete. Best Validation Accuracy: {best_acc:.4f}")

In [None]:
# ===========================
# 提交代码 (Inference & Submission) - 包含标签映射修正
# ===========================
import pandas as pd
import torch.nn.functional as F

# 1. 定义测试集 Dataset
class TestDataset(Dataset):
    def __init__(self, img_dir, transform=None):
        # 必须排序，保证文件名和预测结果一一对应
        self.img_names = sorted(os.listdir(img_dir)) 
        self.img_dir = img_dir
        self.transform = transform

    def __len__(self):
        return len(self.img_names)

    def __getitem__(self, idx):
        img_name = self.img_names[idx]
        img_path = os.path.join(self.img_dir, img_name)
        image = Image.open(img_path).convert('RGB') # 确保转为 RGB
        if self.transform:
            image = self.transform(image)
        return image, img_name

# 2. 加载模型
# 确保这里的 build_model() 和你训练时定义的网络结构一致 (ResNet34)
model = build_model()

# 加载你刚才训练好的权重
model_path = 'best_model_fer.pth' 
if os.path.exists(model_path):
    model.load_state_dict(torch.load(model_path))
    print(f"Loaded model weights from {model_path}")
else:
    raise FileNotFoundError(f"Error: Model file {model_path} not found!")
    
model.eval()

# 3. 定义测试转换 (与验证集保持一致)
test_transform = transforms.Compose([
    transforms.Resize((CONFIG['input_size'], CONFIG['input_size'])),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# 4. 准备 DataLoader
test_dataset = TestDataset(CONFIG['test_dir'], transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=2)

# ===========================
# 【核心修正】定义标签映射字典
# ===========================
# 你的模型训练顺序 (ImageFolder 默认字母序): 
# 0:Angry, 1:Fear, 2:Happy, 3:Neutral, 4:Sad, 5:Surprise
#
# 题目要求的提交顺序:
# 0:Angry, 1:Fear, 2:Happy, 3:Sad, 4:Surprise, 5:Neutral
#
# 映射逻辑 (模型输出 -> 题目要求):
idx_map = {
    0: 0, # Angry -> Angry
    1: 1, # Fear -> Fear
    2: 2, # Happy -> Happy
    3: 5, # Neutral (模型输出3) -> 题目要求 5
    4: 3, # Sad (模型输出4)     -> 题目要求 3
    5: 4  # Surprise (模型输出5)-> 题目要求 4
}

predictions = []
filenames = []

print("Starting Inference with Label Remapping...")

with torch.no_grad():
    for images, names in test_loader:
        images = images.to(CONFIG['device'])
        
        # --- TTA (Test Time Augmentation) ---
        # 1. 原图预测
        output1 = model(images)
        # 2. 水平翻转图预测
        output2 = model(torch.flip(images, [3]))
        
        # 3. 取平均
        avg_output = (output1 + output2) / 2.0
        
        # 获取预测结果 (0-5)
        _, preds = torch.max(avg_output, 1)
        
        # 转为列表以便处理
        preds_list = preds.cpu().numpy().tolist()
        
        # --- 【关键步骤】应用映射修正 ---
        # 将模型预测出的索引，翻译成题目要求的索引
        remapped_preds = [idx_map[p] for p in preds_list]
        
        predictions.extend(remapped_preds)
        filenames.extend(names)

# 5. 生成提交文件
# 确保列名为 ID 和 Emotion (与你之前的截图一致)
df_sub = pd.DataFrame({
    'ID': filenames, 
    'Emotion': predictions
})

print(df_sub.head())

# 保存文件
df_sub.to_csv('submission.csv', index=False)
print("Submission saved successfully! (Label mapping applied)")