In [None]:
!jupyter nbconvert --ClearMetadataPreprocessor.enabled=True \
    --to notebook --output fixed_notebook.ipynb \
    !jupyter nbconvert --ClearMetadataPreprocessor.enabled=True \
    --to notebook --output fixed_notebook.ipynb \
    "EduAssist.ipynb"

usage: jupyter-nbconvert [-h] [--debug] [--show-config] [--show-config-json]
                         [--generate-config] [-y] [--execute] [--allow-errors]
                         [--stdin] [--stdout] [--inplace] [--clear-output]
                         [--coalesce-streams] [--no-prompt] [--no-input]
                         [--allow-chromium-download]
                         [--disable-chromium-sandbox] [--show-input]
                         [--embed-images] [--sanitize-html]
                         [--log-level NbConvertApp.log_level]
                         [--config NbConvertApp.config_file]
                         [--to NbConvertApp.export_format]
                         [--template TemplateExporter.template_name]
                         [--template-file TemplateExporter.template_file]
                         [--theme HTMLExporter.theme]
                         [--sanitize_html HTMLExporter.sanitize_html]
                         [--writer NbConvertApp.writer_class]
   

In [None]:
# !pip install --upgrade torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# !pip install --upgrade transformers
# !pip install emoji

In [None]:
# !pip install pdfplumber PyPDF2

In [None]:
# !pip install --quiet  nltk faiss-cpu

In [None]:
# !pip install transformers
# !pip install torch
# !pip install emoji
# !pip install nltk
# !pip install pdfplumber
# !pip install PyPDF2
# !pip install sentence-transformers
# !pip install faiss-cpu
# !pip install python-telegram-bot==20.0
# !pip install nest_asyncio

In [None]:
import os
import nest_asyncio
import nltk
from nltk.corpus import stopwords

TOKEN = "8326794111:AAHWrN70roC7EK9rJz9LJyjS17ugMNcx5dM"

nest_asyncio.apply()

nltk.download('stopwords', quiet=True)
STOPWORDS = set(stopwords.words('english'))

from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, pipeline
import torch, re, emoji, pdfplumber, io
from PyPDF2 import PdfReader
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
import asyncio

from telegram import ReplyKeyboardMarkup, Update
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, ContextTypes, filters
from typing import Tuple, List, Dict

# Rag

In [None]:
class RAGStore:
    def __init__(self):
        self.chunks: List[str] = []
        self.metas: List[Dict] = []
        self.index = None
        self.emb_dim = None

    def add_text(self, text: str, source: str = "user") -> None:
        """إضافة نص جديد إلى مخزن المعرفة"""
        chunks = self._chunk_text(text)
        embs = embedder.encode(chunks, convert_to_numpy=True, normalize_embeddings=True)

        if self.index is None:
            self.emb_dim = embs.shape[1]
            self.index = faiss.IndexFlatIP(self.emb_dim)
        self.index.add(embs)

        start_id = len(self.chunks)
        self.chunks.extend(chunks)
        for i in range(len(chunks)):
            self.metas.append({"source": source, "chunk_id": start_id + i})

    @staticmethod
    def _chunk_text(text: str, chunk_words: int = 400) -> List[str]:
        """تقسيم النص إلى أجزاء أصغر"""
        words = text.split()
        return [' '.join(words[i:i+chunk_words]) for i in range(0, len(words), chunk_words)]

    def search(self, query: str, top_k: int = 4) -> List[Tuple[str, Dict, float]]:
        """بحث عن المعلومات ذات الصلة بالنص المطلوب"""
        if not self.chunks:
            return []

        q_emb = embedder.encode([query], convert_to_numpy=True, normalize_embeddings=True)
        D, I = self.index.search(q_emb, min(top_k, len(self.chunks)))

        return [
            (self.chunks[idx], self.metas[idx], float(score))
            for score, idx in zip(D[0], I[0])
            if idx != -1
        ]

# Model Loading

In [None]:
device = 0 if torch.cuda.is_available() else -1
GEN_MODEL = "google/flan-t5-large"
EMBED_MODEL = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"

tokenizer = AutoTokenizer.from_pretrained(GEN_MODEL)
model = AutoModelForSeq2SeqLM.from_pretrained(GEN_MODEL).to("cuda" if device == 0 else "cpu")

text2text_pipe = pipeline(
    "text2text-generation",
    model=model,
    tokenizer=tokenizer,
    device=device,
    max_length=512
)

embedder = SentenceTransformer(EMBED_MODEL, device="cuda" if device == 0 else "cpu")

print(f"Using device: {model.device}")

Device set to use cpu


Using device: cpu


# Text preprocessing

