<a href="https://colab.research.google.com/github/arinadi/colab-discord-transcriber/blob/main/Colab_Discord_Transcriber.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title 🚀 Run Transcription Bot
# @markdown Configure all bot parameters here. Hover over a setting for more information provided in the code comments.

# ------------------------------------------------------------------------------
# SECTION 1: PRE-FLIGHT CHECKS & CONFIGURATION
# ------------------------------------------------------------------------------

# --- 1.1. 🔑 Load Secrets from Colab (PRIORITY 1) ---
from google.colab import userdata, runtime
try:
    DISCORD_BOT_TOKEN = userdata.get('DISCORD_BOT_TOKEN')
    DISCORD_WEBHOOK_URL = userdata.get('DISCORD_WEBHOOK_URL')
    DISCORD_CHANNEL_ID = userdata.get('DISCORD_CHANNEL_ID')
    if not all([DISCORD_BOT_TOKEN, DISCORD_WEBHOOK_URL, DISCORD_CHANNEL_ID]):
        raise ValueError("Ensure all secrets (DISCORD_BOT_TOKEN, DISCORD_WEBHOOK_URL, DISCORD_CHANNEL_ID) are set in Colab Secrets.")
    print("✅ Secrets loaded successfully.")
except Exception as e:
    print(f"❌ ERROR: Failed to load secrets: {e}")
    raise SystemExit("Execution stopped due to missing secrets.")


# --- 1.2. 🛠️ Bot Configuration (Tunable Parameters) ---
# @markdown ---
# @markdown ### 🤖 Model & Transcription Settings
# @markdown These settings control the core transcription performance and quality.
# @markdown **Model Size:** This is the primary trade-off between **speed** and **accuracy**.
model_size = 'large-v2' #@param ['large-v2', 'medium', 'small', 'base', 'tiny']
# @markdown **Use FP16 Precision:** Using FP16 on a GPU can **double transcription speed**. `auto` is recommended.
use_fp16 = 'auto' #@param ['auto', 'True', 'False']
# @markdown **Beam Size:** Higher values are more accurate but slower. `5` is a good default.
beam_size = 10 #@param {type:"integer"}
# @markdown **Pause Threshold (seconds):** The length of silence that will create a new paragraph.
pause_threshold = 0.3 #@param {type:"number"}
# @markdown **Max Audio Duration (seconds):** A safeguard to reject excessively long files. Set to `0` to disable.
MAX_AUDIO_DURATION_SECONDS = 5400 #@param {type:"integer"}
# @markdown ---
# @markdown ### ⏳ Idle Monitor & Auto-Shutdown
# @markdown These settings control the automatic shutdown feature to conserve Google Colab compute units.
# @markdown **Notify Time (minutes):** Time in minutes of inactivity before the first webhook notification.
IDLE_NOTIFY_MIN = 5 #@param {type:"integer"}
# @markdown **Warning Time (minutes):** Time in minutes of inactivity before the final warning.
IDLE_WARN_MIN = 8 #@param {type:"integer"}
# @markdown **Shutdown Time (minutes):** Time in minutes of inactivity before automatic shutdown.
IDLE_SHUTDOWN_MIN = 10 #@param {type:"integer"}
# @markdown ---

# ------------------------------------------------------------------------------
# SECTION 2: INSTALLATION & IMPORTS
# ------------------------------------------------------------------------------
print("⏳ Installing required libraries...")
!pip install -q openai-whisper ffmpeg-python numpy torch discord.py==2.3.2 nest_asyncio requests werkzeug
print("✅ Libraries installed successfully.")

import nest_asyncio
nest_asyncio.apply()
print("✅ nest_asyncio applied.")

import discord
from discord.ext import commands, tasks
import os
import shutil
import time
import asyncio
import torch
import ffmpeg
import whisper
import requests
import re
import zipfile
import uuid
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Optional
from werkzeug.utils import secure_filename

UPLOAD_FOLDER = 'uploads'
TRANSCRIPT_FOLDER = 'transcripts'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(TRANSCRIPT_FOLDER, exist_ok=True)
print("✅ Static paths created.")

