In [None]:
import os
import csv
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from torch import nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, models

# 定义数据集类
class AgeDataset(Dataset):
    def __init__(self, image_dir, csv_file, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_age_mapping = {}

        # 读取 CSV 文件，构建图片名称到年龄的映射
        with open(csv_file, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for row in reader:
                image_name = row['name']
                age = int(row['age'])
                self.image_age_mapping[image_name] = age

        # 获取所有图片的路径
        self.image_paths = []
        for filename in os.listdir(image_dir):
            if filename.endswith(('.png', '.jpg', '.jpeg')):
                self.image_paths.append(os.path.join(image_dir, filename))

        print(f"图片已载入，共 {len(self.image_paths)} 张图片")  # 提示图片已载入

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image_name = os.path.basename(image_path)

        # 打开图像
        image = Image.open(image_path)

        # 获取对应的年龄标签
        if image_name in self.image_age_mapping:
            age = self.image_age_mapping[image_name]
        else:
            raise ValueError(f"未找到图片 {image_name} 的年龄标签信息")

        # 应用图像变换
        if self.transform:
            image = self.transform(image)

        return image, age


# 定义图像变换
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # 将图像统一调整为224x224大小
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.ToTensor(),  # 转换为张量
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # 标准化
])

# 创建训练集实例
train_image_dir = r'C:\Users\1\Desktop\age\trainset'
train_csv_file = r'C:\Users\1\Desktop\age\train.csv'
train_dataset = AgeDataset(train_image_dir, train_csv_file, transform=transform)

# 创建验证集实例
val_image_dir = r'C:\Users\1\Desktop\age\valset'
val_csv_file = r'C:\Users\1\Desktop\age\val.csv'
val_dataset = AgeDataset(val_image_dir, val_csv_file, transform=transform)

# 加载预训练的 ResNet10（通过 ResNet18 代替）
model = models.resnet18(pretrained=True)  # 替换为 ResNet18
num_ftrs = model.fc.in_features
# 假设年龄预测是一个回归问题
model.fc = nn.Linear(num_ftrs, 1)

# 解冻更多的层（可以解冻整个网络，或者解冻部分层）
for param in model.parameters():
    param.requires_grad = True

# 定义损失函数和优化器
criterion = nn.MSELoss()
mae_criterion = nn.L1Loss()  # MAE 损失函数
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# 训练设备
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)

# 训练模型
num_epochs = 8
batch_size = 16  # 增加 batch_size
for epoch in range(num_epochs):
    print(f"\n--- Epoch {epoch + 1}/{num_epochs} 开始 ---")  # 输出周期开始提示
    # 创建训练数据加载器，优化数据加载
    train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)

    model.train()
    running_loss = 0.0
    running_mae = 0.0
    all_errors = []
    for images, ages in train_dataloader:
        images, ages = images.to(device), ages.to(device).float().unsqueeze(1)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, ages)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        mae = mae_criterion(outputs, ages)
        running_mae += mae.item()

        # 计算每个样本的绝对误差
        errors = torch.abs(outputs - ages).squeeze().cpu().tolist()
        all_errors.extend(errors)

    train_loss = running_loss / len(train_dataloader)
    train_mae = running_mae / len(train_dataloader)

    # 每 4 个 epoch 执行一次缩减
    if (epoch + 1) % 4 == 0:
        print(f"--- 对数据集进行缩减 ---")
        num_to_remove = int(len(all_errors) * 0.05)
        sorted_indices = sorted(range(len(all_errors)), key=lambda i: all_errors[i], reverse=True)
        indices_to_remove = sorted_indices[:num_to_remove]
        remaining_indices = [i for i in range(len(train_dataset)) if i not in indices_to_remove]
        train_dataset = Subset(train_dataset, remaining_indices)  # 更新 train_dataset

    # 验证模型
    val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0, pin_memory=True)
    model.eval()
    val_loss = 0.0
    val_mae = 0.0
    with torch.no_grad():
        for images, ages in val_dataloader:
            images, ages = images.to(device), ages.to(device).float().unsqueeze(1)
            outputs = model(images)
            loss = criterion(outputs, ages)
            val_loss += loss.item()

            mae = mae_criterion(outputs, ages)
            val_mae += mae.item()

    val_loss /= len(val_dataloader)
    val_mae /= len(val_dataloader)

    print(f'Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}, Train MAE: {train_mae:.4f}, '
          f'Val Loss: {val_loss:.4f}, Val MAE: {val_mae:.4f}')

    # 调整学习率
    scheduler.step()

    # 保存模型
    if (epoch + 1) % 3 == 0:
        torch.save(model.state_dict(), f'model_epoch_{epoch + 1}.pth')

    print(f"--- Epoch {epoch + 1}/{num_epochs} 结束 ---")  # 输出周期结束提示


