In [None]:
from datasets import load_dataset
import torchvision.transforms as transforms
import torchvision
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch
from PIL import Image


In [None]:
class SimCLR(nn.Module):
    def __init__(self, base_encoder, projection_dim=128):
        super(SimCLR, self).__init__()
        # 使用ResNet作为基础编码器
        self.encoder = base_encoder
        # 定义投影头
        self.projection_head = nn.Sequential(
            nn.Linear(512, 512),  # 512是ResNet输出的特征维度
            nn.ReLU(),
            nn.Linear(512, projection_dim)
        )
    def forward(self, x):
        x = self.encoder(x)  # 输出图像的特征
        x = F.relu(x)         # 激活函数
        x = self.projection_head(x)  # 输出投影空间的特征
        return x


In [None]:
def contrastive_loss(x_i, x_j, temperature=0.5):
    """
    对比损失函数：基于余弦相似度
    x_i, x_j: 输入的特征向量（来自不同的数据增强视图）
    """
    # 计算余弦相似度
    similarity = F.cosine_similarity(x_i, x_j, dim=1)
    # 对比损失
    loss = -torch.log(torch.exp(similarity / temperature) / torch.sum(torch.exp(similarity / temperature), dim=1))
    return loss.mean()

In [None]:
transform = transforms.Compose([
    transforms.RandomResizedCrop(224),  # 随机裁剪到224x224
    transforms.RandomHorizontalFlip(),  # 随机水平翻转
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

In [None]:
dataset = load_dataset("cifar10", split='train')

In [None]:
test_image = dataset['img']
print(test_image)

In [7]:
import numpy as np
def transform_dataset(example):
    # 将图像从 list 转为 NumPy 数组
    # image_array = np.array(example['img'], dtype=np.uint8)  # 确保数据类型为 uint8
    # print(type(image_array))
    # image = Image.fromarray(image_array)  # 转为 PIL 图像
    example['pixel_values'] = transform(image)  # 对图像应用数据增强
    return example

In [8]:
dataset = dataset.map(transform_dataset)

Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.ndarray'>
<class 'numpy.nd

KeyboardInterrupt: 

In [None]:
train_loader = DataLoader(dataset=dataset, batch_size=256, shuffle=True)

In [None]:
print(train_loader)

In [None]:
base_encoder = torchvision.models.resnet18(weights=None)

In [93]:
base_encoder = nn.Sequential(*list(base_encoder.children())[:-1])  # 去掉最后的全连接层

In [94]:
model = SimCLR(base_encoder)

In [95]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [96]:
epochs = 10
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for batch in train_loader:
        images = batch['pixel_values']

        view1 = images
        view2 = images

        optimizer.zero_grad()

        features1 = model(view1)
        features2 = model(view2)

        loss = contrastive_loss(features1, features2)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()
   
    print(f"Epoch [{epoch+1}/{epoch}], Loss: {total_loss/len(train_loader)}")

AttributeError: 'list' object has no attribute '__array_interface__'