# ------------------------------------------------------------------------------
# SECTION 3: HARDWARE & MODEL LOADING
# ------------------------------------------------------------------------------
print("🤖 Checking hardware and loading Whisper model...")
device = "cuda" if torch.cuda.is_available() else "cpu"
if device == "cuda":
    print("✅ GPU (CUDA) detected!")
else:
    print("⚠️ GPU (CUDA) not detected. Using CPU. Transcription will be very slow.")

fp16_enabled = False
if use_fp16.lower() == 'true':
    fp16_enabled = True
elif use_fp16.lower() == 'auto' and device == 'cuda':
    fp16_enabled = True
    print("✅ Auto-FP16 enabled for CUDA device.")

model = None
try:
    print(f"⏳ Loading Whisper model '{model_size}' onto {device.upper()} (FP16: {fp16_enabled})...")
    model = whisper.load_model(model_size, device=device)
    print(f"✅ Whisper model '{model_size}' loaded successfully.")
except Exception as e:
    error_msg = f"❌ FAILED to load Whisper model: {e}"
    print(error_msg)
    requests.post(DISCORD_WEBHOOK_URL, json={'content': f"❌ **ERROR:** Failed to load Whisper model. Bot cannot start.\n`{e}`"})

# ------------------------------------------------------------------------------
# SECTION 4: CORE UTILITIES & CLASSES
# ------------------------------------------------------------------------------
print("🏛️ Defining core architecture classes and utilities...")

def send_webhook_notification(webhook_url, message):
    try:
        requests.post(webhook_url, json={'content': message})
    except Exception as e:
        print(f"⚠️ Could not send webhook notification: {e}")

