导入必要的库

In [1]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision.models import resnet18
from torch.utils.tensorboard import SummaryWriter
import numpy as np
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import pickle
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch.nn.functional as F
from torchvision.transforms.functional import to_pil_image, to_tensor

下载并使用STL-10数据集，用于自监督学习算法SimCLR在ResNet-18上的训练

In [2]:
# 下载并加载STL-10数据集
transform = transforms.Compose([  
        transforms.RandomResizedCrop(96),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ])


stl10_train_dataset = datasets.STL10(root='./data', split='unlabeled', download=True, transform=transform)
stl10_train_loader = torch.utils.data.DataLoader(stl10_train_dataset, batch_size=256, shuffle=True, num_workers=4)

Files already downloaded and verified


确保使用的torch和torchvision的版本对应,以及接下来要使用mps进行调用GPU加速

In [3]:
device = torch.device('mps')
print(f"Using device: {device}")
print(torch.__version__)
print(torchvision.__version__)


Using device: mps
2.3.1
0.18.1


定义并训练SimCLR模型

In [4]:
# NT-Xent损失函数定义
class NTXentLoss(nn.Module):
    def __init__(self, batch_size, temperature, device):
        super(NTXentLoss, self).__init__()
        self.batch_size = batch_size
        self.temperature = temperature
        self.device = device
        self.criterion = nn.CrossEntropyLoss(reduction="sum")

    def similarity_function(self, x, y):
        return torch.mm(x, y.t()) / self.temperature

    def forward(self, z_i, z_j):
        # 获取当前实际批次大小
        batch_size = z_i.size(0)
        N = 2 * batch_size
        z = torch.cat((z_i, z_j), dim=0)
        
        sim = self.similarity_function(z, z)
        
        mask = torch.eye(N, dtype=torch.bool).to(self.device)
        sim = sim.masked_fill(mask, -float('inf'))
        
        pos = torch.cat([torch.diag(sim, batch_size), torch.diag(sim, -batch_size)])
        pos = pos.view(N, 1)
        
        neg = sim.masked_fill(mask, -float('inf')).flatten()
        logits = torch.cat((pos, neg.view(N, -1)), dim=1)
        
        labels = torch.zeros(N, dtype=torch.long).to(self.device)
        loss = self.criterion(logits, labels)
        loss /= N
        return loss


# SimCLR 模型定义
class SimCLR(nn.Module):
    def __init__(self, base_model, out_dim=256):
        super(SimCLR, self).__init__()
        self.encoder = base_model
        self.projector = nn.Sequential(
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, out_dim)
        )

    def forward(self, x):
        h = self.encoder(x)
        z = self.projector(h)
        return h, z

# 自定义数据集类定义
class CustomBinaryDataset(Dataset):
    def __init__(self, data_dict, transform=None):
        self.filenames = data_dict['filenames']
        self.images = data_dict['data']
        self.labels = data_dict['fine_labels']
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        # 将图像数据从一维数组转换为 32x32x3 的形状
        image = image.reshape(3, 32, 32).transpose(1, 2, 0)
        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)

        return image, label

# 加强的数据增强策略
def augment(x):
    return x + torch.randn_like(x) * 0.1

# 计算准确率函数
def calculate_accuracy(z1, z2):
    similarity = torch.matmul(z1, z2.T)
    preds = similarity.argmax(dim=1)
    correct = preds.eq(torch.arange(z1.size(0), device=z1.device)).sum().item()
    return correct / z1.size(0)


# 训练SimCLR模型的函数
def train_simclr(model, loader, optimizer, criterion, device, epoch):
    model.train()
    total_loss = 0
    total_correct = 0
    num_samples = 0
    for i, (x, _) in enumerate(loader):
        x = x.to(device)
        
        # 数据增强
        x1, x2 = augment(x), augment(x)
        
        # 获取投影后的特征向量
        _, z1 = model(x1)
        _, z2 = model(x2)
        
        # 计算损失
        loss = criterion(z1, z2)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_correct += calculate_accuracy(z1, z2) * x.size(0)
        num_samples += x.size(0)

    avg_loss = total_loss / len(loader)
    accuracy = total_correct / num_samples
    print(f'Epoch [{epoch+1}], Loss: {avg_loss:.6f}, Accuracy: {accuracy:.6f}')
    return avg_loss, accuracy