图片已载入，共 20000 张图片
图片已载入，共 3000 张图片





--- Epoch 1/8 开始 ---


KeyboardInterrupt: 

In [2]:
import os
import csv
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader, Subset
from torch import nn, optim
from torchvision import transforms, models
from torch.cuda.amp import GradScaler, autocast
import numpy as np

# ------------------- 数据集类 -------------------
class AgeDataset(Dataset):
    def __init__(self, image_dir, csv_file, transform=None, age_bins=None):
        self.image_dir = image_dir
        self.transform = transform
        self.age_bins = np.array(age_bins)  # 使用numpy加速计算
        self.image_info = []

        # 读取CSV并过滤无效数据
        with open(csv_file, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                img_name = row['name']
                img_path = os.path.join(image_dir, img_name)
                if os.path.exists(img_path):
                    age = int(row['age'])
                    self.image_info.append((img_path, age))

        print(f"成功载入 {len(self.image_info)} 张有效图片")

    def __len__(self):
        return len(self.image_info)

    def __getitem__(self, idx):
        img_path, age = self.image_info[idx]
        image = Image.open(img_path).convert('RGB')
        
        # 自动计算年龄类别（利用numpy向量化加速）
        age_class = np.digitize(age, self.age_bins, right=False) - 1
        age_class = max(0, min(age_class, len(self.age_bins)-2))  # 确保不越界

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(age, dtype=torch.float32), torch.tensor(age_class)

# ------------------- 模型定义 -------------------
class AgeResNet50(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # 加载预训练ResNet50
        self.base = models.resnet50(pretrained=True)
        
        # 冻结底层参数（可选）
        # for param in self.base.parameters():
        #     param.requires_grad = False
        
        # 替换全连接层
        in_features = self.base.fc.in_features
        self.base.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features, num_classes)
        )

    def forward(self, x):
        return self.base(x)

# ------------------- 训练工具函数 -------------------
def train_epoch(model, loader, criterion, optimizer, device, scaler, age_bins):
    model.train()
    total_loss, total_correct, total_mae = 0, 0, 0
    total_samples = 0

    for images, ages, classes in loader:
        images = images.to(device, non_blocking=True)
        classes = classes.to(device, non_blocking=True)
        batch_size = images.size(0)

        # 混合精度训练
        with autocast():
            outputs = model(images)
            loss = criterion(outputs, classes)

        # 反向传播
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad(set_to_none=True)

        # 计算指标
        total_loss += loss.item() * batch_size
        preds = outputs.argmax(dim=1)
        total_correct += (preds == classes).sum().item()

        # 向量化MAE计算
        pred_ages = torch.tensor([age_bins[pred] for pred in preds.cpu().numpy()]).to(device)
        total_mae += torch.abs(pred_ages - ages.to(device)).sum().item()
        total_samples += batch_size

    return {
        'loss': total_loss / total_samples,
        'acc': total_correct / total_samples,
        'mae': total_mae / total_samples
    }

def validate(model, loader, criterion, device, age_bins):
    model.eval()
    total_loss, total_correct, total_mae = 0, 0, 0
    total_samples = 0

    with torch.no_grad():
        for images, ages, classes in loader:
            images = images.to(device, non_blocking=True)
            classes = classes.to(device, non_blocking=True)
            batch_size = images.size(0)

            outputs = model(images)
            loss = criterion(outputs, classes)

            total_loss += loss.item() * batch_size
            preds = outputs.argmax(dim=1)
            total_correct += (preds == classes).sum().item()

            # 向量化MAE计算
            pred_ages = torch.tensor([age_bins[pred] for pred in preds.cpu().numpy()]).to(device)
            total_mae += torch.abs(pred_ages - ages.to(device)).sum().item()
            total_samples += batch_size

    return {
        'loss': total_loss / total_samples,
        'acc': total_correct / total_samples,
        'mae': total_mae / total_samples
    }