def format_duration(seconds: float) -> str:
    if not isinstance(seconds, (int, float)) or seconds < 0: return "N/A"
    minutes = int(seconds // 60)
    remaining_seconds = int(seconds % 60)
    return f"{minutes}m {remaining_seconds:02d}s"

def format_transcription_with_pauses(result: dict, pause_thresh: float) -> str:
    formatted_text, previous_end = "", 0.0
    if "segments" not in result: return ""
    for segment in result["segments"]:
        start, text = segment["start"], segment["text"].strip()
        if (start - previous_end) > pause_thresh: formatted_text += "\n\n"
        formatted_text += text + " "
        previous_end = segment.get("end", start + 5.0)
    return formatted_text.strip()

@dataclass
class TranscriptionJob:
    message: discord.Message
    original_filename: str
    local_filepath: str
    audio_duration: float
    author: discord.Member = field(init=False)
    job_id: str = field(default_factory=lambda: uuid.uuid4().hex[:8])
    status: str = "queued"
    def __post_init__(self):
        self.author = self.message.author
        print(f"[JOB: {self.job_id}] New job object created for '{self.original_filename}'.")

class JobManager:
    def __init__(self, bot: commands.Bot):
        self.bot = bot
        self.job_queue = asyncio.Queue()
        self.active_jobs = {}
        print("✅ JobManager initialized.")
    async def add_job(self, job: TranscriptionJob):
        self.active_jobs[job.job_id] = job
        await self.job_queue.put(job)
        queue_position = self.job_queue.qsize()
        content = f"✅ `[ID: {job.job_id}]` Your file `{job.original_filename}` has been added to the queue (Position: **#{queue_position}**)."
        await job.message.channel.send(content)
        print(f"[JOB: {job.job_id}] Added to queue at position {queue_position}.")
    def complete_job(self, job_id: str):
        if job_id in self.active_jobs:
            del self.active_jobs[job_id]
            print(f"[JOB: {job_id}] Job completed and removed from active list.")
    def is_idle(self) -> bool:
        return self.job_queue.empty() and not self.active_jobs

class IdleMonitor:
    def __init__(self, bot: commands.Bot, job_manager: JobManager, webhook_url: str):
        self.bot = bot
        self.job_manager = job_manager
        self.webhook_url = webhook_url
        self._last_activity_time = time.time()
        self._notifications_sent = set()
        self.check_idle_status.start()
        print(f"✅ IdleMonitor initialized. Settings: Notify={IDLE_NOTIFY_MIN}m, Warn={IDLE_WARN_MIN}m, Shutdown={IDLE_SHUTDOWN_MIN}m.")
    def reset_timer(self):
        if self._notifications_sent: print("[IDLE_MONITOR] Activity detected. Resetting idle timer.")
        self._last_activity_time = time.time()
        self._notifications_sent.clear()

    async def _initiate_shutdown(self):
        print(f"[IDLE_MONITOR] {IDLE_SHUTDOWN_MIN}-minute idle limit reached. Initiating shutdown.")
        shutdown_message = (f"🔴 **AUTO-SHUTDOWN:** Bot has been idle for {IDLE_SHUTDOWN_MIN} minutes. "
                            f"Terminating Colab runtime to save resources.")
        send_webhook_notification(self.webhook_url, shutdown_message)
        await perform_shutdown(self.bot)

    @tasks.loop(minutes=1)
    async def check_idle_status(self):
        if not self.job_manager.is_idle():
            self.reset_timer()
            return
        idle_minutes = (time.time() - self._last_activity_time) / 60
        if idle_minutes >= IDLE_SHUTDOWN_MIN and IDLE_SHUTDOWN_MIN not in self._notifications_sent:
            self._notifications_sent.add(IDLE_SHUTDOWN_MIN)
            await self._initiate_shutdown()
            self.check_idle_status.stop()
            return
        if idle_minutes >= IDLE_WARN_MIN and IDLE_WARN_MIN not in self._notifications_sent:
            self._notifications_sent.add(IDLE_WARN_MIN)
            print(f"[IDLE_MONITOR] {IDLE_WARN_MIN}-minute idle warning.")
            send_webhook_notification(self.webhook_url, f"⚠️ **IDLE WARNING:** No activity for {IDLE_WARN_MIN} minutes. Bot will shut down in {IDLE_SHUTDOWN_MIN - IDLE_WARN_MIN} minutes.")
        if idle_minutes >= IDLE_NOTIFY_MIN and IDLE_NOTIFY_MIN not in self._notifications_sent:
            self._notifications_sent.add(IDLE_NOTIFY_MIN)
            print(f"[IDLE_MONITOR] {IDLE_NOTIFY_MIN}-minute idle notification.")
            send_webhook_notification(self.webhook_url, f"ℹ️ **IDLE NOTIFICATION:** No activity for {IDLE_NOTIFY_MIN} minutes. Bot will shut down in {IDLE_SHUTDOWN_MIN - IDLE_NOTIFY_MIN} minutes if it remains idle.")

# ------------------------------------------------------------------------------
# SECTION 5: FILE HANDLING & VALIDATION
# ------------------------------------------------------------------------------
print("📁 Setting up file handling and validation logic...")

class FilesHandler:
    def __init__(self, job_manager: JobManager, upload_folder: str):
        self.job_manager = job_manager
        self.upload_folder = upload_folder
        self.chunk_regex = re.compile(r'\.zip\.\d{3}$')
        print("✅ FilesHandler initialized.")
    async def _validate_and_queue_file(self, local_path: str, original_filename: str, message: discord.Message):
        temp_job_id = uuid.uuid4().hex[:8]
        print(f"[VALIDATE:{temp_job_id}] Starting validation for '{original_filename}'...")
        try:
            probe = await asyncio.to_thread(ffmpeg.probe, local_path)
            duration = float(probe['format']['duration'])
            print(f"[VALIDATE:{temp_job_id}] ffmpeg probe successful. Duration: {duration:.2f}s.")
            max_duration = int(MAX_AUDIO_DURATION_SECONDS)
            if max_duration > 0 and duration > max_duration:
                error_msg = f"File duration ({format_duration(duration)}) exceeds the max allowed ({format_duration(max_duration)})."
                print(f"❌ [VALIDATE:{temp_job_id}] FAILED: {error_msg}")
                await message.reply(f"❌ Could not process `{original_filename}`. **Reason:** {error_msg}")
                os.remove(local_path)
                return
            job = TranscriptionJob(message, original_filename, local_path, duration)
            await self.job_manager.add_job(job)
        except ffmpeg.Error as e:
            err_details = e.stderr.decode('utf-8') if e.stderr else 'Unknown ffmpeg error'
            print(f"❌ [VALIDATE:{temp_job_id}] FAILED: ffmpeg could not probe file. Error: {err_details}")
            await message.reply(f"❌ Could not process `{original_filename}`. It may be corrupted or an unsupported media file.")
            os.remove(local_path)
        except Exception as e:
            print(f"❌ [VALIDATE:{temp_job_id}] FAILED: An unexpected error occurred. Error: {e}")
            await message.reply(f"❌ An unexpected error occurred while validating `{original_filename}`.")
            if os.path.exists(local_path): os.remove(local_path)
    async def handle_attachments(self, message: discord.Message):
        chunks = defaultdict(list)
        other_files = []
        for att in message.attachments:
            if self.chunk_regex.search(att.filename):
                base_name = att.filename.rsplit('.zip.', 1)[0]
                chunks[base_name].append(att)
            else: other_files.append(att)
        for base_name, chunk_list in chunks.items(): await self._process_chunk_group(f"{base_name}.zip", chunk_list, message)
        for att in other_files: await self._process_single_attachment(att, message)
    async def _process_chunk_group(self, final_zip_name: str, chunk_list: list, message: discord.Message):
        chunk_list.sort(key=lambda x: x.filename)
        status_msg = await message.channel.send(f"🧩 Merging **{len(chunk_list)}** parts for `{final_zip_name}`...")
        combined_zip_path = os.path.join(self.upload_folder, f"{int(time.time())}_{secure_filename(final_zip_name)}")
        try:
            with open(combined_zip_path, 'wb') as dest_file:
                for chunk_att in chunk_list:
                    temp_chunk_path = os.path.join(self.upload_folder, chunk_att.filename)
                    await chunk_att.save(temp_chunk_path)
                    with open(temp_chunk_path, 'rb') as src_file: shutil.copyfileobj(src_file, dest_file)
                    os.remove(temp_chunk_path)
            await status_msg.edit(content=f"🗜️ Extracting files from merged `{final_zip_name}`...")
            await self._extract_and_queue_zip(combined_zip_path, final_zip_name, message)
        except Exception as e:
            print(f"❌ Failed to merge chunks for `{final_zip_name}`: {e}")
            await message.channel.send(f"❌ Failed to merge chunks for `{final_zip_name}`: `{e}`")
        finally:
            if os.path.exists(combined_zip_path): os.remove(combined_zip_path)
    async def _process_single_attachment(self, attachment: discord.Attachment, message: discord.Message):
        local_path = os.path.join(self.upload_folder, f"{int(time.time())}_{secure_filename(attachment.filename)}")
        await attachment.save(local_path)
        if attachment.filename.lower().endswith('.zip'):
            await message.channel.send(f"🗜️ Extracting files from `{attachment.filename}`...")
            await self._extract_and_queue_zip(local_path, attachment.filename, message)
            os.remove(local_path)
        else: await self._validate_and_queue_file(local_path, attachment.filename, message)
    async def _extract_and_queue_zip(self, zip_path: str, original_zip_name: str, message: discord.Message):
        extract_dir = os.path.join(self.upload_folder, f"extract_{int(time.time())}")
        try:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                await asyncio.to_thread(zip_ref.extractall, extract_dir)
            for root, _, files in os.walk(extract_dir):
                for filename in files:
                    if not filename.startswith('__MACOSX') and not filename.startswith('.'):
                        source_path, dest_path = os.path.join(root, filename), os.path.join(self.upload_folder, f"{int(time.time())}_{secure_filename(filename)}")
                        shutil.move(source_path, dest_path)
                        await self._validate_and_queue_file(dest_path, filename, message)
        except zipfile.BadZipFile:
            print(f"⚠️ Bad zip file uploaded: {original_zip_name}")
            await message.reply(f"❌ Failed to extract `{original_zip_name}`: Corrupted or invalid ZIP archive.")
        except Exception as e:
            print(f"❌ Error extracting zip `{original_zip_name}`: {e}")
            await message.reply(f"❌ An unexpected error occurred while extracting `{original_zip_name}`: `{e}`")
        finally:
            if os.path.exists(extract_dir): shutil.rmtree(extract_dir)

# ------------------------------------------------------------------------------
# SECTION 6: DISCORD BOT & WORKER
# ------------------------------------------------------------------------------
print("🤖 Initializing Discord bot and background worker...")
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents)
job_manager = JobManager(bot)
files_handler = FilesHandler(job_manager, UPLOAD_FOLDER)
idle_monitor: Optional[IdleMonitor] = None

