In [31]:
import torch
from torch.utils.data import DataLoader, random_split, Dataset
from torchvision import transforms
from pathlib import Path
from PIL import Image

# 数据集类定义
class CatAndDogDataset(torch.utils.data.Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = Path(data_dir)
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        for file_name in self.data_dir.iterdir():
            if file_name.is_file() and file_name.suffix.lower() in ['.jpg', '.jpeg', '.png']:
                self.image_paths.append(file_name)
                if 'cat' in file_name.stem.lower():
                    self.labels.append(0)
                elif 'dog' in file_name.stem.lower():
                    self.labels.append(1)
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx]).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# 定义训练和验证的转换
def get_transforms():
    train_transform = transforms.Compose([
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(
            brightness=0.2, 
            contrast=0.2, 
            saturation=0.2, 
            hue=0.1
        ),
        transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    val_transform = transforms.Compose([
        transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    return train_transform, val_transform


In [32]:
# def calculate_mean_std_fast(data_dir, sample_ratio=0.1, batch_size=128, num_workers=0):
#     """
#     使用采样法快速计算数据集的均值和标准差
#     适用于大规模数据集（如12500个文件）
    
#     参数:
#         data_dir: 数据集目录路径
#         sample_ratio: 采样比例（0.1表示10%）
#         batch_size: 批次大小
#         num_workers: 数据加载的工作进程数（在Jupyter环境中建议设为0）
    
#     返回:
#         mean: 均值数组 [R, G, B]
#         std: 标准差数组 [R, G, B]
#     """
#     # 定义基础转换（仅调整大小和转张量）
#     basic_transform = transforms.Compose([
#         transforms.Resize((224, 224)),  # 统一大小
#         transforms.ToTensor(),          # 转张量并归一化到[0,1]
#     ])
    
#     # 创建完整数据集
#     full_dataset = CatAndDogDataset(
#         data_dir=data_dir,
#         transform=basic_transform,
#         has_label=True
#     )
    
#     # 采样部分数据
#     dataset_size = len(full_dataset)
#     sample_size = max(1, int(dataset_size * sample_ratio))  # 确保至少有1个样本
#     sample_indices = random.sample(range(dataset_size), sample_size)
#     sampled_dataset = Subset(full_dataset, sample_indices)
    
#     # 创建数据加载器（禁用多进程以避免序列化问题）
#     dataloader = DataLoader(
#         sampled_dataset,
#         batch_size=batch_size,
#         shuffle=False,
#         num_workers=num_workers,  # 在Jupyter环境中设置为0
#         pin_memory=False,         # 单进程模式下不需要
#         drop_last=False
#     )
    
#     # 使用CPU计算（简化实现）
#     device = torch.device("cpu")
#     print(f"使用设备: {device}")
#     print(f"数据集大小: {dataset_size}")
#     print(f"采样数量: {sample_size}")
    
#     # 初始化均值计算
#     mean = torch.zeros(3, device=device)
#     total_pixels = 0
    
#     print(f"计算均值中 (使用{sample_size}个样本)...")
#     for images, _ in tqdm(dataloader):
#         images = images.to(device)
#         batch_samples = images.size(0)
        
#         # 计算当前批次的均值
#         batch_mean = torch.mean(images, dim=[0, 2, 3])
#         mean += batch_mean * batch_samples
#         total_pixels += batch_samples
    
#     # 计算最终均值
#     mean /= total_pixels
    
#     # 初始化方差计算
#     var = torch.zeros(3, device=device)
    
#     print("计算标准差中...")
#     for images, _ in tqdm(dataloader):
#         images = images.to(device)
#         batch_samples = images.size(0)
        
#         # 减去均值
#         for i in range(3):
#             images[:, i, :, :] -= mean[i]
        
#         # 计算方差
#         batch_var = torch.mean(images ** 2, dim=[0, 2, 3])
#         var += batch_var * batch_samples
    
#     # 计算最终标准差
#     std = torch.sqrt(var / total_pixels)
    
#     # 转回CPU并转为列表
#     mean = mean.cpu().tolist()
#     std = std.cpu().tolist()
    
#     return mean, std

# # 简单的单进程计算方法（适合调试）
# def calculate_mean_std_simple(data_dir, sample_ratio=0.1):
#     """
#     简单的单进程方法计算均值和标准差，适合调试
    
#     参数:
#         data_dir: 数据集目录路径
#         sample_ratio: 采样比例（0.1表示10%）
    
#     返回:
#         mean: 均值数组 [R, G, B]
#         std: 标准差数组 [R, G, B]
#     """
#     # 定义基础转换
#     basic_transform = transforms.Compose([
#         transforms.Resize((224, 224)),
#         transforms.ToTensor(),
#     ])
    
#     # 创建数据集
#     dataset = CatAndDogDataset(
#         data_dir=data_dir,
#         transform=basic_transform,
#         has_label=True
#     )
    
#     # 采样数据
#     dataset_size = len(dataset)
#     sample_size = max(1, int(dataset_size * sample_ratio))
#     sample_indices = random.sample(range(dataset_size), sample_size)
    
#     # 初始化均值和标准差计算
#     mean = torch.zeros(3)
#     std = torch.zeros(3)
    
#     print(f"使用简单方法计算均值和标准差 (样本数: {sample_size})...")
    
#     # 计算均值
#     for idx in tqdm(sample_indices):
#         image, _ = dataset[idx]
#         mean += torch.mean(image, dim=[1, 2])
#     mean /= sample_size
    
#     # 计算标准差
#     for idx in tqdm(sample_indices):
#         image, _ = dataset[idx]
#         for i in range(3):
#             image[i] -= mean[i]
#         std += torch.mean(image ** 2, dim=[1, 2])
#     std = torch.sqrt(std / sample_size)
    
#     # 转为列表
#     mean = mean.tolist()
#     std = std.tolist()
    
#     return mean, std

# # 使用示例
# if __name__ == "__main__":
#     data_dir = '/Users/sunchangxing/Documents/project/ML-study/torch/kaggle/cat_and_dog/data/train'
    
#     try:
#         # 方法1: 修复后的采样法（适合大数据集）
#         print("=== 使用修复后的采样法计算均值和标准差 ===")
#         # 注意：设置num_workers=0避免多进程问题
#         mean, std = calculate_mean_std_fast(data_dir, sample_ratio=0.15, batch_size=128, num_workers=0)
#         print(f"均值: {mean}")
#         print(f"标准差: {std}")
#         print(f"可直接用于transforms.Normalize: transforms.Normalize(mean={mean}, std={std})")
        
#         # 方法2: 简单单进程方法（适合调试）
#         # print("\n=== 使用简单单进程方法计算均值和标准差 ===")
#         # mean, std = calculate_mean_std_simple(data_dir, sample_ratio=0.05)
#         # print(f"均值: {mean}")
#         # print(f"标准差: {std}")
        
#     except Exception as e:
#         print(f"发生错误: {e}")
#         print("请检查数据集路径是否正确，以及数据格式是否符合要求")

In [33]:
train_transform, val_transform = get_transforms()

dataset = CatAndDogDataset("data/train", transform=train_transform)

batch_size = 32
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = random_split(
    dataset, 
    [train_size, test_size],
    generator=torch.Generator().manual_seed(42)  # 设置随机种子以确保结果可复现
)
train_loader = DataLoader(
    train_dataset, 
    batch_size=batch_size, 
    shuffle=True, 
    pin_memory=True
)
test_loader = DataLoader(
    test_dataset, 
    batch_size=batch_size, 
    shuffle=False, 
    pin_memory=True
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [34]:
from torch import nn
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2)
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256*14*14, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(0.2),
            nn.Linear(128, 2),
            # nn.Sigmoid()
        )
    def forward(self, x):
        batch_size = x.size(0)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = x.view(batch_size, -1)
        x = self.fc(x)
        return x

In [35]:
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * images.size(0)
    
    epoch_loss = running_loss / len(train_loader.dataset)
    return epoch_loss

def test(model, test_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            running_loss += loss.item() * images.size(0)
            
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    epoch_loss = running_loss / len(test_loader.dataset)
    epoch_acc = correct / total
    return epoch_loss, epoch_acc

In [36]:
from torch import optim
model = Net()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
epoch_num = 10
for epoch in range(epoch_num):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    test_loss, test_acc = test(model, test_loader, criterion, device)
    print(f"Epoch {epoch+1}/{epoch_num}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}")



Epoch 1/10, Train Loss: 0.8222, Test Loss: 0.6308, Test Acc: 0.6414
Epoch 2/10, Train Loss: 0.5851, Test Loss: 0.5449, Test Acc: 0.7160
Epoch 3/10, Train Loss: 0.5217, Test Loss: 0.4978, Test Acc: 0.7250


KeyboardInterrupt: 