In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install datasets
!pip install evaluate

Collecting datasets
  Downloading datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl.metadata (7.2 kB)
Collecting fsspec<=2024.9.0,>=2023.1.0 (from fsspec[http]<=2024.9.0,>=2023.1.0->datasets)
  Downloading fsspec-2024.9.0-py3-none-any.whl.metadata (11 kB)
Downloading datasets-3.1.0-py3-none-any.whl (480 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m480.6/480.6 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m7.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading fsspec-2024.9.0-py3-none-any.whl (

In [None]:
#library import
import torch
from torch.utils.data import DataLoader, random_split
from transformers import BlipProcessor, BlipForConditionalGeneration
from datasets import load_dataset
import numpy as np
from tqdm import tqdm
import os
import zipfile
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
from evaluate import load
import torch.nn.functional as F
import torchvision.transforms as transforms


In [None]:
# Configurations
BATCH_SIZE = 16
LEARNING_RATE = 1e-4
EPOCHS = 20
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_NAME = "Salesforce/blip-image-captioning-base"


In [None]:
#set zip file path
zip_path = '/content/drive/MyDrive/가야 딥러닝2 NLP/가야_final/flickr8k.zip'  # file path of  Google Drive
extract_path = '/content/flickr8k/'  # extract file path

# extract zip file
if not os.path.exists(extract_path):
    os.makedirs(extract_path)
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        print("Extracting zip file...")
        zip_ref.extractall(extract_path)
        print("Extraction complete!")

# file path of image and cpation
image_folder = os.path.join(extract_path, 'Images')  # a folder containing image data
caption_file = os.path.join(extract_path, 'captions.txt')  # caption data file

assert os.path.exists(image_folder), "Image folder not found!"
assert os.path.exists(caption_file), "Caption file not found!"

Extracting zip file...
Extraction complete!


In [None]:
# caption file load and parsing
def load_captions(caption_file):
    captions_dict = {}
    with open(caption_file, 'r') as f:
        lines = f.readlines()[1:]  # first line is a header
        for line in lines:
            image_name, caption = line.strip().split(',', 1)
            caption = caption.strip()
            if image_name in captions_dict:
                captions_dict[image_name].append(caption)
            else:
                captions_dict[image_name] = [caption]
    return captions_dict

captions = load_captions(caption_file)

In [None]:
# caption check
first_3_captions = {}
for i, (key, value) in enumerate(captions.items()):
    if i == 3:
        break
    first_3_captions[key] = value

print(first_3_captions)

{'1000268201_693b08cb0e.jpg': ['A child in a pink dress is climbing up a set of stairs in an entry way .', 'A girl going into a wooden building .', 'A little girl climbing into a wooden playhouse .', 'A little girl climbing the stairs to her playhouse .', 'A little girl in a pink dress going into a wooden cabin .'], '1001773457_577c3a7d70.jpg': ['A black dog and a spotted dog are fighting', 'A black dog and a tri-colored dog playing with each other on the road .', 'A black dog and a white dog with brown spots are staring at each other in the street .', 'Two dogs of different breeds looking at each other on the road .', 'Two dogs on pavement moving toward each other .'], '1002674143_1b742ab4b8.jpg': ['A little girl covered in paint sits in front of a painted rainbow with her hands in a bowl .', 'A little girl is sitting in front of a large painted rainbow .', 'A small girl in the grass plays with fingerpaints in front of a white canvas with a rainbow on it .', 'There is a girl with pigt

In [None]:
# Custom dataset for train
class Flickr8kDataset():
    def __init__(self, image_folder, captions, processor):
        self.image_folder = image_folder
        self.captions = captions
        self.processor = processor
        self.image_names = list(captions.keys())

    def __len__(self):
        return len(self.image_names)

    def __getitem__(self, idx):
        image_name = self.image_names[idx]
        image_path = os.path.join(self.image_folder, image_name)
        image = Image.open(image_path).convert("RGB")
        caption = self.captions[image_name][0]  # 첫 번째 캡션 사용

        inputs = self.processor(images=image, text=caption, return_tensors="pt", padding="max_length", max_length=32, truncation=True, size=(224, 224))
        return {
            "pixel_values": inputs["pixel_values"].squeeze(0),
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
            "labels": inputs["input_ids"].squeeze(0),
        }


In [None]:
# Processor and Dataset
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
dataset = Flickr8kDataset(image_folder=image_folder, captions=captions, processor=processor)

# Train-Test-Validation Split
train_size = int(0.75 * len(dataset))
valid_size = int(0.125 * len(dataset))
test_size = len(dataset) - train_size - valid_size

train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, valid_size, test_size], generator=torch.Generator().manual_seed(42))

In [None]:
# DataLoader
batch_size = BATCH_SIZE
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)

config.json:   0%|          | 0.00/4.56k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/990M [00:00<?, ?B/s]

In [None]:
# train one epoch
def train_one_epoch(model, dataloader, optimizer):
    model.train()
    total_loss = 0
    for batch in tqdm(dataloader, desc="Training"):
        pixel_values = batch["pixel_values"].to(device)
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()
        outputs = model(pixel_values=pixel_values, labels=labels,input_ids = input_ids, attention_mask=attention_mask)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

# evaluate
def evaluate(model, dataloader):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            pixel_values = batch["pixel_values"].to(device)
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)


            outputs = model(pixel_values=pixel_values, labels=labels,input_ids = input_ids, attention_mask=attention_mask)
            total_loss += outputs.loss.item()
    return total_loss / len(dataloader)

# Optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)

In [None]:

best_valid_loss = float("inf")  # save the smallest loss

# train
for epoch in range(EPOCHS):
    train_loss = train_one_epoch(model, train_loader, optimizer)
    valid_loss = evaluate(model, valid_loader)

    # save weight when the loss is the smallest
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), "best_model_weights.pt")
        print(f"Model weights saved at epoch {epoch+1} with valid loss: {valid_loss:.4f}")

    print(f"Epoch {epoch+1}/{EPOCHS}, Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}")

# evaluate test data
test_loss = evaluate(model, test_loader)
print(f"Test Loss: {test_loss:.4f}")


Training: 100%|██████████| 380/380 [07:08<00:00,  1.13s/it]
Evaluating: 100%|██████████| 64/64 [00:28<00:00,  2.25it/s]


Model weights saved at epoch 1 with valid loss: 1.0184
Epoch 1/20, Train Loss: 0.7959, Valid Loss: 1.0184


Training: 100%|██████████| 380/380 [07:07<00:00,  1.13s/it]
Evaluating: 100%|██████████| 64/64 [00:28<00:00,  2.21it/s]


Epoch 2/20, Train Loss: 0.5814, Valid Loss: 1.0773


Training: 100%|██████████| 380/380 [07:08<00:00,  1.13s/it]
Evaluating: 100%|██████████| 64/64 [00:27<00:00,  2.29it/s]


Epoch 3/20, Train Loss: 0.4213, Valid Loss: 1.1570


Training: 100%|██████████| 380/380 [07:07<00:00,  1.12s/it]
Evaluating: 100%|██████████| 64/64 [00:28<00:00,  2.28it/s]


Epoch 4/20, Train Loss: 0.2908, Valid Loss: 1.2488


Training:   3%|▎         | 12/380 [00:14<07:28,  1.22s/it]


KeyboardInterrupt: 

In [None]:
#stopped training because the valid loss showed an increasing trend as the epochs progressed.
# it tend to overfit.
# first epoch was the best
# will use pretrained weight of the first epoch

In [None]:
# Initialize BLIP model and load weights
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base")
weights_path = "/content/best_model_weights.pt"
blip_model.load_state_dict(torch.load(weights_path))  # Load pre-trained weights
blip_model = blip_model.to(device)
blip_model.eval()  # Set model to evaluation mode

# Initialize CLIP model and processor
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(device)
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Initialize BLIP processor
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")

# Initialize test DataLoader
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Initialize metrics
total_caption_length = 0
unique_words = set()
generated_captions = []
true_captions = []
total_clip_score = 0
total_perplexity = 0
image_features_list = []
text_features_list = []



