In [1]:
import cv2
import os
from transformers import BlipProcessor, BlipForConditionalGeneration, pipeline
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import torch
from PIL import Image
from difflib import SequenceMatcher
import re

# Load BLIP model and processor for captioning
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
blip_processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
blip_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device)

# Load Sentence Transformer model for embedding captions
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

# Load Summarization model from Hugging Face
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")

# Function to extract frames from video
def extract_frames_from_video(video_path, output_folder, frame_rate=1):
    cap = cv2.VideoCapture(video_path)
    count = 0
    success = True
    fps = int(cap.get(cv2.CAP_PROP_FPS))
# Check if fps is zero and handle it
    if fps == 0:
        print(f"Error: Video has 0 fps. Cannot extract frames.")
        return
    while success:
        success, frame = cap.read()
        if success and count % (fps * frame_rate) == 0:
            frame_path = os.path.join(output_folder, f"frame_{count}.jpg")
            cv2.imwrite(frame_path, frame)
        count += 1
    cap.release()
    print(f"Extracted {count // (fps * frame_rate)} frames from {video_path}")

# Generate captions for each frame
def generate_caption(image_path):
    image = Image.open(image_path).convert("RGB")
    inputs = blip_processor(image, return_tensors="pt").to(device)
    output = blip_model.generate(**inputs)
    caption = blip_processor.decode(output[0], skip_special_tokens=True)
    return caption

# Compute cosine similarity for captions
def compute_similarity(captions):
    embeddings = embedding_model.encode(captions)
    similarity_matrix = cosine_similarity(embeddings)
    return similarity_matrix

# Group captions based on cosine similarity threshold
def group_captions(captions, similarity_matrix, threshold=0.8):
    n = len(captions)
    groups = []
    visited = set()

    for i in range(n):
        if i in visited:
            continue

        group = [captions[i]]
        visited.add(i)

        for j in range(i+1, n):
            if j not in visited and similarity_matrix[i, j] >= threshold:
                group.append(captions[j])
                visited.add(j)

        groups.append(group)

    return groups

# Function to remove redundant captions based on similarity
def remove_redundant_captions(captions, threshold=0.7):
    filtered_captions = []
    for caption in captions:
        if not any(SequenceMatcher(None, caption, existing).ratio() > threshold for existing in filtered_captions):
            filtered_captions.append(caption)
    return filtered_captions

def clean_caption(caption):
    # Remove any unwanted phrases or website mentions
    unwanted_patterns = [r"CNN.com", r"iReporter", r"gallery", r"next week", r"next Wednesday"]
    for pattern in unwanted_patterns:
        caption = re.sub(pattern, '', caption, flags=re.IGNORECASE)

    # Remove repeated or meaningless phrases like "A computer is a computer."
    caption = re.sub(r"\b(\w+)\s+\1\b", r"\1", caption)

    # Strip extra whitespace and periods
    caption = caption.strip(". ")
    return caption

# Summarize groups of captions using Hugging Face summarization model
def summarize_groups(groups):
    summaries = []

    for group in groups:
        # Remove redundant captions within the group
        distinct_group = remove_redundant_captions(group)
         # Clean each caption before summarizing
        distinct_group = [clean_caption(caption) for caption in distinct_group]
        # Prepare text to summarize
        text_to_summarize = " ".join(distinct_group)

        # Use the Hugging Face summarizer
        summary = summarizer(text_to_summarize, max_length=20, min_length=10, do_sample=False)[0]['summary_text']
        summaries.append(summary)

    return summaries

# Main function to process video
def process_video(video_path, frame_output_folder, frame_rate=1, similarity_threshold=0.8):
    # Step 1: Extract frames from the video
    if not os.path.exists(frame_output_folder):
        os.makedirs(frame_output_folder)
    extract_frames_from_video(video_path, frame_output_folder, frame_rate)

    # Step 2: Generate captions for each frame
    frame_paths = [os.path.join(frame_output_folder, frame) for frame in os.listdir(frame_output_folder) if frame.endswith('.jpg')]
    captions = [generate_caption(frame) for frame in frame_paths]

    # Step 3: Compute similarity matrix
    similarity_matrix = compute_similarity(captions)

    # Step 4: Group captions based on similarity threshold
    groups = group_captions(captions, similarity_matrix, threshold=similarity_threshold)

    # Step 5: Summarize each group of captions
    final_summaries = summarize_groups(groups)

    return final_summaries

# Example usage
video_path = 'E:\\ufc_crime\\captioning_with_share_gtp\\motion-detection-computer-room-door-1920x1080 (1).mp4'
frame_output_folder = '/content'
final_captions = process_video(video_path, frame_output_folder, frame_rate=1, similarity_threshold=0.8)

print("Final Video Captions:")
for i, caption in enumerate(final_captions, 1):
    print(f"Caption {i}: {caption}")


  from .autonotebook import tqdm as notebook_tqdm


: 