<a href="https://colab.research.google.com/github/adamanz/AdamWebsite/blob/master/tbpnv9.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#!/usr/bin/env python3
# TBPN Guest Detector
# A tool for automatically detecting podcast guests in TBPN streams

import os
import re
import time
import subprocess
import json
import requests
import base64
from typing import Optional, Dict, Any, List, Tuple
import torch

# Install dependencies
def install_dependencies():
    print("Installing dependencies...")
    subprocess.run("pip install faster-whisper yt-dlp python-dotenv requests opencv-python-headless -q", shell=True)
    subprocess.run("apt-get install ffmpeg -y", shell=True)
    subprocess.run("pip install --upgrade yt-dlp -q", shell=True)

    # Import required modules after installation
    global WhisperModel, drive
    from faster_whisper import WhisperModel
    from google.colab import drive, userdata

    print("Dependencies installed successfully.")

# Set up drive mounting if needed
def setup_drive(use_drive=True):
    if use_drive:
        from google.colab import drive
        drive.mount('/content/drive')
        drive_output_dir = "/content/drive/MyDrive/TBPN_Guest_Detector"
        os.makedirs(drive_output_dir, exist_ok=True)
        return drive_output_dir
    return None

# Base detector class
class BaseGuestDetector:
    def __init__(self):
        self.transcript_buffer = ""
        self.transcript_context = []
        self.last_processed_time = time.time()
        self.current_timestamp = 0.0
        self.context_window_size = 131000

    def update_transcript(self, new_transcript: str, timestamp: float = 0.0):
        self.current_timestamp = timestamp
        self.transcript_buffer += f"\n{new_transcript}"
        self.transcript_context.append({
            "text": new_transcript,
            "timestamp": timestamp,
            "formatted_time": self._format_timestamp(timestamp)
        })
        total_chars = sum(len(segment["text"]) for segment in self.transcript_context)
        if total_chars > (self.context_window_size * 4):
            while total_chars > (self.context_window_size * 3) and self.transcript_context:
                removed = self.transcript_context.pop(0)
                total_chars -= len(removed["text"])

    def _format_timestamp(self, seconds: float) -> str:
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        return f"{hours:02d}:{minutes:02d}:{secs:02d}"

    def _timestamp_to_seconds(self, timestamp_str: str) -> float:
        try:
            h, m, s = map(int, timestamp_str.split(':'))
            return h * 3600 + m * 60 + s
        except:
            return 0.0

    def _find_context_for_timestamp(self, timestamp: float) -> str:
        nearby_segments = []
        for segment in self.transcript_context:
            if abs(segment["timestamp"] - timestamp) < 30:
                nearby_segments.append(segment["text"])
        return " ".join(nearby_segments) if nearby_segments else self.transcript_buffer[-500:]

    def _prepare_transcript_for_analysis(self) -> str:
        formatted_transcript = ""
        intro_keywords = ["guest", "joining", "welcome", "introduce", "with us", "today we have", "speaking with"]
        for segment in self.transcript_context:
            time_str = segment["formatted_time"]
            text = segment["text"].strip()
            contains_intro = any(keyword in text.lower() for keyword in intro_keywords)
            if contains_intro:
                formatted_transcript += f"[TIMESTAMP: {time_str}] 🔍 {text}\n\n"
            else:
                formatted_transcript += f"[TIMESTAMP: {time_str}] {text}\n"
        return formatted_transcript

    def _parse_structured_response(self, text: str) -> List[Dict[str, Any]]:
        if "NO_GUESTS_DETECTED" in text:
            return []
        guest_pattern = r"GUEST: (.*?) \| HANDLE: (.*?) \| TIMESTAMP: (.*?) \| CONFIDENCE: (\d+)"
        matches = re.findall(guest_pattern, text)
        guests = []
        for match in matches:
            name = match[0].strip()
            handle = match[1].strip()
            timestamp_str = match[2].strip()
            confidence = int(match[3])
            if confidence >= 7:
                timestamp = self._timestamp_to_seconds(timestamp_str)
                context = self._find_context_for_timestamp(timestamp)
                if handle == "Not mentioned":
                    handle_match = re.search(r'@(\w+)', context, re.IGNORECASE)
                    if handle_match:
                        handle = handle_match.group(1)
                guests.append({
                    "name": name,
                    "x_handle": handle,
                    "timestamp": timestamp,
                    "formatted_time": timestamp_str,
                    "confidence": confidence,
                    "context": context
                })
        return guests

