The following two code blocks load different pretrained models to train classifiers for a classification task. The first code block loads a model from vision_transformer, and the second code block loads a resnet50 model. Before running the code, it is necessary to update the path settings in the code based on the file locations.

The third code block uses the already trained classifier model to perform some evaluations, calculates the confusion matrix, and saves the result images.

In [3]:
import os
import pandas as pd
from PIL import Image
import torch
from torch import nn
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.models import vision_transformer
from torchvision.models.vision_transformer import vit_b_16
from sklearn.metrics import confusion_matrix, precision_score, recall_score
import wandb

# 初始化 wandb
wandb.init(
    project="my-ViT-project",
    config={
        "learning_rate": 0.001,
        "epochs": 1,
        "batch_size": 8,
    }
)



# 设定设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 数据转换，适用于彩色图像
transform = transforms.Compose([
    transforms.Resize((224, 224)),   # 调整图像大小以匹配模型输入
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.RandomRotation(10),  # 随机旋转
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.0439, 0.0438, 0.0438], std=[0.0942, 0.0941, 0.0941]),
])

# 定义自定义数据集类
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.annotations.iloc[idx, 0])
        image = Image.open(img_name).convert('RGB')
        label = self.annotations.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        return image, label

# 加载数据集
train_csv = '/root/autodl-tmp/xin/datasets/MNIST/updated_train_labels_with_colors.csv'
test_csv = '/root/autodl-tmp/xin/datasets/MNIST/updated_test_labels_with_colors.csv'
dataset_path = '/root/autodl-tmp/xin/datasets/MNIST/colored-train-images'
testdatapath=  "/root/autodl-tmp/xin/datasets/MNIST/colored-test-images"
train_dataset = CustomDataset(csv_file=train_csv, root_dir=dataset_path, transform=transform)
test_dataset = CustomDataset(csv_file=test_csv, root_dir=testdatapath, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=4)

# 加载预训练模型
model = vit_b_16(weights=vision_transformer.ViT_B_16_Weights.DEFAULT).to(device)

