In [2]:
import clip
import torch
from torch import nn
from PIL import Image
#ab8d0971276394e3d1d6dda698f2272810be2374  
device = "cuda" if torch.cuda.is_available() else "cpu"
model_name = "ViT-B/32"
model, preprocess = clip.load(model_name, device=device)


In [3]:
print(model)

CLIP(
  (visual): VisionTransformer(
    (conv1): Conv2d(3, 768, kernel_size=(32, 32), stride=(32, 32), bias=False)
    (ln_pre): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (transformer): Transformer(
      (resblocks): Sequential(
        (0): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          )
          (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): Sequential(
            (c_fc): Linear(in_features=768, out_features=3072, bias=True)
            (gelu): QuickGELU()
            (c_proj): Linear(in_features=3072, out_features=768, bias=True)
          )
          (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        )
        (1): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          

In [4]:
from pathlib import Path
import pandas as pd

# 确认正确的路径
dataset_path = Path("/media/liulianhang/C20EE8FD0EE8EB7D/h-and-m-personalized-fashion-recommendations")

# 读取数据
articles = pd.read_csv(dataset_path / "articles.csv")
customers = pd.read_csv(dataset_path / "customers.csv")
transactions = pd.read_csv(dataset_path / "transactions_train.csv")

In [5]:
class LoRALinear(nn.Module):
    def __init__(self, original_linear, r=4, lora_alpha=1.0):
        super().__init__()
        self.in_features = original_linear.in_features
        self.out_features = original_linear.out_features
        self.r = r
        self.lora_alpha = lora_alpha

        # Original weights and biases
        self.weight = original_linear.weight
        self.bias = original_linear.bias

        # Freeze original weights and biases
        self.weight.requires_grad = False
        if self.bias is not None:
            self.bias.requires_grad = False

        # LoRA parameters
        self.lora_A = nn.Parameter(torch.zeros((r, self.in_features)))
        self.lora_B = nn.Parameter(torch.zeros((self.out_features, r)))

        # Initialize LoRA parameters
        nn.init.kaiming_uniform_(self.lora_A, a=math.sqrt(5))
        nn.init.zeros_(self.lora_B)

        # Scaling factor
        self.scaling = self.lora_alpha / self.r

    def forward(self, x):
        # Original output
        result = nn.functional.linear(x, self.weight, self.bias)
        # LoRA update
        lora_update = (x @ self.lora_A.T) @ self.lora_B.T * self.scaling
        return result + lora_update

In [6]:
import math

def replace_linear_with_lora(module, r=4, lora_alpha=1.0):
    for name, child in module.named_children():
        if isinstance(child, nn.Linear):
            setattr(module, name, LoRALinear(child, r=r, lora_alpha=lora_alpha))
        else:
            replace_linear_with_lora(child, r=r, lora_alpha=lora_alpha)


In [7]:
replace_linear_with_lora(model.transformer)

In [8]:
# 假设您已经有 articles DataFrame 和 image_path_pool
# 我们需要创建一个数据集和数据加载器
from torch.utils.data import Dataset, DataLoader
import os
class CustomDataset(Dataset):
    def __init__(self, image_paths, labels, preprocess):
        self.image_paths = image_paths
        self.labels = labels
        self.preprocess = preprocess

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = self.preprocess(Image.open(self.image_paths[idx])).to(device)
        label = self.labels[idx]
        return image, label
# #Loop in all the images folders, choose the first three images in each folder, Make separate predictions for all the classes, using loop,and calculate the accuracy
# image_folder_root = "/media/liulianhang/C20EE8FD0EE8EB7D/h-and-m-personalized-fashion-recommendations/images"
# image_folder_3num = os.listdir(image_folder_root)
# image_path_pool = []
# #in each image folder, choose the first three images
# for image_folder in image_folder_3num:
#     image_folder_path = os.path.join(image_folder_root, image_folder)
#     image_folder_images = os.listdir(image_folder_path)
#     image_folder_images = image_folder_images[:2]
#     for image_name in image_folder_images:
#         image_path = os.path.join(image_folder_path, image_name)
#         image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
#         image_path_pool.append(image_path)

# # 准备 image_paths 和 labels
# image_paths = image_path_pool  # 您之前收集的图像路径列表
# labels = []  # 对应的标签列表，需要根据 image_paths 生成
product_group_name = articles['product_group_name'].unique()
# for image_path in image_paths:
#     article_id = int(image_path.split('/')[-1].split('.')[0][1:])
#     label = articles[articles['article_id'] == article_id]['product_group_name'].values[0]
#     labels.append(label)

# dataset = CustomDataset(image_paths, labels, preprocess)
# dataloader = DataLoader(dataset, batch_size=32, shuffle=True)


In [9]:
for name, param in model.named_parameters():
    if 'lora' in name:
        param.requires_grad = True
    else:
        param.requires_grad = False
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4)
criterion = nn.CrossEntropyLoss()


In [10]:
# for name, param in model.named_parameters():
#     print(f"{name}: requires_grad={param.requires_grad}")


In [60]:
# 获取所有图像路径和对应的标签
def load_image_paths_and_labels(image_folder, articles_df):
    image_paths = []
    labels = []
    for subdir, dirs, files in os.walk(image_folder):
        for file in files:
            if file.endswith('.jpg'):
                # 获取图像路径
                image_path = os.path.join(subdir, file)
                image_paths.append(image_path)
                
                # 从文件名提取 article_id
                article_id = int(file.split('.')[0][1:])  # 假设文件名是 "0108775015.jpg"
                # 获取标签
                label = articles_df[articles_df['article_id'] == article_id]['product_group_name'].values[0]
                labels.append(label)
    return image_paths, labels

In [59]:
from sklearn.model_selection import train_test_split
# 加载数据
image_folder = '/media/liulianhang/C20EE8FD0EE8EB7D/h-and-m-personalized-fashion-recommendations/images'
image_paths, labels = load_image_paths_and_labels(image_folder, articles)

# 划分训练集、验证集和测试集
train_image_paths, temp_image_paths, train_labels, temp_labels = train_test_split(
    image_paths, labels, test_size=0.3, random_state=42)

val_image_paths, test_image_paths, val_labels, test_labels = train_test_split(
    temp_image_paths, temp_labels, test_size=0.5, random_state=42)


train_dataset = CustomDataset(train_image_paths, train_labels, preprocess)
val_dataset = CustomDataset(val_image_paths, val_labels, preprocess)
test_dataset = CustomDataset(test_image_paths, test_labels, preprocess)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

In [58]:
# 定义训练和验证函数
import wandb
def train_one_epoch(model, train_loader, optimizer, criterion, device,text_tokens, epoch):
    total_loss = 0
    total_batches = len(train_loader)
    for batch_idx, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        label_indices = torch.tensor([list(product_group_name).index(label) for label in labels]).to(device)

        # 前向传播
        image_features = model.encode_image(images)
        image_features = image_features / image_features.norm(dim=-1, keepdim=True)

        
        text_features = model.encode_text(text_tokens)
        text_features = text_features / text_features.norm(dim=-1, keepdim=True)

        # 计算相似度
        logits_per_image = image_features @ text_features.T

        # 计算损失
        loss = criterion(logits_per_image, label_indices)

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        print('loss:',loss.item())
        #want to use wandb to log the loss
        wandb.log({'train_everystep_loss': loss.item()}, step=epoch * total_batches + batch_idx)
    avg_loss = total_loss / len(train_loader)
    return avg_loss

def validate(model, val_loader, criterion, device,text_tokens):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for images, labels in val_loader:
            images = images.to(device)
            label_indices = torch.tensor([list(product_group_name).index(label) for label in labels]).to(device)

            # 前向传播
            image_features = model.encode_image(images)
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)

            
            text_features = model.encode_text(text_tokens)
            text_features = text_features / text_features.norm(dim=-1, keepdim=True)

            # 计算相似度
            logits_per_image = image_features @ text_features.T

            # 计算损失
            loss = criterion(logits_per_image, label_indices)

            total_loss += loss.item()
            #wandb.log({'val_loss': loss.item()}, step=epoch * total_batches + batch_idx)

    avg_loss = total_loss / len(val_loader)
    return avg_loss


In [66]:
# from matplotlib import pyplot as plt

# label_indices = torch.tensor([list(product_group_name).index(label) for label in labels]).to(device)
# for images, labels in train_loader:
#         images = images.to(device)
#         label_indices = torch.tensor([list(product_group_name).index(label) for label in labels]).to(device)
#         print(len(label_indices))

#         # 前向传播
#         image_features = model.encode_image(images)
#         print(image_features.shape)
#         image_features = image_features / image_features.norm(dim=-1, keepdim=True)
#         print(image_features.shape)

#         text_inputs = clip.tokenize([f"a photo of a {label}" for label in labels]).to(device)
#         print(text_inputs.shape)
#         text_features = model.encode_text(text_inputs)
#         print(text_features.shape)
#         text_features = text_features / text_features.norm(dim=-1, keepdim=True)
#         print(text_features.shape)

#         # 计算相似度
#         logits_per_image = image_features @ text_features.T
#         print(logits_per_image.shape )
#         print('finish')

#         # 计算损失
#         loss = criterion(logits_per_image, label_indices)
# 准备所有可能的标签文本
# label_texts = [f"a photo of a {label}" for label in product_group_name]
# text_tokens = clip.tokenize(label_texts).to(device)
# print(text_tokens.shape)
# for images, labels in train_loader:
#         print(len(train_loader))
#         images = images.to(device)
#         # 获取对应标签在 product_group_name 中的索引
#         label_indices = torch.tensor([list(product_group_name).index(label) for label in labels]).to(device)
#         print(len(label_indices))

#         # 前向传播 - 图像特征
#         image_features = model.encode_image(images)
#         image_features = image_features / image_features.norm(dim=-1, keepdim=True)  # 避免就地操作
#         print(image_features.shape)

#         # 前向传播 - 文本特征
#         # 在每次训练中计算文本特征，这样它们的计算图不会被detach
#         text_features = model.encode_text(text_tokens)
#         text_features = text_features / text_features.norm(dim=-1, keepdim=True)  # 避免就地操作
#         print(text_features.shape)
#         # 计算相似度
#         logits_per_image = image_features @ text_features.T
#         print(logits_per_image.shape)
#         # 计算损失
#         loss = criterion(logits_per_image, label_indices)
#         print(loss)
#         print(f"loss: {loss}, requires_grad: {loss.requires_grad}, grad_fn: {loss.grad_fn}")

#         # 反向传播和优化
#         optimizer.zero_grad()
#         loss.backward()
#         optimizer.step()

#         total_loss += loss.item()


In [65]:
import wandb
wandb.init(project='clip_training', name='CLIP_Model_Run')
num_epochs = 50
best_val_loss = float('inf')
label_texts = [f"a photo of a {label}" for label in product_group_name]
text_tokens = clip.tokenize(label_texts).to(device)
# for epoch in range(num_epochs):
#     train_loss = train_one_epoch(model, train_loader, optimizer, criterion, device,text_tokens, epoch)
#     val_loss = validate(model, val_loader, criterion, device,text_tokens)
#     print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f}")
#     # 使用 WandB 记录训练和验证损失
#     wandb.log({
#         'epoch': epoch + 1,
#         'train_loss': train_loss,
#         'val_loss': val_loss
#     })
#     # 保存最佳模型
#     if val_loss < best_val_loss:
#         best_val_loss = val_loss
#         torch.save(model.state_dict(), 'best_clip_model.pth')
#         print("Saved Best Model")
#         wandb.save('best_clip_model.pth')

# 测试模型性能
def test(model, test_loader, criterion, device,text_tokens):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        #want the loop is ten times not the whole test_loader

        for i, (images, labels) in enumerate(test_loader):
            if i >= 10:
                break  # 退出循环
            images = images.to(device)
            label_indices = torch.tensor([list(product_group_name).index(label) for label in labels]).to(device)

            # 前向传播
            image_features = model.encode_image(images)
            image_features = image_features / image_features.norm(dim=-1, keepdim=True)

            #text_inputs = clip.tokenize([f"a photo of a {label}" for label in labels]).to(device)
            text_features = model.encode_text(text_tokens)
            text_features = text_features / text_features.norm(dim=-1, keepdim=True)

            # 计算相似度
            logits_per_image = image_features @ text_features.T

            # 计算损失
            loss = criterion(logits_per_image, label_indices)

            total_loss += loss.item()
            _, predicted = logits_per_image.max(1)
            total += label_indices.size(0)
            correct += (predicted == label_indices).sum().item()

    avg_loss = total_loss / 10
    accuracy = 100.0 * correct / total
    return avg_loss, accuracy

# 加载最佳模型并测试
model.load_state_dict(torch.load('best_clip_model.pth'))
test_loss, test_accuracy = test(model, test_loader, criterion, device,text_tokens)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")
# 记录测试结果到 WandB
wandb.log({
    'test_loss': test_loss,
    'test_accuracy': test_accuracy
})

# 结束 WandB 运行
wandb.finish()

[34m[1mwandb[0m: Currently logged in as: [33mliulianhang[0m ([33mliulianhang-kth-royal-institute-of-technology[0m). Use [1m`wandb login --relogin`[0m to force relogin


  model.load_state_dict(torch.load('best_clip_model.pth'))


Test Loss: 2.0810, Test Accuracy: 45.62%


0,1
test_accuracy,▁
test_loss,▁

0,1
test_accuracy,45.625
test_loss,2.08102