# ------------------- 主训练流程 -------------------
def main():
    # 配置参数
    # 定义年龄区间，每 12 岁一个区间，覆盖 [0, 192) 区间
    age_bins = list(range(0, 193, 12))  # [0, 12), [12, 24), ..., [180, 192)
    batch_size = 128  # ResNet50需要减小batch size
    num_epochs = 15
    lr = 1e-4

    # 数据增强
    train_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(0.2, 0.2, 0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    val_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    # 数据集
    train_dataset = AgeDataset(r'C:\Users\1\Desktop\age\trainset', 'train.csv', train_transform, age_bins)
    val_dataset = AgeDataset(r'C:\Users\1\Desktop\age\valset', 'val.csv', val_transform, age_bins)

    # 数据加载器
    train_loader = DataLoader(train_dataset, batch_size=batch_size, 
                             shuffle=True, num_workers=0, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size,
                           num_workers=0, pin_memory=True)

    # 模型初始化
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = AgeResNet50(num_classes=len(age_bins)-1).to(device)  # 输出类别数要少 1，因为区间是左闭右开
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2)
    scaler = GradScaler()

    # 训练循环
    best_mae = float('inf')
    for epoch in range(num_epochs):
        # 训练阶段
        train_metrics = train_epoch(model, train_loader, criterion, optimizer, device, scaler, age_bins)
        
        # 验证阶段
        val_metrics = validate(model, val_loader, criterion, device, age_bins)
        scheduler.step(val_metrics['loss'])

        # 打印结果
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print(f"Train >> Loss: {train_metrics['loss']:.4f} | Acc: {train_metrics['acc']:.4f} | MAE: {train_metrics['mae']:.2f}")
        print(f"Valid >> Loss: {val_metrics['loss']:.4f} | Acc: {val_metrics['acc']:.4f} | MAE: {val_metrics['mae']:.2f}")

        # 保存最佳模型
        if val_metrics['mae'] < best_mae:
            best_mae = val_metrics['mae']
            torch.save(model.state_dict(), f'best_model_mae{best_mae:.2f}.pth')

        # 每4个epoch缩减数据集
        if (epoch+1) % 4 == 0:
            print("Updating training dataset...")
            # 这里添加你的数据集更新逻辑

if __name__ == "__main__":
    main()


成功载入 20000 张有效图片
成功载入 3000 张有效图片


  scaler = GradScaler()
  with autocast():



Epoch 1/15
Train >> Loss: 2.3407 | Acc: 0.1888 | MAE: 31.39
Valid >> Loss: 2.1174 | Acc: 0.2413 | MAE: 26.80

Epoch 2/15
Train >> Loss: 2.1985 | Acc: 0.2235 | MAE: 27.60
Valid >> Loss: 2.0871 | Acc: 0.2320 | MAE: 24.28

Epoch 3/15
Train >> Loss: 2.1307 | Acc: 0.2460 | MAE: 26.11
Valid >> Loss: 2.1093 | Acc: 0.2300 | MAE: 22.89

Epoch 4/15
Train >> Loss: 2.0664 | Acc: 0.2651 | MAE: 24.86
Valid >> Loss: 2.0807 | Acc: 0.2333 | MAE: 23.45
Updating training dataset...

Epoch 5/15
Train >> Loss: 1.9927 | Acc: 0.2893 | MAE: 23.84
Valid >> Loss: 2.1754 | Acc: 0.2200 | MAE: 23.63

Epoch 6/15
Train >> Loss: 1.9178 | Acc: 0.3214 | MAE: 22.83
Valid >> Loss: 2.2162 | Acc: 0.2350 | MAE: 23.17

Epoch 7/15
Train >> Loss: 1.8096 | Acc: 0.3584 | MAE: 21.42
Valid >> Loss: 2.2217 | Acc: 0.2267 | MAE: 24.82

Epoch 8/15
Train >> Loss: 1.5799 | Acc: 0.4582 | MAE: 18.73
Valid >> Loss: 2.2108 | Acc: 0.2343 | MAE: 23.30
Updating training dataset...

Epoch 9/15
Train >> Loss: 1.4654 | Acc: 0.5031 | MAE: 17.63
V

In [1]:
import os
import csv
import torch
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
from torchvision import transforms, models
from torch.cuda.amp import GradScaler, autocast
import numpy as np

