In [17]:
# 模型训练
import os
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split

# 设置全局参数
modellr = 1e-4
BATCH_SIZE = 64
EPOCHS = 50
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# 数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=1),  # 转换为单通道灰度图像
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])

])

# 读取样本和标签
data_set = datasets.ImageFolder('../Dataset/photos', transform)
print(data_set.class_to_idx)

# 随机划分数据集
train_ratio = 0.8
train_size = int(train_ratio * len(data_set))
test_size = len(data_set) - train_size
train_set, test_set = random_split(data_set, [train_size, test_size])

# 导入数据
train_loader = torch.utils.data.DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)

# 实例化
model = models.resnet50(pretrained=True)
# 修改第一层以接受单通道图像
model.conv1 = nn.Conv2d(1, model.conv1.out_channels, kernel_size=model.conv1.kernel_size, 
                        stride=model.conv1.stride, padding=model.conv1.padding, bias=False)

# for param in model.parameters():
#     param.requires_grad = False
# # 解冻模型的最后两层
# for name, param in model.named_parameters():
#     if "layer4" in name or "fc" in name:
#         param.requires_grad = True

num_ftrs = model.fc.in_features
num_classes = len(data_set.classes)
model.fc = nn.Linear(num_ftrs, num_classes)
model = model.to(DEVICE)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=modellr)  # 选择简单暴力的Adam优化器，学习率调低

def adjust_learning_rate(optimizer, epoch, initial_lr):
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    lr = initial_lr * (0.1 ** (epoch // 30))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


def train_model(model, criterion, optimizer, train_loader, test_loader, epochs, device, model_save_path):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {running_loss/len(train_loader)}")

        # 测试模型
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Test Accuracy: {accuracy:.2f}%")

        # 保存模型的 state_dict
        torch.save(model.state_dict(), os.path.join(model_save_path, f"model_final3_epoch_{epoch+1}.pth"))

    print("Training and testing complete")

# 保存模型
model_save_path = "../model"
train_model(model, criterion, optimizer, train_loader, test_loader, EPOCHS, DEVICE, model_save_path)
torch.save(model.state_dict(), model_save_path)



{'17485': 0, '2456': 1, '2730': 2, '3001': 3, '3002': 4, '3003': 5, '3004': 6, '3006': 7, '3007': 8, '3009': 9, '3010': 10, '30414': 11, '3622': 12, '45176': 13, '4600': 14, '55981': 15, '56145': 16, '56891': 17, '57520': 18, '6111': 19, '70695': 20, '87079': 21}


KeyboardInterrupt: 

In [13]:
# 模型迁移学习
import os
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split

# 设置全局参数
modellr = 1e-4
BATCH_SIZE = 64
EPOCHS = 20
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 模型路径
model_path = '../model/model_epoch_3.pth' 
image_dir = '../Dataset/test/7'
output_dir = '../Dataset/test_result' 

# 数据预处理
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=1),  # 转换为单通道灰度图像
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])

])

# 读取样本和标签
data_set = datasets.ImageFolder('../Dataset/photos', transform)
print(data_set.class_to_idx)

# 随机划分数据集
train_ratio = 0.8
train_size = int(train_ratio * len(data_set))
test_size = len(data_set) - train_size
train_set, test_set = random_split(data_set, [train_size, test_size])

# 导入数据
train_loader = torch.utils.data.DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)

# 创建模型实例
model = models.resnet50(pretrained=False)
# 修改第一层以接受单通道图像
model.conv1 = nn.Conv2d(1, model.conv1.out_channels, kernel_size=model.conv1.kernel_size, 
                        stride=model.conv1.stride, padding=model.conv1.padding, bias=False)
# 冻结前面的层
for param in model.parameters():
    param.requires_grad = False
# 解冻模型的最后两层
for name, param in model.named_parameters():
    if "layer4" in name or "fc" in name:
        param.requires_grad = True

num_ftrs = model.fc.in_features
num_classes = len(data_set.classes)
model.fc = nn.Linear(num_ftrs, num_classes)

