In [1]:
import os
import numpy as np
import torch
import clip
from PIL import Image
import cv2
import whisper
import re
import requests
from pytubefix import YouTube
from moviepy import VideoFileClip
import subprocess
import argparse
#from pydub import AudioSegment
from sentence_transformers import SentenceTransformer, util
import time
from moviepy import VideoFileClip
import shutil
from langchain_ollama import OllamaLLM
from langchain_core.prompts import ChatPromptTemplate

  from pkg_resources import packaging
  from .autonotebook import tqdm as notebook_tqdm


In [3]:
def download_video_from_youtube(url, output_path="input_video.mp4"):
    try:
        yt = YouTube(url)
        stream = yt.streams.filter(progressive=True, file_extension='mp4').get_highest_resolution()
        stream.download(filename=output_path)
        return output_path
    except RuntimeError:
        raise FileNotFoundError("File does not exist.")

def download_video_from_url(url, output_path="input_video.mp4"):
    if "youtube.com" in url or "youtu.be" in url:
        return download_video_from_youtube(url, output_path)
    
    elif "drive.google.com" in url:
        try:
            file_id = url.split("/d/")[1].split("/")[0]
        except IndexError:
            raise ValueError("Google Drive link format incorrect.")
        d_url = f"https://drive.google.com/uc?export=download&id={file_id}"
        r = requests.get(d_url, stream=True)
        with open(output_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
        return output_path

    else: 
        r = requests.get(url, stream=True)
        if r.status_code != 200:
            raise ValueError("Could not download video from direct link.")
        with open(output_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
        return output_path

def convert_to_mp4(input_path, output_path="input_video.mp4"):
    try:
        clip = VideoFileClip(input_path)
        clip.write_videofile(output_path, codec='libx264')
        clip.close()
    except Exception as e:
        raise RuntimeError(f"Video conversion failed: {e}")

def get_video_input(output_path="input_video.mp4"):
    print("Choose input method:")
    print("1. Upload local video file")
    print("2. Provide video URL (YouTube / Drive / MP4)")
    choice = input("Enter 1 or 2: ").strip()

    if choice == "1":
        file_path = input("Enter full path to your local video file: ").strip()
        if not os.path.exists(file_path):
            raise FileNotFoundError("File does not exist.")
        print("Converting to MP4...")
        convert_to_mp4(file_path, output_path)
        print(f"Converted and saved to: {output_path}")
        return output_path

    elif choice == "2":
        url = input("Enter video URL: ").strip()
        return download_video_from_url(url)

    else:
        raise ValueError("Invalid choice. Please enter 1 or 2.")

In [4]:
video=get_video_input()

Choose input method:
1. Upload local video file
2. Provide video URL (YouTube / Drive / MP4)
Converting to MP4...
MoviePy - Building video input_video.mp4.
MoviePy - Writing audio in input_videoTEMP_MPY_wvf_snd.mp3


                                                                      

MoviePy - Done.
MoviePy - Writing video input_video.mp4



                                                                           

MoviePy - Done !
MoviePy - video ready input_video.mp4
Converted and saved to: input_video.mp4


In [5]:
ffmpeg_path = '/Users/mohalsahai/Desktop/Video Clipping/Python Packages/ffmpeg'

def extract_audio(video_path, audio_path="audio.wav"):
    print(f"Extracting audio from: {video_path}")
    try:
        subprocess.run([
            ffmpeg_path, "-y", "-i", video_path, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", audio_path
        ], check=True)
        print(f"Audio saved to: {audio_path}")
        return audio_path
    except subprocess.CalledProcessError as e:
        raise ValueError(f"Audio extraction failed: {e}")

def extract_subtitles(video_path, subtitle_path="subtitles.srt"):
    print("Trying to extract subtitles...")
    result = subprocess.run(
        [ffmpeg_path, "-y", "-i", video_path, "-map", "0:s:0", subtitle_path],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    if "Stream mapping:" in result.stderr and os.path.exists(subtitle_path):
        print(f"Subtitles saved to: {subtitle_path}")
        return subtitle_path
    else:
        print("No subtitles found in the video.")
        return None

In [6]:
audio=extract_audio(video)
subtitles=extract_subtitles(video)

Extracting audio from: input_video.mp4
Audio saved to: audio.wav
Trying to extract subtitles...
No subtitles found in the video.


ffmpeg version N-119686-gae0f71a387-tessus  https://evermeet.cx/ffmpeg/  Copyright (c) 2000-2025 the FFmpeg developers
  built with Apple clang version 17.0.0 (clang-1700.0.13.3)
  configuration: --cc=/usr/bin/clang --prefix=/opt/ffmpeg --extra-version=tessus --enable-avisynth --enable-fontconfig --enable-gpl --enable-libaom --enable-libass --enable-libbluray --enable-libdav1d --enable-libfreetype --enable-libgsm --enable-libharfbuzz --enable-libmodplug --enable-libmp3lame --enable-libmysofa --enable-libopencore-amrnb --enable-libopencore-amrwb --enable-libopenh264 --enable-libopenjpeg --enable-libopus --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvmaf --enable-libvo-amrwbenc --enable-libvorbis --enable-libvpx --enable-libwebp --enable-libx264 --enable-libx265 --enable-libxavs --enable-libxml2 --enable-libxvid --enable-libzimg --enable-libzmq --enable-libzvbi --enable

In [7]:
import os

# Replace this with your actual path to ffmpeg
os.environ["PATH"] += os.pathsep + "/path/to/ffmpeg_folder"

In [8]:
import os
import whisper
import subprocess

# --- Custom FFmpeg Path ---
ffmpeg_path = '/Users/mohalsahai/Desktop/Video Clipping/Python Packages/ffmpeg'

# --- Add to PATH so whisper can find it ---
os.environ["PATH"] += os.pathsep + os.path.dirname(ffmpeg_path)

# --- Whisper Transcription ---
model = whisper.load_model("medium")
result = model.transcribe("audio.wav", verbose=True, task='transcribe', language='en', fp16=False)

# --- Output ---
print("Detected language:", result['language'])
print("Full transcription:\n", result['text'])

timestamped_transcript=""
for segment in result["segments"]:
    timestamped_transcript+=f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}\n"
    print(f"[{segment['start']:.2f} - {segment['end']:.2f}] {segment['text']}")

# --- Save Full Transcript ---
with open("transcript.txt", "w", encoding="utf-8") as f:
    f.write(result["text"])

# --- Save SRT File ---
def save_srt(segments, path="transcript.srt"):
    with open(path, "w", encoding="utf-8") as f:
        for i, seg in enumerate(segments):
            start = seg['start']
            end = seg['end']
            text = seg['text']
            f.write(f"{i+1}\n")
            f.write(f"{format_time(start)} --> {format_time(end)}\n")
            f.write(f"{text.strip()}\n\n")

# --- Format Time for SRT ---
def format_time(seconds):
    h = int(seconds // 3600)
    m = int((seconds % 3600) // 60)
    s = seconds % 60
    return f"{h:02}:{m:02}:{s:06.3f}".replace(".", ",")

save_srt(result['segments'])


[00:00.000 --> 00:10.000]  Have you ever wondered how to effectively qualify leads in bank assurance?
[00:10.000 --> 00:13.000]  Let's explore some real-life scenarios together.
[00:16.000 --> 00:19.000]  First, consider the importance of building rapport.
[00:19.000 --> 00:28.000]  According to a study by HubSpot, 70% of buyers say that they feel more connected to a salesperson who listens to their needs.
[00:31.000 --> 00:35.000]  You can qualify leads list using NOPP criteria.
[00:35.000 --> 00:37.000]  N is need for insurance.
[00:37.000 --> 00:44.000]  The customer has the need for insurance for child education, protection, retirement, savings.
[00:44.000 --> 00:46.000]  O is opportunity to meet.
[00:46.000 --> 00:51.000]  There is an opportunity to meet with the customer and get undivided attention for 30 minutes.
[00:51.000 --> 00:53.000]  P is physically fit.
[00:53.000 --> 00:59.000]  Evaluate the physical fitness of the customer so that correct insurance product can be offere

In [9]:
embedder = SentenceTransformer('all-MiniLM-L6-v2')

def find_similar_transcript_segments(text_query, transcript_segments, top_k=5):
    query_embedding = embedder.encode(text_query, convert_to_tensor=True)
    texts = [seg["text"] for seg in transcript_segments]
    text_embeddings = embedder.encode(texts, convert_to_tensor=True, batch_size=32)

    similarities = util.cos_sim(query_embedding, text_embeddings)[0]
    ranked = sorted(zip(transcript_segments, similarities), key=lambda x: x[1], reverse=True)

    results = []
    for segment, score in ranked[:top_k]:
        results.append({
            "text": segment["text"],
            "start": segment["start"],
            "end": segment["end"],
            "similarity": float(score)
        })
    return results

Chatbot with UI

In [10]:
import tkinter as tk
from tkinter import scrolledtext
import threading
import queue
import re
from langchain_ollama import OllamaLLM
from langchain_core.prompts import ChatPromptTemplate
import os
import subprocess

llm = OllamaLLM(model='gemma3')

# Template for general queries
general_template = """
You are Vivi, an expert and friendly video assistant chatbot.

You are having an ongoing conversation with the user. You have access to a full transcript of a video. If the user’s question is about the video, answer helpfully and refer to timestamps if relevant.

If the question is general and not related to the video, just respond helpfully like a normal assistant. You can use your general knowledge to help the user even if it’s unrelated to the transcript.

---
Conversation History:
{context}

---
Full Transcript of the Video:
{transcript}

---
User:
{question}

---
Vivi:
"""

# Template for topic extraction
topic_template = """
You are an expert video analysis assistant. Given the transcript of a video with timestamps, identify the major topics discussed and provide their corresponding timestamp ranges. Return the result as a list of topics, each with a brief description and its start and end timestamps.

Transcript:
{transcript}

Return the result in the following format:
- Topic: [Brief description]
  Timestamps: [start_time] - [end_time]
"""

prompt_general = ChatPromptTemplate.from_template(general_template)
prompt_topic = ChatPromptTemplate.from_template(topic_template)
chain_general = prompt_general | llm
chain_topic = prompt_topic | llm

# FFmpeg-based video clipping function
def clip_video(video_path, start_time, end_time, output_path=None):
    """
    Clip a video from start_time to end_time using FFmpeg.
    
    Args:
        video_path (str): Path to the input video.
        start_time (float): Start time in seconds.
        end_time (float): End time in seconds.
        output_path (str): Path to save the clipped video. If None, generates a default name.
    
    Returns:
        str: Success message with output path or error message.
    """
    ffmpeg_path = '/Users/mohalsahai/Desktop/Video Clipping/Python Packages/ffmpeg'
    try:
        if not os.path.exists(video_path):
            return f"Error: Video file {video_path} does not exist."
        
        if output_path is None:
            base, ext = os.path.splitext(video_path)
            output_path = f"{base}_clip_{int(start_time)}_{int(end_time)}.mp4"
        
        # Calculate duration
        duration = end_time - start_time
        
        # FFmpeg command: -ss for start, -t for duration, -c copy for stream copying
        command = [
            ffmpeg_path, "-y",  # Overwrite output if exists
            "-i", video_path,   # Input file
            "-ss", str(start_time),  # Start time
            "-t", str(duration),     # Duration
            "-c", "copy",       # Copy streams without re-encoding
            output_path         # Output file
        ]
        
        result = subprocess.run(command, capture_output=True, text=True, check=True)
        return f"Video clip saved to {output_path}"
    
    except subprocess.CalledProcessError as e:
        return f"Error clipping video: FFmpeg failed with {e.stderr}"
    except Exception as e:
        return f"Error clipping video: {str(e)}"

def start_chat_ui(transcript_segments, full_transcript_text):
    context = ""
    response_queue = queue.Queue()
    video_path = video  # Use the video variable from Cell 3

    def is_video_clipping_query(query):
        return query.lower().startswith("video clipping:")

    def extract_query(query):
        match = re.match(r"Video Clipping:\\s*(.+)", query, re.IGNORECASE)
        return match.group(1).strip() if match else query

    def get_major_topics():
        try:
            response = chain_topic.invoke({"transcript": full_transcript_text})
            topics = []
            lines = response.split("\n")
            current_topic = None
            for line in lines:
                if line.startswith("- Topic:"):
                    current_topic = {"description": line.replace("- Topic:", "").strip()}
                elif line.startswith("  Timestamps:") and current_topic:
                    times = line.replace("  Timestamps:", "").strip().split(" - ")
                    if len(times) == 2:
                        current_topic["start"] = float(times[0])
                        current_topic["end"] = float(times[1])
                        topics.append(current_topic)
                        current_topic = None
            return topics
        except Exception as e:
            return [{"description": f"Error extracting topics: {str(e)}", "start": 0, "end": 0}]

    def find_most_similar_topic(query, topics):
        if not topics:
            return None
        query_embedding = embedder.encode(query)
        topic_descriptions = [topic["description"] for topic in topics]
        topic_embeddings = embedder.encode(topic_descriptions)
        similarities = util.cos_sim(query_embedding, topic_embeddings)[0]
        max_idx = similarities.argmax()
        return topics[max_idx], float(similarities[max_idx])

    def send_message():
        nonlocal context
        user_input = user_entry.get()
        if user_input.strip().lower() == "exit":
            root.destroy()
            return

        chat_display.insert(tk.END, f"User: {user_input}\n")
        user_entry.delete(0, tk.END)

        def run_bot():
            try:
                if is_video_clipping_query(user_input):
                    query = extract_query(user_input)
                    topics = get_major_topics()
                    if not topics or "Error" in topics[0]["description"]:
                        response_queue.put((topics[0]["description"], []))
                        return

                    best_topic, similarity = find_most_similar_topic(query, topics)
                    if not best_topic or similarity < 0.1:
                        response_queue.put(("No relevant topic found for the video.", []))
                        return

                    result = clip_video(
                        video_path,
                        best_topic["start"],
                        best_topic["end"],
                        output_path=f"clipped_{int(best_topic['start'])}_{int(best_topic['end'])}.mp4"
                    )
                    response = f"{result}\nTopic: {best_topic['description']}\nTimestamps: {best_topic['start']:.2f} - {best_topic['end']:.2f}\nSimilarity: {similarity:.2f}"
                    response_queue.put((response, []))
                else:
                    top_segments = find_similar_transcript_segments(user_input, transcript_segments, top_k=3)
                    response = chain_general.invoke({
                        "context": context,
                        "question": user_input,
                        "transcript": full_transcript_text,
                    })
                    response_queue.put((response, top_segments))
            except Exception as e:
                response_queue.put((f"Error: {str(e)}", []))

        def check_queue():
            try:
                response, top_segments = response_queue.get_nowait()
                chat_display.insert(tk.END, f"Vivi: {response}\n")
                if top_segments:
                    chat_display.insert(tk.END, "Relevant segments:\n" + "\n".join(
                        [f"[{seg['start']:.2f} - {seg['end']:.2f}]: {seg['text']}" for seg in top_segments]) + "\n")
                chat_display.yview(tk.END)
                nonlocal context
                context += f"\nUser: {user_input}\nAI: {response}\n"
            except queue.Empty:
                root.after(100, check_queue)

        threading.Thread(target=run_bot, daemon=True).start()
        root.after(100, check_queue)

    def on_closing():
        for thread in threading.enumerate()[1:]:
            thread.join(timeout=1.0)
        root.destroy()

    root = tk.Tk()
    root.title("Vivi Video Chatbot")
    chat_display = scrolledtext.ScrolledText(root, wrap=tk.WORD, width=80, height=30, font=("Arial", 12))
    chat_display.pack(padx=10, pady=10)
    user_entry = tk.Entry(root, font=("Arial", 12))
    user_entry.pack(fill=tk.X, padx=10, pady=(0, 10))
    user_entry.bind("<Return>", lambda e: send_message())
    send_btn = tk.Button(root, text="Send", font=("Arial", 12), command=send_message)
    send_btn.pack(pady=(0, 10))
    chat_display.insert(tk.END, "Welcome to the Vivi Video Chatbot! Type 'exit' to quit.\nFor video clipping, use 'Video Clipping: {Query}'.\n")
    chat_display.yview(tk.END)
    root.protocol("WM_DELETE_WINDOW", on_closing)
    root.mainloop()

In [11]:
def format_transcript_with_timestamps(segments):
    lines = []
    for seg in segments:
        start = format_time(seg['start'])
        end = format_time(seg['end'])
        text = seg['text'].strip()
        lines.append(f"[{start} - {end}] {text}")
    return "\n".join(lines)

In [None]:
if __name__ == "__main__":
    start_chat_ui(result["segments"], timestamped_transcript)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


In [1]:
print(timestamped_transcript)

NameError: name 'timestamped_transcript' is not defined