# ------------------- 数据集类 -------------------
class AgeDataset(Dataset):
    def __init__(self, image_dir, csv_file, transform=None, age_bins=None):
        self.image_dir = image_dir
        self.transform = transform
        self.age_bins = np.array(age_bins)
        
        # 使用pandas读取CSV
        self.image_info = []
        df = pd.read_csv(csv_file)
        for _, row in df.iterrows():
            img_name = row['name']
            img_path = os.path.join(image_dir, img_name)
            if os.path.exists(img_path):
                age = int(row['age'])
                self.image_info.append((img_path, age))
        
        print(f"成功载入 {len(self.image_info)} 张有效图片")

    def __len__(self):
        return len(self.image_info)

    def __getitem__(self, idx):
        img_path, age = self.image_info[idx]
        image = Image.open(img_path).convert('RGB')
        
        # 自动计算年龄类别
        age_class = np.digitize(age, self.age_bins, right=False) - 1
        age_class = max(0, min(age_class, len(self.age_bins)-2)) 

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(age, dtype=torch.float32), torch.tensor(age_class)

# ------------------- 模型定义 -------------------
class AgeResNet50(nn.Module):
    def __init__(self, num_classes, is_regression=False):
        super().__init__()
        # 加载预训练ResNet50
        self.base = models.resnet50(pretrained=True)
        
        # 冻结底层参数（可选）
        # for param in self.base.parameters():
        #     param.requires_grad = False
        
        # 替换全连接层（用于分类）
        in_features = self.base.fc.in_features
        self.base.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features, num_classes)  # 分类到不同的年龄区间
        )
        
        # 回归任务的输出层
        self.regression_fc = nn.Linear(in_features, 1)  # 输出具体年龄

    def forward(self, x):
        # 通过ResNet50提取特征
        x = self.base.conv1(x)
        x = self.base.bn1(x)
        x = self.base.relu(x)
        x = self.base.maxpool(x)
        
        x = self.base.layer1(x)
        x = self.base.layer2(x)
        x = self.base.layer3(x)
        x = self.base.layer4(x)
        
        # 得到分类的结果 (x的shape是(batch_size, 2048))
        x_classification = self.base.avgpool(x)
        x_classification = torch.flatten(x_classification, 1)  # 将特征展平
        
        # 分类任务输出
        classification_output = self.base.fc(x_classification)
        
        # 回归部分
        regression_output = self.regression_fc(x_classification)
        
        return classification_output, regression_output

# ------------------- 训练工具函数 -------------------
def train_epoch(model, loader, criterion_class, criterion_regression, optimizer, device, scaler, age_bins):
    model.train()
    total_loss, total_correct, total_mae = 0, 0, 0
    total_samples = 0

    for images, ages, classes in loader:
        images = images.to(device, non_blocking=True)
        classes = classes.to(device, non_blocking=True)
        ages = ages.to(device, non_blocking=True)
        batch_size = images.size(0)

        # 混合精度训练
        with autocast():
            # 分类和回归的输出
            class_outputs, reg_outputs = model(images)
            
            # 分类任务损失
            class_loss = criterion_class(class_outputs, classes)
            
            # 回归任务损失
            reg_loss = criterion_regression(reg_outputs.squeeze(), ages)
            
            # 总损失
            loss = class_loss + reg_loss

        # 反向传播
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad(set_to_none=True)

        total_loss += loss.item() * batch_size
        preds = class_outputs.argmax(dim=1)
        total_correct += (preds == classes).sum().item()

        total_mae += torch.abs(reg_outputs.squeeze() - ages).sum().item()
        total_samples += batch_size

    accuracy = total_correct / total_samples
    mae = total_mae / total_samples

    return {
        'loss': total_loss / total_samples,
        'acc': accuracy,
        'mae': mae
    }

def validate(model, loader, criterion_class, criterion_regression, device, age_bins):
    model.eval()
    total_loss, total_correct, total_mae = 0, 0, 0
    total_samples = 0

    with torch.no_grad():
        for images, ages, classes in loader:
            images = images.to(device, non_blocking=True)
            classes = classes.to(device, non_blocking=True)
            ages = ages.to(device, non_blocking=True)
            batch_size = images.size(0)

            # 分类和回归的输出
            class_outputs, reg_outputs = model(images)
            
            class_loss = criterion_class(class_outputs, classes)
            reg_loss = criterion_regression(reg_outputs.squeeze(), ages)
            loss = class_loss + reg_loss

            total_loss += loss.item() * batch_size
            preds = class_outputs.argmax(dim=1)
            total_correct += (preds == classes).sum().item()

            total_mae += torch.abs(reg_outputs.squeeze() - ages).sum().item()
            total_samples += batch_size

    accuracy = total_correct / total_samples
    mae = total_mae / total_samples

    return {
        'loss': total_loss / total_samples,
        'acc': accuracy,
        'mae': mae
    }