In [None]:
def clean_text(text: str, remove_stopwords: bool = True) -> Tuple[str, int]:
    """Clean text from unwanted characters and emojis"""
    emoji_count = sum(1 for c in text if c in emoji.EMOJI_DATA)
    text = emoji.replace_emoji(text, replace=' ')
    text = re.sub(r'[^a-zA-Z0-9\s.,!?]', ' ', text)
    lines = [line.strip() for line in text.splitlines() if len(line.strip()) > 5]
    text = ' '.join(lines)

    if remove_stopwords:
        text = ' '.join(w for w in text.split() if w.lower() not in STOPWORDS)

    return re.sub(r'\s+', ' ', text).strip(), emoji_count

async def process_with_typing_indicator(update: Update, context: ContextTypes.DEFAULT_TYPE, task, *args):
    """Helper function to show typing indicator during processing"""
    await context.bot.send_chat_action(chat_id=update.effective_chat.id, action="typing")
    try:
        return await task(*args)
    except Exception as e:
        print(f"Error in processing: {e}")
        await update.message.reply_text("❌ An error occurred during processing. Please try again.")
        return None

# Summary Generation

In [None]:
async def generate_summary(text: str) -> str:
    """Generate a concise summary in English"""
    chunks = RAGStore._chunk_text(text, 800)
    summaries = []

    for chunk in chunks:
        prompt = f"Summarize the following text in 3-4 short bullet points. Focus only on the key facts. Remove unnecessary words and keep sentences clear:\n\n{chunk}"


        out = await asyncio.to_thread(
            text2text_pipe,
            prompt,
            max_length=200,
            min_length=60,
            do_sample=True,
            temperature=0.7,
            num_beams=4,
            early_stopping=True
        )
        if out and len(out) > 0:
            summary = out[0]['generated_text'].strip()
            if summary:
                summaries.append(summary)

    final = "\n\n".join(summaries)
    return final if final.strip() else "No summary could be generated."

# Simplified Explanation Generation

In [None]:
async def generate_simplified_explanation(text: str) -> str:
    """Generate a simplified explanation in plain English"""
    chunks = RAGStore._chunk_text(text, 700)
    explanations = []

    for chunk in chunks:
        prompt = f"Explain this text in simple English for a high school student. Use short sentences and clear language:\n\n{chunk}"
        out = await asyncio.to_thread(
            text2text_pipe,
            prompt,
            max_length=250,
            min_length=80,
            do_sample=True,
            temperature=0.6,
            num_beams=4,
            early_stopping=True
        )
        if out and len(out) > 0:
            explanation = out[0]['generated_text'].strip()
            if explanation:
                explanations.append(explanation)

    final = "\n\n".join(explanations)
    return final if final.strip() else "No explanation could be generated."

# Answer Generation

In [None]:
async def generate_answer(question: str, contexts: List[str]) -> str:
    """Answer a question in English based on context"""
    context_text = "\n".join(contexts[:3])
    prompt = f"Answer the question based on the context below. If the answer is not in the context, say 'I don't know'.\n\nContext:\n{context_text}\n\nQuestion: {question}\nAnswer:"

    out = await asyncio.to_thread(
        text2text_pipe,
        prompt,
        max_length=300,
        do_sample=True,
        temperature=0.5,
        num_beams=3
    )

    if out and len(out) > 0:
        answer = out[0]['generated_text'].strip()
        return answer if answer else "I don't know."
    return "I don't know."

# PDF Content Reader

In [None]:
def read_pdf_content(file_bytes: bytes) -> str:
    """Read PDF content using multiple libraries for reliability"""
    text = ""
    try:
        with pdfplumber.open(io.BytesIO(file_bytes)) as pdf:
            for page in pdf.pages:
                extracted = page.extract_text()
                if extracted:
                    text += extracted + "\n"
        if text.strip():
            return text
    except Exception as e:
        print(f"pdfplumber error: {e}")

    try:
        reader = PdfReader(io.BytesIO(file_bytes))
        for page in reader.pages:
            extracted = page.extract_text()
            if extracted:
                text += extracted + "\n"
    except Exception as e:
        print(f"PyPDF2 error: {e}")

    return text

# Bot

In [None]:
MENU_KEYBOARD = ReplyKeyboardMarkup(
    [
        ["Summarize Text 📝", "Explain Text 📖"],
        ["Ask Questions ❓", "Clear Memory 🗑️"],
        ["Help ℹ️"]
    ],
    resize_keyboard=True,
    one_time_keyboard=False
)

user_data = {}

async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    user_id = update.message.from_user.id
    user_data[user_id] = {"rag": RAGStore(), "mode": None}
    await update.message.reply_text(
        "👋 Welcome to the Study Assistant Bot!\n\n"
        "I can help you with:\n"
        "• 📝 Summarizing texts and PDFs\n"
        "• 📖 Explaining content in simple English\n"
        "• ❓ Answering your questions about the material\n\n"
        "Send me a text or PDF to get started!",
        reply_markup=MENU_KEYBOARD
    )

