<a href="https://colab.research.google.com/github/Xyroset/AI-Didital-Doppelganger/blob/main/Personal_AI_Telegram_Bot_(LLM_%2B_Voice_%2B_Vision).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **ü§ñ Personal AI Telegram Bot (LLM + Voice + Vision)**

### **!! The bot is designed for a single user!!**
Otherwise, Google Colab may freeze or overload.

<br>

### **Welcome!**
 This notebook allows you to deploy your own fully customizable AI companion directly in Telegram.

It uses a local **LLM(unsloth)** for smart conversations, **XTTS** for natural voice generation, and the **Groq API** to instantly understand your voice messages and photos.

### **Before you start:**

1. **Models:** Make sure you have uploaded your LLM and TTS models to your Google Drive.

2. **Hardware:** This code requires a GPU. Go to the top menu: `Runtime` => `Change runtime` type and select T4 GPU.

3. **API Keys:** You will need a Telegram Bot Token (from `@BotFather`) and a free [Groq API Key](https://console.groq.com/keys "Groq").

Just follow the steps below, run the cells one by one, and your AI friend will be online!

# 1. **üì¶ Install libraries**

This is the first step. Run this block to download and install all the necessary dependencies for the AI models, voice generation (TTS), and the Telegram bot framework.

After installation, you will see `Compete!` and the session will **restart**.

In [None]:
# @title ### **Install**

import os
import subprocess
import time

start_time = time.time()

venv_dir = "/content/tts_venv"
venv_bin = f"{venv_dir}/bin"
venv_python = f"{venv_bin}/python"
venv_pip = f"{venv_bin}/pip"

print("\n[1/5] Install system libraries..")
subprocess.run("sudo apt-get update -qq", shell=True)
subprocess.run("sudo apt-get install -y -qq espeak-ng libsndfile1-dev ffmpeg", shell=True)

print("\n[2/5] Set default state for Unsloth & Bot...")

subprocess.run("pip uninstall -y TTS coqui-tts transformers tokenizers numpy", shell=True)

print("   => Installing Unsloth...")
subprocess.run("pip install 'unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git'", shell=True)
subprocess.run("pip install --no-deps 'xformers<0.0.27' 'trl<0.8.0' peft accelerate bitsandbytes", shell=True)

print("   => Installing Aiogram, Groq and etc")
subprocess.run("pip install aiogram groq Pillow moviepy lottie cairosvg", shell=True)

print("\n[3/5] Create venv for TTS...")

if not os.path.exists(venv_python):
    print("   => Create folder venv...")
    subprocess.run(f"python3 -m venv {venv_dir}", shell=True)

if not os.path.exists(venv_pip):
    subprocess.run("curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py", shell=True)
    subprocess.run(f"{venv_python} get-pip.py", shell=True)

print("\n[4/5] Install libraries for VENV (Torch, TTS)...")

def install_in_venv(args):
    cmd = f"{venv_pip} install {args}"
    try:
        subprocess.check_call(cmd.split())
    except subprocess.CalledProcessError:
        print(f"Erro: {args}")
        raise
try:
    install_in_venv("--upgrade pip")

    install_in_venv("torch torchaudio --index-url https://download.pytorch.org/whl/cu121")

    install_in_venv("coqui-tts==0.24.1")
    install_in_venv("torchcodec soundfile typing-extensions")

    print("   => Fix version inside venv...")
    install_in_venv("numpy==1.26.4 transformers==4.45.2 tokenizers==0.20.3")

except Exception as e:
    print(f"Error Venv: {e}")

print("\n[5/5] Confirm patch...")

target_file = f"{venv_dir}/lib/python3.12/site-packages/coqpit/coqpit.py"

if not os.path.exists(target_file):
    try:
        found = subprocess.check_output(f"find {venv_dir} -name coqpit.py", shell=True, text=True).strip()
        if found: target_file = found
    except: pass

if os.path.exists(target_file):
    with open(target_file, "r") as f:
        content = f.read()

    patched = False

    if "if issubclass(field_type, Serializable):" in content:
        content = content.replace(
            "if issubclass(field_type, Serializable):",
            "if isinstance(field_type, type) and issubclass(field_type, Serializable):"
        )
        patched = True

    if 'raise ValueError(f" [!] \'{type(x)}\' value type' in content:
         content = content.replace('raise ValueError(f" [!] \'{type(x)}\' value type', 'pass # SUPPRESSED')
         patched = True

    if patched:
        with open(target_file, "w") as f:
            f.write(content)
        print("   => Patch is complete.")
else:
    print("   => File coqpit.py is not excist")

elapsed = int(time.time() - start_time)
print(f"\nComplete! {elapsed} sec.")
print("Restart...")

time.sleep(3)
os.kill(os.getpid(), 9)


[1/5] Install system libraries..

[2/5] Set default state for Unsloth & Bot...
   => Installing Unsloth...
   => Installing Aiogram, Groq and etc

[3/5] Create venv for TTS...
   => Create folder venv...

[4/5] Install libraries for VENV (Torch, TTS)...
   => Fix version inside venv...

[5/5] Confirm patch...
   => Patch is complete.

Complete! 722 sec.
Restart...


# **2. ‚öô Settings**

### **Secret keys:**

1. Look at the left sidebar of Google Colab and click on the üîë (Key icon) named ***"Secrets"***.

2. **Telegram Token:** Go to Telegram, message `@BotFather`, use the `/newbot` command, and copy your **HTTP API Token**. Create a new secret in Colab named exactly `BOT_API` and paste your token as the value.

3. **Groq API Key:** Go to `console.groq.com`, sign in, and generate a new API key. Create a second secret in Colab named exactly `GROQ_API` and paste the key.

4. **Crucial Step:** Toggle the ***"Notebook access"*** switch to ON for both secrets!

<br>

### **Basic Settings:**

1. **BOT_NAME & LANGUAGE:** Choose a name for your AI and select the primary language for voice generation.

2. **Model Paths:** Ensure these match the exact folder paths on your Google Drive where the LLM(text) and TTS(voice) models are stored.

<br>

### **LLM Loading Parameters Guide**

Unsloth models, you might need to adjust the loading parameters to avoid Out Of Memory (OOM) errors on a T4 GPU (15GB VRAM).

* **Max Sequence Length (`max_seq_length`):** The "memory window" of the AI. Higher values let the bot remember longer chats, but consume more VRAM.
    * `2048` - Safe mode (fast, minimal VRAM).
    * `4096` - Standard for roleplay and normal chats.
    * `8192` - Maximum recommended for T4 GPU with 8B models.
* **Load in 4-bit (`load_in_4bit`):** Quantization. **Must be TRUE** for 7B-9B models on a T4 GPU. You can uncheck it (False) only if you use tiny models (1.5B - 3B).
* **Dtype:** Data type for weights. Leave it at `None` to let Unsloth auto-detect the best format (usually float16 for Colab).

**Popular Unsloth Models (GGUF/Safetensors) & Settings for T4 GPU:**
1.  `unsloth/Llama-3.1-8B-bnb-4bit`: Seq=2048, 4-bit=True, Dtype=None
2.  `unsloth/Qwen3-4B-Base`: Seq=2048, 4-bit=True, Dtype=None
3.  `unsloth/gpt-oss-20b-unsloth-bnb-4bit`: Seq=1024, 4-bit=True, Dtype=None
4.  `unsloth/DeepSeek-R1-Distill-Llama-8B-unsloth-bnb-4bit`: Seq=4096, 4-bit=True, Dtype=None
5.  `unsloth/Mistral-Nemo-Instruct-2407-bnb-4bit` (12B): Seq=2048, 4-bit=True, Dtype=None

<br>

### **Advanced Settings (LLM & TTS):**

1. **Temperature:** Controls creativity. Lower values make the AI logical and strict; higher values make it more creative and unpredictable.

2. **Max Tokens:** Limits the maximum length of the bot's text responses.

3. **Repetition Penalty & Top K/P:** Advanced parameters that prevent the AI from looping or repeating words, controlling its vocabulary richness. If unsure, leave them at their default values!

**Action:** After setting up your secrets and sliders, run all cells in this section to save your configurations.


In [1]:
# @title ## **Base**

# Settings
# @markdown ### **Bot**
BOT_NAME = "AI Assistant" # @param {type:"string"}
LANGUAGE = "en" # @param ["en", "ru", "es", "fr", "de", "ja"]

# @markdown ---

# @markdown <br>

# @markdown ### **API**
SECRET_BOT_API = "BOT_API" # @param {type:"string"}
SECRET_GROQ_API = "GROQ_API"  # @param {type:"string"}

# @markdown ---

# @markdown <br>

# @markdown ### **Model Path**
LLM_MODEL_PATH = "/content/drive/My Drive/LLM_Model" # @param {type:"string"}
TTS_MODEL_PATH = "/content/drive/My Drive/TTS_Model" # @param {type:"string"}

# @markdown ---

# @markdown <br>

# @markdown ### **LLM Loading (Unsloth)**
LLM_MAX_SEQ_LENGTH = 2048 # @param {type:"slider", min:1024, max:16384, step:1024}
LLM_LOAD_IN_4BIT = True # @param {type:"boolean"}
LLM_DTYPE = "None" # @param ["None", "float16", "bfloat16"]

print("Settings confirmed!")


Settings confirmed!


In [2]:
# @title ### **Advanced**

# @markdown ### **LLM**
LLM_TEMPERATURE = 0.6 # @param {type:"slider", min:0.1, max:1.0, step:0.1}
MAX_TOKENS = 128 # @param {type:"slider", min:64, max:1024, step:64}
LLM_REPETITION_PENALTY = 1.1 # @param {type:"slider", min:1.0, max:2.0, step:0.05}
LLM_TOP_K = 50 # @param {type:"slider", min:10, max:100, step:5}
LLM_TOP_P = 0.95 # @param {type:"slider", min:0.5, max:1.0, step:0.05}

# @markdown ---

# @markdown <br>

# @markdown ### **TTS**
TTS_TEMPERATURE = 0.65 # @param {type:"slider", min:0.1, max:1.0, step:0.05}
TTS_REPETITION_PENALTY = 2 # @param {type:"slider", min:1.0, max:2.0, step:0.05}
TTS_TOP_K = 40 # @param {type:"slider", min:10, max:100, step:5}
TTS_TOP_P = 0.8 # @param {type:"slider", min:0.5, max:1.0, step:0.05}

print("Advanced Settings confirmed!")

Advanced Settings confirmed!


# **3. üèóÔ∏è Load models**

#### **!! If you have changed the settings, then restart this block !!**

Press **Run and wait**. Do not proceed until you see the `Complete!` message at the bottom. It usually takes a few minutes.

In [3]:
# @title ### **Load**

import time
import subprocess
import sys
import os

from unsloth import FastLanguageModel
import torch

start_time = time.time()

from google.colab import drive
drive.mount('/content/drive')

print("Cleaning up old background processes...")
os.system("pkill -f tts_runner.py")
os.system("fuser -k 5050/tcp")
time.sleep(2)


parsed_dtype = None
if LLM_DTYPE == "float16":
    parsed_dtype = torch.float16
elif LLM_DTYPE == "bfloat16":
    parsed_dtype = torch.bfloat16

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = LLM_MODEL_PATH,
    max_seq_length = LLM_MAX_SEQ_LENGTH,
    dtype = parsed_dtype,
    load_in_4bit = LLM_LOAD_IN_4BIT,
)

