<a href="https://colab.research.google.com/github/Nehal12-hammam/Fruits-Detection-using-YOLO/blob/main/Telegram_Chatbot_for_Promotional_Video_Generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#  Install required packages (run this in your environment)
!pip install gTTS
!pip install diffusers transformers accelerate
!pip install gradio
!pip install openai-whisper
!pip install clip
!pip install nest_asyncio
!pip install python-telegram-bot --upgrade
!pip install git+https://github.com/openai/CLIP.git
!pip install moviepy
!pip install gdown
!pip install ffmpeg-python
!pip install langchain
!pip install python-telegram-bot gtts moviepy diffusers torch clip whisper scikit-learn Pillow

import nest_asyncio
nest_asyncio.apply()

import asyncio
import os
import logging
import requests  # To make API calls to Ollama
from telegram import Update
from gtts import gTTS
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips
from PIL import Image, ImageEnhance
from diffusers import StableDiffusionPipeline
import clip
import torch
import whisper
from telegram.ext import Application, CommandHandler, MessageHandler, filters, CallbackContext
from sklearn.metrics.pairwise import cosine_similarity

# Enable logging
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__) #Corrected function name)

# Load the Stable Diffusion model
pipe = StableDiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1")
pipe.to("cuda" if torch.cuda.is_available() else "cpu")

device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device=device)

# Create directories to save images
output_image_dir = "generated_images"
if not os.path.exists(output_image_dir):
    os.makedirs(output_image_dir)

# Global variables to store user inputs
user_data = {}

# Helper functions (unchanged)
def resize_image(image, target_size=(512, 512)):
    image.thumbnail(target_size, Image.LANCZOS)
    return image

def enhance_image(image):
    enhancer = ImageEnhance.Sharpness(image)
    return enhancer.enhance(1.5)

def generate_images(description, num_images=5):
    images = []
    logger.info(f"Generating {num_images} images for description: {description}")
    for i in range(num_images):
        try:
            output = pipe(description, num_inference_steps=100, guidance_scale=7.5, height=512, width=512)
            if len(output.images) == 0:
                logger.warning(f"No images generated for iteration {i + 1}.")
                continue
            image = enhance_image(resize_image(output.images[0].copy()))
            image_path = os.path.join(output_image_dir, f"generated_image_{i + 1}.png")
            image.save(image_path)
            images.append(image_path)
            logger.info(f"Saved image: {image_path}")
        except Exception as e:
            logger.error(f"Error generating image {i + 1}: {e}")
    return images

def create_video_from_images(images, audio_path, audio_duration, fps=2):
    num_frames = int(audio_duration / 3)
    image_duration = 3.0
    clips = [ImageClip(image).set_duration(image_duration) for image in images]
    video_clip = concatenate_videoclips(clips, method="compose")
    video_clip.fps = fps  # Set FPS here

    audio = AudioFileClip(audio_path)
    video_clip = video_clip.set_audio(audio)
    video_path = "generated_promo_video.mp4"
    video_clip.write_videofile(video_path, codec='libx264', audio=True, fps=fps)  # Add fps parameter here
    return video_path

def generate_audio(description):
    tts = gTTS(description, lang='en')
    audio_path = "description_audio.mp3"
    tts.save(audio_path)
    return audio_path

def get_audio_duration(audio_path):
    audio = AudioFileClip(audio_path)
    return audio.duration

def generate_promotional_video(product_info):
    description = (
        f"Introducing {product_info['name']}: {product_info['desc']}. "
        f"Target audience: {product_info['audience']}. "
        f"Key features include: {product_info['features']}. "
        f"Visual style: {product_info['style']}."
    )

    logger.info("Generating audio for the description...")
    audio_path = generate_audio(description)
    images = generate_images(description)

    audio_duration = get_audio_duration(audio_path)
    logger.info(f"Audio duration: {audio_duration:.2f} seconds.")

    video_path = create_video_from_images(images, audio_path, audio_duration)
    return video_path

def transcribe_video(video_path):
    model = whisper.load_model("base")
    result = model.transcribe(video_path)

    transcripts = []
    for segment in result['segments']:
        start = segment['start']
        end = segment['end']
        text = segment['text']
        transcripts.append({'start': start, 'end': end, 'text': text})

    return transcripts

