In [54]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets, models
import torch.nn.functional as F
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split, DataLoader
from PIL import Image
import pandas as pd
import os
import torch.optim.lr_scheduler as lr_scheduler  # 导入学习率调整器

# 检查CUDA设备是否可用，选择设备
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# 数据路径
train_dir = "F:\\python代码\\机器学习\\期末作业\\train"
pred_dir = "F:\\python代码\\机器学习\\期末作业\\pred"
csv_dir = "F:\\python代码\\机器学习\\期末作业\\pre_data.csv"
# 标签字典
label_dict = {0: 'cavallo', 1: 'mucca', 2: 'scoiattolo'}



Using device: cuda:0


In [55]:

# 数据预处理
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),  # 添加水平翻转
    transforms.RandomRotation(15),  # 添加随机旋转
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 加载数据集
train_data = datasets.ImageFolder(train_dir, transform=transform)
# 划分数据集
train_size = int(0.7 * len(train_data))
val_size = int(0.15 * len(train_data))
test_size = len(train_data) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(train_data, [train_size, val_size, test_size])

# 数据加载器
batch_size = 16  # 增加批量大小
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [56]:

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()

        # 保存残差
        self.residual_function = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels * BasicBlock.expansion, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels * BasicBlock.expansion)
        )

        # 匹配维度
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != BasicBlock.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * BasicBlock.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * BasicBlock.expansion)
            )

    def forward(self, x):
        return nn.ReLU(inplace=True)(self.residual_function(x) + self.shortcut(x))

class ResNet(nn.Module):
    def __init__(self, block, num_block, num_classes=3):
        super(ResNet, self).__init__()

        self.in_channels = 64

        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True)
        )
        self.pool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # ResNet layers
        self.layer1 = self._make_layer(block, 64, num_block[0], 1)
        self.layer2 = self._make_layer(block, 128, num_block[1], 2)
        self.layer3 = self._make_layer(block, 256, num_block[2], 2)
        self.layer4 = self._make_layer(block, 512, num_block[3], 2)

        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        output = self.conv1(x)
        output = self.pool(output)
        output = self.layer1(output)
        output = self.layer2(output)
        output = self.layer3(output)
        output = self.layer4(output)
        output = self.avg_pool(output)
        output = output.view(output.size(0), -1)
        output = self.fc(output)

        return output

# 定义ResNet-18
def resnet18(num_classes):
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes)

model = resnet18(num_classes=3).to(device) 

In [57]:

# 实例化新的深度CNN模型，损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)  # 添加L2正则化


# 添加学习率调整策略
scheduler = lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)  # 每隔5个周期学习率衰减为原来的0.5倍

# 训练模型
num_epochs = 50  # 增加训练周期
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")
    
    # 学习率衰减
    scheduler.step()

Epoch 1/50, Loss: 0.8953643655933147
Epoch 2/50, Loss: 0.7059951136336056
Epoch 3/50, Loss: 0.6056588400139038
Epoch 4/50, Loss: 0.5344120837975798
Epoch 5/50, Loss: 0.4912891692795087
Epoch 6/50, Loss: 0.40729551934779473
Epoch 7/50, Loss: 0.3760570705945575
Epoch 8/50, Loss: 0.34196278586267903
Epoch 9/50, Loss: 0.3228056306857209
Epoch 10/50, Loss: 0.315894587819493
Epoch 11/50, Loss: 0.2472639680071429
Epoch 12/50, Loss: 0.2424026611850772
Epoch 13/50, Loss: 0.23355200907874316
Epoch 14/50, Loss: 0.21449939837459653
Epoch 15/50, Loss: 0.22004599034721153
Epoch 16/50, Loss: 0.1691095204820539
Epoch 17/50, Loss: 0.15346614145328125
Epoch 18/50, Loss: 0.15269593896921804
Epoch 19/50, Loss: 0.15346768879083567
Epoch 20/50, Loss: 0.1404649853202063
Epoch 21/50, Loss: 0.13099062175990192
Epoch 22/50, Loss: 0.11937306304441919
Epoch 23/50, Loss: 0.11797667066790327
Epoch 24/50, Loss: 0.1097952130832201
Epoch 25/50, Loss: 0.10541121829626472
Epoch 26/50, Loss: 0.10855534116790415
Epoch 27/

In [58]:

# 验证模型
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in val_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # 数据也要转移到GPU
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Validation Accuracy: {100 * correct // total} %')


Validation Accuracy: 89 %


In [59]:

# 测试模型
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs, labels = inputs.to(device), labels.to(device)  # 数据也要转移到GPU
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f'Test Accuracy: {100 * correct // total} %')


Test Accuracy: 88 %


In [60]:
model_dir="model_weights.pth"
torch.save(model.state_dict(), model_dir)
print(f"Model saved at {model_dir}")


