<a href="https://colab.research.google.com/github/OmegaTrees/JupiterLab-Render/blob/main/Extract%20audio%20or%20subtitles.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# Install dependencies
!pip install nest-asyncio pyrogram tgcrypto



In [6]:
# Install FFmpeg for video processing
!apt update && apt install -y ffmpeg

[33m0% [Working][0m            Hit:1 http://security.ubuntu.com/ubuntu jammy-security InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:4 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Fetched 3,632 B in 3s (1,300 B/s)
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
35 packages can be upgraded. Run 'apt list --upgradable' to see them.
[1;33mW: [0mSkipping acquire of configur

In [7]:
# Progress tracker module for download progress
%%writefile progress_tracker.py
import time
import asyncio
import threading
import os
from pyrogram.types import Message
from typing import Optional
from concurrent.futures import ThreadPoolExecutor

def size_unit(size_bytes):
    """Convert bytes to human readable format"""
    if size_bytes == 0:
        return "0B"
    size_names = ["B", "KB", "MB", "GB", "TB"]
    i = 0
    while size_bytes >= 1024 and i < len(size_names) - 1:
        size_bytes /= 1024.0
        i += 1
    return f"{size_bytes:.2f}{size_names[i]}"

def create_progress_bar(percentage, length=20):
    """Create a visual progress bar"""
    filled = int(length * percentage / 100)
    bar = '█' * filled + '░' * (length - filled)
    return f"[{bar}]"

def format_time(seconds):
    """Format seconds into readable time"""
    if seconds < 0:
        return "Calculating..."
    if seconds < 60:
        return f"{int(seconds)}s"
    elif seconds < 3600:
        minutes = int(seconds // 60)
        secs = int(seconds % 60)
        return f"{minutes}m {secs}s"
    else:
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        return f"{hours}h {minutes}m"

class DownloadTracker:
    """Track download progress and status"""
    def __init__(self):
        self.reset()
        self.loop = None

    def reset(self):
        """Reset download tracking"""
        self.status_message = None
        self.start_time = None
        self.current_bytes = 0
        self.total_bytes = 0
        self.speed = 0
        self.eta = 0
        self.filename = ""
        self.is_downloading = False
        self.last_update = 0

    def set_loop(self, loop):
        """Set the event loop for async operations"""
        self.loop = loop

    def start_download(self, filename, total_size):
        """Start download tracking"""
        self.filename = filename
        self.total_bytes = total_size
        self.start_time = time.time()
        self.current_bytes = 0
        self.is_downloading = True
        self.last_update = 0

    def update_progress(self, current_bytes, total_bytes):
        """Update download progress"""
        now = time.time()

        # Update every 2 seconds to avoid spam
        if now - self.last_update < 2:
            return
        self.last_update = now

        self.current_bytes = current_bytes
        if total_bytes > 0:
            self.total_bytes = total_bytes

        elapsed = now - self.start_time if self.start_time else 0

        if elapsed > 0:
            self.speed = current_bytes / elapsed
            if self.speed > 0 and self.total_bytes > 0:
                self.eta = (self.total_bytes - current_bytes) / self.speed
            else:
                self.eta = 0
        else:
            self.speed = 0
            self.eta = 0

        self._update_status()

    def complete_download(self, success=True):
        """Complete download tracking"""
        self.is_downloading = False
        if success and self.total_bytes > 0:
            self.current_bytes = self.total_bytes
        self._update_status()

    def _update_status(self):
        """Update the status message"""
        if not self.status_message or not self.loop:
            return

        try:
            if self.is_downloading:
                if self.total_bytes > 0:
                    percentage = (self.current_bytes / self.total_bytes) * 100
                else:
                    percentage = 0

                progress_bar = create_progress_bar(percentage)

                status_text = (
                    f"📥 **DOWNLOADING VIDEO FILE**\n\n"
                    f"🏷️ **File:** {self.filename[:40]}{'...' if len(self.filename) > 40 else ''}\n\n"
                    f"{progress_bar} {percentage:.1f}%\n\n"
                    f"📊 **Progress:** {size_unit(self.current_bytes)}"
                )

                if self.total_bytes > 0:
                    status_text += f" / {size_unit(self.total_bytes)}"
                else:
                    status_text += " / Unknown"

                status_text += (
                    f"\n⚡ **Speed:** {size_unit(self.speed)}/s\n"
                    f"⏱️ **ETA:** {format_time(self.eta)}\n\n"
                    f"🔧 **Engine:** Pyrogram\n"
                    f"⏳ **Please wait while downloading...**"
                )
            else:
                status_text = (
                    f"✅ **DOWNLOAD COMPLETE!**\n\n"
                    f"🏷️ **File:** {self.filename}\n"
                    f"📊 **Size:** {size_unit(self.current_bytes)}\n"
                    f"⏱️ **Time:** {format_time(time.time() - self.start_time) if self.start_time else 'Unknown'}\n\n"
                    f"🔍 **Starting video analysis...**"
                )

            # Update the status message using the event loop
            asyncio.run_coroutine_threadsafe(
                self._edit_message_safe(self.status_message, status_text),
                self.loop
            )

        except Exception as e:
            print(f"Error updating download status: {str(e)}")

    async def _edit_message_safe(self, message, text):
        """Safely edit a message with error handling"""
        try:
            await message.edit_text(text)
        except Exception:
            pass  # Ignore telegram errors

    def set_status_message(self, message):
        """Set the status message to update"""
        self.status_message = message

Overwriting progress_tracker.py


In [8]:

API_ID = 25217592
API_HASH = "82066a5588eb307441fac11da76a912a"
BOT_TOKEN = "7748534332:AAEOHI4MaFq0ytEb5ZPCx4mh2GSJODJGOvw"

print("\n✅ Configuration set!")
print("📝 Your credentials have been entered successfully.")


✅ Configuration set!
📝 Your credentials have been entered successfully.


In [9]:

%%writefile video_extractor_bot.py
import os
import re
import asyncio
import logging
import json
import shutil
import subprocess
import tempfile
import time
import glob
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor

from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, CallbackQuery
from pyrogram.errors import FloodWait

# Import progress tracker
from progress_tracker import DownloadTracker, size_unit, format_time

# Set up logging for Colab
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Constants
MAX_FILE_SIZE = 2 * 1024 * 1024 * 1024  # 2GB limit
SUPPORTED_VIDEO_FORMATS = [
    '.mp4', '.mkv', '.avi', '.mov', '.flv', '.wmv', '.webm', '.m4v',
    '.3gp', '.ts', '.mts', '.m2ts', '.mpg', '.mpeg', '.ogv', '.asf'
]

# Create necessary directories
DOWNLOAD_DIR = "/content/downloads"
OUTPUT_DIR = "/content/outputs"
TEMP_DIR = "/content/temp"

os.makedirs(DOWNLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)

def format_duration(seconds):
    """Format duration in seconds to readable format"""
    if not seconds or seconds == "N/A":
        return "Unknown"
    try:
        seconds = float(seconds)
        hours = int(seconds // 3600)
        minutes = int((seconds % 3600) // 60)
        secs = int(seconds % 60)
        if hours > 0:
            return f"{hours:02d}:{minutes:02d}:{secs:02d}"
        else:
            return f"{minutes:02d}:{secs:02d}"
    except:
        return "Unknown"

def get_video_info(file_path):
    """Extract detailed video information using ffprobe"""
    try:
        cmd = [
            'ffprobe', '-v', 'quiet', '-print_format', 'json',
            '-show_format', '-show_streams', file_path
        ]

        result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)

        if result.returncode != 0:
            logger.error(f"ffprobe error: {result.stderr}")
            return None

        data = json.loads(result.stdout)

        # Parse streams
        video_streams = []
        audio_streams = []
        subtitle_streams = []

        for stream in data.get('streams', []):
            stream_info = {
                'index': stream.get('index'),
                'codec_name': stream.get('codec_name', 'Unknown'),
                'codec_long_name': stream.get('codec_long_name', 'Unknown'),
                'duration': stream.get('duration', 'N/A'),
                'bit_rate': stream.get('bit_rate', 'N/A'),
                'language': stream.get('tags', {}).get('language', 'Unknown'),
                'title': stream.get('tags', {}).get('title', ''),
                'disposition': stream.get('disposition', {})
            }

            if stream['codec_type'] == 'video':
                stream_info.update({
                    'width': stream.get('width', 'N/A'),
                    'height': stream.get('height', 'N/A'),
                    'fps': eval(stream.get('r_frame_rate', '0/1')) if stream.get('r_frame_rate') else 'N/A',
                    'pixel_format': stream.get('pix_fmt', 'N/A')
                })
                video_streams.append(stream_info)

            elif stream['codec_type'] == 'audio':
                stream_info.update({
                    'channels': stream.get('channels', 'N/A'),
                    'channel_layout': stream.get('channel_layout', 'N/A'),
                    'sample_rate': stream.get('sample_rate', 'N/A')
                })
                audio_streams.append(stream_info)

            elif stream['codec_type'] == 'subtitle':
                subtitle_streams.append(stream_info)

        # Get format info
        format_info = data.get('format', {})

        return {
            'format': {
                'filename': format_info.get('filename', ''),
                'format_name': format_info.get('format_name', 'Unknown'),
                'duration': format_info.get('duration', 'N/A'),
                'size': format_info.get('size', 'N/A'),
                'bit_rate': format_info.get('bit_rate', 'N/A')
            },
            'video_streams': video_streams,
            'audio_streams': audio_streams,
            'subtitle_streams': subtitle_streams
        }

    except subprocess.TimeoutExpired:
        logger.error("ffprobe timeout")
        return None
    except Exception as e:
        logger.error(f"Error analyzing video: {str(e)}")
        return None

def extract_stream(input_file, output_file, stream_type, stream_index, progress_callback=None):
    """Extract specific stream from video with appropriate container"""
    try:
        if stream_type == 'audio':
            # Get audio codec info first
            probe_cmd = [
                'ffprobe', '-v', 'quiet', '-select_streams', f'a:{stream_index}',
                '-show_entries', 'stream=codec_name', '-of', 'csv=p=0', input_file
            ]
            result = subprocess.run(probe_cmd, capture_output=True, text=True)
            codec = result.stdout.strip() if result.returncode == 0 else 'unknown'

            # Choose appropriate container and codec based on source
            if codec in ['aac']:
                # Use M4A for AAC
                output_file = output_file.replace('.m4a', '.m4a')
                cmd = [
                    'ffmpeg', '-i', input_file, '-map', f'0:a:{stream_index}',
                    '-c', 'copy', '-y', output_file
                ]
            elif codec in ['mp3']:
                # Use MP3 for MP3
                output_file = output_file.replace('.m4a', '.mp3')
                cmd = [
                    'ffmpeg', '-i', input_file, '-map', f'0:a:{stream_index}',
                    '-c', 'copy', '-y', output_file
                ]
            elif codec in ['eac3', 'ac3']:
                # Use AC3 for EAC3/AC3
                output_file = output_file.replace('.m4a', '.ac3')
                cmd = [
                    'ffmpeg', '-i', input_file, '-map', f'0:a:{stream_index}',
                    '-c', 'copy', '-y', output_file
                ]
            elif codec in ['dts']:
                # Use DTS for DTS
                output_file = output_file.replace('.m4a', '.dts')
                cmd = [
                    'ffmpeg', '-i', input_file, '-map', f'0:a:{stream_index}',
                    '-c', 'copy', '-y', output_file
                ]
            elif codec in ['flac']:
                # Use FLAC for FLAC
                output_file = output_file.replace('.m4a', '.flac')
                cmd = [
                    'ffmpeg', '-i', input_file, '-map', f'0:a:{stream_index}',
                    '-c', 'copy', '-y', output_file
                ]
            elif codec in ['opus']:
                # Use OGG for Opus
                output_file = output_file.replace('.m4a', '.ogg')
                cmd = [
                    'ffmpeg', '-i', input_file, '-map', f'0:a:{stream_index}',
                    '-c', 'copy', '-y', output_file
                ]
            elif codec in ['vorbis']:
                # Use OGG for Vorbis
                output_file = output_file.replace('.m4a', '.ogg')
                cmd = [
                    'ffmpeg', '-i', input_file, '-map', f'0:a:{stream_index}',
                    '-c', 'copy', '-y', output_file
                ]
            else:
                # Default to MKA for unknown or incompatible codecs
                output_file = output_file.replace('.m4a', '.mka')
                cmd = [
                    'ffmpeg', '-i', input_file, '-map', f'0:a:{stream_index}',
                    '-c', 'copy', '-y', output_file
                ]

        elif stream_type == 'subtitle':
            # Always convert subtitles to SRT for compatibility
            cmd = [
                'ffmpeg', '-i', input_file, '-map', f'0:s:{stream_index}',
                '-c:s', 'srt', '-y', output_file
            ]
        elif stream_type == 'video':
            cmd = [
                'ffmpeg', '-i', input_file, '-map', f'0:v:{stream_index}',
                '-c', 'copy', '-y', output_file
            ]
        else:
            return False, "Invalid stream type"

        logger.info(f"Executing: {' '.join(cmd)}")

        process = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            universal_newlines=True
        )

        stdout, stderr = process.communicate()

        if process.returncode == 0:
            return True, f"Extraction successful. Output: {output_file}"
        else:
            logger.error(f"FFmpeg error: {stderr}")
            return False, f"FFmpeg error: {stderr}"

    except Exception as e:
        logger.error(f"Error extracting stream: {str(e)}")
        return False, str(e)

def remove_stream(input_file, output_file, stream_type, stream_index, progress_callback=None):
    """Remove specific stream from video - KEEP ORIGINAL FORMAT!"""
    try:
        # Keep the same format as input
        if stream_type == 'audio':
            cmd = [
                'ffmpeg', '-i', input_file,
                '-map', '0',  # Map all streams
                '-map', f'-0:a:{stream_index}',  # Exclude specific audio stream
                '-c', 'copy',  # Copy without re-encoding
                '-y', output_file
            ]
        elif stream_type == 'subtitle':
            cmd = [
                'ffmpeg', '-i', input_file,
                '-map', '0',
                '-map', f'-0:s:{stream_index}',
                '-c', 'copy',
                '-y', output_file
            ]
        elif stream_type == 'video':
            cmd = [
                'ffmpeg', '-i', input_file,
                '-map', '0',
                '-map', f'-0:v:{stream_index}',
                '-c', 'copy',
                '-y', output_file
            ]
        else:
            return False, "Invalid stream type"

        logger.info(f"Executing: {' '.join(cmd)}")

        process = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            universal_newlines=True
        )

        stdout, stderr = process.communicate()

        if process.returncode == 0:
            return True, f"Stream removal successful. Output: {output_file}"
        else:
            logger.error(f"FFmpeg error: {stderr}")
            return False, f"FFmpeg error: {stderr}"

    except Exception as e:
        logger.error(f"Error removing stream: {str(e)}")
        return False, str(e)

class BotState:
    """Class to manage bot state"""
    def __init__(self):
        self.download_tracker = DownloadTracker()
        self.executor = ThreadPoolExecutor(max_workers=2)
        self.reset()

    def reset(self):
        """Reset all state variables"""
        self.current_file = None
        self.video_info = None
        self.is_processing = False
        self.user_sessions = {}  # Store user-specific data
        if hasattr(self, 'download_tracker'):
            self.download_tracker.reset()
        self.clean_directories()
        logger.info("Bot state reset")

    def clean_directories(self):
        """Clean up directories"""
        try:
            for directory in [DOWNLOAD_DIR, OUTPUT_DIR, TEMP_DIR]:
                for file_path in glob.glob(f"{directory}/*"):
                    try:
                        if os.path.exists(file_path):
                            if os.path.isfile(file_path):
                                os.remove(file_path)
                            else:
                                shutil.rmtree(file_path)
                    except Exception as e:
                        logger.error(f"Error removing {file_path}: {str(e)}")
        except Exception as e:
            logger.error(f"Error cleaning directories: {str(e)}")

class VideoExtractorBot:
    def __init__(self):
        self.bot_state = BotState()

    def create_stream_info_text(self, video_info):
        """Create formatted text showing all stream information"""
        if not video_info:
            return "❌ No video information available"

        text = "📹 **VIDEO STREAM ANALYSIS**\n\n"

        # Format info
        format_info = video_info['format']
        text += f"📁 **File Information:**\n"
        text += f"• **Format:** {format_info['format_name']}\n"
        text += f"• **Duration:** {format_duration(format_info['duration'])}\n"
        text += f"• **Size:** {size_unit(int(format_info['size'])) if format_info['size'] != 'N/A' else 'Unknown'}\n"
        text += f"• **Bitrate:** {format_info['bit_rate']} bps\n\n"

        # Video streams
        if video_info['video_streams']:
            text += f"🎬 **VIDEO STREAMS ({len(video_info['video_streams'])}):**\n"
            for i, stream in enumerate(video_info['video_streams']):
                text += f"**Stream {i}:**\n"
                text += f"• **Codec:** {stream['codec_name']} ({stream['codec_long_name']})\n"
                text += f"• **Resolution:** {stream['width']}x{stream['height']}\n"

                # Fix FPS formatting
                fps_value = stream['fps']
                if isinstance(fps_value, (int, float)) and fps_value != 'N/A':
                    fps_text = f"{fps_value:.2f}"
                else:
                    fps_text = "N/A"
                text += f"• **FPS:** {fps_text}\n"

                text += f"• **Language:** {stream['language']}\n"
                if stream['title']:
                    text += f"• **Title:** {stream['title']}\n"
                text += "\n"

        # Audio streams
        if video_info['audio_streams']:
            text += f"🔊 **AUDIO STREAMS ({len(video_info['audio_streams'])}):**\n"
            for i, stream in enumerate(video_info['audio_streams']):
                text += f"**Stream {i}:**\n"
                text += f"• **Codec:** {stream['codec_name']} ({stream['codec_long_name']})\n"
                text += f"• **Channels:** {stream['channels']} ({stream['channel_layout']})\n"
                text += f"• **Sample Rate:** {stream['sample_rate']} Hz\n"
                text += f"• **Language:** {stream['language']}\n"
                if stream['title']:
                    text += f"• **Title:** {stream['title']}\n"
                text += "\n"

        # Subtitle streams
        if video_info['subtitle_streams']:
            text += f"📝 **SUBTITLE STREAMS ({len(video_info['subtitle_streams'])}):**\n"
            for i, stream in enumerate(video_info['subtitle_streams']):
                text += f"**Stream {i}:**\n"
                text += f"• **Codec:** {stream['codec_name']} ({stream['codec_long_name']})\n"
                text += f"• **Language:** {stream['language']}\n"
                if stream['title']:
                    text += f"• **Title:** {stream['title']}\n"
                text += "\n"

        text += "🔧 **Use the buttons below to extract or remove streams!**"
        return text

    def create_stream_keyboard(self, video_info):
        """Create inline keyboard for stream operations"""
        if not video_info:
            return None

        keyboard = []

        # Extract buttons
        if video_info['video_streams']:
            keyboard.append([InlineKeyboardButton("🎬 Extract Video Streams", callback_data="extract_video_menu")])

        if video_info['audio_streams']:
            keyboard.append([InlineKeyboardButton("🔊 Extract Audio Streams", callback_data="extract_audio_menu")])

        if video_info['subtitle_streams']:
            keyboard.append([InlineKeyboardButton("📝 Extract Subtitle Streams", callback_data="extract_subtitle_menu")])

        # Remove buttons
        keyboard.append([InlineKeyboardButton("❌ Remove Streams", callback_data="remove_menu")])

        # Utility buttons
        keyboard.append([
            InlineKeyboardButton("🔄 New File", callback_data="reset"),
            InlineKeyboardButton("ℹ️ Info", callback_data="show_info")
        ])

        return InlineKeyboardMarkup(keyboard)

    def create_extract_menu(self, stream_type, streams):
        """Create menu for extracting specific streams"""
        keyboard = []

        for i, stream in enumerate(streams):
            if stream_type == 'video':
                label = f"📹 V{i}: {stream['codec_name']} ({stream['width']}x{stream['height']})"
            elif stream_type == 'audio':
                label = f"🔊 A{i}: {stream['codec_name']} ({stream['language']})"
            elif stream_type == 'subtitle':
                label = f"📝 S{i}: {stream['codec_name']} ({stream['language']})"

            # Truncate long labels
            if len(label) > 35:
                label = label[:32] + "..."

            keyboard.append([InlineKeyboardButton(label, callback_data=f"extract_{stream_type}_{i}")])

        keyboard.append([InlineKeyboardButton("🔙 Back", callback_data="back_to_main")])
        return InlineKeyboardMarkup(keyboard)

    def create_remove_menu(self, video_info):
        """Create menu for removing streams"""
        keyboard = []

        # Video streams
        for i, stream in enumerate(video_info['video_streams']):
            label = f"❌ Remove V{i}: {stream['codec_name']}"
            if len(label) > 35:
                label = label[:32] + "..."
            keyboard.append([InlineKeyboardButton(label, callback_data=f"remove_video_{i}")])

        # Audio streams
        for i, stream in enumerate(video_info['audio_streams']):
            label = f"❌ Remove A{i}: {stream['codec_name']} ({stream['language']})"
            if len(label) > 35:
                label = label[:32] + "..."
            keyboard.append([InlineKeyboardButton(label, callback_data=f"remove_audio_{i}")])

        # Subtitle streams
        for i, stream in enumerate(video_info['subtitle_streams']):
            label = f"❌ Remove S{i}: {stream['codec_name']} ({stream['language']})"
            if len(label) > 35:
                label = label[:32] + "..."
            keyboard.append([InlineKeyboardButton(label, callback_data=f"remove_subtitle_{i}")])

        keyboard.append([InlineKeyboardButton("🔙 Back", callback_data="back_to_main")])
        return InlineKeyboardMarkup(keyboard)

# Initialize bot
extractor_bot = VideoExtractorBot()

# Bot setup - get credentials from global scope
app = Client("video_extractor_bot", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN)

@app.on_message(filters.command("start"))
async def start_command(client, message: Message):
    """Handle /start command"""
    welcome_text = (
        "🎬 **VIDEO STREAM EXTRACTOR BOT**\n\n"
        "Welcome to the most advanced video stream processing bot!\n\n"
        "**🌟 Features:**\n"
        "• 📹 Extract video streams\n"
        "• 🔊 Extract audio streams\n"
        "• 📝 Extract subtitle streams\n"
        "• ❌ Remove unwanted streams\n"
        "• 🔍 Detailed video analysis\n\n"
        "**📤 How to use:**\n"
        "1. Send me a video file\n"
        "2. Wait for analysis\n"
        "3. Choose your operation\n"
        "4. Download the result!\n\n"
        "**📋 Supported formats:**\n"
        "`MP4, MKV, AVI, MOV, FLV, WMV, WEBM, M4V, 3GP, TS, MPG, MPEG`\n\n"
        "**⚠️ Limits:**\n"
        "• Max file size: 2GB\n"
        "• Processing time: ~5-10 minutes\n\n"
        "🚀 **Send me a video file to get started!**"
    )

    keyboard = InlineKeyboardMarkup([
        [InlineKeyboardButton("📚 Help", callback_data="help"),
         InlineKeyboardButton("ℹ️ About", callback_data="about")]
    ])

    await message.reply_text(welcome_text, reply_markup=keyboard)

@app.on_message(filters.command("reset"))
async def reset_command(client, message: Message):
    """Handle /reset command"""
    extractor_bot.bot_state.reset()
    await message.reply_text(
        "🔄 **Bot Reset Complete!**\n\n"
        "All files have been cleared and the bot is ready for a new video.\n"
        "Send me a video file to start processing!"
    )

@app.on_message(filters.document | filters.video)
async def handle_video_file(client, message: Message):
    """Handle video file uploads"""
    try:
        # Check if already processing
        if extractor_bot.bot_state.is_processing:
            await message.reply_text(
                "⚠️ **Already Processing**\n\n"
                "Please wait for the current operation to complete before sending another file.\n"
                "Use /reset to cancel current operation."
            )
            return

        # Get file info
        file_info = message.document or message.video
        file_name = file_info.file_name
        file_size = file_info.file_size

        # Check file size
        if file_size > MAX_FILE_SIZE:
            await message.reply_text(
                f"❌ **File Too Large**\n\n"
                f"File size: {size_unit(file_size)}\n"
                f"Maximum allowed: {size_unit(MAX_FILE_SIZE)}\n\n"
                f"Please send a smaller video file."
            )
            return

        # Check file format
        file_ext = Path(file_name).suffix.lower()
        if file_ext not in SUPPORTED_VIDEO_FORMATS:
            await message.reply_text(
                "❌ **Unsupported Format**\n\n"
                f"File: `{file_name}`\n"
                f"Extension: `{file_ext}`\n\n"
                f"**Supported formats:**\n"
                f"`{', '.join(SUPPORTED_VIDEO_FORMATS)}`"
            )
            return

        # Start processing
        extractor_bot.bot_state.is_processing = True
        extractor_bot.bot_state.reset()

        # Send initial status
        status_msg = await message.reply_text(
            "📥 **PREPARING DOWNLOAD**\n\n"
            f"🏷️ **File:** {file_name}\n"
            f"📊 **Size:** {size_unit(file_size)}\n"
            f"🔧 **Engine:** Pyrogram\n\n"
            f"⏳ **Initializing download...**"
        )

        # Set up download tracking
        extractor_bot.bot_state.download_tracker.set_loop(asyncio.get_event_loop())
        extractor_bot.bot_state.download_tracker.set_status_message(status_msg)
        extractor_bot.bot_state.download_tracker.start_download(file_name, file_size)

        # Download file
        download_path = os.path.join(DOWNLOAD_DIR, file_name)

        await client.download_media(
            message,
            file_name=download_path,
            progress=extractor_bot.bot_state.download_tracker.update_progress
        )

        # Complete download
        extractor_bot.bot_state.download_tracker.complete_download(True)

        # Analyze video
        await status_msg.edit_text(
            "🔍 **ANALYZING VIDEO**\n\n"
            f"🏷️ **File:** {file_name}\n"
            f"📊 **Size:** {size_unit(file_size)}\n\n"
            "⏳ **Please wait while analyzing streams...**"
        )

        # Get video information
        video_info = await asyncio.get_event_loop().run_in_executor(
            extractor_bot.bot_state.executor,
            get_video_info,
            download_path
        )

        if not video_info:
            await status_msg.edit_text(
                "❌ **Analysis Failed**\n\n"
                f"Could not analyze the video file: `{file_name}`\n\n"
                "This might happen if:\n"
                "• File is corrupted\n"
                "• Unsupported codec\n"
                "• File is not a valid video\n\n"
                "Try with a different video file."
            )
            extractor_bot.bot_state.is_processing = False
            return

        # Store information
        extractor_bot.bot_state.current_file = download_path
        extractor_bot.bot_state.video_info = video_info

        # Show stream information and options
        info_text = extractor_bot.create_stream_info_text(video_info)
        keyboard = extractor_bot.create_stream_keyboard(video_info)

        await status_msg.edit_text(info_text, reply_markup=keyboard)
        extractor_bot.bot_state.is_processing = False

    except Exception as e:
        logger.error(f"Error handling video file: {str(e)}")
        try:
            await message.reply_text(
                f"❌ **Error occurred:**\n\n`{str(e)}`\n\n"
                f"Please try again or contact support."
            )
        except:
            pass
        extractor_bot.bot_state.is_processing = False

@app.on_callback_query()
async def handle_callback_query(client, callback_query: CallbackQuery):
    """Handle inline keyboard callbacks"""
    try:
        data = callback_query.data
        message = callback_query.message

        if data == "help":
            help_text = (
                "📚 **HELP & GUIDE**\n\n"
                "**🔧 How to Extract Streams:**\n"
                "1. Send a video file\n"
                "2. Wait for analysis\n"
                "3. Click on extract buttons\n"
                "4. Choose specific stream\n"
                "5. Download extracted file\n\n"
                "**❌ How to Remove Streams:**\n"
                "1. Click 'Remove Streams'\n"
                "2. Select stream to remove\n"
                "3. Download modified video\n\n"
                "**💡 Tips:**\n"
                "• Extraction keeps original quality\n"
                "• Removal maintains original format\n"
                "• Subtitles are converted to SRT\n"
                "• Use /reset to start over"
            )
            await callback_query.edit_message_text(help_text)

        elif data == "about":
            about_text = (
                "ℹ️ **ABOUT THIS BOT**\n\n"
                "**🤖 Video Stream Extractor Bot**\n"
                "Version: 2.0\n"
                "Engine: FFmpeg + Pyrogram\n\n"
                "**👨‍💻 Developer:** Advanced Bot Solutions\n"
                "**🛠️ Technology:** Python, FFmpeg\n"
                "**🏃‍♂️ Platform:** Google Colab\n\n"
                "**⚡ Features:**\n"
                "• Stream extraction without quality loss\n"
                "• Multiple format support\n"
                "• Real-time progress tracking\n"
                "• Advanced video analysis\n\n"
                "**📞 Support:** Use /reset if stuck"
            )
            await callback_query.edit_message_text(about_text)

        elif data == "show_info":
            if extractor_bot.bot_state.video_info:
                info_text = extractor_bot.create_stream_info_text(extractor_bot.bot_state.video_info)
                keyboard = extractor_bot.create_stream_keyboard(extractor_bot.bot_state.video_info)
                await callback_query.edit_message_text(info_text, reply_markup=keyboard)
            else:
                await callback_query.answer("No video loaded. Please send a video file first.")

        elif data == "reset":
            extractor_bot.bot_state.reset()
            await callback_query.edit_message_text(
                "🔄 **Reset Complete!**\n\n"
                "Send me a new video file to start processing."
            )

        elif data == "back_to_main":
            if extractor_bot.bot_state.video_info:
                info_text = extractor_bot.create_stream_info_text(extractor_bot.bot_state.video_info)
                keyboard = extractor_bot.create_stream_keyboard(extractor_bot.bot_state.video_info)
                await callback_query.edit_message_text(info_text,                 reply_markup=keyboard)

        elif data.startswith("extract_"):
            await handle_extract_menu(callback_query, data)

        elif data == "remove_menu":
            if extractor_bot.bot_state.video_info:
                keyboard = extractor_bot.create_remove_menu(extractor_bot.bot_state.video_info)
                await callback_query.edit_message_text(
                    "❌ **REMOVE STREAMS**\n\n"
                    "Select a stream to remove from the video:\n\n"
                    "⚠️ **Warning:** This will create a new video file without the selected stream.",
                    reply_markup=keyboard
                )

        elif data.startswith("remove_"):
            await handle_remove_operation(callback_query, data)

        await callback_query.answer()

    except Exception as e:
        logger.error(f"Error in callback query: {str(e)}")
        try:
            await callback_query.answer(f"Error: {str(e)}")
        except:
            pass

async def handle_extract_menu(callback_query: CallbackQuery, data: str):
    """Handle extract menu selections"""
    try:
        video_info = extractor_bot.bot_state.video_info
        if not video_info:
            await callback_query.answer("No video loaded.")
            return

        if data == "extract_video_menu":
            keyboard = extractor_bot.create_extract_menu("video", video_info['video_streams'])
            await callback_query.edit_message_text(
                "🎬 **EXTRACT VIDEO STREAMS**\n\n"
                "Select a video stream to extract:\n\n"
                "📝 **Note:** Extracted video will maintain original quality and format.",
                reply_markup=keyboard
            )

        elif data == "extract_audio_menu":
            keyboard = extractor_bot.create_extract_menu("audio", video_info['audio_streams'])
            await callback_query.edit_message_text(
                "🔊 **EXTRACT AUDIO STREAMS**\n\n"
                "Select an audio stream to extract:\n\n"
                "📝 **Note:** Audio will be extracted with appropriate container format.",
                reply_markup=keyboard
            )

        elif data == "extract_subtitle_menu":
            keyboard = extractor_bot.create_extract_menu("subtitle", video_info['subtitle_streams'])
            await callback_query.edit_message_text(
                "📝 **EXTRACT SUBTITLE STREAMS**\n\n"
                "Select a subtitle stream to extract:\n\n"
                "📝 **Note:** Subtitles will be converted to SRT format for compatibility.",
                reply_markup=keyboard
            )

        elif data.startswith("extract_video_") or data.startswith("extract_audio_") or data.startswith("extract_subtitle_"):
            await handle_extract_operation(callback_query, data)

    except Exception as e:
        logger.error(f"Error in extract menu: {str(e)}")
        await callback_query.answer(f"Error: {str(e)}")

async def handle_extract_operation(callback_query: CallbackQuery, data: str):
    """Handle stream extraction operations"""
    try:
        # Parse operation data
        parts = data.split("_")
        stream_type = parts[1]
        stream_index = int(parts[2])

        if not extractor_bot.bot_state.current_file:
            await callback_query.answer("No video file loaded.")
            return

        # Set processing state
        extractor_bot.bot_state.is_processing = True

        # Show processing message
        await callback_query.edit_message_text(
            f"⚙️ **EXTRACTING {stream_type.upper()} STREAM**\n\n"
            f"🔧 **Stream:** {stream_type.title()} #{stream_index}\n"
            f"⏳ **Status:** Processing...\n\n"
            f"Please wait while extracting the stream from video."
        )

        # Create output filename with dynamic extension
        original_name = Path(extractor_bot.bot_state.current_file).stem
        if stream_type == 'audio':
            # Start with .m4a, but extract_stream will change it based on codec
            output_filename = f"{original_name}_audio_{stream_index}.m4a"
        elif stream_type == 'subtitle':
            output_filename = f"{original_name}_subtitle_{stream_index}.srt"
        elif stream_type == 'video':
            original_ext = Path(extractor_bot.bot_state.current_file).suffix
            output_filename = f"{original_name}_video_{stream_index}{original_ext}"

        output_path = os.path.join(OUTPUT_DIR, output_filename)

        # Extract stream
        success, message = await asyncio.get_event_loop().run_in_executor(
            extractor_bot.bot_state.executor,
            extract_stream,
            extractor_bot.bot_state.current_file,
            output_path,
            stream_type,
            stream_index
        )

        if success:
            # Get the actual output filename (may have changed during extraction)
            actual_output_file = message.split("Output: ")[-1] if "Output: " in message else output_path
            actual_filename = os.path.basename(actual_output_file)

            # Send the extracted file
            file_size = os.path.getsize(actual_output_file)
            await callback_query.edit_message_text(
                f"✅ **EXTRACTION COMPLETE!**\n\n"
                f"🏷️ **File:** {actual_filename}\n"
                f"📊 **Size:** {size_unit(file_size)}\n"
                f"🔧 **Type:** {stream_type.title()} Stream #{stream_index}\n\n"
                f"📤 **Uploading...**"
            )

            # Upload file
            await callback_query.message.reply_document(
                document=actual_output_file,
                caption=(
                    f"📁 **Extracted {stream_type.title()} Stream**\n\n"
                    f"🏷️ **Original:** {Path(extractor_bot.bot_state.current_file).name}\n"
                    f"🔧 **Stream:** #{stream_index}\n"
                    f"📊 **Size:** {size_unit(file_size)}"
                )
            )

            # Show completion message
            keyboard = InlineKeyboardMarkup([
                [InlineKeyboardButton("🔙 Back to Streams", callback_data="back_to_main"),
                 InlineKeyboardButton("🔄 New File", callback_data="reset")]
            ])

            await callback_query.edit_message_text(
                f"✅ **EXTRACTION SUCCESSFUL!**\n\n"
                f"🎉 The {stream_type} stream has been extracted and sent!\n\n"
                f"You can extract more streams or upload a new video file.",
                reply_markup=keyboard
            )
        else:
            await callback_query.edit_message_text(
                f"❌ **EXTRACTION FAILED**\n\n"
                f"Error: {message}\n\n"
                f"Please try again or use a different stream."
            )

        extractor_bot.bot_state.is_processing = False

    except Exception as e:
        logger.error(f"Error in extract operation: {str(e)}")
        extractor_bot.bot_state.is_processing = False
        await callback_query.edit_message_text(
            f"❌ **EXTRACTION ERROR**\n\n"
            f"An unexpected error occurred:\n`{str(e)}`\n\n"
            f"Please try again or contact support."
        )

async def handle_remove_operation(callback_query: CallbackQuery, data: str):
    """Handle stream removal operations"""
    try:
        # Parse operation data
        parts = data.split("_")
        stream_type = parts[1]  # video, audio, or subtitle
        stream_index = int(parts[2])

        if not extractor_bot.bot_state.current_file:
            await callback_query.answer("No video file loaded.")
            return

        # Set processing state
        extractor_bot.bot_state.is_processing = True

        # Show processing message
        await callback_query.edit_message_text(
            f"⚙️ **REMOVING {stream_type.upper()} STREAM**\n\n"
            f"🔧 **Stream:** {stream_type.title()} #{stream_index}\n"
            f"⏳ **Status:** Processing...\n\n"
            f"Please wait while removing the stream from video."
        )

        # Create output filename
        original_name = Path(extractor_bot.bot_state.current_file).stem
        original_ext = Path(extractor_bot.bot_state.current_file).suffix
        output_filename = f"{original_name}_no_{stream_type}_{stream_index}{original_ext}"
        output_path = os.path.join(OUTPUT_DIR, output_filename)

        # Remove stream
        success, message = await asyncio.get_event_loop().run_in_executor(
            extractor_bot.bot_state.executor,
            remove_stream,
            extractor_bot.bot_state.current_file,
            output_path,
            stream_type,
            stream_index
        )

        if success:
            # Send the modified file
            file_size = os.path.getsize(output_path)
            await callback_query.edit_message_text(
                f"✅ **STREAM REMOVAL COMPLETE!**\n\n"
                f"🏷️ **File:** {output_filename}\n"
                f"📊 **Size:** {size_unit(file_size)}\n"
                f"🔧 **Removed:** {stream_type.title()} Stream #{stream_index}\n\n"
                f"📤 **Uploading...**"
            )

            # Upload file
            await callback_query.message.reply_document(
                document=output_path,
                caption=(
                    f"📁 **Modified Video File**\n\n"
                    f"🏷️ **Original:** {Path(extractor_bot.bot_state.current_file).name}\n"
                    f"❌ **Removed:** {stream_type.title()} Stream #{stream_index}\n"
                    f"📊 **Size:** {size_unit(file_size)}"
                )
            )

            # Show completion message
            keyboard = InlineKeyboardMarkup([
                [InlineKeyboardButton("🔙 Back to Streams", callback_data="back_to_main"),
                 InlineKeyboardButton("🔄 New File", callback_data="reset")]
            ])

            await callback_query.edit_message_text(
                f"✅ **STREAM REMOVAL SUCCESSFUL!**\n\n"
                f"🎉 The {stream_type} stream has been removed from your video!\n\n"
                f"You can process more streams or upload a new video file.",
                reply_markup=keyboard
            )
        else:
            await callback_query.edit_message_text(
                f"❌ **REMOVAL FAILED**\n\n"
                f"Error: {message}\n\n"
                f"Please try again or use a different stream."
            )

        extractor_bot.bot_state.is_processing = False

    except Exception as e:
        logger.error(f"Error in remove operation: {str(e)}")
        extractor_bot.bot_state.is_processing = False
        await callback_query.edit_message_text(
            f"❌ **REMOVAL ERROR**\n\n"
            f"An unexpected error occurred:\n`{str(e)}`\n\n"
            f"Please try again or contact support."
        )

# Don't auto-run when imported
if __name__ == "__main__":
    print("🤖 Video Stream Extractor Bot Starting...")
    print("📋 Features: Extract & Remove Video/Audio/Subtitle Streams")
    print("🚀 Ready to process video files!")

Overwriting video_extractor_bot.py


In [None]:

# Run the Video Stream Extractor Bot
import nest_asyncio
import asyncio
nest_asyncio.apply()

print("🚀 Starting Video Stream Extractor Bot...")
print("⚡ Features: Extract & Remove Video/Audio/Subtitle streams")
print("📡 Bot is now running! Send video files to process.")
print("🔄 Use Ctrl+C to stop the bot")
print("="*50)

# Read and execute the bot file with credentials
bot_code = open('video_extractor_bot.py').read()
namespace = {
    'API_ID': API_ID,
    'API_HASH': API_HASH,
    'BOT_TOKEN': BOT_TOKEN,
    '__name__': '__main__'
}
exec(bot_code, namespace)
app = namespace['app']

# Run the bot properly
async def run_bot():
    try:
        print("🔄 Initializing bot...")
        await app.start()
        print("✅ Bot started successfully!")
        print("📱 Bot is now listening for messages...")
        await asyncio.Event().wait()
    except KeyboardInterrupt:
        print("🛑 Bot stopped")
    finally:
        try:
            await app.stop()
        except:
            pass

asyncio.run(run_bot())

🚀 Starting Video Stream Extractor Bot...
⚡ Features: Extract & Remove Video/Audio/Subtitle streams
📡 Bot is now running! Send video files to process.
🔄 Use Ctrl+C to stop the bot
🤖 Video Stream Extractor Bot Starting...
📋 Features: Extract & Remove Video/Audio/Subtitle Streams
🚀 Ready to process video files!
🔄 Initializing bot...
✅ Bot started successfully!
📱 Bot is now listening for messages...


ERROR:__main__:Error in callback query: Telegram says: [400 QUERY_ID_INVALID] - The callback query id is invalid (caused by "messages.SetBotCallbackAnswer")
ERROR:__main__:Error in callback query: Telegram says: [400 QUERY_ID_INVALID] - The callback query id is invalid (caused by "messages.SetBotCallbackAnswer")
