In [None]:
!pip install git+https://github.com/openai/whisper.git

Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-w8vsghsz
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-w8vsghsz
  Resolved https://github.com/openai/whisper.git to commit dd4d010d2c585bc70aeddd166cd3e26b0bb62f31
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting tiktoken (from openai-whisper==20240930)
  Downloading tiktoken-0.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting triton>=2 (from openai-whisper==20240930)
  Downloading triton-3.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.3 kB)
Downloading triton-3.1.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (209.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m209.

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import librosa
import speech_recognition as sr
import moviepy.editor as mp
from transformers import AutoTokenizer, AutoModel
import re

class AdvancedTokenCleaner:
    @staticmethod
    def clean_tokens(tokens):
        """
        Intelligently clean and filter tokens

        Args:
            tokens (list): Raw tokens from tokenizer

        Returns:
            list: Cleaned meaningful tokens
        """
        # Remove special tokens and whitespace tokens
        cleaned_tokens = []

        # Define filtering criteria
        def is_meaningful_token(token):
            # Remove special characters, very short tokens, and whitespace
            if token.startswith('##'):
                token = token.replace('##', '')

            # Criteria for meaningful tokens
            return (
                len(token) > 1 and  # Minimum length
                not token.isspace() and  # Not just whitespace
                not token.startswith('[') and  # Not special tokens
                not token.endswith(']') and
                not token in ['<pad>', '<s>', '</s>']  # Remove padding tokens
            )

        # Filter and process tokens
        for token in tokens:
            if is_meaningful_token(token):
                # Remove any remaining special characters
                cleaned_token = re.sub(r'[^a-zA-Z0-9\u0900-\u097F]', '', token)

                if cleaned_token:
                    cleaned_tokens.append(cleaned_token)

        return cleaned_tokens

class MultilingualTokenizer:
    def __init__(self, model_name="google/mt5-base"):
        """
        Initialize multilingual tokenizer with advanced cleaning
        """
        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModel.from_pretrained(model_name)
            self.token_cleaner = AdvancedTokenCleaner()
        except Exception as e:
            print(f"Tokenizer loading error: {e}")
            self.tokenizer = None

    def tokenize_text(self, text):
        """
        Advanced text tokenization with intelligent cleaning

        Args:
            text (str): Input text to tokenize

        Returns:
            dict: Tokenization results
        """
        if not self.tokenizer or not text:
            return None

        # Preprocess text (remove extra whitespaces)
        text = re.sub(r'\s+', ' ', text).strip()

        # Tokenize text
        tokens = self.tokenizer(
            text,
            return_tensors="pt",
            add_special_tokens=True
        )

        # Convert to tokens and clean
        raw_tokens = self.tokenizer.convert_ids_to_tokens(tokens['input_ids'][0])

        # Clean tokens
        cleaned_tokens = self.token_cleaner.clean_tokens(raw_tokens)

        return {
            'input_ids': tokens['input_ids'],
            'attention_mask': tokens['attention_mask'],
            'raw_tokens': raw_tokens,
            'cleaned_tokens': cleaned_tokens
        }

class SpeechToTextExtractor:
    def __init__(self):
        """
        Initialize speech recognition with advanced processing
        """
        self.recognizer = sr.Recognizer()

        # Configure recognizer for better accuracy
        self.recognizer.dynamic_energy_threshold = True
        self.recognizer.pause_threshold = 0.5

    def extract_speech_from_audio(self, audio_path, language='en-IN'):
        """
        Extract speech text with multiple recognition attempts

        Args:
            audio_path (str): Path to audio file
            language (str): Language for speech recognition

        Returns:
            str: Extracted speech text
        """
        try:
            with sr.AudioFile(audio_path) as source:
                # Adjust for ambient noise
                self.recognizer.adjust_for_ambient_noise(source, duration=1)

                # Read the entire audio file
                audio = self.recognizer.record(source)

                # Multiple recognition attempts
                recognition_methods = [
                    lambda: self.recognizer.recognize_google(audio, language=language),
                    lambda: self.recognizer.recognize_sphinx(audio)
                ]

                for method in recognition_methods:
                    try:
                        text = method()
                        if text:
                            return text
                    except Exception:
                        continue

                return None

        except Exception as e:
            print(f"Speech recognition error: {e}")
            return None

class AudioPreprocessor:
    def __init__(
        self,
        sample_rate=16000,
        max_duration=60
    ):
        self.sample_rate = sample_rate
        self.max_duration = max_duration
        self.speech_extractor = SpeechToTextExtractor()

    def extract_audio(self, video_path):
        """
        Extract audio from video with comprehensive processing

        Args:
            video_path (str): Path to input video

        Returns:
            tuple: Audio data and extracted text
        """
        try:
            # Extract audio from video
            video = mp.VideoFileClip(video_path)
            temp_audio_path = "temp_audio.wav"
            video.audio.write_audiofile(temp_audio_path)

            # Extract speech text
            extracted_text = self.speech_extractor.extract_speech_from_audio(temp_audio_path)

            # Load audio data
            audio_data, _ = librosa.load(
                temp_audio_path,
                sr=self.sample_rate,
                duration=self.max_duration
            )

            # Clean up temporary file
            os.remove(temp_audio_path)

            return audio_data, extracted_text

        except Exception as e:
            print(f"Audio extraction error: {e}")
            return None, None

class AudioTokenizationPipeline:
    def __init__(self):
        """
        Initialize comprehensive tokenization pipeline
        """
        self.preprocessor = AudioPreprocessor()
        self.text_tokenizer = MultilingualTokenizer()

    def tokenize_video(self, video_path):
        """
        Comprehensive video tokenization

        Args:
            video_path (str): Path to input video

        Returns:
            dict: Detailed tokenization results
        """
        # Extract audio and speech text
        audio_data, extracted_text = self.preprocessor.extract_audio(video_path)

        if audio_data is None or extracted_text is None:
            print("Failed to extract audio or speech")
            return None

        # Tokenize extracted text
        text_tokens = self.text_tokenizer.tokenize_text(extracted_text)

        # Print comprehensive results
        print("\n--- Video Speech Extraction ---")
        video_transcripts.insert(extrected_text)
        print("Extracted Text:", extracted_text)

        if text_tokens:
            print("\n--- Token Details ---")
            print("Raw Tokens:", text_tokens['raw_tokens'])
            print("Cleaned Tokens:", text_tokens['cleaned_tokens'])
            print("Token IDs Shape:", text_tokens['input_ids'].shape)

        return {
            'audio_data': audio_data,
            'extracted_text': extracted_text,
            'text_tokens': text_tokens
        }

#video selection
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

def video_selection(question):
  # Load pre-trained model and tokenizer for embeddings
  tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
  model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")

  def get_embeddings(texts):
    """
    Generate embeddings for a list of texts.
    """
      inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")
      outputs = model(**inputs)
      return outputs.last_hidden_state.mean(dim=1).detach().numpy()


# Step 2: Embed video transcripts
  video_embeddings = get_embeddings(video_transcripts)

# Step 3: Embed user question
  question_embedding = get_embeddings([question])

# Step 4: Compute relevance scores
  similarities = cosine_similarity(question_embedding, video_embeddings)

# Step 5: Select top-K videos
  top_k = 2  # Number of videos to select
  top_k_indices = np.argsort(similarities[0])[::-1][:top_k]
  selected_videos = [video_transcripts[i] for i in top_k_indices]

# Output the selected videos
  print("Selected Videos:")
  for i, video in enumerate(selected_videos, 1):
      print(f"{i}. {video}")


import os
import numpy as np
import torch
import json
import warnings
from typing import List, Dict, Union, Optional
from transformers import pipeline
import whisper
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from moviepy.editor import VideoFileClip
import logging
from datetime import datetime
import soundfile as sf
import librosa

class FixedVideoChatbot:
    def __init__(self, cache_dir: str = "video_cache", log_file: str = "chatbot.log"):
        self.setup_logging(log_file)
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)

        try:
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
            self.logger.info(f"Using device: {self.device}")

            self.transcription_model = whisper.load_model("base", device=self.device)
            self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
            self.qa_pipeline = pipeline(
                "question-answering",
                model="deepset/roberta-base-squad2",
                device=0 if torch.cuda.is_available() else -1
            )
            self.logger.info("Models loaded successfully")
        except Exception as e:
            self.logger.error(f"Error loading models: {str(e)}")
            raise

        self.cached_data = {}

    def setup_logging(self, log_file: str):
        self.logger = logging.getLogger('VideoChatbot')
        self.logger.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        self.logger.addHandler(file_handler)

    def extract_audio(self, video_path: str) -> np.ndarray:
        """Extract audio using librosa instead of moviepy."""
        try:
            # Extract audio using librosa
            self.logger.info(f"Extracting audio from {video_path}")
            audio_array, sr = librosa.load(video_path, sr=16000, mono=True)

            # Ensure audio is the correct format for whisper
            if not isinstance(audio_array, np.ndarray):
                raise ValueError("Audio extraction failed")

            if len(audio_array.shape) != 1:
                audio_array = audio_array.mean(axis=-1)

            return audio_array.astype(np.float32)

        except Exception as e:
            self.logger.error(f"Error in audio extraction: {str(e)}")
            try:
                # Fallback method using moviepy
                self.logger.info("Attempting fallback audio extraction")
                with VideoFileClip(video_path) as video:
                    if video.audio is None:
                        raise ValueError("Video has no audio track")
                    audio_array = video.audio.to_soundarray(fps=16000)
                    if len(audio_array.shape) > 1:
                        audio_array = audio_array.mean(axis=1)
                    return audio_array.astype(np.float32)
            except Exception as fallback_error:
                self.logger.error(f"Fallback audio extraction failed: {str(fallback_error)}")
                raise

    def process_chunks(self, audio_array: np.ndarray, chunk_duration: int = 30) -> List[Dict]:
        """Process audio in chunks to handle memory constraints."""
        chunk_size = chunk_duration * 16000  # assuming 16kHz sample rate
        chunks = []

        for i in range(0, len(audio_array), chunk_size):
            chunk = audio_array[i:i + chunk_size]
            if len(chunk) < 100:  # Skip very small chunks
                continue

            # Ensure chunk is 1D array
            if len(chunk.shape) > 1:
                chunk = chunk.mean(axis=-1)

            result = self.transcription_model.transcribe(chunk)

            # Adjust timestamps
            for segment in result["segments"]:
                segment["start"] += i / 16000  # Convert samples to seconds
                segment["end"] += i / 16000
                chunks.append(segment)

        return chunks

    def process_video(self, video_path: str) -> Dict:
        """Process video with improved error handling."""
        try:
            video_id = os.path.basename(video_path)
            cache_file = os.path.join(self.cache_dir, f"{video_id}_transcription.json")

            # Check cache
            if os.path.exists(cache_file):
                with open(cache_file, 'r') as f:
                    return json.load(f)

            # Extract and process audio
            audio_array = self.extract_audio(video_path)

            # Process in chunks
            transcription_segments = self.process_chunks(audio_array)

            # Combine results
            transcription_data = {
                "full_text": " ".join(seg["text"] for seg in transcription_segments),
                "segments": transcription_segments
            }

            # Cache results
            with open(cache_file, 'w') as f:
                json.dump(transcription_data, f)

            return transcription_data

        except Exception as e:
            self.logger.error(f"Error processing video {video_path}: {str(e)}")
            raise

    def find_relevant_context(self, query: str, transcription_data: Dict) -> str:
        """Find relevant context with improved error handling."""
        try:
            if not transcription_data["segments"]:
                return ""

            # Convert query to embedding
            query_embedding = self.embedding_model.encode([query])

            # Get segment texts and embeddings
            segment_texts = [s["text"] for s in transcription_data["segments"]]
            if not segment_texts:
                return ""

            segment_embeddings = self.embedding_model.encode(segment_texts)

            # Ensure proper shapes
            if len(query_embedding.shape) == 1:
                query_embedding = query_embedding.reshape(1, -1)
            if len(segment_embeddings.shape) == 1:
                segment_embeddings = segment_embeddings.reshape(1, -1)

            # Calculate similarities
            similarities = cosine_similarity(query_embedding, segment_embeddings)[0]

            # Get top segments
            top_k = min(3, len(similarities))
            top_indices = np.argsort(similarities)[-top_k:]

            # Combine context
            context = " ".join(segment_texts[i] for i in top_indices)
            return context

        except Exception as e:
            self.logger.error(f"Error finding context: {str(e)}")
            return ""

    def get_response(self, query: str, video_paths: Union[str, List[str]]) -> Dict:
        """Generate response with improved error handling."""
        try:
            if isinstance(video_paths, str):
                video_paths = [video_paths]

            all_contexts = []
            for video_path in video_paths:
                if video_path not in self.cached_data:
                    self.cached_data[video_path] = self.process_video(video_path)

                context = self.find_relevant_context(query, self.cached_data[video_path])
                if context:
                    all_contexts.append(context)

            if not all_contexts:
                return {
                    "answer": "No relevant information found in the videos.",
                    "confidence": 0.0,
                    "context": ""
                }

            combined_context = " ".join(all_contexts)

            # Generate answer
            answer = self.qa_pipeline(
                question=query,
                context=combined_context,
                max_answer_length=100
            )

            return {
                "answer": answer["answer"],
                "confidence": answer["score"],
                "context": combined_context
            }

        except Exception as e:
            self.logger.error(f"Error generating response: {str(e)}")
            return {
                "error": str(e),
                "answer": "Sorry, I encountered an error processing your question.",
                "confidence": 0.0
            }