# ------------------- 主训练流程 -------------------
def main():
    age_bins = list(range(0, 193, 12))  # 0-192岁，每12岁一个区间
    batch_size = 128
    num_epochs = 20
    lr = 1e-4

    # 数据增强
    train_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(0.2, 0.2, 0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    val_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    train_dataset = AgeDataset(r'C:\Users\1\Desktop\age\trainset', 'train.csv', train_transform, age_bins)
    val_dataset = AgeDataset(r'C:\Users\1\Desktop\age\valset', 'val.csv', val_transform, age_bins)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=0, pin_memory=True)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = AgeResNet50(num_classes=len(age_bins)).to(device)
    
    criterion_class = nn.CrossEntropyLoss()
    criterion_regression = nn.MSELoss()
    
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2)
    scaler = GradScaler()

    best_mae = float('inf')
    for epoch in range(num_epochs):
        # 训练阶段
        train_metrics = train_epoch(model, train_loader, criterion_class, criterion_regression, optimizer, device, scaler, age_bins)
        
        # 验证阶段
        val_metrics = validate(model, val_loader, criterion_class, criterion_regression, device, age_bins)
        scheduler.step(val_metrics['loss'])

        # 打印结果
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print(f"Train >> Loss: {train_metrics['loss']:.4f} | Acc: {train_metrics['acc']:.4f} | MAE: {train_metrics['mae']:.2f}")
        print(f"Valid >> Loss: {val_metrics['loss']:.4f} | Acc: {val_metrics['acc']:.4f} | MAE: {val_metrics['mae']:.2f}")

        # 保存最佳模型
        if val_metrics['mae'] < best_mae:
            best_mae = val_metrics['mae']
            torch.save(model.state_dict(), f'best_model_mae{best_mae:.2f}.pth')

if __name__ == "__main__":
    main()


成功载入 20000 张有效图片
成功载入 3000 张有效图片


  scaler = GradScaler()
  with autocast():



Epoch 1/20
Train >> Loss: 3500.2870 | Acc: 0.1570 | MAE: 46.41
Valid >> Loss: 2311.0773 | Acc: 0.2297 | MAE: 36.27

Epoch 2/20
Train >> Loss: 1858.4375 | Acc: 0.1872 | MAE: 30.97
Valid >> Loss: 1220.3385 | Acc: 0.2230 | MAE: 23.93

Epoch 3/20
Train >> Loss: 1190.0335 | Acc: 0.1888 | MAE: 24.49
Valid >> Loss: 1028.6966 | Acc: 0.1870 | MAE: 24.55

Epoch 4/20
Train >> Loss: 1019.0925 | Acc: 0.1965 | MAE: 23.12
Valid >> Loss: 915.0897 | Acc: 0.2337 | MAE: 22.24

Epoch 5/20
Train >> Loss: 959.5208 | Acc: 0.1950 | MAE: 22.64
Valid >> Loss: 901.9238 | Acc: 0.2410 | MAE: 21.62

Epoch 6/20
Train >> Loss: 911.9181 | Acc: 0.2057 | MAE: 22.11
Valid >> Loss: 922.8781 | Acc: 0.2177 | MAE: 22.31

Epoch 7/20
Train >> Loss: 854.9123 | Acc: 0.2041 | MAE: 21.35
Valid >> Loss: 958.0239 | Acc: 0.2260 | MAE: 23.35

Epoch 8/20
Train >> Loss: 793.7935 | Acc: 0.2135 | MAE: 20.64
Valid >> Loss: 963.1448 | Acc: 0.2403 | MAE: 22.30

Epoch 9/20
Train >> Loss: 680.1397 | Acc: 0.2246 | MAE: 19.01
Valid >> Loss: 904

In [None]:
import os
import csv
import torch
import pandas as pd
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torch import nn, optim
from torchvision import transforms, models
from torch.cuda.amp import GradScaler, autocast
import numpy as np