# 定义训练函数
def train_SimCLR(params, train_loader):
    # 设置设备
    device = torch.device(params['device'])

    # 定义ResNet-18基础模型，并去除分类层
    resnet_model = resnet18(weights=params['base_model_weights'])
    resnet_model.fc = nn.Identity()

    # 初始化SimCLR模型
    model = SimCLR(resnet_model, out_dim=params['out_dim']).to(device)

    # 定义优化器和损失函数
    optimizer = optim.Adam(model.parameters(), lr=params['learning_rate'], weight_decay=1e-6)  # 加入权重衰减
    criterion = NTXentLoss(batch_size=params['batch_size'], temperature=0.5, device=device)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)  # 学习率调度器
    writer = SummaryWriter()

    # 训练模型
    for epoch in range(params['num_epochs']):
        avg_loss, accuracy = train_simclr(model, train_loader, optimizer, criterion, device, epoch)
        writer.add_scalar('Loss/train', avg_loss, epoch)
        writer.add_scalar('Accuracy/train', accuracy, epoch)
        scheduler.step()  # 更新学习率

    # 保存模型权重
    torch.save(model.state_dict(), params['model_save_path'])
    writer.close()

In [5]:
# 定义参数字典
params_stl10 = {
    'device': 'mps',  # 计算设备
    'base_model_weights': 'DEFAULT',  # 预训练模型的权重，
    'out_dim': 256,  # SimCLR 模型的投影输出维度
    'learning_rate': 1e-4,  # 学习率
    'num_epochs': 15,  # 训练的轮数
    'batch_size': 256,  # 批量大小
    'model_save_path': 'simclr_stl10.pth',  # 保存训练好的模型权重的路径
}


In [8]:
# 调用训练函数
train_SimCLR(params_stl10,stl10_train_loader)

Epoch [1], Loss: 0.825628, Accuracy: 0.985410
Epoch [2], Loss: 0.695386, Accuracy: 0.999550
Epoch [3], Loss: 0.694338, Accuracy: 0.999810
Epoch [4], Loss: 0.693898, Accuracy: 0.999870
Epoch [5], Loss: 0.693753, Accuracy: 0.999890
Epoch [6], Loss: 0.693457, Accuracy: 0.999960
Epoch [7], Loss: 0.693385, Accuracy: 0.999930
Epoch [8], Loss: 0.693391, Accuracy: 0.999920
Epoch [9], Loss: 0.693299, Accuracy: 0.999970
Epoch [10], Loss: 0.693268, Accuracy: 0.999980
Epoch [11], Loss: 0.693258, Accuracy: 1.000000
Epoch [12], Loss: 0.693260, Accuracy: 0.999970
Epoch [13], Loss: 0.693238, Accuracy: 0.999980
Epoch [14], Loss: 0.693264, Accuracy: 0.999970
Epoch [15], Loss: 0.693254, Accuracy: 0.999990


加载并解析CIFAR数据集

In [6]:
# 读取并解析二进制文件
train_path = '/Users/fuchenxu/Desktop/computer_vision/final_project/task_1/SIFAR-100/cifar-100-python/train'
test_path = '/Users/fuchenxu/Desktop/computer_vision/final_project/task_1/SIFAR-100/cifar-100-python/test'

with open(train_path, 'rb') as f:
    train_data = pickle.load(f, encoding='latin1')
with open(test_path, 'rb') as f:
    test_data = pickle.load(f, encoding='latin1')

# 检查 train_data 的内容
print(type(train_data),type(test_data))
print(len(train_data),len(test_data))
print(train_data.keys() if isinstance(train_data, dict) else "Not a dictionary")
print(test_data.keys() if isinstance(test_data, dict) else "Not a dictionary")


<class 'dict'> <class 'dict'>
5 5
dict_keys(['filenames', 'batch_label', 'fine_labels', 'coarse_labels', 'data'])
dict_keys(['filenames', 'batch_label', 'fine_labels', 'coarse_labels', 'data'])


In [7]:
# 自定义数据集类定义
class CustomBinaryDataset(Dataset):
    def __init__(self, data_dict, transform=None):
        self.filenames = data_dict['filenames']
        self.images = data_dict['data']
        self.labels = data_dict['fine_labels']
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        # 将图像数据从一维数组转换为 32x32x3 的形状
        image = image.reshape(3, 32, 32).transpose(1, 2, 0)
        image = Image.fromarray(image)

        if self.transform:
            image = self.transform(image)

        return image, label

