### 学号

In [1]:
student_id = '22211360121'
#输入学号，不输入或者输错则没有成绩
subdir = ''

### 库

In [2]:
from __future__ import print_function, division
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import datetime
import os
import copy
import glob
import pandas as pd
from PIL import Image
from torch.nn import functional as F  #包含常用的函数包

## 数据处理

In [3]:
data_transforms = {
    #训练集数据增强和归一化
    'train':
    transforms.Compose([
        transforms.Resize(256),
        transforms.RandomResizedCrop(224),
        transforms.RandomVerticalFlip(p=0.3),
        transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.3),  # 随机锐化
        transforms.RandomAutocontrast(p=0.2),                        # 自动对比度
        transforms.RandomGrayscale(p=0.1),
        transforms.RandomHorizontalFlip(30),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        #该均值和方差是在imagenet(包含很多种类自然图像的数据集)的一个统计特征
    ]),
    #在验证集上仅需要归一化
    'val':
    transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ]),
}

In [4]:
data_dir = 'data'
image_datasets = {
    x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x])
    for x in ['train', 'val']
}
dataloaders = {
    x: torch.utils.data.DataLoader(image_datasets[x],
                                   batch_size=16,
                                   shuffle=True,
                                   num_workers=4,
                                   pin_memory=True)
    for x in ['train', 'val']
}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes
print(class_names)

#如果有gpu使用gpu训练，没有则用cpu
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

['cats', 'dogs']


### 测试数据导入

In [5]:
class dataset(torch.utils.data.Dataset):

    def __init__(self, file_list, transform=None):
        self.file_list = file_list
        self.transform = transform

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        img_path = self.file_list[idx]
        img = Image.open(img_path)
        img_transformed = self.transform(img)
        # 对于测试集，返回文件 ID（文件名无扩展名）而不是标签
        fileid = os.path.basename(img_path).split('.')[0]  # 如 '1'
        return img_transformed, fileid

In [6]:
test1_dir = os.path.join('data', 'testB')
test_list = glob.glob(os.path.join(test1_dir, '*.jpg'))
trans = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
test_data = dataset(test_list, transform=trans)
test_loader = torch.utils.data.DataLoader(
    dataset=test_data,
    batch_size=16,
    shuffle=False,  # 预测时通常不需要打乱
    num_workers=0   # 先设置为 0，避免多进程问题
)

### 可视化图像


In [7]:
# def imshow(inp, title=None):
#     """Imshow for Tensor."""
#     inp = inp.numpy().transpose((1, 2, 0))
#     mean = np.array([0.485, 0.456, 0.406])
#     std = np.array([0.229, 0.224, 0.225])
#     inp = std * inp + mean
#     inp = np.clip(inp, 0, 1)
#     plt.imshow(inp)
#     if title is not None:
#         plt.title(title)
#     plt.pause(0.001)  # 暂停一下，以便更新情节


# # 获取一批训练数据
# inputs, classes = next(iter(dataloaders['train']))

# # 批量制作网格
# out = torchvision.utils.make_grid(inputs)

# imshow(out, title=[class_names[x] for x in classes])

### 网络结构

### 训练模型方法

In [8]:
import time
import copy
import torch
import torch.nn as nn
import torch.optim as optim
from torch.cuda.amp import autocast, GradScaler
from torch.optim import lr_scheduler

# 定义训练函数
def train_model(model, criterion, optimizer, scheduler, dataloaders, dataset_sizes, device, num_epochs=5):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    scaler = GradScaler()  # 初始化GradScaler

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # 每个epoch都有一个训练和验证阶段
        for phase in ['train', 'val']:
            running_loss = 0.0
            running_corrects = 0

            # 迭代数据
            for inputs, labels in dataloaders[phase]:
                inputs, labels = inputs.to(device), labels.to(device)

                # 零参数梯度
                optimizer.zero_grad()

                # 前向传播
                with autocast():  # 启用混合精度
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                # 反向传播 + 仅在训练阶段进行优化
                if phase == 'train':
                    scaler.scale(loss).backward()  # 使用Scaler进行反向传播
                    scaler.step(optimizer)         # 更新参数
                    scaler.update()                # 更新Scaler

                running_loss += loss.item() * inputs.size(0)
                _, preds = torch.max(outputs, 1)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # 深度复制model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

            if phase == 'train':
                scheduler.step()
                model.train()  # 训练模型
            else:
                model.eval()   # 测试模型

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60}m {time_elapsed % 60}s')
    print(f'Best val Acc: {best_acc:.4f}')

    # 加载最佳模型权重
    model.load_state_dict(best_model_wts)
    return model

### 模型结果可视化

