In [None]:
#Fine-Tuning the BLIP Model
import json
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration

# Initialize the processor and model for BLIP
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model_img_captioning = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to("cpu")

# Define the dataset class
class AnnotatedFrameDataset(Dataset):
    def __init__(self, annotations_file, processor):
        with open(annotations_file, 'r') as f:
            self.annotations = json.load(f)["videos"]
        self.processor = processor

    def __len__(self):
        return sum(len(video["frames"]) for video in self.annotations)

    def __getitem__(self, idx):
        video_idx, frame_idx = 0, idx
        while frame_idx >= len(self.annotations[video_idx]["frames"]):
            frame_idx -= len(self.annotations[video_idx]["frames"])
            video_idx += 1

        frame_info = self.annotations[video_idx]["frames"][frame_idx]
        image = Image.open(frame_info["frame_file"]).convert("RGB")
        description = frame_info["annotations"]["description"]
        inputs = self.processor(images=image, text=description, return_tensors="pt", padding="max_length", truncation=True)
        return inputs.input_ids.squeeze(), inputs.attention_mask.squeeze(), inputs.labels.squeeze()

# Load the dataset
dataset = AnnotatedFrameDataset("annotations.json", processor)
dataloader = DataLoader(dataset, batch_size=4, shuffle=True)

# Fine-tuning loop
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_img_captioning.to(device)
optimizer = torch.optim.AdamW(model_img_captioning.parameters(), lr=5e-5)

model_img_captioning.train()
for epoch in range(3):  # Adjust the number of epochs as needed
    for batch in dataloader:
        input_ids, attention_mask, labels = [x.to(device) for x in batch]
        outputs = model_img_captioning(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

print("Fine-tuning complete.")

# Save the fine-tuned model
model_img_captioning.save_pretrained("path_to_save_fine_tuned_model")
processor.save_pretrained("path_to_save_fine_tuned_model")


In [None]:
#Generating Captions and Summaries for Unseen Videos
import cv2
import os
from PIL import Image
from transformers import BlipProcessor, BlipForConditionalGeneration, T5Tokenizer, T5ForConditionalGeneration

# Load the fine-tuned processor and model
processor = BlipProcessor.from_pretrained("path_to_save_fine_tuned_model")
model_img_captioning = BlipForConditionalGeneration.from_pretrained("path_to_save_fine_tuned_model").to("cpu")

# Initialize the tokenizer and model for summarization
tokenizer_t5 = T5Tokenizer.from_pretrained("t5-small")
model_t5 = T5ForConditionalGeneration.from_pretrained("t5-small").to("cpu")

def generate_caption(pil_image):
    inputs = processor(images=pil_image, return_tensors="pt")
    inputs = {k: v.to("cpu") for k, v in inputs.items()}
    out = model_img_captioning.generate(**inputs)
    caption = processor.decode(out[0], skip_special_tokens=True)
    return caption

# Define paths
video_folder_path = '/Users/kristinakuznetsova/Downloads/untitledfolder'
frames_folder = "/Users/kristinakuznetsova/Downloads/frames"
os.makedirs(frames_folder, exist_ok=True)

# Process each video
video_files = [f for f in os.listdir(video_folder_path) if f.endswith(('mp4', 'avi', 'mkv'))]
all_video_captions = []

for video_file in video_files:
    video_path = os.path.join(video_folder_path, video_file)
    cap = cv2.VideoCapture(video_path)
    video_captions = []
    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    sample_interval = int(frame_rate)  # sample one frame per second
    frame_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        if frame_count % sample_interval == 0:
            pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            caption = generate_caption(pil_image)
            video_captions.append(caption)

            # Save the frame
            frame_filename = f"{os.path.splitext(video_file)[0]}_{frame_count}.jpg"
            frame_path = os.path.join(frames_folder, frame_filename)
            cv2.imwrite(frame_path, frame)

        frame_count += 1

    cap.release()

    if video_captions:
        input_text = "summarize: " + " ".join(video_captions)
        inputs = tokenizer_t5(input_text, return_tensors="pt", max_length=512, truncation=True)
        summary_ids = model_t5.generate(inputs.input_ids, max_length=150, min_length=40, length_penalty=2.0, num_beams=4, early_stopping=True)
        summary = tokenizer_t5.decode(summary_ids[0], skip_special_tokens=True)
        all_video_captions.append(f"{video_file}: {summary}")

print("\nAll video summaries:")
for video_summary in all_video_captions:
    print(video_summary)