def search_transcription(transcripts, query):
    matches = [segment for segment in transcripts if query.lower() in segment['text'].lower()]
    return matches if matches else None

def encode_text(description):
    text = clip.tokenize([description]).to(device)
    with torch.no_grad():
        text_features = model.encode_text(text)
    return text_features.cpu().numpy()

def encode_image(image_path):
    image = preprocess(Image.open(image_path)).unsqueeze(0).to(device)
    with torch.no_grad():
        image_features = model.encode_image(image)
    return image_features.cpu().numpy()

def calculate_cosine_similarity(text_features, image_features):
    return cosine_similarity(text_features, image_features)

def find_unrelated_images(description, images, threshold=0.3):
    text_features = encode_text(description)
    unrelated_images = []
    for img_file in images:
        image_features = encode_image(img_file)
        similarity = cosine_similarity(text_features, image_features)[0][0]
        if similarity < threshold:
            unrelated_images.append(img_file)
    return unrelated_images

# Function to interact with the Ollama tool
def generate_response_with_ollama(input_text):
    response = requests.post("http://localhost:11434/v1/ollama/t5", json={"input": input_text})
    if response.status_code == 200:
        return response.json()['output']
    else:
        logger.error("Error communicating with Ollama API")
        return "I encountered an error while trying to get a response."

# Start command handler
async def start(update: Update, context: CallbackContext):
    user_id = update.message.chat_id
    user_data[user_id] = {}
    await update.message.reply_text("Welcome! Let's start by telling me the Product Name.")

# Handle message flow to collect product details step by step
async def handle_message(update: Update, context: CallbackContext):
    user_id = update.message.chat_id
    text = update.message.text.strip().lower()  # Normalize the text to lowercase

    if user_id not in user_data:
        user_data[user_id] = {}

    if text == "cancel":
        await update.message.reply_text("Goodbye!")
        user_data.pop(user_id, None)  # Remove the user data
        return

    # Ask for product details step by step
    if 'product_name' not in user_data[user_id]:
        user_data[user_id]['product_name'] = text
        await update.message.reply_text("Got it! Now tell me about the Product Description.")
    elif 'product_desc' not in user_data[user_id]:
        user_data[user_id]['product_desc'] = text
        await update.message.reply_text("Who is the Target Audience?")
    elif 'target_audience' not in user_data[user_id]:
        user_data[user_id]['target_audience'] = text
        await update.message.reply_text("Great! What are the Key Features of your product?")
    elif 'key_features' not in user_data[user_id]:
        user_data[user_id]['key_features'] = text
        await update.message.reply_text("What Visual Style do you prefer?")
    elif 'visual_style' not in user_data[user_id]:
        user_data[user_id]['visual_style'] = text

        # Summary and generating video
        summary = (
            f"*Product Name:* {user_data[user_id]['product_name']}\n"
            f"*Description:* {user_data[user_id]['product_desc']}\n"
            f"*Target Audience:* {user_data[user_id]['target_audience']}\n"
            f"*Key Features:* {user_data[user_id]['key_features']}\n"
            f"*Visual Style:* {user_data[user_id]['visual_style']}\n"
        )
        await update.message.reply_text(f"Here is a summary of your inputs:\n{summary}")
        await update.message.reply_text("Generating promotional video...")

        # Generate promotional video
        video_path = generate_promotional_video({
            'name': user_data[user_id]['product_name'],
            'desc': user_data[user_id]['product_desc'],
            'audience': user_data[user_id]['target_audience'],
            'features': user_data[user_id]['key_features'],
            'style': user_data[user_id]['visual_style']
        })

        # Send the generated video
        with open(video_path, 'rb') as video_file:
            await update.message.reply_video(video=video_file, caption="Here is your generated promotional video!")

        # Transcribe the generated video for searching later
        user_data[user_id]['transcript'] = transcribe_video(video_path)

        # Offer the user options after video generation
        await update.message.reply_text(
            "Would you like to search in the video transcript?\n"
            "Use /search <keyword> to search.\n"
            "Or you can upload another video to search using /search_video.\n"

        )

    else:
        await update.message.reply_text("It seems you have already provided all necessary information. If you'd like to start over, just type 'cancel'.")