Model saved at model_weights.pth


In [61]:
import os
from torchvision import transforms
from PIL import Image
import csv
# 输出结果到 pred.csv 文件
output_file = "pred.csv"

# 创建用于预测的数据集
class PredictionDataset(torch.utils.data.Dataset):
    def __init__(self, folder_path, transform=None):
        self.folder_path = folder_path
        self.transform = transform
        self.images = os.listdir(folder_path)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = os.path.join(self.folder_path, self.images[idx])
        image = Image.open(img_name).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, self.images[idx]

# 预测数据集的数据预处理
pred_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])


# 创建用于预测的数据集
pred_dataset = PredictionDataset(folder_path=pred_dir, transform=pred_transform)
pred_loader = DataLoader(pred_dataset, batch_size=1, shuffle=False)

# 预测并输出结果
model.eval()
predictions = []

with torch.no_grad():
    for inputs, filenames in pred_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        predictions.append((filenames[0], predicted.item()))

# 输出结果
print("Filename\tPrediction")
for filename, prediction in predictions:
    print(f"{filename}\t\t{prediction}")



with open(output_file, mode='w', newline='') as csvfile:
    fieldnames = ['Filename', 'Prediction']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()
    for filename, prediction in predictions:
        writer.writerow({'Filename': filename, 'Prediction': prediction})

print(f"Predictions have been written to {output_file}.")

Filename	Prediction
image_1.jpeg		2
image_10.jpeg		2
image_100.jpeg		1
image_101.jpeg		2
image_102.jpeg		0
image_103.jpeg		1
image_104.jpeg		2
image_105.jpeg		0
image_106.jpeg		2
image_107.jpeg		2
image_108.jpeg		2
image_109.jpeg		2
image_11.jpeg		1
image_110.jpeg		2
image_111.jpeg		2
image_112.jpeg		1
image_113.jpeg		1
image_114.jpeg		1
image_115.jpeg		2
image_116.jpeg		1
image_117.jpeg		0
image_118.jpeg		1
image_119.jpeg		0
image_12.jpeg		2
image_120.jpeg		0
image_13.jpeg		0
image_14.jpeg		0
image_15.jpeg		1
image_16.jpeg		1
image_17.jpeg		0
image_18.jpeg		1
image_19.jpeg		2
image_2.jpeg		0
image_20.jpeg		1
image_21.jpeg		1
image_22.jpeg		0
image_23.jpeg		0
image_24.jpeg		1
image_25.jpeg		2
image_26.jpeg		2
image_27.jpeg		0
image_28.jpeg		1
image_29.jpeg		2
image_3.jpeg		1
image_30.jpeg		2
image_31.jpeg		2
image_32.jpeg		0
image_33.jpeg		1
image_34.jpeg		0
image_35.jpeg		0
image_36.jpeg		0
image_37.jpeg		1
image_38.jpeg		0
image_39.jpeg		0
image_4.jpeg		0
image_40.jpeg		0
image_41.jp

In [62]:
import os
from torchvision import transforms
from PIL import Image
import csv
# 输出结果到 pred.csv 文件
output_file = "pred.csv"

#修改这里验收位置
pred_dir = "F:\\python代码\\机器学习\\期末作业\\pred"
# 创建用于预测的数据集
class PredictionDataset(torch.utils.data.Dataset):
    def __init__(self, folder_path, transform=None):
        self.folder_path = folder_path
        self.transform = transform
        self.images = os.listdir(folder_path)

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_name = os.path.join(self.folder_path, self.images[idx])
        image = Image.open(img_name).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, self.images[idx]

# 预测数据集的数据预处理
pred_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# 创建用于预测的数据集
pred_dataset = PredictionDataset(folder_path=pred_dir, transform=pred_transform)
pred_loader = DataLoader(pred_dataset, batch_size=1, shuffle=False)



# 加载保存的权重
loaded_model = resnet18(num_classes=3).to(device)
loaded_model.load_state_dict(torch.load(model_dir))
loaded_model.eval()

# 预测并输出结果
loaded_predictions = []

with torch.no_grad():
    for inputs, filenames in pred_loader:
        inputs = inputs.to(device)
        outputs = loaded_model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        loaded_predictions.append((filenames[0], predicted.item()))

# 输出结果到 pred.csv 文件
output_file_loaded = "pred_loaded.csv"

with open(output_file_loaded, mode='w', newline='') as csvfile:
    fieldnames = ['Filename', 'Prediction']
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

    writer.writeheader()
    for filename, prediction in loaded_predictions:
        writer.writerow({'Filename': filename, 'Prediction': prediction})

print(f"Predictions from loaded model have been written to {output_file_loaded}.")



Predictions from loaded model have been written to pred_loaded.csv.