# 删除最后一层权重后,加载模型的 state_dict
state_dict = torch.load(model_path)
state_dict.pop('fc.weight', None)
state_dict.pop('fc.bias', None)
model.load_state_dict(state_dict, strict=False)

# 将模型转移到cuda上
model = model.to(DEVICE)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=modellr)  # 选择简单暴力的Adam优化器，学习率调低

def adjust_learning_rate(optimizer, epoch, initial_lr):
    """Sets the learning rate to the initial LR decayed by 10 every 30 epochs"""
    lr = initial_lr * (0.1 ** (epoch // 30))
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr


def train_model(model, criterion, optimizer, train_loader, test_loader, epochs, device, model_save_path):
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Training Loss: {running_loss/len(train_loader)}")

        # 测试模型
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = 100 * correct / total
        print(f"Epoch {epoch+1}/{epochs}, Test Accuracy: {accuracy:.2f}%")

        # 保存模型的 state_dict
        torch.save(model.state_dict(), os.path.join(model_save_path, f"model_final2_epoch_{epoch+1}.pth"))

    print("Training and testing complete")

# 保存模型
model_save_path = "../model"
train_model(model, criterion, optimizer, train_loader, test_loader, EPOCHS, DEVICE, model_save_path)
torch.save(model.state_dict(), model_save_path)


{'17485': 0, '2456': 1, '2730': 2, '3001': 3, '3002': 4, '3003': 5, '3004': 6, '3006': 7, '3007': 8, '3009': 9, '3010': 10, '30414': 11, '3622': 12, '45176': 13, '4600': 14, '55981': 15, '56145': 16, '56891': 17, '57520': 18, '6111': 19, '70695': 20, '87079': 21}


KeyboardInterrupt: 

# 模型预测使用
**注意：输入彩色图像，下面程序自动转换为灰度图，该模型采用灰度图进行训练和预测，**
将`image_dir`,`output_dir`都设置为合理的目录
将`model_path`指定为模型的.pth文件(保存了模型的state_dict)

In [16]:
# 模型预测效果
from PIL import Image, ImageDraw
import os
import torch
import torch.nn as nn
from torchvision import transforms, models

# 模型路径
model_path = '../model/model_final2_epoch_1.pth' 
image_dir = '../Dataset/test/'
output_dir = '../Dataset/test_result' 
num_classes = 22

# 创建模型实例
model = models.resnet50(pretrained=False)    # 这里好像错了，不应该加载预先训练权重的
# 修改第一层以接受单通道图像
model.conv1 = nn.Conv2d(1, model.conv1.out_channels, kernel_size=model.conv1.kernel_size, 
                        stride=model.conv1.stride, padding=model.conv1.padding, bias=False)

num_ftrs = model.fc.in_features
model.fc = torch.nn.Linear(num_ftrs, num_classes)  # <num_classes> 需要替换为类别数量

# 加载模型的 state_dict
model.load_state_dict(torch.load(model_path))

model.eval()

# 预处理函数
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=1),  # 转换为单通道灰度图像
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])
])

# 加载并预处理图片
def load_and_preprocess_image(image_path):
    image = Image.open(image_path)
    image = transform(image).unsqueeze(0)
    return image

# 预测函数
def predict(model, image_tensor):
    outputs = model(image_tensor)
    _, predicted = torch.max(outputs.data, 1)
    return predicted.item()

# 对文件夹中的每张图像进行预测，并在图像上写入预测结果
for filename in os.listdir(image_dir):
    if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
        image_path = os.path.join(image_dir, filename)
        image_tensor = load_and_preprocess_image(image_path)
        predicted_class_idx = predict(model, image_tensor)

        # 在图像上写入预测结果
        output_image = Image.open(image_path)
        draw = ImageDraw.Draw(output_image)
        draw.text((10, 30), f'Pre: {predicted_class_idx}', fill=(255, 255, 255))

        # 保存修改后的图像
        output_path = os.path.join(output_dir, filename)
        output_image.save(output_path)

print("All images processed and saved in the output directory.")

All images processed and saved in the output directory.
