In [6]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

这段代码定义了一个数据预处理流水线，使用了 `torchvision.transforms` 模块中的多个变换操作。以下是对每个变换操作的解释：

```python
transform = transforms.Compose([
    transforms.RandomResizedCrop(size=32),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
```

1. **`transforms.RandomResizedCrop(size=32)`**:
   - 随机裁剪图像，并将其调整为指定大小（32x32）。
   - 这个操作有助于模型学习不同尺度和位置的特征。

2. **`transforms.RandomHorizontalFlip()`**:
   - 以0.5的概率随机水平翻转图像。
   - 这个操作有助于模型学习图像的对称性。

3. **`transforms.ColorJitter()`**:
   - 随机改变图像的亮度、对比度、饱和度和色调。
   - 这个操作有助于模型学习对颜色变化的鲁棒性。

4. **`transforms.RandomGrayscale(p=0.2)`**:
   - 以0.2的概率将图像转换为灰度图像。
   - 这个操作有助于模型学习对颜色信息的鲁棒性。

5. **`transforms.ToTensor()`**:
   - 将PIL图像或numpy数组转换为torch张量，并将像素值从[0, 255]范围缩放到[0, 1]范围。
   - 这个操作是必需的，因为PyTorch模型期望输入是张量格式。

6. **`transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))`**:
   - 对图像张量进行标准化处理，使其均值为0，标准差为1。
   - 这里的参数是针对CIFAR-10数据集的典型均值和标准差。
   - 标准化操作有助于模型更快地收敛。

这个预处理流水线可以应用于数据加载器中，以便在训练过程中对图像数据进行预处理。例如：

```python
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# 定义数据预处理流水线
transform = transforms.Compose([
    transforms.RandomResizedCrop(size=32),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])

# 加载数据集并应用预处理
train_dataset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
```

这样，`train_loader` 将会在每次迭代时自动应用定义的预处理流水线。

In [8]:
# 设置超参数
batch_size = 128
memory_bank_size = 50000  # CIFAR-10 有 50000 张训练图像
feature_dim = 128  # 特征向量的维度
negative_samples = 1024  # 每个 batch 选择的负样本数量

## gpt - 数据增强方法 ###
transform = transforms.Compose([
    transforms.RandomResizedCrop(size=32),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
])
### github 会报OverflowError错 ###
# transform = transforms.Compose([
#     transforms.RandomResizedCrop(size=32, scale=(0.2,1.)),
#     transforms.ColorJitter(0.4, 0.4, 0.4, 0.4),
#     transforms.RandomGrayscale(p=0.2),
#     # transforms.RandomHorizontalFlip(),
#     transforms.ToTensor(),
#     transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
# ])

# CIFAR-10 数据集
train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)

Files already downloaded and verified


In [10]:
# test
for temp in train_loader:
    a,b = temp
    print(a.shape, b.shape)
    break

torch.Size([128, 3, 32, 32]) torch.Size([128])


In [None]:
# 定义编码器网络
class Encoder(nn.Module):
    def __init__(self, feature_dim=128):
        super(Encoder, self).__init__()
        self.encoder = torchvision.models.resnet18(pretrained=False, num_classes=feature_dim)
        self.encoder.fc = nn.Sequential(nn.Linear(self.encoder.fc.in_features, feature_dim))

    def forward(self, x):
        return self.encoder(x)

In [None]:
# Encoder = Encoder()
# print(Encoder)

In [None]:
# 初始化 Memory Bank
class MemoryBank:
    def __init__(self, size, dim):
        self.size = size
        self.dim = dim
        self.memory = torch.randn(size, dim).cuda()
        self.memory = nn.functional.normalize(self.memory, dim=1)

    def update(self, indices, features):
        # 更新 Memory Bank 中指定索引的特征
        self.memory[indices] = nn.functional.normalize(features, dim=1)

    def get_negatives(self, batch_size):
        # 随机选择负样本
        return torch.randint(0, self.size, (batch_size, negative_samples)).cuda()

In [None]:
# 实例辨别损失 (NCE)
class NCELoss(nn.Module):
    def __init__(self, temperature=0.07):
        super(NCELoss, self).__init__()
        self.temperature = temperature
        self.criterion = nn.CrossEntropyLoss()

    def forward(self, features, positives, negatives):
        batch_size = features.shape[0]
        features = nn.functional.normalize(features, dim=1)
        
        # 计算正样本相似度
        positive_logits = torch.sum(features * positives, dim=1).unsqueeze(1)
        
        # 计算负样本相似度
        negative_logits = torch.matmul(features, negatives.T)
        
        # 拼接正负样本的相似度
        logits = torch.cat([positive_logits, negative_logits], dim=1)
        logits /= self.temperature
        
        # 创建标签，0 表示正样本
        labels = torch.zeros(batch_size, dtype=torch.long).cuda()
        loss = self.criterion(logits, labels)

        return loss

In [None]:
# 定义模型、损失函数和优化器
model = Encoder(feature_dim=feature_dim).cuda()
memory_bank = MemoryBank(size=memory_bank_size, dim=feature_dim)
criterion = NCELoss().cuda()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [None]:
# 训练过程
def train(model, train_loader, memory_bank, criterion, optimizer, epochs=200):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for i, (images, indices) in enumerate(train_loader):
            images = images.cuda()
            indices = indices.cuda()

            # 提取特征
            features = model(images)
            
            # 从 memory bank 中获取正样本和负样本
            positives = memory_bank.memory[indices]
            negatives_indices = memory_bank.get_negatives(batch_size)
            negatives = memory_bank.memory[negatives_indices]

            # 计算损失
            loss = criterion(features, positives, negatives)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # 更新 memory bank 中对应的特征
            memory_bank.update(indices, features.detach())

        print(f'Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}')

In [None]:
# 开始训练
train(model, train_loader, memory_bank, criterion, optimizer, epochs=200)