# SEARCH IN VIDEO ENGINE

In [None]:
# Install Required Libraries

!pip install moviepy pydub SpeechRecognition indic-transliteration yt-dlp
!pip install deep-translator

In [None]:
# Import Libraries

from pydub import AudioSegment
import speech_recognition as sr
from moviepy.editor import VideoFileClip
from indic_transliteration import sanscript
from indic_transliteration.sanscript import transliterate
from difflib import get_close_matches
import yt_dlp
import os
from urllib.parse import urlparse, parse_qs
from deep_translator import GoogleTranslator

In [None]:
# YouTube Video Download Functions

def _get_video_id_from_url(url: str):
    parsed = urlparse(url)
    qs = parse_qs(parsed.query)
    if 'v' in qs and qs['v']:
        return qs['v'][0]
    host = parsed.netloc.lower().replace('www.', '')
    if host == 'youtu.be':
        return parsed.path.lstrip('/').split('/')[0]
    if parsed.path and '/shorts/' in parsed.path:
        try:
            return parsed.path.split('/shorts/')[1].split('/')[0]
        except IndexError:
            return None
    frag_qs = parse_qs(parsed.fragment)
    if 'v' in frag_qs and frag_qs['v']:
        return frag_qs['v'][0]
    return None

def _normalize_to_watch_url(url: str):
    vid = _get_video_id_from_url(url)
    if vid:
        return f"https://www.youtube.com/watch?v={vid}", vid
    return url, None

def download_youtube_video(url, workdir="downloads"):
    print("Start downloading Youtube video...")
    os.makedirs(workdir, exist_ok=True)
    clean_url, requested_vid = _normalize_to_watch_url(url)
    print(f"Normalized URL: {clean_url}  (requested id: {requested_vid})")

    ydl_opts = {
        'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
        'outtmpl': os.path.join(workdir, '%(id)s.%(ext)s'),
        'merge_output_format': 'mp4',
        'noplaylist': True,
        'quiet': True,
        'no_warnings': True,
    }

    chosen_info = None
    downloaded_path = None

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(clean_url, download=True)

        if isinstance(info, dict) and 'entries' in info and info['entries']:
            entries = [e for e in info['entries'] if e]
            if requested_vid:
                for e in entries:
                    if e.get('id') == requested_vid:
                        chosen_info = e
                        break
            if chosen_info is None:
                chosen_info = entries[0]
        else:
            chosen_info = info

        prepared = ydl.prepare_filename(chosen_info)
        expected_mp4 = os.path.splitext(prepared)[0] + '.mp4'

        if os.path.exists(expected_mp4):
            downloaded_path = expected_mp4
        else:
            files = [os.path.join(workdir, f) for f in os.listdir(workdir) if os.path.isfile(os.path.join(workdir, f))]
            if not files:
                raise FileNotFoundError("Downloaded file not found in working directory.")
            downloaded_path = max(files, key=os.path.getsize)

    print(f"Downloaded file: {downloaded_path}")
    return downloaded_path, chosen_info or info


In [None]:
# Audio Extraction & Splitting

def extract_audio_from_video(video_file, audio_file):
    video = VideoFileClip(video_file)
    video.audio.write_audiofile(audio_file)

def get_chunk_length(video_duration):
    return 15000  # 15 seconds

def split_audio(audio_file, chunk_length_ms):
    audio = AudioSegment.from_wav(audio_file)
    chunks = []
    overlap = 3000
    start = 0
    while start < len(audio):
        end = start + chunk_length_ms
        chunks.append(audio[start:end])
        start += chunk_length_ms - overlap
    return chunks


In [None]:
# Speech Recognition Functions

def transcribe_audio_chunk(chunk, recognizer, language='en-US'):
    with chunk.export(format="wav") as source:
        audio = sr.AudioFile(source)
        with audio as audio_source:
            audio_data = recognizer.record(audio_source)
            try:
                return recognizer.recognize_google(audio_data, language=language)
            except sr.UnknownValueError:
                return ""
            except sr.RequestError as e:
                print(f"Request Error: {e}")
                return ""

def milliseconds_to_hms(milliseconds):
    seconds = milliseconds // 1000
    minutes = seconds // 60
    hours = minutes // 60
    minutes = minutes % 60
    seconds = seconds % 60
    return f"{int(hours):02}:{int(minutes):02}:{int(seconds):02}"


In [None]:
# Search and Transliteration Functions

