In [None]:
'''
垃圾分类项目
使用了深度学习中的卷积神经网络(CNN)来提取图像特征
结合机器学习中的KNN和SVM算法进行分类和预测

训练模型所使用设备：
CPU: i5-11300H 
GPU: NVIDIA GeForce MX450   
CUDA: 11.8

代码编辑器: VSCode

十次epoch大概用时20min
程序得出测试集预测结果大概用时5h
'''

# 加载库
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
import numpy as np

# 数据预处理
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),   
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  
])

# 自定义数据集类 --> 读取和整理训练集的图片和标签
class GarbageDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform   
        self.image_paths = []
        self.labels = []
        
        # 遍历每个类别的目录，并将图像路径和标签添加到列表中
        for label in range(40):
            label_dir = os.path.join(self.root_dir, str(label))   # 找出每种垃圾的文件夹
            if not os.path.isdir(label_dir):
                print(f"未找到目录: {label_dir}")
                continue
            for img_name in os.listdir(label_dir):   # 遍历文件夹里的所有图片
                img_path = os.path.join(label_dir, img_name)   # 图片的完整路径
                if os.path.isfile(img_path):  
                    self.image_paths.append(img_path)
                    self.labels.append(label)
        
        print(f"训练集图片总数: {len(self.image_paths)}")
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):   # 拿出一张特定的图片和它的标签
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        label = self.labels[idx]
        return image, label   # 返回处理后的图片和它的标签

# 自定义测试数据集类  --> 读取测试数据，检查模型效果
class TestDataset(Dataset):
    def __init__(self, root_dir, file_path, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        with open(file_path, 'r') as f:
            self.image_paths = [line.strip() for line in f]
            print(f"测试集图片总数: {len(self.image_paths)}")
    
    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.image_paths[idx])
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, self.image_paths[idx]    # 返回处理后的照片和它的路径

# 加载训练数据
print("加载训练数据...")
train_dataset = GarbageDataset(root_dir='train/train', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)   # 训练数据加载器 

'卷积神经网络(CNN)模型定义和训练'
# 定义模型
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        # 卷积层
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        # 全连接层
        self.fc1 = nn.Linear(128*16*16, 512)
        self.fc2 = nn.Linear(512, 40)

    # 前向传播过程
    def forward(self, x):
        x = nn.ReLU()(self.conv1(x))    # 通过第一个卷积层，然后应用ReLU激活函数
        x = nn.MaxPool2d(2, 2)(x)   # 应用2x2的最大池化层
        x = nn.ReLU()(self.conv2(x))
        x = nn.MaxPool2d(2, 2)(x)
        x = nn.ReLU()(self.conv3(x))
        x = nn.MaxPool2d(2, 2)(x)
        x = x.view(-1, 128*16*16)   # 将特征展平为一维张量
        x = nn.ReLU()(self.fc1(x))   # 通过第一个全连接层，然后应用ReLU激活函数
        x = self.fc2(x)
        return x   # 返回最终的输出

     # 提取特征过程，不经过全连接层
    def extract_features(self, x):
        x = nn.ReLU()(self.conv1(x))
        x = nn.MaxPool2d(2, 2)(x)
        x = nn.ReLU()(self.conv2(x))
        x = nn.MaxPool2d(2, 2)(x)
        x = nn.ReLU()(self.conv3(x))
        x = nn.MaxPool2d(2, 2)(x)
        x = x.view(-1, 128*16*16)
        return x   # 返回特征张量

# 实例化模型、损失函数和优化器
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'当前使用的设备: {torch.cuda.get_device_name(0)}')
model = SimpleCNN().to(device)
criterion = nn.CrossEntropyLoss()  # 交叉熵损失函数 
optimizer = optim.Adam(model.parameters(), lr=0.001)   # 优化器

'模型训练'
num_epochs = 10  # 将 epoch 数设置为 10
print("开始训练...")
for epoch in range(num_epochs):
    model.train()  # 将CNN模型设置为训练模式
    # 初始化运行中的损失、正确预测数量和总样本数
    running_loss = 0.0  
    correct = 0   
    total = 0  
    # 遍历训练数据加载器 对于每个batch（批次）中的inputs（输入图像）和labels（标签）
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)   # 使用定义的损失函数计算损失
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        
        _, predicted = torch.max(outputs.data, 1)   # 从输出中选择概率最高的类别
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total
 
    print(f'epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f},  Accuracy: {epoch_acc:.2f}%')

# 保存模型
print("保存模型...")
torch.save(model.state_dict(), 'best_model.pth')

'KNN和SVM分类器训练与预测'
# 提取训练集特征
print("提取训练数据特征...")
train_features = []
train_labels = []
with torch.no_grad():
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        features = model.extract_features(inputs).cpu().numpy()   # 提取输入图像的特征
        train_features.append(features)
        train_labels.append(labels.cpu().numpy())
# 将train_features和train_labels列表中的所有数组连接起来,形成单个NumPy数组。
train_features = np.concatenate(train_features)
train_labels = np.concatenate(train_labels)

# 定义 KNN 和 SVM 分类器
print("训练 KNN 和 SVM 分类器...")
knn_classifier = KNeighborsClassifier(n_neighbors=5)   # 邻居数设置为5
svm_classifier = SVC(kernel='linear')   # 使用线性核函数
# 训练 KNN 和 SVM 分类器
knn_classifier.fit(train_features, train_labels)
svm_classifier.fit(train_features, train_labels)

# 加载测试数据
print("加载测试数据...")
test_dataset = TestDataset(root_dir='test/test', file_path='test/testpath.txt', transform=transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)   # 测试数据加载器

# 加载训练好的模型
print("加载训练好的模型...")
model.load_state_dict(torch.load('best_model.pth'))   # 从文件'best_model.pth'加载模型的预训练参数
model.eval()   # 将模型设置为评估模式

# 提取测试集特征并进行预测
print("提取测试数据特征...")
test_features = []
test_image_paths = []
with torch.no_grad():
    for inputs, image_paths in test_loader:
        inputs = inputs.to(device)
        features = model.extract_features(inputs).cpu().numpy()
        test_features.append(features)
        test_image_paths.extend(image_paths)
test_features = np.concatenate(test_features)

# 使用 KNN 进行预测
print("使用 KNN 进行预测...")
knn_predictions = knn_classifier.predict(test_features)

# 使用 SVM 进行预测
print("使用 SVM 进行预测...")
svm_predictions = svm_classifier.predict(test_features)

# 保存预测结果
print("保存预测结果...")
with open('knn.csv', 'w') as f:
    for label in knn_predictions:
        f.write(f'{label}\n')

with open('svm.csv', 'w') as f:
    for label in svm_predictions:
        f.write(f'{label}\n')

print("测试集图片分类完成。")