# ------------------- 数据集类 -------------------
class AgeDataset(Dataset):
    def __init__(self, image_dir, csv_file, transform=None, age_bins=None):
        self.image_dir = image_dir
        self.transform = transform
        self.age_bins = np.array(age_bins)
        
        # 使用pandas读取CSV
        self.image_info = []
        df = pd.read_csv(csv_file)
        for _, row in df.iterrows():
            img_name = row['name']
            img_path = os.path.join(image_dir, img_name)
            if os.path.exists(img_path):
                age = int(row['age'])
                self.image_info.append((img_path, age))
        
        print(f"成功载入 {len(self.image_info)} 张有效图片")

    def __len__(self):
        return len(self.image_info)

    def __getitem__(self, idx):
        img_path, age = self.image_info[idx]
        image = Image.open(img_path).convert('RGB')
        
        # 自动计算年龄类别
        age_class = np.digitize(age, self.age_bins, right=False) - 1
        age_class = max(0, min(age_class, len(self.age_bins)-2))  # 防止越界

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(age, dtype=torch.float32), torch.tensor(age_class)

    def remove_sample(self, idx):
        """ Remove a sample from dataset by index """
        self.image_info.pop(idx)

# ------------------- 模型定义 -------------------
class AgeResNet50(nn.Module):
    def __init__(self, num_classes, is_regression=False):
        super().__init__()
        # 加载预训练ResNet152
        self.base = models.resnet152(pretrained=True)
        
        # 替换全连接层（用于分类）
        in_features = self.base.fc.in_features
        self.base.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features, num_classes)  # 分类到不同的年龄区间
        )
        
        # 回归任务的输出层
        self.regression_fc = nn.Linear(in_features, 1)  # 输出具体年龄

    def forward(self, x):
        # 通过ResNet152提取特征
        x = self.base.conv1(x)
        x = self.base.bn1(x)
        x = self.base.relu(x)
        x = self.base.maxpool(x)
        
        x = self.base.layer1(x)
        x = self.base.layer2(x)
        x = self.base.layer3(x)
        x = self.base.layer4(x)
        
        # 得到分类的结果 (x的shape是(batch_size, 2048))
        x_classification = self.base.avgpool(x)
        x_classification = torch.flatten(x_classification, 1)  # 将特征展平
        
        # 分类任务输出
        classification_output = self.base.fc(x_classification)
        
        # 回归部分
        regression_output = self.regression_fc(x_classification)
        
        return classification_output, regression_output

# ------------------- 训练工具函数 -------------------
def train_epoch(model, loader, criterion_class, criterion_regression, optimizer, device, scaler, age_bins, epoch, remove_invalid_samples=False):
    model.train()
    total_loss, total_correct, total_mae = 0, 0, 0
    total_samples = 0
    mae_differences = []  # 用于存储所有样本的 MAE 差值
    sample_indices = []  # 用于存储样本索引，方便删除样本时对应索引

    for idx, (images, ages, classes) in enumerate(loader):
        images = images.to(device, non_blocking=True)
        classes = classes.to(device, non_blocking=True)
        ages = ages.to(device, non_blocking=True)
        batch_size = images.size(0)

        # 混合精度训练
        with autocast():
            # 分类和回归的输出
            class_outputs, reg_outputs = model(images)
            
            # 分类任务损失
            class_loss = criterion_class(class_outputs, classes)
            
            # 回归任务损失
            reg_loss = criterion_regression(reg_outputs.squeeze(), ages)
            
            # 总损失
            loss = class_loss + reg_loss

        # 反向传播
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        optimizer.zero_grad(set_to_none=True)

        total_loss += loss.item() * batch_size
        preds = class_outputs.argmax(dim=1)
        total_correct += (preds == classes).sum().item()

        total_mae += torch.abs(reg_outputs.squeeze() - ages).sum().item()
        total_samples += batch_size

        # 计算MAE差值并存储（确保不计算梯度）
        mae_differences.extend(torch.abs(reg_outputs.squeeze() - ages).detach().cpu().numpy())
        sample_indices.extend([idx] * batch_size)  # 记录当前批次样本的索引

    accuracy = total_correct / total_samples
    mae = total_mae / total_samples

    # 每5个周期进行检查并删除MAE差值最大的5%的样本
    if epoch % 5 == 4 and remove_invalid_samples:
        # 计算前3% MAE差值最大的样本的索引
        threshold = int(0.04 * len(mae_differences))  # 删除最大3%的样本
        sorted_indices = np.argsort(mae_differences)[::-1]  # 按照MAE差值降序排序
        remove_indices = sorted_indices[:threshold]  # 获取最大的3%样本的索引

        print(f"删除 {len(remove_indices)} 个 MAE 差值最大的样本")
        
        # 删除样本
        for idx in sorted(remove_indices, reverse=True):  # 反向排序避免索引问题
            loader.dataset.remove_sample(idx)

        # 重新创建DataLoader，以更新数据集
        global train_loader
        train_loader = DataLoader(loader.dataset, batch_size=32, shuffle=True, num_workers=0, pin_memory=False)

    return {
        'loss': total_loss / total_samples,
        'acc': accuracy,
        'mae': mae
    }