num_features = model.heads[-1].in_features  # 获取最后一个线性层的输入特征数量
model.heads[-1] = nn.Linear(num_features, 10).to(device)  # 替换为新的线性层，适配10类

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练函数
def train(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
       
        loss.backward()
        optimizer.step()


        # Logging to wandb
        wandb.log({"Train Loss": loss.item()})
        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')
        torch.cuda.empty_cache()  # 尝试在这里清理CUDA缓存

# 测试函数
def test():
    model.eval()
    test_loss = 0
    correct = 0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()  # 将一批的损失加和
            pred = output.argmax(dim=1, keepdim=True)  # 获取概率最高的索引
            correct += pred.eq(target.view_as(pred)).sum().item()
            all_preds.extend(pred.cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    # 计算混淆矩阵及其它统计指标
    cm = confusion_matrix(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average='macro')
    recall = recall_score(all_targets, all_preds, average='macro')

    test_loss /= len(test_loader.dataset)

    accuracy = 100. * correct / len(test_loader.dataset)
    
    # Logging to wandb
    wandb.log({"Test Loss": test_loss, "Accuracy": accuracy, "Precision": precision, "Recall": recall})
    


    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n')
    print(f'Precision: {precision:.4f}, Recall: {recall:.4f}')
    print('Confusion Matrix:\n', cm)

# 保存模型函数
def save_model(model, model_name, epoch, path):
    torch.save(model.state_dict(), os.path.join(path, f"{model_name}_epoch{epoch}.pth"))

model_save_path = '/root/autodl-tmp/xin/Classify/modelweight'

# 训练和测试模型
for epoch in range(1, 2):
    train(epoch)
    test()
    save_model(model, "vision_transformer", epoch, model_save_path)


Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33m983501138[0m ([33mxinliang1001[0m). Use [1m`wandb login --relogin`[0m to force relogin


VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.011112405308004883, max=1.0…



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))



Test set: Average loss: 0.2476, Accuracy: 2440/9796 (25%)

Precision: 0.2456, Recall: 0.2486
Confusion Matrix:
 [[709   0  26   0  18  33   5   0 189   0]
 [  3 775   1   0  80   0   0 141   2   0]
 [506   1  20   0  77  92   5  26 275   0]
 [481   0  20   0  54 135   7  51 253   1]
 [147   9  78   0 309 129  21  88 198   3]
 [362   0  24   0  54 167   7  38 238   2]
 [353   3  58   0 181  90  12  33 228   0]
 [ 80  14  64   0 322 132  20 185 182   3]
 [559   0  28   0  48  70   8   2 259   0]
 [193   6  70   0 262 165  14  60 228   4]]


In [1]:
import os
import pandas as pd
from PIL import Image
import torch
from torch import nn
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from torchvision.models import resnet50
from sklearn.metrics import confusion_matrix, precision_score, recall_score
import wandb

# 初始化 wandb
wandb.init(
    project="my-ResNet50-project",
    config={
        "learning_rate": 0.001,
        "epochs": 1,
        "batch_size": 8,
    }
)

# 设定设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 数据转换，适用于彩色图像
transform = transforms.Compose([
    transforms.Resize((224, 224)),   # 调整图像大小以匹配模型输入
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.RandomRotation(10),  # 随机旋转
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.2197, 0.2214, 0.2221], std=[0.0745, 0.0771, 0.0801]),
])


# 定义自定义数据集类
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.annotations.iloc[idx, 0])
        image = Image.open(img_name).convert('RGB')
        label = self.annotations.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        return image, label

# 加载数据集
train_csv = '/root/autodl-tmp/xin/GAN/CGAN/data/mixed_images82secondgen.csv'
test_csv = '/root/autodl-tmp/xin/datasets/MNIST/updated_test_labels_with_colors.csv'
dataset_path = '/root/autodl-tmp/xin/GAN/CGAN/data/mixed_images82secondgen'
testdatapath = "/root/autodl-tmp/xin/datasets/MNIST/colored-test-images"
train_dataset = CustomDataset(csv_file=train_csv, root_dir=dataset_path, transform=transform)
test_dataset = CustomDataset(csv_file=test_csv, root_dir=testdatapath, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, num_workers=4)

# 加载预训练模型
model = resnet50(weights='IMAGENET1K_V1').to(device)

num_features = model.fc.in_features  # 获取最后一个线性层的输入特征数量
model.fc = nn.Linear(num_features, 10).to(device)  # 替换为新的线性层，适配10类

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# 训练函数
def train(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()

        # Logging to wandb
        wandb.log({"Train Loss": loss.item()})
        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')
        torch.cuda.empty_cache()  # 尝试在这里清理CUDA缓存

# 测试函数
def test():
    model.eval()
    test_loss = 0
    correct = 0
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()  # 将一批的损失加和
            pred = output.argmax(dim=1, keepdim=True)  # 获取概率最高的索引
            correct += pred.eq(target.view_as(pred)).sum().item()
            all_preds.extend(pred.cpu().numpy())
            all_targets.extend(target.cpu().numpy())

    # 计算混淆矩阵及其它统计指标
    cm = confusion_matrix(all_targets, all_preds)
    precision = precision_score(all_targets, all_preds, average='macro')
    recall = recall_score(all_targets, all_preds, average='macro')

    test_loss /= len(test_loader.dataset)

    accuracy = 100. * correct / len(test_loader.dataset)
    
    # Logging to wandb
    wandb.log({"Test Loss": test_loss, "Accuracy": accuracy, "Precision": precision, "Recall": recall})
    
    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({accuracy:.0f}%)\n')
    print(f'Precision: {precision:.4f}, Recall: {recall:.4f}')
    print('Confusion Matrix:\n', cm)

# 保存模型函数
def save_model(model, model_name, epoch, path):
    torch.save(model.state_dict(), os.path.join(path, f"{model_name}_epoch{epoch}.pth"))

model_save_path = '/root/autodl-tmp/xin/Classify/modelweight'

# 训练和测试模型
for epoch in range(1, wandb.config.epochs + 1):
    train(epoch)
    test()
    save_model(model, "resnet50secondhun10%", epoch, model_save_path)


Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33m983501138[0m ([33mxinliang1001[0m). Use [1m`wandb login --relogin`[0m to force relogin


VBox(children=(Label(value='Waiting for wandb.init()...\r'), FloatProgress(value=0.011112927955885728, max=1.0…


Test set: Average loss: 0.0432, Accuracy: 8713/9796 (89%)

Precision: 0.8980, Recall: 0.8879
Confusion Matrix:
 [[943   0   3  15   1   0   9   1   8   0]
 [  1 935   5   1   4   1  10   1  44   0]
 [  5   0 861  69   7  25  29   0   6   0]
 [  4   0   9 944   0  29   4   7   4   1]
 [  0   1   3   0 925   3   8   0  13  29]
 [  3   1  15 225   0 636   2   8   2   0]
 [ 13   3  23  16   9   9 878   0   7   0]
 [  2   6  13  17   2  26   0 921  13   2]
 [  3   0   5  43   0  22  15   1 884   1]
 [  7   1   2  35  19  19   1  57  75 786]]


In [3]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.models import resnet50
from sklearn.metrics import confusion_matrix
import seaborn as sns
from torch import nn


# 定义自定义数据集类
class CustomDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.annotations.iloc[idx, 0])
        image = Image.open(img_name).convert('RGB')
        label = self.annotations.iloc[idx, 1]
        color = self.annotations.iloc[idx, 2]
        if self.transform:
            image = self.transform(image)
        return image, label, color

# 数据转换，适用于彩色图像
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    # transforms.Normalize(mean=[0.0439, 0.0438, 0.0438], std=[0.0942, 0.0941, 0.0941]),原数据集
    # transforms.Normalize(mean=[0.1362, 0.1372, 0.1376], std=[0.0834, 0.0848, 0.0867]),第一次混合后的数据集
    transforms.Normalize(mean=[0.2197, 0.2214, 0.2221], std=[0.0745, 0.0771, 0.0801]), 
    
])

# 加载数据集
test_csv = '/root/autodl-tmp/xin/datasets/MNIST/updated_test_labels_with_colors.csv'
testdatapath = "/root/autodl-tmp/xin/datasets/MNIST/colored-test-images"
test_dataset = CustomDataset(csv_file=test_csv, root_dir=testdatapath, transform=transform)

# 调整批量大小
batch_size = 4  # 调整为更小的批量大小以减少内存使用
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

# 加载预训练模型
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = resnet50(weights='IMAGENET1K_V1').to(device)
num_features = model.fc.in_features
model.fc = nn.Linear(num_features, 10).to(device)

# 加载已经训练好的模型权重
# model_weight_name = 'resnet50hun10%_epoch1.pth'
model_weight_name = "resnet50secondhun10%_epoch1.pth"
model_save_path = f'/root/autodl-tmp/xin/Classify/modelweight/{model_weight_name}'
 
model.load_state_dict(torch.load(model_save_path))
model.eval()

# 获取预测结果和实际标签
all_preds = []
all_targets = []
all_colors = []

with torch.no_grad():
    for data, target, color in test_loader:
        data, target, color = data.to(device), target.to(device), color.to(device)
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True).view(-1)
        all_preds.extend(pred.cpu().numpy())
        all_targets.extend(target.cpu().numpy())
        all_colors.extend(color.cpu().numpy())

# 转换为NumPy数组
all_preds = np.array(all_preds)
all_targets = np.array(all_targets)
all_colors = np.array(all_colors)

# 为每种颜色生成和保存混淆矩阵
color_map = {0: 'Red', 1: 'Green', 2: 'Blue'}
result_save_path = '/root/autodl-tmp/xin/Classify/results'
os.makedirs(result_save_path, exist_ok=True)

for color, color_name in color_map.items():
    mask = (all_colors == color)
    preds_color = all_preds[mask]
    targets_color = all_targets[mask]
    cm = confusion_matrix(targets_color, preds_color)
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap=plt.cm.Blues)
    plt.title(f'Confusion Matrix for {color_name}')
    plt.ylabel('Actual Label')
    plt.xlabel('Predicted Label')
    plt.savefig(os.path.join(result_save_path, f'confusion_matrix_{color_name}_{model_weight_name}.png'))
    plt.close()

print("All confusion matrices saved.")



# 计算每个数字在每种颜色下的分类成功率
unique_labels = np.unique(all_targets)
unique_colors = np.unique(all_colors)

# 创建结果保存目录
result_save_path = '/root/autodl-tmp/xin/Classify/results'
os.makedirs(result_save_path, exist_ok=True)

# 存储结果的DataFrame
results = []

for label in unique_labels:
    for color in unique_colors:
        # 过滤出当前数字和颜色的样本
        mask = (all_targets == label) & (all_colors == color)
        if np.sum(mask) == 0:
            continue
        true_positive = np.sum((all_preds[mask] == label))
       
        accuracy = true_positive / np.sum(mask)
        results.append({"Digit": label, "Color": color, "Accuracy": accuracy})

results_df = pd.DataFrame(results)

# 保存结果到CSV文件
results_csv_path = os.path.join(result_save_path, f'classification_accuracy_by_color_82{model_weight_name}.csv')
results_df.to_csv(results_csv_path, index=False)
print(f"Results saved to {results_csv_path}")

# 可视化结果并保存图像
plt.figure(figsize=(10, 8))
for label in unique_labels:
    subset = results_df[results_df["Digit"] == label]
    plt.plot(subset["Color"], subset["Accuracy"], marker='o', label=f"Digit {label}")

plt.xlabel("Color")
plt.ylabel("Accuracy")
plt.title("Classification Accuracy for Each Digit by Color")
plt.legend()
plt.grid(True)

# 保存图像到文件
accuracy_plot_path = os.path.join(result_save_path, f'classification_accuracy_by_color_{model_weight_name}.png')
plt.savefig(accuracy_plot_path)
print(f"Plot saved to {accuracy_plot_path}")
plt.close()  # 关闭绘图以释放内存

# 绘制并保存混淆矩阵
cm = confusion_matrix(all_targets, all_preds)

def plot_confusion_matrix(cm, labels, title='Confusion Matrix', cmap=plt.cm.Blues):
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap=cmap, xticklabels=labels, yticklabels=labels)
    plt.title(title)
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()
    plt.savefig(os.path.join(result_save_path, f'confusion_matrix_{model_weight_name}.png'))
    plt.close()

labels = [str(i) for i in range(10)]  # 假设标签为0到9
plot_confusion_matrix(cm, labels)


All confusion matrices saved.
Results saved to /root/autodl-tmp/xin/Classify/results/classification_accuracy_by_color_82resnet50secondhun10%_epoch1.pth.csv
Plot saved to /root/autodl-tmp/xin/Classify/results/classification_accuracy_by_color_resnet50secondhun10%_epoch1.pth.png
