In [1]:
import os
import json
import torch
import clip
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from transformers import CLIPProcessor, CLIPModel
from nltk.translate.bleu_score import sentence_bleu
from rouge_score import rouge_scorer
import logging
from pathlib import Path
from pycocotools.coco import COCO
import requests
import zipfile
import shutil

# Setup logging
logging.basicConfig(filename='captioning_log.txt', level=logging.INFO)

# Directories
IMG_FOLDER = "img_folder"
METADATA_FOLDER = "metadata_folder"
OUTPUT_FOLDER = "output_folder"
DATASET_FOLDER = "coco_dataset"
os.makedirs(IMG_FOLDER, exist_ok=True)
os.makedirs(METADATA_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
os.makedirs(DATASET_FOLDER, exist_ok=True)

# Load CLIP model and processor
device = "cuda" if torch.cuda.is_available() else "cpu"
model = CLIPModel.from_pretrained("openai/clip-vit-large-patch14-336").to(device)
processor = CLIPProcessor.from_pretrained("openai/clip-vit-large-patch14-336")

# Download and prepare COCO dataset
def download_coco_dataset():
    logging.info("Downloading COCO 2017 dataset...")
    # Download a small subset of COCO images and annotations
    img_url = "http://images.cocodataset.org/train2017/0000000000*.jpg"  # Example subset
    ann_url = "http://images.cocodataset.org/annotations/annotations_trainval2017.zip"
    
    # Download annotations
    ann_path = os.path.join(DATASET_FOLDER, "annotations.zip")
    if not os.path.exists(ann_path):
        response = requests.get(ann_url)
        with open(ann_path, 'wb') as f:
            f.write(response.content)
        with zipfile.ZipFile(ann_path, 'r') as zip_ref:
            zip_ref.extractall(DATASET_FOLDER)
    
    # Simulate downloading a few images (replace with actual download logic)
    # For demo, assume images are in DATASET_FOLDER/train2017/
    os.makedirs(os.path.join(DATASET_FOLDER, "train2017"), exist_ok=True)
    # Placeholder: Copy a few sample images to img_folder for testing
    # In practice, download images using img_url or use existing images

# Fine-tune CLIP model
def fine_tune_model(model, processor, dataset_path=DATASET_FOLDER):
    logging.info("Fine-tuning CLIP model with COCO dataset...")
    coco = COCO(os.path.join(dataset_path, "annotations/instances_train2017.json"))
    caption_file = os.path.join(dataset_path, "annotations/captions_train2017.json")
    coco_caps = COCO(caption_file)
    
    # Get image IDs
    img_ids = coco.getImgIds()
    train_data = []
    
    for img_id in img_ids[:1000]:  # Limit to 1000 images for demo
        img_info = coco.loadImgs(img_id)[0]
        img_path = os.path.join(dataset_path, "train2017", img_info['file_name'])
        ann_ids = coco_caps.getAnnIds(imgIds=img_id)
        captions = [ann['caption'] for ann in coco_caps.loadAnns(ann_ids)]
        if os.path.exists(img_path) and captions:
            train_data.append((img_path, captions[0]))  # Use first caption
    
    # Fine-tuning loop
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
    for epoch in range(2):  # Limited epochs for demo
        for img_path, caption in train_data:
            try:
                image = Image.open(img_path).convert("RGB")
                inputs = processor(text=[caption], images=image, return_tensors="pt", padding=True).to(device)
                outputs = model(**inputs)
                loss = outputs.loss
                loss.backward()
                optimizer.step()
                optimizer.zero_grad()
            except Exception as e:
                logging.error(f"Error in fine-tuning for {img_path}: {e}")
    
    model.eval()
    logging.info("Fine-tuning completed.")
    return model, processor

# Preprocess image
def preprocess_image(image_path):
    try:
        image = Image.open(image_path).convert("RGB")
        image = image.resize((336, 336))
        image_np = np.array(image).astype(np.float32) / 255.0
        return Image.fromarray((image_np * 255).astype(np.uint8))
    except Exception as e:
        logging.error(f"Error preprocessing image {image_path}: {e}")
        return None

# Read metadata
def read_metadata(metadata_path):
    metadata = {
        "section_header": None,
        "above_text": None,
        "caption": None,
        "picture_id": None,
        "footnote": None,
        "below_text": None
    }
    try:
        with open(metadata_path, 'r') as f:
            lines = f.readlines()
            for line in lines:
                key, value = line.strip().split(": ", 1)
                metadata[key] = value if value != "null" else None
        return metadata
    except Exception as e:
        logging.error(f"Error reading metadata {metadata_path}: {e}")
        return metadata

# Generate captions
def generate_captions(image, metadata, model, processor):
    text_inputs = []
    if metadata["section_header"]:
        text_inputs.append(f"Section: {metadata['section_header']}")
    if metadata["above_text"]:
        text_inputs.append(f"Above: {metadata['above_text']}")
    if metadata["below_text"]:
        text_inputs.append(f"Below: {metadata['below_text']}")
    if metadata["footnote"]:
        text_inputs.append(f"Footnote: {metadata['footnote']}")
    context = " ".join(text_inputs)
    
    inputs = processor(text=[context], images=image, return_tensors="pt", padding=True).to(device)
    
    with torch.no_grad():
        outputs = model(**inputs)
        image_embeds = outputs.image_embeds
        text_embeds = outputs.text_embeds
    
    # Simulated caption generation (using context)
    concise_caption = f"Summary of {context or 'image content'}."
    detailed_caption = f"Detailed description: {context or 'image content'}."
    
    # Confidence scores (based on cosine similarity)
    cosine_sim = torch.cosine_similarity(image_embeds, text_embeds).item()
    concise_confidence = min(cosine_sim, 0.95)
    detailed_confidence = min(cosine_sim * 0.9, 0.90)
    
    # Check consistency
    if metadata["section_header"] and metadata["section_header"].lower() not in detailed_caption.lower():
        detailed_confidence *= 0.5
        logging.warning(f"Low confidence for detailed caption due to metadata mismatch: {metadata['section_header']}")
    
    return {
        "concise": {"caption": concise_caption, "confidence": concise_confidence},
        "detailed": {"caption": detailed_caption, "confidence": detailed_confidence}
    }

# Overlay captions on image
def overlay_captions(image, captions):
    draw = ImageDraw.Draw(image)
    try:
        font = ImageFont.truetype("arial.ttf", 20)
    except:
        font = ImageFont.load_default()
    
    y_concise = image.height - 60
    y_detailed = image.height - 30
    
    concise_text = captions["concise"]["caption"]
    concise_confidence = captions["concise"]["confidence"]
    draw.text((10, y_concise), concise_text, fill="blue", font=font)
    if concise_confidence < 0.7:
        draw.line((10, y_concise + 20, 10 + len(concise_text) * 10, y_concise + 20), fill="blue")
    
    detailed_text = captions["detailed"]["caption"]
    detailed_confidence = captions["detailed"]["confidence"]
    draw.text((10, y_detailed), detailed_text, fill="red", font=font)
    if detailed_confidence < 0.7:
        draw.line((10, y_detailed + 20, 10 + len(detailed_text) * 10, y_detailed + 20), fill="red")
    
    return image

# Evaluate captions
def evaluate_captions(generated, reference):
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
    bleu_score = sentence_bleu([reference.split()], generated.split())
    rouge_scores = scorer.score(reference, generated)
    return {"bleu": bleu_score, "rouge1": rouge_scores['rouge1'].fmeasure, "rougeL": rouge_scores['rougeL'].fmeasure}

# Main processing loop
def process_images():
    captions_output = {}
    
    for img_file in os.listdir(IMG_FOLDER):
        if img_file.endswith(('.png', '.jpg', '.jpeg')):
            img_path = os.path.join(IMG_FOLDER, img_file)
            metadata_path = os.path.join(METADATA_FOLDER, img_file.rsplit('.', 1)[0] + '.txt')
            
            image = preprocess_image(img_path)
            if not image:
                continue
                
            metadata = read_metadata(metadata_path)
            
            captions = generate_captions(image, metadata, model, processor)
            
            annotated_image = overlay_captions(image.copy(), captions)
            
            output_img_path = os.path.join(OUTPUT_FOLDER, f"annotated_{img_file}")
            annotated_image.save(output_img_path)
            
            eval_scores = {}
            if metadata["caption"]:
                eval_scores["concise"] = evaluate_captions(captions["concise"]["caption"], metadata["caption"])
                eval_scores["detailed"] = evaluate_captions(captions["detailed"]["caption"], metadata["caption"])
            
            captions_output[img_file] = {
                "concise_caption": captions["concise"],
                "detailed_caption": captions["detailed"],
                "evaluation": eval_scores
            }
    
    with open(os.path.join(OUTPUT_FOLDER, "captions.json"), 'w') as f:
        json.dump(captions_output, f, indent=4)
    
    logging.info("Processing completed successfully.")

if __name__ == "__main__":
    # Download and prepare dataset
    download_coco_dataset()
    
    # Fine-tune model
    model, processor = fine_tune_model(model, processor)
    
    # Process images
    process_images()



loading annotations into memory...
Done (t=11.99s)
creating index...
index created!
loading annotations into memory...
Done (t=0.74s)
creating index...
index created!


The hypothesis contains 0 counts of 2-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 3-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
The hypothesis contains 0 counts of 4-gram overlaps.
Therefore the BLEU score evaluates to 0, independently of
how many N-gram overlaps of lower order it contains.
Consider using lower n-gram order or use SmoothingFunction()