In [9]:
def visualize_model(model, dataloaders, class_names, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure(figsize=(10, num_images))

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):  # ✅ 正确引用验证集 dataloader
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size(0)):
                images_so_far += 1
                ax = plt.subplot(num_images // 2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'predicted: {class_names[preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return

    model.train(mode=was_training)


### Res

In [10]:
import torch
import torch.nn as nn
import torchvision.models as models

# 加载预训练的ResNet50
resnet50 = models.resnet50(pretrained=True)
# 修改全连接层，假设是二分类任务
resnet50.fc = nn.Linear(resnet50.fc.in_features, 2)
# 将模型移动到GPU（如果可用）
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet50 = resnet50.to(device)



### Eff

In [11]:
from efficientnet_pytorch import EfficientNet

# 加载预训练的EfficientNet-B0（可根据需要选择B1、B2等）
efficientnet = EfficientNet.from_pretrained('efficientnet-b0')
# 修改全连接层，适应二分类任务
efficientnet._fc = nn.Linear(efficientnet._fc.in_features, 2)
# 将模型移动到GPU
efficientnet = efficientnet.to(device)

Loaded pretrained weights for efficientnet-b0


### 分别训练

In [12]:
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer_resnet = torch.optim.SGD(resnet50.parameters(), lr=0.001, momentum=0.9)

# 学习率调度器
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_resnet, step_size=2, gamma=0.1)

# 训练ResNet50（假设你有train_model函数）
resnet50 = train_model(resnet50,
    criterion,
    optimizer_resnet,
    exp_lr_scheduler,
    dataloaders,
    dataset_sizes,
    device,
    num_epochs=40)

Epoch 0/39
----------


  scaler = GradScaler()  # 初始化GradScaler
  with autocast():  # 启用混合精度


train Loss: 0.2317 Acc: 0.8988
val Loss: 0.0790 Acc: 0.9725

Epoch 1/39
----------
train Loss: 0.2560 Acc: 0.8820
val Loss: 0.0749 Acc: 0.9677

Epoch 2/39
----------
train Loss: 0.1015 Acc: 0.9587
val Loss: 0.0704 Acc: 0.9718

Epoch 3/39
----------
train Loss: 0.1034 Acc: 0.9560
val Loss: 0.0776 Acc: 0.9710

Epoch 4/39
----------
train Loss: 0.1178 Acc: 0.9523
val Loss: 0.0630 Acc: 0.9748

Epoch 5/39
----------
train Loss: 0.1141 Acc: 0.9537
val Loss: 0.0667 Acc: 0.9745

Epoch 6/39
----------
train Loss: 0.1203 Acc: 0.9516
val Loss: 0.0704 Acc: 0.9723

Epoch 7/39
----------
train Loss: 0.1313 Acc: 0.9486
val Loss: 0.0740 Acc: 0.9708

Epoch 8/39
----------
train Loss: 0.1363 Acc: 0.9459
val Loss: 0.0665 Acc: 0.9753

Epoch 9/39
----------
train Loss: 0.1347 Acc: 0.9460
val Loss: 0.0678 Acc: 0.9718

Epoch 10/39
----------
train Loss: 0.1322 Acc: 0.9513
val Loss: 0.0679 Acc: 0.9733

Epoch 11/39
----------
train Loss: 0.1263 Acc: 0.9520
val Loss: 0.0715 Acc: 0.9713

Epoch 12/39
----------
t

In [13]:
# 定义优化器（EfficientNet对Adam优化器效果更好）
optimizer_efficientnet = torch.optim.Adam(efficientnet.parameters(), lr=0.0001)

# 训练EfficientNet
efficientnet = train_model(efficientnet,
    criterion,
    optimizer_efficientnet,  # 修正为 optimizer_efficientnet
    exp_lr_scheduler,
    dataloaders,
    dataset_sizes,
    device,
    num_epochs=40)

Epoch 0/39
----------


  scaler = GradScaler()  # 初始化GradScaler
  with autocast():  # 启用混合精度


train Loss: 0.2475 Acc: 0.8913
val Loss: 0.1038 Acc: 0.9607

Epoch 1/39
----------
train Loss: 0.1374 Acc: 0.9412
val Loss: 0.0966 Acc: 0.9643

Epoch 2/39
----------
train Loss: 0.1166 Acc: 0.9498
val Loss: 0.0978 Acc: 0.9615

Epoch 3/39
----------
train Loss: 0.1058 Acc: 0.9575
val Loss: 0.1014 Acc: 0.9600

Epoch 4/39
----------
train Loss: 0.0926 Acc: 0.9606
val Loss: 0.0976 Acc: 0.9643

Epoch 5/39
----------
train Loss: 0.0951 Acc: 0.9595
val Loss: 0.1050 Acc: 0.9617

