### 1.原始数据分类
1. 导入需要的包

In [1]:
import os
import shutil
import xml.etree.ElementTree as ET
import random

2. 根据.xml文件对原始数据集'PMID2019/JPEGImages/'进行分类

In [2]:
# 定义路径
print(os.getcwd())
xml_folder = '../OriginalData/PMID2019/Annotations'
image_folder = '../OriginalData/PMID2019/JPEGImages'
test_set_path = '../dataset/test'
train_set_path = '../dataset/train/'

c:\Users\86159\OneDrive - bjfu.edu.cn\大创打工人\CVAI\code


In [3]:
def create_folders_and_move_images(xml_folder, image_folder, train_set_path):
    # 清空目标目录
    if os.path.exists(test_set_path):
        shutil.rmtree(test_set_path)
    if os.path.exists(train_set_path):
        shutil.rmtree(train_set_path)

    # 遍历XML文件夹
    for xml_file in os.listdir(xml_folder):
        if not xml_file.endswith('.xml'):
            continue

        # 解析XML文件
        tree = ET.parse(os.path.join(xml_folder, xml_file))
        root = tree.getroot()

        # 提取"name"字段值
        name = root.find('.//name').text

        # 创建对应的文件夹
        train_class_folder = os.path.join(train_set_path, name)

        os.makedirs(train_class_folder, exist_ok=True)

        # 获取与当前XML文件对应的图片文件
        image_file = xml_file.replace('.xml', '.jpg')

        # 如果对应的图片文件存在，则将其移动到训练集文件夹
        if os.path.exists(os.path.join(image_folder, image_file)):
            shutil.copy(os.path.join(image_folder, image_file), os.path.join(train_class_folder, image_file))
            print(f"'{image_file}' Succeeded.")
        else:
            print(f"Image file '{image_file}' not found for XML file '{xml_file}'.")
                


# 创建文件夹并移动图片
# create_folders_and_move_images(xml_folder, image_folder, train_set_path)


3. 从```'dataset/train/'```中按比例(```split_ratio=0.2```)划分测试集

In [4]:
def move_files_to_test_set(train_set_path, test_set_path, classes, split_ratio=0.2):
    """
    将训练集中的部分文件移动到测试集中
    
    参数：
    train_set_path: 训练集文件夹路径
    test_set_path: 测试集文件夹路径
    classes: 类别列表
    split_ratio: 测试集占比，默认为0.2
    """
    # 清空目标目录
    if os.path.exists(test_set_path):
        shutil.rmtree(test_set_path)

    # 遍历每个类别
    for class_name in classes:
        class_train_path = os.path.join(train_set_path, class_name)
        class_test_path = os.path.join(test_set_path, class_name)

        

        os.makedirs(class_test_path, exist_ok=True)

        # 获取类别下的所有文件
        files = [file for file in os.listdir(class_train_path) if file.endswith('.jpg')]

        # 计算要移动到测试集的文件数量
        num_files_to_move = int(len(files) * split_ratio)

        # 随机选择要移动的文件
        files_to_move = random.sample(files, num_files_to_move)

        # 移动文件到测试集中
        for file_to_move in files_to_move:
            shutil.move(os.path.join(class_train_path, file_to_move), os.path.join(class_test_path, file_to_move))

# move_files_to_test_set(train_set_path, test_set_path, os.listdir(train_set_path))

### 2.根据划分后的文件夹构建数据集
#### 1. 导入需要的包

In [5]:
import os
import torch
from PIL import Image
import torch.utils.data
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

#### 2. 构造dataset类

In [6]:
class CustomDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        self.root_dir = root_dir
        self.transform = transform
        self.classes = sorted(os.listdir(root_dir))
        self.class_to_idx = {cls_name: idx for idx, cls_name in enumerate(self.classes)}
        self.image_paths = []
        self.labels = []

        for cls_name in self.classes:
            cls_folder = os.path.join(root_dir, cls_name)
            for img_name in os.listdir(cls_folder):
                img_path = os.path.join(cls_folder, img_name)
                self.image_paths.append(img_path)
                self.labels.append(self.class_to_idx[cls_name])

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label


#### 3. 第一次定义transform、创建DataLoader对象

