In [None]:
# Install dependencies (if not already installed)
!pip install ftfy regex tqdm
!pip install git+https://github.com/openai/CLIP.git
!pip install opencv-python
!pip install gradio
!pip install transformers
!apt-get install -y ffmpeg

In [None]:
import torch
import clip
from PIL import Image
import cv2
import numpy as np
import os
import uuid
import gradio as gr
import matplotlib.pyplot as plt

device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

def process_video(video_file, query):
    
    try:
        video_path = video_file if isinstance(video_file, str) else video_file.name

        text_input = clip.tokenize([query]).to(device)
        with torch.no_grad():
            text_feature = model.encode_text(text_input)
            text_feature = text_feature / text_feature.norm(dim=-1, keepdim=True)

        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        if fps == 0:
            return "Error: Unable to retrieve FPS.", None, None
        frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = frame_count / fps

        timestamps = []
        similarities = []
        features_list = []

        frame_index = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            if frame_index % 5 == 0:
                timestamp = frame_index / fps
                timestamps.append(timestamp)
                image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                image_input = preprocess(image).unsqueeze(0).to(device)
                with torch.no_grad():
                    image_feature = model.encode_image(image_input)
                    image_feature = image_feature / image_feature.norm(dim=-1, keepdim=True)
                features_list.append(image_feature.cpu().numpy().squeeze())
                sim = (image_feature @ text_feature.T).item()
                similarities.append(sim)
            frame_index += 1
        cap.release()

        if not similarities:
            return "No features extracted from the video.", None, None

        timestamps = np.array(timestamps)
        similarities = np.array(similarities)
        features_array = np.array(features_list)  # shape: (num_samples, feature_dim)

        window_duration = 2.0  # seconds (adjustable)
        candidates = []
        n = len(timestamps)

        for i in range(n):
            j = i
            while j < n and (timestamps[j] - timestamps[i] <= window_duration):
                j += 1
            j = j - 1  # last valid index in the window
            if j < i:
                continue
            avg_feature = np.mean(features_array[i:j+1], axis=0)
            avg_feature = avg_feature / np.linalg.norm(avg_feature)
            candidate_score = np.dot(avg_feature, text_feature.cpu().numpy().squeeze())
            candidates.append({
                'start': timestamps[i],
                'end': timestamps[j],
                'score': candidate_score
            })

        candidates = sorted(candidates, key=lambda x: x['score'], reverse=True)
        top3 = candidates[:3]

        output_text = "Top 3 candidate segments:\n"
        for idx, cand in enumerate(top3):
            output_text += f"Candidate {idx+1}: Start = {cand['start']:.2f}s, End = {cand['end']:.2f}s, Score = {cand['score']:.4f}\n"

        if len(top3) == 0:
            return "No candidate segments found.", None, None

        best_candidate = top3[0]
        best_start = best_candidate['start']
        best_duration = best_candidate['end'] - best_candidate['start']
        output_video = f"best_candidate_{uuid.uuid4().hex[:6]}.mp4"
        ffmpeg_cmd = f'ffmpeg -y -ss {best_start} -i "{video_path}" -t {best_duration} -c copy "{output_video}"'
        os.system(ffmpeg_cmd)

        plt.figure(figsize=(10, 4))
        plt.plot(timestamps, similarities, label="Cosine Similarity", marker="o")
        plt.xlabel("Time (s)")
        plt.ylabel("Cosine Similarity")
        plt.title("Similarity vs Time (sampled every 5 frames)")
        for cand in top3:
            plt.axvspan(cand['start'], cand['end'], color='red', alpha=0.3)
            mid = (cand['start'] + cand['end']) / 2
            plt.text(mid, max(similarities)*0.9, f"{cand['score']:.2f}",
                     ha='center', va='center', color='black')
        plt.legend()
        plt.tight_layout()
        plot_filename = f"similarity_plot_{uuid.uuid4().hex[:6]}.png"
        plt.savefig(plot_filename)
        plt.close()

        return output_text, plot_filename, output_video
    except Exception as e:
        return f"Error occurred: {str(e)}", None, None

iface = gr.Interface(
    fn=process_video,
    inputs=[
        gr.Video(label="Upload Video"),
        gr.Textbox(label="Query", placeholder="e.g., Walking on the beach")
    ],
    outputs=[
        gr.Textbox(label="Candidate Scores"),
        gr.Image(label="Similarity Plot"),
        gr.Video(label="Best Candidate Segment")
    ],
    title="CLIP Zero-Shot Video Candidate Extraction",
    description=("Upload a video and enter a text query. The system uses CLIP's text encoder to encode the query and "
                 "CLIP's image encoder to extract features from video frames (every 5 frames). It computes cosine similarity, "
                 "finds the top 3 candidate segments using a sliding 2-second window, and displays the candidate scores along "
                 "with the best candidate video segment.")
)




In [None]:
iface.launch(debug=True)