def search_word_in_transcript(transcript, search_words, chunk_start_time, chunk_length_ms, fuzzy_match, cutoff):
    timestamps = {}
    word_times = {}
    words = transcript.lower().split()
    search_words = [word.lower() for word in search_words]

    for i, word in enumerate(words):
        if word in search_words:
            word_time = chunk_start_time + (i / len(words)) * chunk_length_ms
            word_time_hms = milliseconds_to_hms(word_time)
            timestamps.setdefault(word, []).append(word_time_hms)
            word_times.setdefault(word, []).append(word_time)
        elif fuzzy_match:
            similar_words = get_close_matches(word, search_words, n=1, cutoff=cutoff)
            if similar_words:
                closest_word = similar_words[0]
                word_time = chunk_start_time + (i / len(words)) * chunk_length_ms
                word_time_hms = milliseconds_to_hms(word_time)
                timestamps.setdefault(closest_word, []).append(word_time_hms)
                word_times.setdefault(closest_word, []).append(word_time)

    concatenated_words = []
    concatenated_times = []
    sorted_words = sorted(word_times.keys(), key=lambda w: min(word_times[w]) if word_times[w] else float('inf'))

    i = 0
    while i < len(sorted_words) - 1:
        current_word = sorted_words[i]
        current_times = word_times[current_word]
        j = i + 1
        concatenated = current_word
        min_time = min(current_times)

        while j < len(sorted_words):
            next_word = sorted_words[j]
            next_times = word_times[next_word]

            if abs(min(next_times) - min(current_times)) <= 1000:
                concatenated += " " + next_word
                j += 1
                current_times = next_times
                min_time = min(min_time, min(current_times))
            else:
                break

        if concatenated != current_word:
            concatenated_words.append(concatenated)
            concatenated_times.append(milliseconds_to_hms(min_time))

        i = j

    return timestamps, concatenated_words, concatenated_times

def transliterate_search_words(words):
    translated_words = []
    for word in words:
        try:
            hindi_script = GoogleTranslator(source='en', target='hi').translate(word)
            translated_words.append(hindi_script)
        except Exception as e:
            print(f"Translation error: {e}")
            translated_words.append(word)
    return translated_words


In [None]:
# Main Function

def main(video_file_or_url, search_string, language='en'):
    audio_file = "audio.wav"

    if video_file_or_url.startswith("http://") or video_file_or_url.startswith("https://"):
        video_file, info = download_youtube_video(video_file_or_url)
    else:
        video_file = video_file_or_url

    extract_audio_from_video(video_file, audio_file)
    video = VideoFileClip(video_file)
    video_duration = video.duration
    chunk_length_ms = get_chunk_length(video_duration)

    search_words = search_string.split()
    if language == 'hi':
        search_words = transliterate_search_words(search_words)

    print(f"Searching for the following words: {search_words}")

    fuzzy_match_input = input("Do you want to enable fuzzy matching for similar words? (yes/no): ").strip().lower()
    fuzzy_match = fuzzy_match_input == 'yes'

    cutoff = 0.8
    if fuzzy_match:
        cutoff_input = cutoff_input = input("Enter the cutoff value for fuzzy matching between 0 and 1 [1 = exact match only, 0 = very loose match (almost any similar word will match]: ")

        try:
            cutoff = float(cutoff_input)
            if not (0 <= cutoff <= 1):
                print("Invalid cutoff value. Using default cutoff of 0.8.")
                cutoff = 0.8
        except ValueError:
            print("Invalid input. Using default cutoff of 0.8.")

    print(f"The search word(s) to be used: {', '.join(search_words)}")

    chunks = split_audio(audio_file, chunk_length_ms)
    recognizer = sr.Recognizer()
    all_timestamps = {}
    all_concatenated_words = []
    all_concatenated_times = []

    for i, chunk in enumerate(chunks):
        chunk_start_time = i * (chunk_length_ms - 3000)
        print(f"Processing chunk {i + 1}/{len(chunks)}...")
        text = transcribe_audio_chunk(chunk, recognizer, language='hi-IN' if language == 'hi' else 'en-US')
        print(f"Transcription of chunk {i + 1}: {text}")

        timestamps, concatenated_words, concatenated_times = search_word_in_transcript(
            text, search_words, chunk_start_time, chunk_length_ms, fuzzy_match, cutoff
        )

        for word, times in timestamps.items():
            all_timestamps.setdefault(word, []).extend(times)

        all_concatenated_words.extend(concatenated_words)
        all_concatenated_times.extend(concatenated_times)

    if all_timestamps:
        print("Unique timestamps for searched words:")
        for word, times in all_timestamps.items():
            print(f"{word}: {', '.join(set(times))}")

    if all_concatenated_words:
        print("Concatenated words with unique timestamps:")
        unique_concatenated = {}
        for word, time in zip(all_concatenated_words, all_concatenated_times):
            unique_concatenated.setdefault(word, set()).add(time)

        for word, times in unique_concatenated.items():
            print(f"{word}: {', '.join(times)}")

    if not all_timestamps:
        print("No word(s) found.")
    if not all_concatenated_words and len(search_string.split()) > 1:
        print("No concatenated word(s) found.")


In [None]:
# Run the Program

video_file_or_url = input("Enter the path of the video OR YouTube URL: ")
search_string = input("Enter sentence/words to search for in the video: ")
language = input("Enter the language of the video (en/hi): ").strip().lower()

main(video_file_or_url, search_string, language)