def run_transcription_process(job: TranscriptionJob) -> tuple[str, str]:
    print(f"[JOB: {job.job_id}] Starting transcription for '{job.original_filename}'. Using FP16: {fp16_enabled}, Beam Size: {beam_size}")
    transcribe_options = {"fp16": fp16_enabled, "word_timestamps": True}
    if beam_size > 0: transcribe_options["beam_size"] = beam_size
    result = model.transcribe(job.local_filepath, **transcribe_options)
    formatted_text = format_transcription_with_pauses(result, pause_threshold)
    base_name = os.path.splitext(job.original_filename)[0]
    safe_name = secure_filename(base_name)[:50]
    duration_str = format_duration(job.audio_duration).replace(" ", "")
    output_filename = f"TS_({duration_str})_{safe_name}.txt"
    output_filepath = os.path.join(TRANSCRIPT_FOLDER, output_filename)
    with open(output_filepath, "w", encoding="utf-8") as f: f.write(formatted_text)
    detected_language = result.get('language', 'N/A')
    print(f"[JOB: {job.job_id}] Transcription complete. Output: '{output_filepath}'. Language: {detected_language.upper()}.")
    return output_filepath, detected_language

async def queue_processor(manager: JobManager):
    await bot.wait_until_ready()
    while not bot.is_closed():
        job: TranscriptionJob = await manager.job_queue.get()
        local_transcript_path = None
        try:
            job.status = "processing"
            duration_str = format_duration(job.audio_duration)
            await job.message.channel.send(f"▶️ `[ID: {job.job_id}]` Now processing `{job.original_filename}` (Duration: **{duration_str}**)...")
            print(f"[JOB: {job.job_id}] Status updated to 'processing'.")
            local_transcript_path, detected_lang = await asyncio.to_thread(run_transcription_process, job)
            embed = discord.Embed(title="🎉 Transcription Complete!", color=discord.Color.green())
            embed.add_field(name="Original File", value=f"`{job.original_filename}`", inline=False)
            embed.add_field(name="Audio Duration", value=format_duration(job.audio_duration), inline=True)
            embed.add_field(name="Detected Language", value=detected_lang.upper(), inline=True)
            embed.set_footer(text=f"Processed for {job.author.display_name} | Job ID: {job.job_id}")
            await job.message.reply(embed=embed, file=discord.File(local_transcript_path))
        except Exception as e:
            print(f"❌ [JOB: {job.job_id}] FATAL ERROR during processing: {e}")
            error_embed = discord.Embed(
                title=f"❌ Failed to Process: {job.original_filename}",
                description=f"An unexpected error occurred during transcription.\n```\n{e}\n```",
                color=discord.Color.red())
            error_embed.set_footer(text=f"Job ID: {job.job_id}")
            await job.message.reply(embed=error_embed)
        finally:
            if os.path.exists(job.local_filepath): os.remove(job.local_filepath)
            if local_transcript_path and os.path.exists(local_transcript_path): os.remove(local_transcript_path)
            manager.job_queue.task_done()
            manager.complete_job(job.job_id)