def validate(model, loader, criterion_class, criterion_regression, device, age_bins):
    model.eval()
    total_loss, total_correct, total_mae = 0, 0, 0
    total_samples = 0

    with torch.no_grad():
        for images, ages, classes in loader:
            images = images.to(device, non_blocking=True)
            classes = classes.to(device, non_blocking=True)
            ages = ages.to(device, non_blocking=True)
            batch_size = images.size(0)

            # 分类和回归的输出
            class_outputs, reg_outputs = model(images)
            
            class_loss = criterion_class(class_outputs, classes)
            reg_loss = criterion_regression(reg_outputs.squeeze(), ages)
            loss = class_loss + reg_loss

            total_loss += loss.item() * batch_size
            preds = class_outputs.argmax(dim=1)
            total_correct += (preds == classes).sum().item()

            total_mae += torch.abs(reg_outputs.squeeze() - ages).sum().item()
            total_samples += batch_size

    accuracy = total_correct / total_samples
    mae = total_mae / total_samples

    return {
        'loss': total_loss / total_samples,
        'acc': accuracy,
        'mae': mae
    }

# ------------------- 主训练流程 -------------------
def main():
    age_bins = list(range(0, 193, 12))  # 0-192岁，每12岁一个区间
    batch_size = 128
    num_epochs = 20
    lr = 1e-4

    # 数据增强
    train_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.RandomCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(0.2, 0.2, 0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    val_transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

    train_dataset = AgeDataset(r'C:\Users\1\Desktop\age\trainset', 'train.csv', train_transform, age_bins)
    val_dataset = AgeDataset(r'C:\Users\1\Desktop\age\valset', 'val.csv', val_transform, age_bins)

    global train_loader
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0, pin_memory=False)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=0, pin_memory=False)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = AgeResNet50(num_classes=len(age_bins)).to(device)
    
    criterion_class = nn.CrossEntropyLoss()
    criterion_regression = nn.MSELoss()
    
    optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=2)
    scaler = GradScaler()

    best_mae = float('inf')
    for epoch in range(num_epochs):
        # 训练阶段
        train_metrics = train_epoch(model, train_loader, criterion_class, criterion_regression, optimizer, device, scaler, age_bins, epoch, remove_invalid_samples=True)
        
        # 验证阶段
        val_metrics = validate(model, val_loader, criterion_class, criterion_regression, device, age_bins)
        scheduler.step(val_metrics['loss'])

        # 打印结果，使用 f-string 格式化输出
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print(f"Train >> Loss: {train_metrics['loss']:.4f} | Acc: {train_metrics['acc']:.4f} | MAE: {train_metrics['mae']:.2f}")
        print(f"Valid >> Loss: {val_metrics['loss']:.4f} | Acc: {val_metrics['acc']:.4f} | MAE: {val_metrics['mae']:.2f}")

        # 保存最佳模型
        if val_metrics['mae'] < best_mae:
            best_mae = val_metrics['mae']
            torch.save(model.state_dict(), f'best_model_mae{best_mae:.2f}.pth')

if __name__ == "__main__":
    main()


成功载入 20000 张有效图片
成功载入 3000 张有效图片


  scaler = GradScaler()
  with autocast():



Epoch 1/20
Train >> Loss: 3381.5045 | Acc: 0.1580 | MAE: 45.50
Valid >> Loss: 2090.8492 | Acc: 0.1990 | MAE: 33.75

Epoch 2/20
Train >> Loss: 1846.4062 | Acc: 0.1757 | MAE: 30.91
Valid >> Loss: 1111.4431 | Acc: 0.1770 | MAE: 24.87

Epoch 3/20
Train >> Loss: 1253.9637 | Acc: 0.1787 | MAE: 25.39
Valid >> Loss: 1012.7089 | Acc: 0.2103 | MAE: 23.73

