In [15]:
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import pandas as pd
import numpy as np
from pathlib import Path
import torchvision.transforms as transforms

class Nutrition5kDataset(Dataset):
    """
    Nutrition5K 数据集加载器
    
    Args:
        csv_file: CSV文件路径，包含ID和Value列
        data_root: 数据根目录
        split: 'train' 或 'test'
        transform: 图像变换
        use_depth: 是否使用深度图（默认False，baseline只用RGB）
    """
    def __init__(self, csv_file, data_root, split='train', 
                 transform=None, use_depth=False):
        self.df = pd.read_csv(csv_file)
        self.data_root = Path(data_root)
        self.split = split
        self.transform = transform
        self.use_depth = use_depth
        
        # 构建图像路径
        self.rgb_dir = self.data_root / split / 'color'
        if use_depth:
            self.depth_dir = self.data_root / split / 'depth_raw'
        
        print(f"加载 {split} 数据集: {len(self.df)} 个样本")
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        # 获取样本信息
        row = self.df.iloc[idx]
        dish_id = row['ID']
        
        # 加载RGB图像
        rgb_path = self.rgb_dir / dish_id / 'rgb.png'
        rgb = Image.open(rgb_path).convert('RGB')
        
        # 应用变换
        if self.transform:
            rgb = self.transform(rgb)
        
        # 准备返回值
        sample = {
            'image': rgb,
            'dish_id': dish_id
        }
        
        # 如果是训练集，添加标签
        if 'Value' in row:
            sample['calories'] = torch.tensor(row['Value'], dtype=torch.float32)
        
        # 如果使用深度图（baseline暂时不用）
        if self.use_depth:
            depth_path = self.depth_dir / dish_id / 'depth_raw.png'
            depth = Image.open(depth_path)
            depth = np.array(depth, dtype=np.float32) / 10000.0  # 转为米
            depth = torch.from_numpy(depth).unsqueeze(0)  # (1, H, W)
            sample['depth'] = depth
        
        return sample


def get_transforms(split='train', image_size=224):
    """
    获取数据变换
    
    Args:
        split: 'train' 或 'val'/'test'
        image_size: 目标图像尺寸
    """
    if split == 'train':
        # 训练集：数据增强
        return transforms.Compose([
            transforms.Resize((image_size, image_size)),
            transforms.RandomHorizontalFlip(p=0.5),  # 50%概率水平翻转
            transforms.RandomRotation(degrees=15),    # ±15度旋转
            transforms.ColorJitter(                   # 色彩抖动
                brightness=0.2,
                contrast=0.2,
                saturation=0.2,
                hue=0.1
            ),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])
    else:
        # 验证集/测试集：只做基本变换
        return transforms.Compose([
            transforms.Resize((image_size, image_size)),
            transforms.ToTensor(),
            transforms.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]
            )
        ])


def create_dataloaders(data_root, train_csv, batch_size=32, 
                       val_split=0.2, num_workers=0, image_size=224):
    """
    创建训练集和验证集的DataLoader
    
    Args:
        data_root: 数据根目录
        train_csv: 训练CSV文件路径
        batch_size: 批次大小
        val_split: 验证集比例（0.2 = 20%）
        num_workers: 数据加载线程数
        image_size: 图像尺寸
    
    Returns:
        train_loader, val_loader
    """
    # 读取完整训练集
    full_df = pd.read_csv(train_csv)
    
    # 划分训练集和验证集
    from sklearn.model_selection import train_test_split
    train_df, val_df = train_test_split(
        full_df, 
        test_size=val_split, 
        random_state=42,
        shuffle=True
    )
    
    # 保存临时CSV
    train_csv_path = Path(train_csv).parent / 'train_split.csv'
    val_csv_path = Path(train_csv).parent / 'val_split.csv'
    train_df.to_csv(train_csv_path, index=False)
    val_df.to_csv(val_csv_path, index=False)
    
    print(f"数据集划分: 训练={len(train_df)}, 验证={len(val_df)}")
    
    # 创建数据集
    train_dataset = Nutrition5kDataset(
        csv_file=train_csv_path,
        data_root=data_root,
        split='train',
        transform=get_transforms('train', image_size),
        use_depth=False  # baseline不用深度图
    )
    
    val_dataset = Nutrition5kDataset(
        csv_file=val_csv_path,
        data_root=data_root,
        split='train',  # 注意：验证集也来自train目录
        transform=get_transforms('val', image_size),
        use_depth=False
    )
    
    # 创建DataLoader
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True  # 加速GPU传输
    )
    
    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    
    return train_loader, val_loader