FastLanguageModel.for_inference(model)

alpaca_prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request.

### Instruction:
{}

### Input:
{}

### Response:
{}"""

runner_code = """
import sys
import os

import matplotlib
matplotlib.use('Agg')
os.environ["MPLBACKEND"] = "Agg"

import traceback
import json
import subprocess
import torch
import torchaudio
from http.server import BaseHTTPRequestHandler, HTTPServer
from TTS.tts.configs.xtts_config import XttsConfig
from TTS.tts.models.xtts import Xtts

torch.set_num_threads(4)

MODEL_PATH = "[[TTS_MODEL_PATH]]"
REF_AUDIO = f"{MODEL_PATH}/reference.wav"

config = XttsConfig()
config.load_json(f"{MODEL_PATH}/config.json")
tts_model = Xtts.init_from_config(config)
tts_model.load_checkpoint(config, checkpoint_dir=MODEL_PATH, use_deepspeed=False)

if torch.cuda.is_available():
    tts_model.cuda()

class TTSHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        try:
            content_length = int(self.headers['Content-Length'])
            data = json.loads(self.rfile.read(content_length))

            text = data.get('text', '').replace('\\n', ' ').replace('\\r', ' ').strip()
            if not text:
                text = "Empty text"

            output_file = data.get('output_file', 'response.ogg')

            out = tts_model.synthesize(
                text, config, speaker_wav=REF_AUDIO, gpt_cond_len=3,
                language="[[LANGUAGE]]",
                temperature=float([[TTS_TEMPERATURE]]),
                repetition_penalty=float([[TTS_REPETITION_PENALTY]]),
                top_k=int([[TTS_TOP_K]]),
                top_p=float([[TTS_TOP_P]])
            )

            temp_wav = "temp_raw.wav"
            torchaudio.save(temp_wav, torch.tensor(out["wav"]).unsqueeze(0), 24000)

            subprocess.run([
                "ffmpeg", "-y", "-i", temp_wav,
                "-c:a", "libopus", "-b:a", "32k", "-vbr", "on", output_file
            ], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

            if os.path.exists(temp_wav):
                os.remove(temp_wav)

            if torch.cuda.is_available():
                torch.cuda.empty_cache()

            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            self.wfile.write(json.dumps({"status": "success", "file": output_file}).encode('utf-8'))

        except Exception as e:
            print(f"\\n[x] Error generation:")
            traceback.print_exc()
            self.send_response(500)
            self.end_headers()

    def log_message(self, format, *args):
        pass

if __name__ == "__main__":
    server = HTTPServer(('127.0.0.1', 5050), TTSHandler)
    server.serve_forever()
"""

runner_code = runner_code.replace("[[TTS_MODEL_PATH]]", TTS_MODEL_PATH)
runner_code = runner_code.replace("[[LANGUAGE]]", LANGUAGE)
runner_code = runner_code.replace("[[TTS_TEMPERATURE]]", str(TTS_TEMPERATURE))
runner_code = runner_code.replace("[[TTS_REPETITION_PENALTY]]", str(TTS_REPETITION_PENALTY))
runner_code = runner_code.replace("[[TTS_TOP_K]]", str(TTS_TOP_K))
runner_code = runner_code.replace("[[TTS_TOP_P]]", str(TTS_TOP_P))

with open("tts_runner.py", "w") as f:
    f.write(runner_code)

my_env = os.environ.copy()
my_env["MPLBACKEND"] = "Agg"

subprocess.Popen(
    "nohup /content/tts_venv/bin/python -u tts_runner.py > tts_server.log 2>&1 &",
    shell=True,
    env=my_env,
    preexec_fn=os.setpgrp
)
time.sleep(12)

elapsed = int(time.time() - start_time)
print(f"\nComplete! {elapsed} sec.")

ü¶• Unsloth: Will patch your computer to enable 2x faster free finetuning.
ü¶• Unsloth Zoo will now patch everything to make training faster!
Mounted at /content/drive
Cleaning up old background processes...
==((====))==  Unsloth 2026.2.1: Fast Llama patching. Transformers: 4.57.6.
   \\   /|    Tesla T4. Num GPUs = 1. Max memory: 14.563 GB. Platform: Linux.
O^O/ \_/ \    Torch: 2.10.0+cu128. CUDA: 7.5. CUDA Toolkit: 12.8. Triton: 3.6.0
\        /    Bfloat16 = FALSE. FA [Xformers = None. FA2 = False]
 "-____-"     Free license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


model.safetensors:   0%|          | 0.00/5.96G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/235 [00:00<?, ?B/s]

Unsloth 2026.2.1 patched 32 layers with 32 QKV layers, 32 O layers and 32 MLP layers.



Complete! 122 sec.


# **4. üöÄ Start telegram bot**
**The final step!** This block contains the main logic and keeps your bot online.

**Action:** Run this cell. Once you see the `Bot is running!` message in the console, open Telegram, find your bot, and send the `/start command`.

**Important:** Keep this cell running and the browser tab open while you are chatting with your AI. If the execution stops, the bot will go offline.

In [17]:

# @title ### **Start**

from google.colab import userdata

import asyncio, os, re, subprocess, base64, requests, gc
from typing import Callable, Dict, Any, Awaitable

from aiogram import Bot, Dispatcher, types, F, BaseMiddleware
from aiogram.types import BotCommand, BotCommandScopeDefault, FSInputFile
from aiogram.utils.chat_action import ChatActionSender
from aiogram.filters import Command

from groq import Groq
from PIL import Image
from collections import deque
from moviepy.editor import VideoFileClip
from lottie.parsers.tgs import parse_tgs
from lottie.exporters.gif import export_gif

# Initialize clients (Make sure keys are stored in Colab Secrets)
groq_client = Groq(api_key=userdata.get(SECRET_GROQ_API))
bot = Bot(token=userdata.get(SECRET_BOT_API))
dp = Dispatcher()

MAX_HISTORY_TOKENS = LLM_MAX_SEQ_LENGTH - MAX_TOKENS - 300

if MAX_HISTORY_TOKENS < 500:
    MAX_HISTORY_TOKENS = 500

user_histories = {}
voice_mode = {}
user_messages = {}
user_timers = {}
last_message_obj = {}
last_user_text = {}

llm_generation_lock = asyncio.Lock()
tts_generation_lock = asyncio.Lock()

# Voice TTS Engine
class VoiceEngine:
    def __init__(self):
        self.api_url = "http://127.0.0.1:5050"

    def text_to_audio(self, text, output_filename="response.ogg"):
        try:
            payload = {"text": text, "output_file": output_filename}
            # Send text
            response = requests.post(self.api_url, json=payload)

            if response.status_code == 200:
                return output_filename
            else:
                print("Error: TTS Server returned a non-200 status.")
                return None

        except Exception as e:
            print(f"Error connecting to TTS Server: {e}")
            return None

voice_engine = VoiceEngine()

# Vision (Groq API)
def describe_image(image_path: str) -> str:
    try:
        with open(image_path, "rb") as image_file:
            encoded_string = base64.b64encode(image_file.read()).decode('utf-8')

        image_url = f"data:image/jpeg;base64,{encoded_string}"

        chat_completion = groq_client.chat.completions.create(
            messages=[
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "Describe in detail what is in this picture."},
                        {"type": "image_url", "image_url": {"url": image_url}},
                    ],
                }
            ],
            model="meta-llama/llama-4-scout-17b-16e-instruct",
        )
        return chat_completion.choices[0].message.content
    except Exception as e:
        return f"Vision Error: {e}"

# Transcribe (Whisper API)
def transcribe_audio(audio_path: str) -> str:
    try:
        with open(audio_path, "rb") as file:
            transcription = groq_client.audio.transcriptions.create(
                file=(audio_path, file.read()),
                model="whisper-large-v3-turbo",
                language=LANGUAGE,
                response_format="json",
                temperature=0.0
            )
        return transcription.text
    except Exception as e:
        return f"Audio Error: {e}"

# Generate text message
def generate_message(message: str, instruction: str) -> str:
    input_text = alpaca_prompt.format(
        instruction,
        message,
        "",
    )

    inputs = tokenizer([input_text], return_tensors = "pt").to("cuda")

    input_length = inputs.input_ids.shape[1]
    safe_length = LLM_MAX_SEQ_LENGTH - MAX_TOKENS - 50
    if input_length > safe_length:
        inputs = {k: v[:, -safe_length:] for k, v in inputs.items()}

    outputs = model.generate(
        **inputs,
        max_new_tokens = MAX_TOKENS,
        use_cache = True,
        temperature = LLM_TEMPERATURE,
        repetition_penalty = LLM_REPETITION_PENALTY,
        top_k = LLM_TOP_K,
        top_p = LLM_TOP_P,
        tokenizer = tokenizer,
        stop_strings = ["\n###", "###", "[System", "User:"],
    )

    response = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]

    del inputs
    del outputs
    gc.collect()

    clean_text = response.split("### Response:\n")[-1].split("###")[0]

    import re
    clean_text = re.sub(r'<think>.*?</think>', '', clean_text, flags=re.DOTALL)

    artifacts = [
        "<|end_of_text|>", "<|eot_id|>", "<ÔΩúend‚ñÅof‚ñÅsentenceÔΩú>",
        "<|im_end|>", "</s>", "<eos>"
    ]
    if tokenizer.eos_token:
        artifacts.append(tokenizer.eos_token)

    for artifact in artifacts:
        clean_text = clean_text.replace(artifact, "")

    return clean_text.strip()

# Format converters for media
def convert_webp_to_jpg(input_path: str, output_path: str):
    with Image.open(input_path) as img:
        rgb_im = img.convert("RGB")
        rgb_im.save(output_path, "JPEG", quality=95)

def convert_webm_to_jpg(input_path: str, output_path: str):
    clip = VideoFileClip(input_path)
    clip.save_frame(output_path, t=0)
    clip.close()

    del clip
    gc.collect()

def convert_tgs_to_jpg(input_path: str, output_path: str):
    temp_gif = "temp_sticker.gif"
    animation = parse_tgs(input_path)
    export_gif(animation, temp_gif)

    try:
        with Image.open(temp_gif) as img:
            img.seek(0)
            background = Image.new("RGB", img.size, (255, 255, 255))
            img_rgba = img.convert("RGBA")
            background.paste(img_rgba, mask=img_rgba.split()[3])
            background.save(output_path, "JPEG", quality=95)
    finally:
        if os.path.exists(temp_gif):
            os.remove(temp_gif)


# Safe History
def get_safe_history(user_id, new_text="") -> str:
    if user_id not in user_histories:
        user_histories[user_id] = deque(maxlen=50)
        return ""

    while True:
        history_text = "".join([f"{role}: {text}\n" for role, text in user_histories[user_id]])
        test_prompt = f"{history_text}\nUser: {new_text}"

        try:
            tokens = tokenizer.encode(test_prompt)
            token_count = len(tokens)
        except Exception:
            token_count = 0

        if token_count <= MAX_HISTORY_TOKENS or len(user_histories[user_id]) == 0:
            break

        user_histories[user_id].popleft()

    return history_text

# Main stream handler
async def convertor(event: types.Message, user_id: int) -> str:
  file_path = ""
  output_filename = ""

  # Text
  if event.text:
    response = event.text

  # Voice
  elif event.voice:
    file = await bot.get_file(event.voice.file_id)
    file_path = f"{user_id}_{file.file_id}.ogg"
    await bot.download(file=file, destination=file_path)

    try:
        response = transcribe_audio(file_path)
    except Exception as e:
        response = f"Error: {e}"

  # Photo
  elif event.photo:
    file_path = f"{user_id}.jpg"
    await bot.download(event.photo[-1], destination=file_path)

    try:
        response = f"(Photo description: {describe_image(file_path)})"
    except Exception as e:
        response = f"Error: {e}"

  # Sticker
  elif event.sticker:
    file = await bot.get_file(event.sticker.file_id)
    file_path = file.file_path.split("/")[-1]

    await bot.download(file, destination=file_path)
    output_filename = f"{file_path}.jpg"

    if event.sticker.is_video:
        await asyncio.to_thread(convert_webm_to_jpg, file_path, output_filename)
    elif event.sticker.is_animated:
        await asyncio.to_thread(convert_tgs_to_jpg, file_path, output_filename)
    else:
        await asyncio.to_thread(convert_webp_to_jpg, file_path, output_filename)

    try:
        response = f"(Sticker description: {describe_image(output_filename)})"
    except Exception as e:
        response = f"Error: {e}"

  # Animation
  elif event.animation:
    file = await bot.get_file(event.animation.file_id)
    file_path = f"{user_id}_{file.file_id}.mp4"

    await bot.download(file, destination=file_path)
    output_filename = f"{file_path}.jpg"

    await asyncio.to_thread(convert_webm_to_jpg, file_path, output_filename)

    try:
        response = f"(Animation description: {describe_image(output_filename)})"
    except Exception as e:
        respone = f"Error: {e}"


  if os.path.exists(file_path): os.remove(file_path)
  if os.path.exists(output_filename): os.remove(output_filename)

  return response

# Main Middleware for Debounce
class MainMiddleware(BaseMiddleware):
  async def __call__(
      self,
      handler: Callable[[types.Message, Dict[str, Any]], Awaitable[Any]],
      event: types.Message,
      data: Dict[str, Any]
) -> Any:

    # Ignor commands
    if event.text and event.text.startswith('/'):
      return await handler(event, data)

    user_id = event.from_user.id

    if user_id not in user_messages:
      user_messages[user_id] = []

    # Convert all message to text
    new_text = await convertor(event, user_id)
    user_messages[user_id].append(new_text)

    # Cancel old task timer
    if user_id in user_timers and not user_timers[user_id].done():
      user_timers[user_id].cancel()

    # Timer Debounce
    async def timer_task():
      await asyncio.sleep(5)
      full_text = f" {user_messages[user_id]}"

      del user_messages[user_id]
      del user_timers[user_id]

      data["full_text"] = full_text

      await handler(event, data)

    user_timers[user_id] = asyncio.create_task(timer_task())


dp.message.middleware(MainMiddleware())

# Template for voice or text message
async def new_message_text_or_voice(message: types.Message, user_id: int, temp_message: types.Message, response_text: str, last_user_text_: str):
    if voice_mode[user_id]:
      async with tts_generation_lock:
          audio_path = await asyncio.to_thread(voice_engine.text_to_audio, response_text)
          if audio_path and os.path.exists(audio_path):
              try:
                  voice_file = FSInputFile(audio_path)
                  await temp_message.delete()
                  last_message_obj[user_id] = await message.answer_voice(voice_file)
                  os.remove(audio_path)
              except Exception as e:
                  await temp_message.delete()
                  last_message_obj[user_id] = await message.answer(f"{response_text}\n(Voice send error: {e})")
          else:
              await temp_message.delete()
              last_message_obj[user_id] = await message.answer(f"{response_text}\n(Voice generation failed)")
    else:
        await temp_message.delete()
        last_message_obj[user_id] = await message.answer(text=response_text)

    last_user_text[user_id] = last_user_text_

# Command: /start
@dp.message(Command("start"))
async def start_command(message: types.Message):
    await message.answer(text=f"Hello! I am {BOT_NAME}. How can I help you today?")
    try: await message.delete()
    except: pass

# Command: /reset_memory
@dp.message(Command("reset_memory"))
async def memory_reset(message: types.Message):
    user_id = message.from_user.id
    if user_id in user_histories:
        user_histories[user_id].clear()
    await message.answer("Memory cleared! Let's start a new conversation.")

# Command: /reset (Regenerates last message)
@dp.message(Command("reset"))
async def reset_last_message(message: types.Message):
    user_id = message.from_user.id

    if last_message_obj[user_id] is None:
        await message.answer("There is no previous message to reset.")
        return

    await last_message_obj[user_id].delete()

    if user_id not in voice_mode:
      voice_mode[user_id] = False

    status_text = "Generating voice..." if voice_mode[user_id] else "Typing..."
    temp_message = await message.answer(text=status_text)

    try: await message.delete()
    except: pass

    history_text = get_safe_history(user_id, last_user_text[user_id])

    system_instruction = (
        f"You are {BOT_NAME}. Your task is to answer the questions in your own style. If you see a description of an image, sticker, or animation, answer it in your own style. "
        f"Here is the chat history:\n{history_text}\n"
    )

    try:
        async with llm_generation_lock:
          new_response_text = await asyncio.to_thread(generate_message, last_user_text[user_id], system_instruction)
    except Exception as e:
        new_response_text = f"Error: {e}"

    await new_message_text_or_voice(message, user_id, temp_message, new_response_text, last_user_text[user_id])

    if user_id in user_histories and len(user_histories[user_id]) > 0:
        user_histories[user_id].pop()
        user_histories[user_id].append(("AI", new_response_text))

# Command: /voice_mode
@dp.message(Command("voice_mode"))
async def turn_voice_mode(message: types.Message):

    user_id = message.from_user.id

    if user_id not in voice_mode:
      voice_mode[user_id] = False

    voice_mode[user_id] = not voice_mode[user_id]

    if voice_mode[user_id]:
        await message.answer(text="Voice mode enabled. I will now reply with audio messages.")
    else:
        await message.answer(text="Voice mode disabled. I will reply with text.")

# Main Handler
@dp.message()
async def base_handler(message: types.Message, full_text: str):
    user_id = message.from_user.id

    if user_id not in voice_mode:
      voice_mode[user_id] = False

    status_text = "Generating voice..." if voice_mode[user_id] else "Typing..."
    temp_message = await message.answer(text=status_text)

    history_text = get_safe_history(user_id, full_text)

    system_instruction = (
        f"You are {BOT_NAME}. Your task is to answer the questions in your own style. If you see a description of an image, sticker, or animation, answer it in your own style. "
        f"Here is the chat history:\n{history_text}\n"
    )

    try:
        async with llm_generation_lock:
          response_text = await asyncio.to_thread(generate_message, full_text, system_instruction)
    except Exception as e:
        response_text = f"Error: {e}"

    user_histories[user_id].append(("User", full_text))
    user_histories[user_id].append(("AI", response_text))

    await new_message_text_or_voice(message, user_id, temp_message, response_text, full_text)


async def main():
    print("Bot is running!")

    commands = [
        BotCommand(command="start", description="Start chatting with the bot"),
        BotCommand(command="reset", description="Regenerate the last message"),
        BotCommand(command="reset_memory", description="Clear chat history and context"),
        BotCommand(command="voice_mode", description="Toggle text-to-speech voice replies")
    ]

    await bot.set_my_commands(commands, scope=BotCommandScopeDefault())
    await bot.delete_webhook(drop_pending_updates=True)
    await dp.start_polling(bot)

try:
    await main()
except KeyboardInterrupt:
    print("Bot stopped.")

Bot is running!


