In [62]:
import csv
import os
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from natsort import natsorted
from torch.utils.data import Dataset, DataLoader
import clip
import torch
import torch.nn as nn
from tqdm import tqdm
import torch.optim as optim
from torch.utils.data import random_split
from torchvision import transforms


In [63]:
# 初始化空列表來存儲動作標籤
action_labels = []

# 開啟 CSV 檔案
with open('/media/1TB_SSD/yancenli/DL_final/train_set.csv', mode='r', encoding='utf-8') as file:
    reader = csv.reader(file)
    
    # 跳過表頭（如果有）
    next(reader)
    
    # 遍歷每一行，提取 label 資料
    for row in reader:
        label = row[1]  # 取得第二列（label）
        action_labels.append(label)  # 加入列表

# 使用 set 來取得所有不重複的標籤
unique_labels = list(set(action_labels))

# 印出結果
print(f"共 {len(action_labels)} 張圖片的標籤")
print(f"動作標籤種類共 {len(unique_labels)} 種：{unique_labels}")



共 10100 張圖片的標籤
動作標籤種類共 15 種：['hugging', 'cycling', 'drinking', 'sleeping', 'listening_to_music', 'laughing', 'eating', 'running', 'texting', 'dancing', 'sitting', 'calling', 'clapping', 'fighting', 'using_laptop']


In [64]:
# 設定圖片資料夾路徑
image_folder = "/media/1TB_SSD/yancenli/DL_final/train_images"
#image_folder = "/media/1TB_SSD/yancenli/DL_final/test_images"

# 初始化空列表儲存圖片的 NumPy 陣列
image_arrays = []


# 取得經過自然排序的檔案名稱
image_files = natsorted([file for file in os.listdir(image_folder) if file.endswith(".jpg")])

# 遍歷資料夾中的圖片
for file in image_files:
    img_path = os.path.join(image_folder, file)  # 獲取完整路徑
    img = Image.open(img_path)  # 使用 Pillow 開啟圖片
    img_array = np.array(img)  # 轉為 NumPy 陣列
    image_arrays.append(img_array)  # 加入列表

# 檢查結果
print(f"共讀取 {len(image_arrays)} 張圖片")
print(f"第一張圖片的形狀：{image_arrays[0].shape}")

共讀取 10100 張圖片
第一張圖片的形狀：(183, 275, 3)


In [65]:
model, preprocess = clip.load("ViT-B/32", jit=False)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

CLIP(
  (visual): VisionTransformer(
    (conv1): Conv2d(3, 768, kernel_size=(32, 32), stride=(32, 32), bias=False)
    (ln_pre): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (transformer): Transformer(
      (resblocks): Sequential(
        (0): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          )
          (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): Sequential(
            (c_fc): Linear(in_features=768, out_features=3072, bias=True)
            (gelu): QuickGELU()
            (c_proj): Linear(in_features=3072, out_features=768, bias=True)
          )
          (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        )
        (1): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          

In [66]:
class CustomClipDataset(Dataset):
    def __init__(self, images, labels, preprocess, unique_labels):
        self.images = images
        self.labels = labels
        self.preprocess = preprocess
        self.unique_labels = unique_labels  # 儲存 unique_labels

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.fromarray(self.images[idx])  # 將 NumPy 陣列轉為 PIL 圖片
        raw_label = self.labels[idx]  # 原始標籤（字串）
        
        # 將 raw_label 轉換為 unique_labels 中的索引
        label = list(self.unique_labels).index(raw_label)  # 獲取 raw_label 在 unique_labels 中的索引
        
        image = self.preprocess(image)  # 預處理圖片
        return image, label



# 使用 CustomClipDataset 並創建 DataLoader
dataset = CustomClipDataset(image_arrays, action_labels, preprocess, unique_labels)
# 訓練集和驗證集的比例
train_size = int(0.8 * len(dataset))  # 80% 用於訓練
val_size = len(dataset) - train_size  # 剩下的 20% 用於驗證

# 隨機切分數據集
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# 創建訓練集和驗證集的 DataLoader
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False)

# 檢查 train_loader 和 val_loader
for images, labels in train_loader:
    print(f'Train batch - Images shape: {images.shape}, Labels shape: {labels.shape}')
    break  # 查看第一批次

for images, labels in val_loader:
    print(f'Validation batch - Images shape: {images.shape}, Labels shape: {labels.shape}')
    break  # 查看第一批次


Train batch - Images shape: torch.Size([256, 3, 224, 224]), Labels shape: torch.Size([256])
Validation batch - Images shape: torch.Size([256, 3, 224, 224]), Labels shape: torch.Size([256])


In [67]:
class CLIPFineTuner(nn.Module):
    def __init__(self, model, num_classes):
        super(CLIPFineTuner, self).__init__()
        self.model = model
        self.classifier = nn.Linear(model.visual.output_dim, num_classes)
    
    def forward(self, x):
        with torch.no_grad():
            features = self.model.encode_image(x).float()  # Convert to float32
        return self.classifier(features)

In [68]:
num_classes = len(unique_labels)
model_ft = CLIPFineTuner(model, num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_ft.classifier.parameters(), lr=1e-4)

In [70]:
# Number of epochs for training
num_epochs = 50

# Training loop
for epoch in range(num_epochs):
    model_ft.train()  # Set the model to training mode
    running_loss = 0.0  # Initialize running loss for the current epoch
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}, Loss: 0.0000")  # Initialize progress bar
    
    for images, labels in pbar:
        images, labels = images.to(device), labels.to(device)  # Move images and labels to the device (GPU or CPU)
        optimizer.zero_grad()  # Clear the gradients of all optimized variables
        outputs = model_ft(images)  # Forward pass: compute predicted outputs by passing inputs to the model
        loss = criterion(outputs, labels)  # Calculate the loss
        loss.backward()  # Backward pass: compute gradient of the loss with respect to model parameters
        optimizer.step()  # Perform a single optimization step (parameter update)
        
        running_loss += loss.item()  # Update running loss
        pbar.set_description(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.4f}")  # Update progress bar with current loss

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')  # Print average loss for the epoch

    # Validation
    model_ft.eval()  # Set the model to evaluation mode
    correct = 0  # Initialize correct predictions counter
    total = 0  # Initialize total samples counter
    
    with torch.no_grad():  # Disable gradient calculation for validation
        for images, labels in val_loader:
            images, labels = images.to(device), labels.to(device)  # Move images and labels to the device
            outputs = model_ft(images)  # Forward pass: compute predicted outputs by passing inputs to the model
            _, predicted = torch.max(outputs.data, 1)  # Get the class label with the highest probability
            total += labels.size(0)  # Update total samples
            correct += (predicted == labels).sum().item()  # Update correct predictions

    print(f'Validation Accuracy: {100 * correct / total}%')  # Print validation accuracy for the epoch

# Save the fine-tuned model
torch.save(model_ft.state_dict(), 'clip_finetuned.pth')  # Save the model's state dictionary

Epoch 1/50, Loss: 2.0573: 100%|██████████| 32/32 [00:11<00:00,  2.88it/s]


Epoch [1/50], Loss: 2.0573


Epoch 2/50, Loss: 1.9511: 100%|██████████| 32/32 [00:11<00:00,  2.88it/s]


Epoch [2/50], Loss: 1.9511


Epoch 3/50, Loss: 1.8523: 100%|██████████| 32/32 [00:11<00:00,  2.87it/s]


Epoch [3/50], Loss: 1.8523


Epoch 4/50, Loss: 1.3828:  78%|███████▊  | 25/32 [00:09<00:02,  2.77it/s]


KeyboardInterrupt: 