async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    await update.message.reply_text(
        "🆘 Help:\n\n"
        "• Send a PDF or text\n"
        "• Use the menu to Summarize, Explain, or Ask\n"
        "• Type 'exit' to leave question mode",
        reply_markup=MENU_KEYBOARD
    )

async def handle_menu_selection(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    user_id = update.message.from_user.id
    if user_id not in user_data:
        await start_command(update, context)
        return

    choice = update.message.text
    rag_store = user_data[user_id]["rag"]

    if "Summarize" in choice:
        if not rag_store.chunks:
            await update.message.reply_text("⚠️ No content loaded yet. Please send a PDF or text first.")
            return
        await update.message.reply_text("⏳ Generating summary...")
        original_text = " ".join(rag_store.chunks)
        summary = await process_with_typing_indicator(
          update,
          context,
          generate_summary,
          original_text
        )
        if summary:
            await update.message.reply_text(f"📝 Summary:\n\n{summary}\n\nChoose another option 👇", reply_markup=MENU_KEYBOARD)

    elif "Explain" in choice:
        if not rag_store.chunks:
            await update.message.reply_text("⚠️ No content loaded yet. Please send a PDF or text first.")
            return
        await update.message.reply_text("⏳ Generating explanation...")
        explanation = await process_with_typing_indicator(update, context, generate_simplified_explanation, " ".join(rag_store.chunks))
        if explanation:
            await update.message.reply_text(f"📖 Explanation:\n\n{explanation}\n\nChoose another option 👇", reply_markup=MENU_KEYBOARD)

    elif "Ask" in choice:
        user_data[user_id]["mode"] = "qa"
        await update.message.reply_text(
            "💬 Ask any question about the content.\nType 'exit' to return.",
            reply_markup=ReplyKeyboardMarkup([["exit"]], resize_keyboard=True)
        )

    elif "Clear" in choice:
        user_data[user_id] = {"rag": RAGStore(), "mode": None}
        await update.message.reply_text("🧹 Memory cleared. Send a new document.", reply_markup=MENU_KEYBOARD)

    elif "Help" in choice:
        await help_command(update, context)

async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    user_id = update.message.from_user.id
    if user_id not in user_data:
        user_data[user_id] = {"rag": RAGStore(), "mode": None}

    await update.message.reply_text("⏳ Processing your PDF...")
    try:
        file = await update.message.document.get_file()
        file_bytes = await file.download_as_bytearray()
        text = read_pdf_content(file_bytes)

        if not text.strip():
            await update.message.reply_text("⚠️ Could not read text from PDF.")
            return

        clean_text_content, _ = clean_text(text)
        user_data[user_id]["rag"].add_text(clean_text_content, source="document")

        await update.message.reply_text(
            "✅ PDF processed!\nNow choose: Summarize, Explain, or Ask Questions",
            reply_markup=MENU_KEYBOARD
        )
    except Exception as e:
        print(f"PDF Error: {e}")
        await update.message.reply_text("❌ Failed to process PDF.")

async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    user_id = update.message.from_user.id
    if user_id not in user_data:
        await start_command(update, context)
        return

    text = update.message.text.strip()

    if text.lower() in ["exit", "quit"]:
        user_data[user_id]["mode"] = None
        await update.message.reply_text("Back to main menu.", reply_markup=MENU_KEYBOARD)
        return

    if user_data[user_id].get("mode") == "qa":
        rag_store = user_data[user_id]["rag"]
        if not rag_store.chunks:
            await update.message.reply_text("⚠️ No content to answer questions.")
            return
        await update.message.reply_text("🔍 Finding answer...")
        results = rag_store.search(text, top_k=3)
        contexts = [res[0] for res in results]
        answer = await process_with_typing_indicator(update, context, generate_answer, text, contexts)
        await update.message.reply_text(f"❓ {text}\n\n💡 {answer}\n\nAsk more or type 'exit'.")
    else:
        clean_text_content, _ = clean_text(text)
        user_data[user_id]["rag"].add_text(clean_text_content, source="text")
        await update.message.reply_text(
            "✅ Text saved!\nNow choose: Summarize, Explain, or Ask Questions",
            reply_markup=MENU_KEYBOARD
        )

# Bot Application

In [None]:
def setup_application() -> ApplicationBuilder:
    app = ApplicationBuilder().token("8326794111:AAHWrN70roC7EK9rJz9LJyjS17ugMNcx5dM").build()
    app.add_handler(CommandHandler("start", start_command))
    app.add_handler(CommandHandler("help", help_command))
    app.add_handler(MessageHandler(filters.Regex("^(Summarize|Explain|Ask|Clear|Help)"), handle_menu_selection))
    app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
    app.add_handler(MessageHandler(filters.Document.PDF, handle_document))
    return app

async def run_bot():
    app = setup_application()
    await app.run_polling()

if __name__ == "__main__":
    asyncio.run(run_bot())

RuntimeError: Cannot close a running event loop