@bot.event
async def on_ready():
    global idle_monitor
    print('----------------------------------------------------')
    print(f'✅ Bot has logged in as {bot.user}')
    print(f'🚀 Worker queue started. Bot active in Channel ID: {DISCORD_CHANNEL_ID}')
    print('----------------------------------------------------')
    bot.loop.create_task(queue_processor(job_manager))
    idle_monitor = IdleMonitor(bot, job_manager, DISCORD_WEBHOOK_URL)

@bot.event
async def on_message(message: discord.Message):
    if message.author.bot or str(message.channel.id) != DISCORD_CHANNEL_ID: return
    await bot.process_commands(message)
    if message.attachments:
        if idle_monitor: idle_monitor.reset_timer()
        bot.loop.create_task(files_handler.handle_attachments(message))

@bot.command(name="ping", help="Checks bot latency, queue, and idle status.")
async def ping(ctx: commands.Context):
    latency = round(bot.latency * 1000)
    queue_size = job_manager.job_queue.qsize()
    embed = discord.Embed(title="📊 Bot Status & Health", color=discord.Color.green())
    embed.add_field(name="Network Latency", value=f"**{latency}ms**", inline=True)
    embed.add_field(name="Jobs in Queue", value=f"**{queue_size}**", inline=True)
    if not job_manager.is_idle() or not idle_monitor:
        embed.add_field(name="Status", value="✅ **Active** (Processing or has jobs in queue)", inline=False)
    else:
        idle_duration = time.time() - idle_monitor._last_activity_time
        notify_sec, warn_sec, shutdown_sec = IDLE_NOTIFY_MIN * 60, IDLE_WARN_MIN * 60, IDLE_SHUTDOWN_MIN * 60
        if idle_duration >= warn_sec:
            embed.color, status_msg = discord.Color.orange(), "⚠️ **Warning**"
            next_action_msg = f"Auto-shutdown in **{format_duration(max(0, shutdown_sec - idle_duration))}**"
        elif idle_duration >= notify_sec:
            embed.color, status_msg = discord.Color.gold(), f"ℹ️ **Notified**"
            next_action_msg = f"Final warning in **{format_duration(warn_sec - idle_duration)}**"
        else:
            embed.color, status_msg = discord.Color.blue(), "🕒 **Idle**"
            next_action_msg = f"First notification in **{format_duration(notify_sec - idle_duration)}**"
        embed.add_field(name="Status", value=status_msg, inline=False)
        embed.add_field(name="Time Idle", value=f"{format_duration(idle_duration)}", inline=True)
        embed.add_field(name="Next Action", value=next_action_msg, inline=True)
    embed.set_footer(text=f"Requested by {ctx.author.display_name}")
    await ctx.send(embed=embed)

