# CLIP Fine tuning
- change text to "a photo of {label} in {scientific name}"
- final version

In [1]:
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git

Collecting ftfy
  Downloading ftfy-6.1.1-py3-none-any.whl (53 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/53.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.1/53.1 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ftfy
Successfully installed ftfy-6.1.1
Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-p4_0140x
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-p4_0140x
  Resolved https://github.com/openai/CLIP.git to commit a1d071733d7111c9c014f024669f959182114e33
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: clip
  Building wheel for clip (setup.py) ... [?25l[?25hdone
  Created wheel for clip: filename=clip-1.0-py3-none-any.whl size=1369497 sha256=22a2d1496dc3795ddde50bfd549de22f6d3f3a5c76c4c4ac6ede

In [16]:
import os
import clip
import torch
from torch import nn, optim
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.datasets import ImageFolder

%matplotlib inline
BATCH_SIZE = 32
EPOCH = 10

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


# Prepare the Model and Data

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
print(device)

cuda:0


In [47]:
model, preprocess = clip.load("ViT-B/32", device=device, jit=False)
model = model.to(torch.float32)

torch.manual_seed(42)

<torch._C.Generator at 0x7815717bd4b0>

In [6]:
# Creating image path, text list
import pandas as pd


data_folder = '/content/drive/MyDrive/mushroom_data_new'
csv_file_path = '/content/drive/MyDrive/labeled.csv'

df = pd.read_csv(csv_file_path)

image_paths = []
text_descriptions = []

# 각 클래스 이름과 학명 정보를 사용하여 이미지 경로 생성
for index, row in df.iterrows():
    class_name = row['Common']
    scientific_name = row['Scientific']

    # 이미지 경로 생성
    class_folder_path = os.path.join(data_folder, class_name)
    image_files = os.listdir(class_folder_path)

    for image_file in image_files:
        image_path = os.path.join(class_folder_path, image_file)
        image_paths.append(image_path)

        text_description = f"a photo of {class_name.replace('_', ' ')} in {scientific_name.replace('_', ' ')}" #and
        text_descriptions.append(clip.tokenize(text_description))

len(text_descriptions)


12740

In [7]:
class MyDataset(Dataset):
    def __init__(self, image_paths, text_descriptions, preprocess):
        self.image_paths = image_paths
        self.text_descriptions = text_descriptions
        self.preprocess = preprocess

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image = Image.open(self.image_paths[idx])
        image = self.preprocess(image)
        text = self.text_descriptions[idx]
        return image, text

# 데이터 전처리 및 데이터 로더 생성
data_folder = '/content/drive/MyDrive/mushroom_data_new'
dataset = MyDataset(image_paths, text_descriptions, preprocess)

# 데이터 분할 (train, validation, test)
total_size = len(dataset)
train_size = int(0.7 * total_size)
val_size = int(0.2 * total_size)
test_size = total_size - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# 데이터 로더 생성
trainloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
valloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
testloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)


In [8]:
from numpy.lib import shape_base
for batch in trainloader:
    images, texts = batch
    # Print the first batch
    print("Image Path:", images[0].shape)
    print("Text Description:", texts[0].shape)
    break

Image Path: torch.Size([3, 224, 224])
Text Description: torch.Size([1, 77])


# Training

In [48]:
def convert_models_to_fp32(model):
    for p in model.parameters():
        p.data = p.data.float()
        p.grad.data = p.grad.data.float()

loss_img = nn.CrossEntropyLoss()
loss_txt = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, len(trainloader)*EPOCH)

In [14]:
from tqdm.notebook import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

def compute_accuracy(logits, ground_truth):
    _, predicted = logits.max(1)
    total = ground_truth.size(0)
    correct = (predicted == ground_truth).sum().item()
    return correct / total

for epoch in range(EPOCH):
    print(f"Epoch: {epoch+1}")

    # Training loop
    model.train()
    train_total, train_correct = 0, 0
    pbar = tqdm(trainloader, total=len(trainloader))
    for batch in pbar:
        optimizer.zero_grad()

        images, texts = batch
        texts = texts.squeeze(1)
        images = images.to(device)
        texts = texts.to(device)

        logits_per_image, logits_per_text = model(images, texts)

        # Compute loss
        # ground_truth = torch.arange(BATCH_SIZE).to(device)
        actual_batch_size = images.size(0)
        ground_truth = torch.arange(actual_batch_size).to(device)
        total_loss = (loss_img(logits_per_image, ground_truth) + loss_txt(logits_per_text, ground_truth)) / 2

        # Compute train accuracy
        train_correct += (logits_per_image.argmax(dim=1) == ground_truth).float().sum().item()
        train_total += images.size(0)

        total_loss.backward()

        if device == "cpu":
            optimizer.step()
        else :
            convert_models_to_fp32(model)
            optimizer.step()
            clip.model.convert_weights(model)

        train_accuracy = 100 * train_correct / train_total
        pbar.set_description(f"Epoch {epoch+1}/{EPOCH}, Loss: {total_loss.item():.4f}, Train Acc: {train_accuracy:.2f}%")

    # Validation loop
    model.eval()
    val_total, val_correct = 0, 0
    with torch.no_grad():
        for batch in valloader:
            images, texts = batch
            texts = texts.squeeze(1)
            images = images.to(device)
            texts = texts.to(device)

            logits_per_image, _ = model(images, texts)

            actual_batch_size = logits_per_image.size(0)
            ground_truth = torch.arange(actual_batch_size).to(device)
            # ground_truth = torch.arange(BATCH_SIZE).to(device)

            val_correct += (logits_per_image.argmax(dim=1) == ground_truth).float().sum().item()
            val_total += images.size(0)

    val_accuracy = 100 * val_correct / val_total
    print(f"Validation Accuracy: {val_accuracy:.2f}%")

Epoch: 1


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 68.49%
Epoch: 2


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 73.12%
Epoch: 3


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 70.53%


In [52]:
torch.save(model.state_dict(), 'CLIP_with_scientific_5.pth')

In [42]:
loaded_model, preprocess = clip.load("ViT-B/32", device=device, jit=False)
loaded_model = loaded_model.to(torch.float32)
loaded_model.load_state_dict(torch.load('CLIP_with_scientific_5.pth'))
loaded_model.to(device)

CLIP(
  (visual): VisionTransformer(
    (conv1): Conv2d(3, 768, kernel_size=(32, 32), stride=(32, 32), bias=False)
    (ln_pre): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (transformer): Transformer(
      (resblocks): Sequential(
        (0): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          )
          (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): Sequential(
            (c_fc): Linear(in_features=768, out_features=3072, bias=True)
            (gelu): QuickGELU()
            (c_proj): Linear(in_features=3072, out_features=768, bias=True)
          )
          (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        )
        (1): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          

In [34]:
from google.colab import drive
drive.mount('/content/drive')

file_path = '/content/drive/MyDrive/CLIP_with_scientific_5.pth'
torch.save(model[0].state_dict(), file_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [49]:
from tqdm.notebook import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

def compute_accuracy(logits, ground_truth):
    _, predicted = logits.max(1)
    total = ground_truth.size(0)
    correct = (predicted == ground_truth).sum().item()
    return correct / total


add_epoch = 15

for epoch in range(add_epoch):
    print(f"Epoch: {epoch+1}")

    # Training loop
    model.train()
    train_total, train_correct = 0, 0
    pbar = tqdm(trainloader, total=len(trainloader))
    for batch in pbar:
        optimizer.zero_grad()

        images, texts = batch
        texts = texts.squeeze(1)
        images = images.to(device)
        texts = texts.to(device)

        logits_per_image, logits_per_text = model(images, texts)

        # Compute loss
        # ground_truth = torch.arange(BATCH_SIZE).to(device)
        actual_batch_size = images.size(0)
        ground_truth = torch.arange(actual_batch_size).to(device)
        total_loss = (loss_img(logits_per_image, ground_truth) + loss_txt(logits_per_text, ground_truth)) / 2

        # Compute train accuracy
        train_correct += (logits_per_image.argmax(dim=1) == ground_truth).float().sum().item()
        train_total += images.size(0)

        total_loss.backward()

        if device == "cpu":
            optimizer.step()
        else :
            convert_models_to_fp32(model)
            optimizer.step()
            clip.model.convert_weights(model)

        train_accuracy = 100 * train_correct / train_total
        pbar.set_description(f"Epoch {epoch+1}/{add_epoch}, Loss: {total_loss.item():.4f}, Train Acc: {train_accuracy:.2f}%")

    # Validation loop
    model.eval()
    val_total, val_correct = 0, 0
    with torch.no_grad():
        for batch in valloader:
            images, texts = batch
            texts = texts.squeeze(1)
            images = images.to(device)
            texts = texts.to(device)

            logits_per_image, _ = model(images, texts)

            actual_batch_size = logits_per_image.size(0)
            ground_truth = torch.arange(actual_batch_size).to(device)
            # ground_truth = torch.arange(BATCH_SIZE).to(device)

            val_correct += (logits_per_image.argmax(dim=1) == ground_truth).float().sum().item()
            val_total += images.size(0)

    val_accuracy = 100 * val_correct / val_total
    print(f"Validation Accuracy: {val_accuracy:.2f}%")


torch.save(model[0].state_dict(), 'CLIP_with_scientific_15.pth')

Epoch: 1


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 70.96%
Epoch: 2


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 72.84%
Epoch: 3


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 72.61%
Epoch: 4


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 74.10%
Epoch: 5


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 75.12%
Epoch: 6


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 75.90%
Epoch: 7


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 76.69%
Epoch: 8


  0%|          | 0/279 [00:00<?, ?it/s]

Validation Accuracy: 76.14%
Epoch: 9


  0%|          | 0/279 [00:00<?, ?it/s]

OSError: ignored

In [51]:
torch.save(model.state_dict(), 'CLIP_with_scientific_15.pth')

In [53]:
loaded_model, preprocess = clip.load("ViT-B/32", device=device, jit=False)
loaded_model = loaded_model.to(torch.float32)
loaded_model.load_state_dict(torch.load('CLIP_with_scientific_15.pth'))
loaded_model.to(device)

CLIP(
  (visual): VisionTransformer(
    (conv1): Conv2d(3, 768, kernel_size=(32, 32), stride=(32, 32), bias=False)
    (ln_pre): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
    (transformer): Transformer(
      (resblocks): Sequential(
        (0): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          )
          (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
          (mlp): Sequential(
            (c_fc): Linear(in_features=768, out_features=3072, bias=True)
            (gelu): QuickGELU()
            (c_proj): Linear(in_features=3072, out_features=768, bias=True)
          )
          (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        )
        (1): ResidualAttentionBlock(
          (attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=768, out_features=768, bias=True)
          

In [54]:

loaded_model.eval()

test_total, test_correct = 0, 0
predicted_labels = []  # To store the predicted labels

with torch.no_grad():
    for batch in testloader:
        images, texts = batch
        texts = texts.squeeze(1)
        images = images.to(device)
        texts = texts.to(device)

        logits_per_image, _ = model(images, texts)

        actual_batch_size = logits_per_image.size(0)
        ground_truth = torch.arange(actual_batch_size).to(device)

        # Compute the predicted labels
        predicted = logits_per_image.argmax(dim=1).cpu().numpy()
        predicted_labels.extend(predicted)

        test_correct += (predicted == ground_truth.cpu().numpy()).sum()
        test_total += images.size(0)

test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")

# Calculate F1 score
# f1 = f1_score(true_labels, predicted_labels, average='weighted')
# print(f"F1 Score: {f1:.4f}")

Test Accuracy: 74.57%