Epoch 6/39
----------
train Loss: 0.0919 Acc: 0.9604
val Loss: 0.0926 Acc: 0.9650

Epoch 7/39
----------
train Loss: 0.0838 Acc: 0.9641
val Loss: 0.1011 Acc: 0.9620

Epoch 8/39
----------
train Loss: 0.0841 Acc: 0.9647
val Loss: 0.0959 Acc: 0.9650

Epoch 9/39
----------
train Loss: 0.0803 Acc: 0.9669
val Loss: 0.1136 Acc: 0.9593

Epoch 10/39
----------
train Loss: 0.0817 Acc: 0.9667
val Loss: 0.0987 Acc: 0.9580

Epoch 11/39
----------
train Loss: 0.0787 Acc: 0.9661
val Loss: 0.0982 Acc: 0.9603

Epoch 12/39
----------
t

In [14]:
torch.save(resnet50.state_dict(), "resnet50_best.pth")
torch.save(efficientnet.state_dict(), "efficientnet_best.pth")

### 平均

In [15]:
import torch

# 修改ensemble_predict函数，接受模型和device参数
def ensemble_predict(inputs, model_resnet, model_efficientnet, device):
    model_resnet.eval()
    model_efficientnet.eval()
    with torch.no_grad():
        outputs_resnet = model_resnet(inputs)
        outputs_efficientnet = model_efficientnet(inputs)
        probs_resnet = torch.softmax(outputs_resnet, dim=1)
        probs_efficientnet = torch.softmax(outputs_efficientnet, dim=1)
        avg_probs = (probs_resnet + probs_efficientnet) / 2
        _, preds = torch.max(avg_probs, 1)
    return preds

# 定义评估集成模型的函数
def evaluate_ensemble(model_resnet, model_efficientnet, dataloader, device):
    model_resnet.eval()
    model_efficientnet.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            preds = ensemble_predict(inputs, model_resnet, model_efficientnet, device)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    accuracy = correct / total
    return accuracy

In [16]:
from torchvision import models
from efficientnet_pytorch import EfficientNet

# 定义设备
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Running on: {device}")

# 加载ResNet50
resnet50 = models.resnet50(pretrained=False)  # 不加载预训练权重，因为我们要用你保存的权重
resnet50.fc = nn.Linear(resnet50.fc.in_features, 2)  # 输出层改为2类
resnet50.load_state_dict(torch.load("resnet50_best.pth"))
resnet50 = resnet50.to(device)

# 加载EfficientNet-B0
efficientnet = EfficientNet.from_name('efficientnet-b0')
efficientnet._fc = nn.Linear(efficientnet._fc.in_features, 2)  # 输出层改为2类
efficientnet.load_state_dict(torch.load("efficientnet_best.pth"))
efficientnet = efficientnet.to(device)

Running on: cuda:0




In [17]:
# 假设你的验证集dataloader是dataloaders['val']
val_accuracy = evaluate_ensemble(resnet50, efficientnet, dataloaders['val'], device)
print(f'集成模型在验证集上的准确度: {val_accuracy:.4f}')

集成模型在验证集上的准确度: 0.9880


In [18]:
from tqdm import tqdm

# 测试
dog_probs = []
with torch.no_grad():
    # 使用 tqdm 包装 test_loader 以显示进度条
    for inputs, fileid in tqdm(test_loader, desc="Predicting", total=len(test_loader)):
        inputs = inputs.to(device)
        # 正确调用 ensemble_predict，传递模型和设备
        preds = ensemble_predict(inputs, resnet50, efficientnet, device)
        dog_probs += list(zip(list(fileid), preds.cpu().numpy()))

Predicting: 100%|██████████| 274/274 [00:24<00:00, 10.96it/s]


### 训练

### 模型评估效果可视化

## 提交结果

In [19]:
print(f"Number of test images: {len(test_list)}")
print(f"Number of batches: {len(test_loader)}")

Number of test images: 4371
Number of batches: 274


In [20]:
dog_probs.sort(key = lambda x : int(x[0]))
# dog_probs

In [21]:
idx = list(map(lambda x: x[0],dog_probs))
prob = list(map(lambda x: x[1],dog_probs))

In [22]:
yhat = [1 if y >= 0.5 else 0 for y in prob]

In [23]:
submission = pd.DataFrame({'id':idx,'label':yhat})
submission

Unnamed: 0,id,label
0,0,1
1,1,1
2,2,0
3,3,0
4,4,1
...,...,...
4366,4366,0
4367,4367,0
4368,4368,0
4369,4369,1


In [24]:
submission.to_csv(subdir + student_id + 'submission_{}.csv'.format(
    datetime.datetime.now().strftime('%Y%m%d_%H%M%S')),
                  index=False)