Epoch 4/20
Train >> Loss: 1103.1221 | Acc: 0.1890 | MAE: 24.30
Valid >> Loss: 1010.7983 | Acc: 0.2173 | MAE: 23.67
删除 800 个 MAE 差值最大的样本

Epoch 5/20
Train >> Loss: 1041.8170 | Acc: 0.1963 | MAE: 23.77
Valid >> Loss: 993.8233 | Acc: 0.2297 | MAE: 23.02

Epoch 6/20
Train >> Loss: 1160.8722 | Acc: 0.1760 | MAE: 25.41
Valid >> Loss: 1007.6815 | Acc: 0.2223 | MAE: 24.29

Epoch 7/20
Train >> Loss: 1076.5542 | Acc: 0.1890 | MAE: 24.41
Valid >> Loss: 958.3063 | Acc: 0.2093 | MAE: 22.55

Epoch 8/20
Train >> Loss: 1040.3639 | Acc: 0.1974 | MAE: 23.90
Valid >> Loss: 965.1147 | Acc: 0.2213 | MAE: 22.75

Epoch 9/20
Train >> Loss: 1019.7236 | Acc: 0.1965 | M

In [2]:
import os
import torch
from PIL import Image
from torchvision import transforms, models
import torch.nn as nn

# ------------------- 模型定义 -------------------
class AgeResNet152(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        # 使用ResNet152（确保训练和预测时使用一致的网络结构）
        self.base = models.resnet152(pretrained=False)
        in_features = self.base.fc.in_features
        self.base.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(in_features, num_classes)  # 分类分支
        )
        # 回归分支输出具体年龄
        self.regression_fc = nn.Linear(in_features, 1)

    def forward(self, x):
        x = self.base.conv1(x)
        x = self.base.bn1(x)
        x = self.base.relu(x)
        x = self.base.maxpool(x)
        
        x = self.base.layer1(x)
        x = self.base.layer2(x)
        x = self.base.layer3(x)
        x = self.base.layer4(x)
        
        x_classification = self.base.avgpool(x)
        x_classification = torch.flatten(x_classification, 1)
        
        classification_output = self.base.fc(x_classification)
        regression_output = self.regression_fc(x_classification)
        
        return classification_output, regression_output

# ------------------- 预测函数 -------------------
def predict_ages(model, device, transform, test_dir, output_txt):
    # 获取 test_dir 下所有图片文件（支持 .png, .jpg, .jpeg 格式）
    image_files = [f for f in os.listdir(test_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg'))]
    image_files.sort()  # 排序，确保输出顺序一致

    results = []
    model.eval()
    with torch.no_grad():
        for img_file in image_files:
            img_path = os.path.join(test_dir, img_file)
            try:
                image = Image.open(img_path).convert('RGB')
            except Exception as e:
                print(f"读取图片 {img_file} 失败：{e}")
                continue
            image_tensor = transform(image).unsqueeze(0).to(device)
            _, reg_output = model(image_tensor)
            predicted_age = reg_output.item()
            # 四舍五入取整
            predicted_age = int(round(predicted_age))
            # 使用制表符分隔，符合格式要求
            results.append(f"{img_file}\t{predicted_age}")

    # 将预测结果写入文本文件
    with open(output_txt, 'w') as f:
        for line in results:
            f.write(line + "\n")

    print(f"预测结果已保存到 {output_txt}")

# ------------------- 主函数 -------------------
def main():
    test_dir = r"C:\Users\1\Desktop\age\valset"      # 测试图片目录
    output_txt = "pred_result.txt"                     # 输出结果文件名

    # 定义测试时的数据预处理（与验证时一致）
    transform = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406],
                             [0.229, 0.224, 0.225])
    ])

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # 使用训练时的年龄分箱，假设训练时使用的是 list(range(0, 193, 12))
    age_bins = list(range(0, 193, 12))
    num_classes = len(age_bins)

    # 构造模型并加载权重（确保模型架构和训练时一致，此处使用ResNet152）
    model = AgeResNet152(num_classes=num_classes).to(device)
    # 修改下方 pth_path 为你保存的模型文件路径
    pth_path = r"C:\Users\1\Desktop\age\best_model_mae21.47.pth"
    state_dict = torch.load(pth_path, map_location=device)
    model.load_state_dict(state_dict)

    # 执行预测
    predict_ages(model, device, transform, test_dir, output_txt)

if __name__ == "__main__":
    main()


  state_dict = torch.load(pth_path, map_location=device)


预测结果已保存到 pred_result.txt