In [7]:
# 定义图像转换（只转换为Tensor，不做其他变换）
transform = transforms.Compose([
    transforms.Resize((64, 64)),  # 调整大小
    transforms.ToTensor(),
])

# 创建自定义数据集实例
# dataset = CustomDataset(root_dir='../dataset/train', transform=transform)
# dataloader = DataLoader(dataset, batch_size=10, shuffle=False, num_workers=2)


#### 4. 计算图像各通道均值和标准差

In [8]:
import torch

def calculate_channel_mean_std(dataloader):
    """
    计算图像数据集的各通道均值和标准差

    Args:
        dataloader: 包含图像数据的dataloader

    Returns:
        各通道均值和标准差的元组，形如(channel_mean, channel_std)
    """
    num_channels = 3  # 假设通道数为3

    # 初始化通道总和和像素总数
    channel_sum = torch.zeros(num_channels)
    channel_sum_sq = torch.zeros(num_channels)
    pixel_count = 0

     # 遍历dataloader计算累加均值和标准差
    for images, _ in dataloader:
        print(channel_sum)
        # 累加每个通道的总和
        channel_sum += torch.sum(images, dim=(0, 2, 3))

        # 累加每个通道的平方和
        channel_sum_sq += torch.sum(images ** 2, dim=(0, 2, 3))

        # 计算像素总数
        pixel_count += images.size(0) * images.size(2) * images.size(3)

    # 计算均值
    channel_mean = channel_sum / pixel_count

    # 计算标准差
    channel_std = torch.sqrt(channel_sum_sq / pixel_count - channel_mean ** 2)

    return channel_mean, channel_std

# 示例用法
# channel_mean, channel_std = calculate_channel_mean_std(dataloader)
# print("各通道均值：", channel_mean)
# print("各通道标准差：", channel_std)


5. 使用计算出的结果再次构建数据集

In [9]:
# 假设计算得到的均值和标准差如下
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# 定义图像转换
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.ToTensor(),  # 转换为Tensor
    # transforms.Normalize(mean, std)  # 使用计算得到的均值和标准差进行标准化
    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
])

# 创建自定义数据集实例
train_dataset = CustomDataset(root_dir='../dataset/train', transform=transform)
test_dataset = CustomDataset(root_dir='../dataset/test', transform=transform)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4, pin_memory=True)

### 3.构建网络

In [10]:
import torch.nn as nn
import torch.nn.functional as F

class MyModel(nn.Module):
    def __init__(self, num_classes=24):
        super(MyModel, self).__init__()
        
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        
        # 计算经过两次卷积和一次池化后的特征图尺寸
        # 输入尺寸: 64*64
        # 第一次卷积: (64 - 3 + 2 * 1) / 1 + 1 = 64
        # 池化后: 64 / 2 = 32
        # 第二次卷积: (32 - 3 + 2 * 1) / 1 + 1 = 32
        # 池化后: 32 / 2 = 16
        
        self.fc1 = nn.Linear(16 * 16 * 16, 128)  # 注意：这里输入的尺寸是根据输入图像的分辨率计算得到的
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)  # num_classes 是数据集中的类别数量
    
    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 16 * 16)  # 展平特征图以输入全连接层
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# 示例用法
model = MyModel(num_classes=24)
print(model)


MyModel(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (fc1): Linear(in_features=4096, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=24, bias=True)
)


### 4.训练网络

In [11]:
import torch.optim as optim
from torch.cuda.amp import GradScaler, autocast

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cpu')
print(device)
# model.to(device)
model.conv1.to(device)
model.pool.to(device)
model.conv2.to(device)
model.fc1.to(device)
model.fc2.to(device)
model.fc3.to(device)

# 初始化 GradScaler
scaler = GradScaler()

# 训练模型
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        # 前向传播和反向传播使用 autocast 进行混合精度计算
        with autocast():
            outputs = model(images)
            loss = criterion(outputs, labels)
        
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        running_loss += loss.item()
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')
    
    # 验证模型
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            with autocast():
                outputs = model(images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
    
    print(f'Validation Accuracy: {100 * correct / total}%')

cpu


### 5.测试网络


In [None]:
torch.save(model.state_dict(), 'model.pth')