# GeminiGuestDetector class
class GeminiGuestDetector(BaseGuestDetector):
    def __init__(self, api_key: str, endpoint: str = "https://generativelanguage.googleapis.com/v1beta", model: str = "gemini-2.5-flash-preview-04-17"):
        super().__init__()
        self.api_key = api_key
        self.endpoint = endpoint
        self.model = model
        self.headers = {
            "Content-Type": "application/json"
        }

    def detect_guests(self) -> List[Dict[str, Any]]:
        if not self.transcript_context:
            return []
        formatted_transcript = self._prepare_transcript_for_analysis()
        content = {
            "parts": [
                {
                    "text": (
                        "You are an AI specialized in podcast guest detection for TBPN. "
                        "Analyze the transcript to identify all guests introduced in the show. Focus on phrases like "
                        "'joining us today', 'our guest is', 'welcome to the show', 'with us is', or 'today we have'. "
                        "Extract: 1. Full name, 2. X handle (if mentioned), 3. Timestamp of first appearance, "
                        "4. Confidence level (0-10). Format response as: "
                        "GUEST: [name] | HANDLE: [X handle or 'Not mentioned'] | TIMESTAMP: [HH:MM:SS] | CONFIDENCE: [0-10]\n"
                        "Return 'NO_GUESTS_DETECTED' if no guests are found. Example: "
                        "GUEST: John Doe | HANDLE: @JohnDoe | TIMESTAMP: 00:05:30 | CONFIDENCE: 9\n\n"
                        "Analyze this podcast transcript and detect all guests mentioned. Here's the transcript with timestamps:\n\n"
                        + formatted_transcript
                    )
                }
            ]
        }
        try:
            url = f"{self.endpoint}/models/{self.model}:generateContent?key={self.api_key}"
            payload = {
                "contents": [content],
                "generationConfig": {
                    "temperature": 0.3,
                    "maxOutputTokens": 2000
                }
            }
            response = requests.post(url, headers=self.headers, json=payload, timeout=30)
            response.raise_for_status()
            result = response.json().get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
            return self._parse_structured_response(result)
        except requests.RequestException as e:
            print(f"Gemini API request failed: {e}")
            return []

    def analyze_video_clip(self, video_path: str, prompt: str) -> str:
        with open(video_path, "rb") as video_file:
            video_data = base64.b64encode(video_file.read()).decode('utf-8')
        content = {
            "parts": [
                {
                    "text": prompt
                },
                {
                    "inline_data": {
                        "mime_type": "video/mp4",
                        "data": video_data
                    }
                }
            ]
        }
        payload = {
            "contents": [content],
            "generationConfig": {
                "temperature": 0.3,
                "maxOutputTokens": 1000
            }
        }
        try:
            url = f"{self.endpoint}/models/{self.model}:generateContent?key={self.api_key}"
            response = requests.post(url, headers=self.headers, json=payload, timeout=30)
            response.raise_for_status()
            result = response.json().get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
            return result
        except requests.RequestException as e:
            print(f"Gemini API video analysis failed: {e}")
            return ""