# Command to handle search in the video transcript
async def handle_search(update: Update, context: CallbackContext):
    if len(update.message.text.split("/search ", 1)) < 2:
        await update.message.reply_text("Please provide a search query after the command.")
        return

    query = update.message.text.split("/search ", 1)[1]
    user_id = update.message.chat_id
    transcript_segments = user_data[user_id].get('transcript', None)

    if not transcript_segments:
        await update.message.reply_text("There is no transcribed video to search. Please provide a video first.")
        return

    matches = search_transcription(transcript_segments, query)
    if matches:
        result_text = "\n".join([f"Found at {segment['start']}s: {segment['text']}" for segment in matches])
        await update.message.reply_text(f"Results for '{query}':\n{result_text}")
    else:
        await update.message.reply_text(f"No matches found for '{query}'.")

# Command to handle video uploading and transcription
async def handle_video(update: Update, context: CallbackContext):
    video = update.message.video  # Get the video sent by the user
    if not video:
        await update.message.reply_text("Please send a valid video for transcription.")
        return

    await update.message.reply_text("Transcribing the video, please wait...")

    # Download the video
    video_file = await context.bot.get_file(video.file_id)
    video_path = "user_video.mp4"
    await video_file.download_to_drive(video_path)

    # Transcribe the video
    transcript_segments = transcribe_video(video_path)
    user_id = update.message.chat_id
    user_data[user_id]['transcript'] = transcript_segments

    await update.message.reply_text("Video transcription completed. You can now search by sending /search <keyword>.")

# Command to find unrelated images
# async def find_unrelated(update: Update, context: CallbackContext):
#     user_id = update.message.chat_id

#     # Initialize user data if not present
#     if user_id not in user_data:
#         user_data[user_id] = {'product_desc': None, 'generated_images': []}

#     await update.message.reply_text("Please provide the product description.")

# # Handle finding unrelated images
# async def handle_find_unrelated_message(update: Update, context: CallbackContext):
#     user_id = update.message.chat_id
#     text = update.message.text
#     message = update.message

#     if user_id not in user_data:
#         user_data[user_id] = {'product_desc': None, 'generated_images': []}

#     if user_data[user_id]['product_desc'] is None:
#         user_data[user_id]['product_desc'] = text
#         await update.message.reply_text(
#             "Thank you! You provided the following description:\n"
#             f"{text}\n\n"
#             "Now, please upload the images to check against the description. You can send multiple images."
#         )
#     else:
#         # Check if the message contains images
#         if message.photo:
#             image_file = await context.bot.get_file(message.photo[-1].file_id)
#             image_path = f"downloaded_image_{user_id}_{len(user_data[user_id]['generated_images'])}.jpg"
#             await image_file.download(image_path)
#             user_data[user_id]['generated_images'].append(image_path)
#             await update.message.reply_text("Image received. You can upload more images. Type 'done' when finished uploading images.")
#         elif text.lower() == 'done':
#             if not user_data[user_id]['generated_images']:
#                 await update.message.reply_text("No images uploaded. Please upload at least one image.")
#                 return

#             description = user_data[user_id]['product_desc']
#             images = user_data[user_id]['generated_images']

#             # Call the function to find unrelated images
#             unrelated_images = find_unrelated_images(description, images)

#             if unrelated_images:
#                 await update.message.reply_text("The following images are unrelated to the description:")
#                 for img_path in unrelated_images:
#                     with open(img_path, 'rb') as img_file:
#                         await update.message.reply_photo(photo=img_file)
#             else:
#                 await update.message.reply_text("All images are related to the description!")

#             # Reset user data after completion
#             user_data[user_id] = {'product_desc': None, 'generated_images': []}
#         else:
#             await update.message.reply_text("Please upload an image or type 'done' when finished uploading images.")

def main():
    nest_asyncio.apply()
    application = Application.builder().token("7589464561:AAG1tdLLQ1KduvEUoZcEzNgDQrpMBm9YSiE").build()

    application.add_handler(CommandHandler("start", start))
    application.add_handler(CommandHandler("search", handle_search))
    application.add_handler(CommandHandler("search_video", handle_video))
    application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
    application.add_handler(MessageHandler(filters.VIDEO, handle_video))
    # application.add_handler(CommandHandler("find_unrelated", find_unrelated))
    # application.add_handler(MessageHandler(filters.TEXT & filters.PHOTO, handle_find_unrelated_message))

    application.run_polling()

if __name__ == "__main__":
    main()