def main():
    # Video path
    video_paths = ["/content/How to control sound waves.mp4","/content/videoplayback (2).mp4","/content/videoplayback.mp4"]

    for video_path in video_paths:
      # Initialize pipeline
      pipeline = AudioTokenizationPipeline()

      try:
          # Tokenize video
          tokenization_result = pipeline.tokenize_video(video_path)

          if tokenization_result:
              # Additional processing or analysis can be done here
              pass

      except Exception as e:
          print(f"Tokenization failed: {e}")


    for question in questions:
      pipeline2 = video_selection()





if __name__ == "__main__":
    main()

Selected Videos:
1. 2 hour lecture so I'm headed to my sorority house to get some lunch as we know I have been sick so I need to make sure I'm eating with my medicine I think today is meatball subs but I'm just going to get meatballs cuz those are my favorite parts let's go together we made it the decorations are gorgeous thank you guys later bye guys
2. Learning machine learning is need of todays time, it is no more skill now it is necessity.

Chatbot Response:
I found 2 relevant videos for your question:
1. 2 hour lecture so I'm headed to my sorority house to get some lunch as we know I have been sick so I need to make sure I'm eating with my medicine I think today is meatball subs but I'm just going to get meatballs cuz those are my favorite parts let's go together we made it the decorations are gorgeous thank you guys later bye guys
2. Learning machine learning is need of todays time, it is no more skill now it is necessity.

Here’s a brief answer based on these videos:
2 hour lect