async def perform_shutdown(bot_instance: commands.Bot):
    """The core shutdown logic: clean up, close connection, and terminate."""
    print("🧹 Cleaning up temporary folders...")
    if os.path.exists(UPLOAD_FOLDER): shutil.rmtree(UPLOAD_FOLDER)
    if os.path.exists(TRANSCRIPT_FOLDER): shutil.rmtree(TRANSCRIPT_FOLDER)
    print("✅ Cleanup complete.")
    print("🛑 Closing bot connection...")
    await bot_instance.close()
    print("🔌 Terminating Colab runtime...")
    runtime.unassign()

# --- THIS IS THE CORRECTED SHUTDOWN COMMAND ---
@bot.command(name="shutdown", help="Shuts down the bot and the Colab runtime.")
async def shutdown(ctx: commands.Context):
    """Sends a confirmation message to the channel, then safely shuts down the bot."""
    # 1. Send an immediate confirmation message to the channel where the command was used.
    await ctx.send(f"🔴 **MANUAL SHUTDOWN:** Command received from **{ctx.author.display_name}**. Bot is now shutting down...")

    # 2. Call the generic function to perform the actual shutdown sequence.
    await perform_shutdown(bot)

# ------------------------------------------------------------------------------
# SECTION 7: RUN THE BOT
# ------------------------------------------------------------------------------
if model:
    max_dur_str = format_duration(MAX_AUDIO_DURATION_SECONDS) if MAX_AUDIO_DURATION_SECONDS > 0 else "Unlimited"
    startup_message = (
        f"✅ **Colab Runtime Ready!**\n"
        f"Model: **{model_size}** on **{device.upper()}** | "
        f"FP16: **{fp16_enabled}** | Beam Size: **{beam_size}**\n"
        f"Pause Threshold: **{pause_threshold}s** | Max Duration: **{max_dur_str}**.\n"
        f"Bot is starting... Idle monitor is **active**."
    )
    send_webhook_notification(DISCORD_WEBHOOK_URL, startup_message)
    print("\n▶️ Running bot...")
    try:
        bot.run(DISCORD_BOT_TOKEN)
    except Exception as e:
        print(f"❌ CRITICAL ERROR: FAILED to run bot: {e}")
        send_webhook_notification(DISCORD_WEBHOOK_URL, f"❌ **CRITICAL ERROR:** Bot failed to run.\n`{e}`")
else:
    print("\n❌ Bot cannot run because the Whisper model failed to load. Please check previous logs.")