# 测试代码
if __name__ == '__main__':
    # 测试数据加载
    data_root = Path('Nutrition5K/Nutrition5K')
    train_csv = data_root / 'nutrition5k_train.csv'
    
    train_loader, val_loader = create_dataloaders(
        data_root=data_root,
        train_csv=train_csv,
        batch_size=16,
        val_split=0.2
    )
    
    # 获取一个batch查看
    batch = next(iter(train_loader))
    print(f"\nBatch信息:")
    print(f"  图像shape: {batch['image'].shape}")  # (B, 3, 224, 224)
    print(f"  卡路里shape: {batch['calories'].shape}")  # (B,)
    print(f"  卡路里值: {batch['calories'][:5]}")

数据集划分: 训练=2640, 验证=661
加载 train 数据集: 2640 个样本
加载 train 数据集: 661 个样本

Batch信息:
  图像shape: torch.Size([16, 3, 224, 224])
  卡路里shape: torch.Size([16])
  卡路里值: tensor([113.9100, 321.5300,   0.0000, 504.8209, 298.1194])


In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class BaselineCNN(nn.Module):
    """
    Baseline CNN 用于卡路里预测
    
    架构：5个卷积块 + 2个全连接层
    输入：(B, 3, 224, 224) RGB图像
    输出：(B, 1) 卡路里预测值
    """
    def __init__(self, dropout_rate=0.5):
        super(BaselineCNN, self).__init__()
        
        # 卷积块 1: 3 -> 32
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)  # 224 -> 112
        )
        
        # 卷积块 2: 32 -> 64
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)  # 112 -> 56
        )
        
        # 卷积块 3: 64 -> 128
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)  # 56 -> 28
        )
        
        # 卷积块 4: 128 -> 256
        self.conv4 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)  # 28 -> 14
        )
        
        # 卷积块 5: 256 -> 512
        self.conv5 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2)  # 14 -> 7
        )
        
        # 全局平均池化
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        
        # 全连接层
        self.fc = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(512, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout_rate),
            nn.Linear(256, 1)  # 输出1个值
        )
        
        # 权重初始化
        self._initialize_weights()
    
    def _initialize_weights(self):
        """使用He初始化"""
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        """
        前向传播
        
        Args:
            x: (B, 3, 224, 224) RGB图像
        
        Returns:
            (B, 1) 卡路里预测值
        """
        x = self.conv1(x)   # (B, 32, 112, 112)
        x = self.conv2(x)   # (B, 64, 56, 56)
        x = self.conv3(x)   # (B, 128, 28, 28)
        x = self.conv4(x)   # (B, 256, 14, 14)
        x = self.conv5(x)   # (B, 512, 7, 7)
        
        # 全局平均池化
        x = self.global_avg_pool(x)  # (B, 512, 1, 1)
        x = x.view(x.size(0), -1)     # (B, 512)
        
        # 全连接层
        x = self.fc(x)  # (B, 1)
        
        # 确保输出非负（卡路里不能为负）
        x = F.relu(x)
        
        return x


def count_parameters(model):
    """计算模型参数量"""
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


# 测试代码
if __name__ == '__main__':
    # 创建模型
    model = BaselineCNN(dropout_rate=0.5)
    
    # 统计参数
    num_params = count_parameters(model)
    print(f"模型参数量: {num_params:,}")
    
    # 测试前向传播
    batch_size = 4
    dummy_input = torch.randn(batch_size, 3, 224, 224)
    
    print(f"\n输入shape: {dummy_input.shape}")
    
    # 前向传播
    output = model(dummy_input)
    print(f"输出shape: {output.shape}")
    print(f"输出值: {output.squeeze()}")
    
    # 检查梯度流
    print(f"\n模型可训练: {model.training}")
    print(f"第一层权重requires_grad: {model.conv1[0].weight.requires_grad}")

模型参数量: 4,847,777

输入shape: torch.Size([4, 3, 224, 224])
输出shape: torch.Size([4, 1])
输出值: tensor([0.0047, 0.0000, 0.0000, 0.0000], grad_fn=<SqueezeBackward0>)

模型可训练: True
第一层权重requires_grad: True