# Main TBPN Guest Detector class
class TBPNGuestDetector:
    def __init__(self, video_url: Optional[str] = None,
                 channel_url: str = "https://www.youtube.com/@TBPNLive/streams",
                 output_dir: str = "/content/output",
                 whisper_model: str = "large-v3",
                 device: str = "cuda",
                 use_gemini: bool = True,
                 gemini_api_key: Optional[str] = None,
                 use_drive: bool = False,
                 drive_output_dir: Optional[str] = None,
                 use_video_analysis: bool = True):
        self.video_url = video_url
        self.channel_url = channel_url
        self.output_dir = drive_output_dir if use_drive else output_dir
        self.whisper_model_name = whisper_model
        self.device = device
        self.compute_type = "int8" if device == "cpu" else "float16"
        self.use_gemini = use_gemini
        self.use_drive = use_drive
        self.drive_output_dir = drive_output_dir
        self.use_video_analysis = use_video_analysis
        self.gemini_detector = None

        if use_gemini:
            if not gemini_api_key:
                raise ValueError("Gemini API key is required when use_gemini is True")
            self.gemini_detector = GeminiGuestDetector(api_key=gemini_api_key)

        os.makedirs(self.output_dir, exist_ok=True)
        self.video_path = None
        self.audio_path = None
        self.transcript_path = None
        self.video_title = "Unknown_Title"
        self.video_id = None
        self.transcript_buffer = ""

    def _sanitize_filename(self, name: str) -> str:
        name = re.sub(r'[^\w\s-]', '', name)
        name = name.strip().replace(' ', '_')
        return name

    def get_video_info(self, target_url: str) -> Dict[str, str]:
        info_cmd = ['yt-dlp', '-j', '--skip-download', target_url, '--playlist-end', '1']
        try:
            info_result = subprocess.run(info_cmd, capture_output=True, text=True, check=True)
            info = json.loads(info_result.stdout)
            return {
                'title': info.get('title', 'Unknown_Title'),
                'id': info.get('id', 'unknown_id')
            }
        except subprocess.CalledProcessError as e:
            raise Exception(f"Failed to get video info: {e.stderr}")

    def download_video(self, target_url: str):
        cmd = ['yt-dlp', '-f', 'best', '--playlist-end', '1', '-o', self.video_path, target_url]
        try:
            subprocess.run(cmd, capture_output=True, text=True, check=True)
            if not os.path.exists(self.video_path):
                raise Exception("Video download failed - file does not exist")
            print(f"Downloaded video to: {self.video_path}")
        except subprocess.CalledProcessError as e:
            raise Exception(f"Failed to download video: {e.stderr}")

    def extract_audio(self):
        print("Extracting audio...")
        cmd = ['ffmpeg', '-i', self.video_path, '-vn', '-acodec', 'libmp3lame', '-ar', '16000', '-ac', '1', '-y', self.audio_path]
        try:
            subprocess.run(cmd, check=True, capture_output=True, text=True)
            print(f"Audio extracted to: {self.audio_path}")
        except subprocess.CalledProcessError as e:
            raise Exception(f"Failed to extract audio: {e.stderr}")

    def transcribe_audio(self) -> List[Dict[str, Any]]:
        print(f"Transcribing audio using {self.whisper_model_name} model on {self.device}...")
        try:
            from faster_whisper import WhisperModel
            model = WhisperModel(
                self.whisper_model_name,
                device=self.device,
                compute_type=self.compute_type,
                download_root="/tmp/whisper-models"
            )
            segments, _ = model.transcribe(
                self.audio_path,
                beam_size=5,
                language="en",
                vad_filter=True,
                vad_parameters=dict(min_silence_duration_ms=500)
            )
            transcript = []
            for seg in segments:
                segment = {
                    'start': seg.start,
                    'end': seg.end,
                    'text': seg.text.strip(),
                }
                self.transcript_buffer += f" {seg.text}"
                if self.gemini_detector:
                    self.gemini_detector.update_transcript(seg.text, seg.start)
                transcript.append(segment)
            with open(self.transcript_path, 'w') as f:
                json.dump(transcript, f, indent=2)
            print(f"Transcription complete. {len(transcript)} segments saved to {self.transcript_path}")
            return transcript
        except Exception as e:
            raise Exception(f"Transcription failed: {str(e)}")

    def extract_video_clip(self, timestamp: float, duration: float = 60.0, output_path: str = None) -> str:
        if not output_path:
            sanitized_name = self._sanitize_filename(f"clip_{int(timestamp)}")
            output_path = os.path.join(self.output_dir, f"{sanitized_name}_clip.mp4")
        cmd = [
            'ffmpeg', '-y', '-ss', str(timestamp), '-i', self.video_path,
            '-t', str(duration), '-c:v', 'libx264', '-c:a', 'aac', output_path
        ]
        try:
            subprocess.run(cmd, check=True, capture_output=True, text=True)
            print(f"Video clip saved to {output_path}")
            return output_path
        except subprocess.CalledProcessError as e:
            print(f"Failed to extract video clip: {e.stderr}")
            return ""

    def take_screenshot(self, timestamp: float, output_path: str):
        cmd = ['ffmpeg', '-ss', str(timestamp), '-i', self.video_path, '-frames:v', '1', '-q:v', '2', output_path]
        try:
            subprocess.run(cmd, check=True, capture_output=True, text=True)
            print(f"Screenshot saved to {output_path}")
        except subprocess.CalledProcessError as e:
            print(f"Failed to take screenshot: {e.stderr}")

    def detect_guests(self) -> List[Dict[str, Any]]:
        print("Analyzing transcript to detect guests...")
        all_results = []
        if self.use_gemini and self.gemini_detector:
            gemini_guests = self.gemini_detector.detect_guests()
            if gemini_guests:
                all_results.append({
                    "detector": "gemini",
                    "guests": gemini_guests,
                    "detection_method": "gemini"
                })
                print(f"\nDetected {len(gemini_guests)} guest(s) with Gemini AI:")
                for i, guest in enumerate(gemini_guests, 1):
                    print(f"  Guest {i}:")
                    print(f"    Name: {guest['name']}")
                    print(f"    X Handle: {guest['x_handle']}")
                    print(f"    Time: {guest['formatted_time']}")
                    print(f"    Confidence: {guest['confidence']}/10")
            else:
                print("No guests detected by Gemini AI.")

        if not all_results:
            print("No guests detected by LLMs. Falling back to regex-based detection...")
            regex_results = self._regex_detect_guests()
            all_results.append(regex_results)

        return all_results

    def _regex_detect_guests(self) -> Dict[str, Any]:
        introduction_patterns = [
            r"(?:our|my|today'?s|special|joining us|welcome|with us|have|introduces?) guest(?:s)? (?:today |tonight |is |are |)(?:is |are |)([\w\s\-''\.]+?)(?:,|\.|!|$)",
            r"(?:joining|welcome|with) (?:us|me) (?:today|tonight|now|is|are) ([\w\s\-''\.]+?)(?:,|\.|!|$)",
            r"(?:I'?m|we'?re) (?:joined|talking|speaking) (?:by|with) ([\w\s\-''\.]+?)(?:,|\.|!|$)",
            r"(?:I|we) have ([\w\s\-''\.]+?) (?:joining|with) (?:us|me)(?:,|\.|!|$)"
        ]
        handle_patterns = [
            r"@(\w+)",
            r"(?:on|at) (?:Twitter|X|twitter|x) (?:as |at |)@?(\w+)",
            r"(?:Twitter|X|twitter|x) handle (?:is |)@?(\w+)",
            r"(?:Twitter|X|twitter|x) @?(\w+)"
        ]
        guests = []
        guest_names = set()
        transcript = self.transcript_buffer
        for pattern in introduction_patterns:
            matches = re.finditer(pattern, transcript, re.IGNORECASE)
            for match in matches:
                name = match.group(1).strip()
                if len(name.split()) >= 2 and len(name) < 50:
                    if not any(self._name_similarity(name, existing) > 0.8 for existing in guest_names):
                        guest_names.add(name)
                        context_start = max(0, match.start() - 200)
                        context_end = min(len(transcript), match.end() + 200)
                        context = transcript[context_start:context_end]
                        x_handle = "Not mentioned"
                        for handle_pattern in handle_patterns:
                            handle_match = re.search(handle_pattern, context, re.IGNORECASE)
                            if handle_match:
                                x_handle = handle_match.group(1)
                                break
                        timestamp, formatted_time = self._find_timestamp_for_text(match.group(0))
                        guests.append({
                            "name": name,
                            "x_handle": x_handle,
                            "confidence": self._calculate_confidence(name, x_handle),
                            "context": context,
                            "timestamp": timestamp,
                            "formatted_time": formatted_time
                        })
        guests = [g for g in guests if g["confidence"] >= 7]
        guests.sort(key=lambda g: g["confidence"], reverse=True)
        result = {
            "detector": "regex",
            "total_guests_detected": len(guests),
            "guests": guests,
            "detection_method": "regex"
        }
        if guests:
            print(f"\nDetected {len(guests)} guest(s) with regex:")
            for i, guest in enumerate(guests, 1):
                print(f"  Guest {i}:")
                print(f"    Name: {guest['name']}")
                print(f"    X Handle: {guest['x_handle']}")
                print(f"    Time: {guest['formatted_time']}")
                print(f"    Confidence: {guest['confidence']}/10")
        else:
            print("No guests detected with regex.")
        return result

    def _name_similarity(self, name1: str, name2: str) -> float:
        name1, name2 = name1.lower(), name2.lower()
        if name1 in name2 or name2 in name1:
            return 0.9
        words1, words2 = set(name1.split()), set(name2.split())
        if not words1 or not words2:
            return 0
        matching = len(words1.intersection(words2))
        total = len(words1.union(words2))
        return matching / total

    def _calculate_confidence(self, name: str, x_handle: str) -> int:
        score = 0
        if name:
            score += 3 if len(name.split()) >= 2 else 1
            score += 2 if 5 <= len(name) <= 40 else 0
        if x_handle and x_handle != "Not mentioned":
            score += 3
            score += 1 if 3 <= len(x_handle) <= 15 and x_handle.isalnum() else 0
        if "our guest" in self.transcript_buffer.lower() or "joining us" in self.transcript_buffer.lower():
            score += 1
        return min(score, 10)

    def _find_timestamp_for_text(self, text: str) -> Tuple[float, str]:
        if not os.path.exists(self.transcript_path):
            return 0.0, "00:00:00"
        with open(self.transcript_path, 'r') as f:
            transcript = json.load(f)
        timestamp, formatted_time = 0.0, "00:00:00"
        simple_text = re.sub(r'[^\w\s]', '', text.lower())
        for segment in transcript:
            segment_text = segment.get('text', '')
            simple_segment = re.sub(r'[^\w\s]', '', segment_text.lower())
            if simple_text in simple_segment:
                timestamp = segment.get('start', 0.0)
                formatted_time = self._format_timestamp(timestamp)
                break
            words = simple_text.split()
            segment_words = simple_segment.split()
            matches = sum(1 for word in words if word in segment_words)
            if matches >= min(3, len(words)):
                timestamp = segment.get('start', 0.0)
                formatted_time = self._format_timestamp(timestamp)
                break
        return timestamp, formatted_time

    def _format_timestamp(self, seconds: float) -> str:
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        return f"{hours:02d}:{minutes:02d}:{secs:02d}"

    def run_pipeline(self, cleanup: bool = True) -> Dict[str, Any]:
        try:
            target_url = self.video_url if self.video_url else self.channel_url
            video_info = self.get_video_info(target_url)
            self.video_title = video_info['title']
            self.video_id = video_info['id']
            sanitized_title = self._sanitize_filename(self.video_title)
            sanitized_id = self._sanitize_filename(self.video_id)

            # Create directory structure
            main_folder = os.path.join(self.output_dir, f"{sanitized_title}_{sanitized_id}")
            video_audio_dir = os.path.join(main_folder, "Video_and_Audio")
            text_output_dir = os.path.join(main_folder, "Text_Output")
            screenshots_dir = os.path.join(main_folder, "Screenshots")
            clips_dir = os.path.join(main_folder, "Clips")

            for dir_path in [video_audio_dir, text_output_dir, screenshots_dir, clips_dir]:
                os.makedirs(dir_path, exist_ok=True)

            self.video_path = os.path.join(video_audio_dir, f"{sanitized_title}_video.mp4")
            self.audio_path = os.path.join(video_audio_dir, f"{sanitized_title}_audio.mp3")
            self.transcript_path = os.path.join(text_output_dir, f"{sanitized_title}_transcript.json")

            # Run the pipeline steps
            self.download_video(target_url)
            self.extract_audio()
            self.transcribe_audio()
            results = self.detect_guests()

            processed_results = {"detectors": []}
            for detector_result in results:
                detector_name = detector_result["detector"]
                guests = detector_result["guests"]

                # Video analysis for guest handles
                if guests and self.use_video_analysis and self.gemini_detector:
                    for guest in guests:
                        sanitized_name = self._sanitize_filename(guest['name'])
                        clip_path = self.extract_video_clip(
                            guest['timestamp'],
                            output_path=os.path.join(clips_dir, f"{sanitized_name}_{guest['formatted_time'].replace(':', '_')}_clip.mp4")
                        )
                        if clip_path:
                            prompt = f"Extract the guest's X handle from this 1-minute video clip. Look for text like '@username'. Return in the format: 'Handle: @username' or 'Handle: Not found'."
                            response = self.gemini_detector.analyze_video_clip(clip_path, prompt)
                            handle_match = re.search(r"Handle: @(\w+)", response)
                            guest["handle_from_video"] = handle_match.group(1) if handle_match else "Not found"
                            if cleanup and os.path.exists(clip_path):
                                os.remove(clip_path)
                        else:
                            guest["handle_from_video"] = "Clip extraction failed"

                processed_results["detectors"].append({
                    "detector": detector_name,
                    "total_guests_detected": len(guests),
                    "guests": guests,
                    "detection_method": detector_result["detection_method"]
                })

            # Save results
            output_path = os.path.join(text_output_dir, f"{sanitized_title}_guest_results.json")
            with open(output_path, 'w') as f:
                json.dump(processed_results, f, indent=2)
            print(f"Results saved to {output_path}")

            # Cleanup if requested
            if cleanup:
                print("Cleaning up temporary files...")
                if os.path.exists(self.video_path):
                    os.remove(self.video_path)
                if os.path.exists(self.audio_path):
                    os.remove(self.audio_path)

            return processed_results

        except Exception as e:
            print(f"Error in pipeline: {str(e)}")
            return {"error": str(e), "detectors": []}