# 数据增强和标准化
transform = transforms.Compose([
    transforms.RandomResizedCrop(32),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# 加载训练数据集
train_dataset = CustomBinaryDataset(data_dict=train_data, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)

#加载测试集数据
test_dataset = CustomBinaryDataset(data_dict=test_data, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)

# 验证数据加载器是否正常工作
for images, labels in train_loader:
    print(images.shape, labels.shape)
    break
for images, labels in test_loader:
    print(images.shape, labels.shape)
    break

torch.Size([256, 3, 32, 32]) torch.Size([256])
torch.Size([256, 3, 32, 32]) torch.Size([256])


定义线性分类器并进行训练和评估

In [22]:
# 定义线性分类器
class LinearClassifier(nn.Module):
    def __init__(self, base_model, num_classes=100):
        super(LinearClassifier, self).__init__()
        self.encoder = base_model
        self.fc1 = nn.Linear(512, 256)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(256, num_classes)

    def forward(self, x):
        with torch.no_grad():
            h = self.encoder(x)
        x = self.fc1(h)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# 训练线性分类器的函数
def train_linear_classifier(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    total_correct = 0
    for x, y in loader:
        x, y = x.to(device), y.to(device)
        outputs = model(x)
        loss = criterion(outputs, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_correct += (outputs.argmax(1) == y).sum().item()
    avg_loss = total_loss / len(loader)
    accuracy = total_correct / len(loader.dataset)
    return avg_loss, accuracy

# 评估线性分类器的函数
def evaluate_linear_classifier(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    total_correct = 0
    with torch.no_grad():
        for x, y in loader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            loss = criterion(outputs, y)
            total_loss += loss.item()
            total_correct += (outputs.argmax(1) == y).sum().item()
    avg_loss = total_loss / len(loader)
    accuracy = total_correct / len(loader.dataset)
    return avg_loss, accuracy
    
# 训练和测试函数
def train_and_evaluate_simclr(params, train_loader, test_loader):
    # 设置设备
    device = torch.device(params['device'])

    # 加载预训练的SimCLR模型并选择性的冻结其权重
    resnet_model = resnet18(weights=params['base_model_weights'])
    resnet_model.fc = nn.Identity()
    simclr_model = SimCLR(resnet_model, out_dim=256).to(device)
    simclr_model.load_state_dict(torch.load(params['simclr_model_path']))

    if params['freeze_pretrained']:
        # 冻结预训练模型参数
        for param in simclr_model.parameters():
            param.requires_grad = False

    # 创建线性分类器
    linear_classifier = LinearClassifier(simclr_model.encoder, num_classes=params['num_classes']).to(device)

    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(linear_classifier.parameters(), lr=params['learning_rate'])
    
    # 学习率调度器
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=3)

    writer = SummaryWriter()
    for epoch in range(params['num_epochs']):
        train_loss, train_accuracy = train_linear_classifier(linear_classifier, train_loader, optimizer, criterion, device)
        test_loss, test_accuracy = evaluate_linear_classifier(linear_classifier, test_loader, criterion, device)
        print(f'Epoch [{epoch+1}], Train Loss: {train_loss:.6f}, Train Accuracy: {train_accuracy:.6f}')
        print(f'Epoch [{epoch+1}], Test Loss: {test_loss:.6f}, Test Accuracy: {test_accuracy:.6f}')
        writer.add_scalars('Loss', {'train': train_loss, 'test': test_loss}, epoch)
        writer.add_scalars('Accuracy', {'train': train_accuracy, 'test': test_accuracy}, epoch)
        
        # 调整学习率
        scheduler.step(test_loss)

    # 保存模型权重
    torch.save(linear_classifier.state_dict(), params['model_save_path'])
    writer.close()

In [12]:
# 定义参数字典
params_simclr_frozen = {
    'device': 'mps',  # 设备选择
    'base_model_weights': 'DEFAULT',
    'simclr_model_path': 'simclr_stl10.pth',
    'learning_rate': 1e-4,  # 学习率
    'batch_size': 256,
    'num_epochs': 15,  # 训练的轮数
    'model_save_path': 'linear_classifier_frozen.pth',  # 保存模型权重的路径
    'num_classes': 100,
    'freeze_pretrained': True,  # 是否冻结预训练模型权重
}

params_simclr_unfrozen = {
    'device': 'mps',  # 设备选择
    'base_model_weights': 'DEFAULT',
    'simclr_model_path': 'simclr_stl10.pth',
    'learning_rate': 1e-4,  # 学习率
    'batch_size': 256,
    'num_epochs': 15,  # 训练的轮数
    'model_save_path': 'linear_classifier_unfrozen.pth',  # 保存模型权重的路径
    'num_classes': 100,
    'freeze_pretrained': False,  # 是否冻结预训练模型权重
}

In [None]:
# 调用训练和测试函数
train_and_evaluate_simclr(params_simclr_unfrozen, train_loader, test_loader)

使用在ImageNet数据集上预训练的ResNet-18模型进行线性分类评估

In [15]:
# 训练和测试函数
def train_and_evaluate_imagenet(params, train_loader, test_loader):
    # 设置设备
    device = torch.device(params['device'])

    # 加载ImageNet预训练的ResNet-18模型并冻结其权重
    resnet_model = resnet18(weights=params['base_model_weights'])
    resnet_model.fc = nn.Identity()

    for param in resnet_model.parameters():
        param.requires_grad = False

    # 创建线性分类器
    linear_classifier = LinearClassifier(resnet_model, num_classes=params['num_classes']).to(device)

    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(linear_classifier.parameters(), lr=params['learning_rate'])

    writer = SummaryWriter()
    for epoch in range(params['num_epochs']):
        train_loss, train_accuracy = train_linear_classifier(linear_classifier, train_loader, optimizer, criterion, device)
        test_loss, test_accuracy = evaluate_linear_classifier(linear_classifier, test_loader, criterion, device)
        print(f'Epoch [{epoch+1}], Train Loss: {train_loss:.6f}, Train Accuracy: {train_accuracy:.6f}')
        print(f'Epoch [{epoch+1}], Test Loss: {test_loss:.6f}, Test Accuracy: {test_accuracy:.6f}')
        writer.add_scalars('Loss', {'train': train_loss, 'test': test_loss}, epoch)
        writer.add_scalars('Accuracy', {'train': train_accuracy, 'test': test_accuracy}, epoch)
    
    # 保存模型权重
    torch.save(linear_classifier.state_dict(), params['model_save_path'])
    writer.close()

In [17]:
# 定义参数字典
params_imagenet = {
    'device': 'cuda' if torch.cuda.is_available() else 'mps',  # 设备选择
    'base_model_weights': 'IMAGENET1K_V1',  # 预训练模型的权重
    'learning_rate': 1e-4,  # 学习率
    'num_epochs': 15,  # 训练的轮数
    'model_save_path': 'imagenet_classifier_cifar100.pth',  # 保存模型权重的路径
    'num_classes': 100,  # 分类器的类别数量
}


In [18]:
# 调用训练和测试函数
train_and_evaluate_imagenet(params_imagenet, train_loader, test_loader)

Epoch [1], Train Loss: 4.585081, Train Accuracy: 0.224000
Epoch [1], Test Loss: 4.525494, Test Accuracy: 0.380000
Epoch [2], Train Loss: 4.474433, Train Accuracy: 0.427200
Epoch [2], Test Loss: 4.409912, Test Accuracy: 0.524000
Epoch [3], Train Loss: 4.357847, Train Accuracy: 0.576400
Epoch [3], Test Loss: 4.304858, Test Accuracy: 0.674000
Epoch [4], Train Loss: 4.280235, Train Accuracy: 0.664800
Epoch [4], Test Loss: 4.248499, Test Accuracy: 0.708000
Epoch [5], Train Loss: 4.237301, Train Accuracy: 0.723600
Epoch [5], Test Loss: 4.219965, Test Accuracy: 0.778000
Epoch [6], Train Loss: 4.203393, Train Accuracy: 0.763000
Epoch [6], Test Loss: 4.184556, Test Accuracy: 0.841000
Epoch [7], Train Loss: 4.182959, Train Accuracy: 0.790000
Epoch [7], Test Loss: 4.173709, Test Accuracy: 0.827000
Epoch [8], Train Loss: 4.163268, Train Accuracy: 0.812400
Epoch [8], Test Loss: 4.129063, Test Accuracy: 0.896000
Epoch [9], Train Loss: 4.144882, Train Accuracy: 0.852800
Epoch [9], Test Loss: 4.126649

在CIFAR-100数据集上从零开始训练ResNet-18

In [19]:
# 训练和测试函数
def train_and_evaluate_cifar_resnet(params, train_loader, test_loader):
    # 设置设备
    device = torch.device(params['device'])

    # 定义ResNet-18模型
    cifar_resnet_model = resnet18(weights=None)
    cifar_resnet_model.fc = nn.Linear(512, 100)  # 适配CIFAR-100
    cifar_resnet_model = cifar_resnet_model.to(device)  # 将模型加载到设备上

    # 定义损失函数和优化器
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(cifar_resnet_model.parameters(), lr=params['learning_rate'])

    writer = SummaryWriter()
    for epoch in range(params['num_epochs']):
        train_loss, train_accuracy = train_linear_classifier(cifar_resnet_model, train_loader, optimizer, criterion, device)
        test_loss, test_accuracy = evaluate_linear_classifier(cifar_resnet_model, test_loader, criterion, device)
        print(f'Epoch [{epoch+1}], Train Loss: {train_loss:.6f}, Train Accuracy: {train_accuracy:.6f}')
        print(f'Epoch [{epoch+1}], Test Loss: {test_loss:.6f}, Test Accuracy: {test_accuracy:.6f}')
        writer.add_scalars('Loss', {'train': train_loss, 'test': test_loss}, epoch)
        writer.add_scalars('Accuracy', {'train': train_accuracy, 'test': test_accuracy}, epoch)
    
    # 保存模型权重
    torch.save(cifar_resnet_model.state_dict(), params['model_save_path'])
    writer.close()


In [20]:
# 定义参数字典
params_cifar = {
    'device': 'mps',  # 设备选择
    'learning_rate': 1e-4,  # 学习率
    'num_epochs': 15,  # 训练的轮数
    'model_save_path': 'cifar_resnet_cifar100.pth',  # 保存模型权重的路径
}


In [23]:
# 调用训练和测试函数
train_and_evaluate_cifar_resnet(params_cifar, train_loader, test_loader)

Epoch [1], Train Loss: 4.575025, Train Accuracy: 0.078960
Epoch [1], Test Loss: 4.448085, Test Accuracy: 0.117000
Epoch [2], Train Loss: 4.374245, Train Accuracy: 0.131580
Epoch [2], Test Loss: 4.319279, Test Accuracy: 0.159300
Epoch [3], Train Loss: 4.271965, Train Accuracy: 0.166800
Epoch [3], Test Loss: 4.236952, Test Accuracy: 0.189000
Epoch [4], Train Loss: 4.194817, Train Accuracy: 0.200940
Epoch [4], Test Loss: 4.161407, Test Accuracy: 0.229800
Epoch [5], Train Loss: 4.127726, Train Accuracy: 0.229680
Epoch [5], Test Loss: 4.128075, Test Accuracy: 0.242700
Epoch [6], Train Loss: 4.082789, Train Accuracy: 0.250560
Epoch [6], Test Loss: 4.083178, Test Accuracy: 0.262500
Epoch [7], Train Loss: 4.035984, Train Accuracy: 0.269760
Epoch [7], Test Loss: 4.031852, Test Accuracy: 0.278400
Epoch [8], Train Loss: 3.993523, Train Accuracy: 0.295380
Epoch [8], Test Loss: 4.024248, Test Accuracy: 0.284400
Epoch [9], Train Loss: 3.953982, Train Accuracy: 0.305940
Epoch [9], Test Loss: 3.966776

用Tensorboard可视化的训练过程中的loss曲线变化以及Linear classification过程中accuracy的变化

In [25]:
!tensorboard --logdir="/Users/fuchenxu/Desktop/computer_vision/final_project/task_1"

TensorFlow installation not found - running with reduced feature set.
Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.14.0 at http://localhost:6006/ (Press CTRL+C to quit)
E0615 12:11:55.861474 6230732800 _internal.py:97] Error on request:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/final/lib/python3.8/site-packages/werkzeug/serving.py", line 363, in run_wsgi
    execute(self.server.app)
  File "/opt/anaconda3/envs/final/lib/python3.8/site-packages/werkzeug/serving.py", line 324, in execute
    application_iter = app(environ, start_response)
  File "/opt/anaconda3/envs/final/lib/python3.8/site-packages/tensorboard/backend/application.py", line 528, in __call__
    return self._app(environ, start_response)
  File "/opt/anaconda3/envs/final/lib/python3.8/site-packages/tensorboard/backend/application.py", line 569, in wrapper
    return wsgi_app(environ, start_response)
  File "/opt/anaconda3/envs/final/lib/python