# Evaluation loop
with torch.no_grad():
    for batch in tqdm(test_loader, desc="Evaluating"):
        # Prepare inputs
        # Get pixel_values (image tensor)

        pixel_values = batch["pixel_values"].to(device)
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)


        # Generate captions using BLIP model
        outputs = blip_model.generate(pixel_values=pixel_values, max_length=32)
        generated_caption = blip_processor.decode(outputs[0], skip_special_tokens=True)
        true_caption = blip_processor.decode(labels[0], skip_special_tokens=True)

        # Calculate Caption Length
        total_caption_length += len(generated_caption.split())

        # Collect unique words for Lexical Diversity
        unique_words.update(generated_caption.split())

        # Store captions for later evaluation
        generated_captions.append(generated_caption)
        true_captions.append(true_caption)



        # Generate caption with BLIP model
        outputs = blip_model.generate(pixel_values=pixel_values, max_length=32)
        generated_caption = blip_processor.decode(outputs[0], skip_special_tokens=True)
        true_caption = blip_processor.decode(labels[0], skip_special_tokens=True)

        # Compute image features using CLIP
        image_features = clip_model.get_image_features(pixel_values)

        # Compute text features for generated and true captions
        generated_text_inputs = clip_processor(text=[generated_caption], return_tensors="pt", padding=True).to(device)
        true_text_inputs = clip_processor(text=[true_caption], return_tensors="pt", padding=True).to(device)

        generated_text_features = clip_model.get_text_features(**generated_text_inputs)
        true_text_features = clip_model.get_text_features(**true_text_inputs)

        # Compute cosine similarity
        generated_score = F.cosine_similarity(image_features, generated_text_features).mean().item()
        true_score = F.cosine_similarity(image_features, true_text_features).mean().item()

        # Average the generated and true scores as the final CLIPScore
        clip_score = (generated_score + true_score) / 2
        total_clip_score += clip_score

        # Compute Perplexity
        logits = blip_model(pixel_values=pixel_values, labels=labels, input_ids=input_ids).logits
        loss_fn = torch.nn.CrossEntropyLoss(ignore_index=blip_processor.tokenizer.pad_token_id)
        loss = loss_fn(logits.view(-1, logits.size(-1)), labels.view(-1))
        perplexity = torch.exp(loss)
        total_perplexity += perplexity.item()

        # Extract image and text embeddings for CLIP self-retrieval
        image_features = clip_model.get_image_features(pixel_values)
        text_inputs = clip_processor(text=[generated_caption], return_tensors="pt", padding=True).to(device)
        text_features = clip_model.get_text_features(**text_inputs)
        image_features_list.append(image_features)
        text_features_list.append(text_features)

# Combine embeddings
image_features_tensor = torch.cat(image_features_list, dim=0)
text_features_tensor = torch.cat(text_features_list, dim=0)

# Compute Recall@1 and Recall@5
num_samples = len(test_loader)
similarity_scores = torch.matmul(image_features_tensor, text_features_tensor.T)  # Cosine similarity
top_k_indices = similarity_scores.topk(5, dim=1).indices
r1_count = 0
r5_count = 0

for i in range(num_samples):
    if i in top_k_indices[i][:1]:  # Recall@1
        r1_count += 1
    if i in top_k_indices[i][:5]:  # Recall@5
        r5_count += 1




  blip_model.load_state_dict(torch.load(weights_path))  # Load pre-trained weights
Evaluating:   0%|          | 0/1012 [00:00<?, ?it/s]We strongly recommend passing in an `attention_mask` since your input_ids may be padded. See https://huggingface.co/docs/transformers/troubleshooting#incorrect-output-when-padding-tokens-arent-masked.
Evaluating: 100%|██████████| 1012/1012 [10:12<00:00,  1.65it/s]


Caption Length (Cap. Len.): 11.24
Lexical Diversity (Lex. Div.): 0.06
Recall@1 (R@1): 0.10
Recall@5 (R@5): 0.29
CLIPScore: 0.30
Perplexity (PPL): 24792.13


In [None]:
# Calculate final metrics
cap_len = total_caption_length / num_samples
lexical_diversity = len(unique_words) / total_caption_length
recall_at_1 = r1_count / num_samples * 100
recall_at_5 = r5_count / num_samples * 100
average_clip_score = total_clip_score / num_samples * 100
average_ppl = total_perplexity / num_samples

# Print results
print(f"Caption Length (Cap. Len.): {cap_len:.2f}")
print(f"Lexical Diversity (Lex. Div.): {lexical_diversity:.2f}")
print(f"Recall@1 (R@1): {recall_at_1:.2f}")
print(f"Recall@5 (R@5): {recall_at_5:.2f}")
print(f"CLIPScore: {average_clip_score:.2f}")
print(f"Perplexity (PPL): {average_ppl:.2f}")

Caption Length (Cap. Len.): 11.24
Lexical Diversity (Lex. Div.): 0.06
Recall@1 (R@1): 10.18
Recall@5 (R@5): 28.75
CLIPScore: 30.27
Perplexity (PPL): 24792.13


In [None]:
#test with real image

# Load an image
image_path = "/content/test_image.jpg"
image = Image.open(image_path).convert("RGB")  # Open the image and convert to RGB

# Preprocess the image
inputs = blip_processor(images=image, return_tensors="pt").to(device)

# Generate caption
with torch.no_grad():
    outputs = blip_model.generate(**inputs, max_length=32, num_beams=5)
    generated_caption = blip_processor.decode(outputs[0], skip_special_tokens=True)

# Print the result
print(f"Generated Caption: {generated_caption}")

Generated Caption: a little girl in a pink shirt holds a rope in front of her face.