def main():
    # Install dependencies first
    install_dependencies()

    # Get secrets from Colab
    from google.colab import userdata

    gemini_api_key = userdata.get('GEMINI_API_KEY')
    if not gemini_api_key:
        raise ValueError("Gemini API key not found in secrets. Add it as 'GEMINI_API_KEY'.")

    # Determine if we're using Google Drive
    use_drive = True
    drive_output_dir = setup_drive(use_drive) if use_drive else None

    # Check for CUDA
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Using device: {device}")

    # Initialize detector
    detector = TBPNGuestDetector(
        video_url="https://www.youtube.com/watch?v=dIcfh1rDAsE",  # Replace with your target video
        channel_url="https://www.youtube.com/@TBPNLive/streams",
        output_dir="/content/output",
        whisper_model="large-v3",
        device=device,
        use_gemini=True,
        gemini_api_key=gemini_api_key,
        use_drive=use_drive,
        drive_output_dir=drive_output_dir,
        use_video_analysis=True
    )

    # Run pipeline
    results = detector.run_pipeline(cleanup=True)

    # Display summary
    print("\nPipeline completed!")
    if use_drive:
        print(f"Outputs saved to Google Drive: {detector.output_dir}")
    else:
        print("Files saved to local Colab environment.")

    # Print detected guests
    for detector_result in results.get("detectors", []):
        detector_name = detector_result["detector"]
        guests = detector_result["guests"]
        if guests:
            print(f"\nDetected {len(guests)} guest(s) with {detector_name}:")
            for i, guest in enumerate(guests, 1):
                print(f"  Guest {i}:")
                print(f"    Name: {guest['name']}")
                print(f"    X Handle: {guest['x_handle']}")
                print(f"    Handle from Video: {guest.get('handle_from_video', 'N/A')}")
                print(f"    Time: {guest['formatted_time']}")
                print(f"    Confidence: {guest['confidence']}/10")
        else:
            print(f"No guests detected with {detector_name}.")

if __name__ == "__main__":
    main()

Installing dependencies...
Dependencies installed successfully.
