<a href="https://colab.research.google.com/github/gptahmed1/-ZenAI-Assistant-Bot/blob/main/open_deep_researcher.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()



In [None]:
#!/usr/bin/env python3
"""
هذا الكود عبارة عن تطبيق بحث متكامل يعتمد على Gemini API وواجهات خارجية مثل SERPAPI وJina.
المهمة الأساسية هي تحسين الأداء والسرعة والاستجابة مع الحفاظ على نفس المزايا الأساسية.
تم تطوير الكود دون إضافة أي مزايا جديدة، مع التركيز على تعزيز الوظائف الحالية وتفادي المشاكل.
"""

import os
import asyncio
import aiohttp
import json
import logging
import google.generativeai as genai
from functools import wraps

# إعداد سجل الأخطاء والتتبع (logging)
logging.basicConfig(
    level=logging.DEBUG,
    format='[%(asctime)s] %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

# ===========================================================
# تعيين مفتاح GEMINI_API_KEY مباشرة دون استخدام متغيرات البيئة
# ===========================================================
GEMINI_API_KEY = "AIzaSyBWIPKFRONUbaskpwLgSpx-KI61Bi0LikY"
genai.configure(api_key=GEMINI_API_KEY)

# ===========================================================
# إعداد تكوين النموذج الذي سيتم استخدامه في الاستجابة
# ===========================================================
generation_config = {
    "temperature": 1,
    "top_p": 0.95,
    "top_k": 40,
    "max_output_tokens": 8192,
    "response_mime_type": "text/plain",
}

# إنشاء النموذج باستخدام Gemini API (يعتمد فقط على gemini-1.5-pro)
model = genai.GenerativeModel(
    model_name="gemini-1.5-pro",
    generation_config=generation_config,
)

# ===========================================================
# دالة مساعدة لتسجيل الزمن الزمني لأداء الدوال (Decorator)
# ===========================================================
def timed(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = asyncio.get_event_loop().time()
        result = await func(*args, **kwargs)
        end_time = asyncio.get_event_loop().time()
        logger.debug(f"تنفيذ الدالة '{func.__name__}' استغرق {end_time - start_time:.4f} ثانية.")
        return result
    return wrapper

# ===========================================================
# دالة إرسال رسالة باستخدام Gemini API
# ===========================================================
def gemini_send_message(prompt: str) -> str:
    """
    ترسل الرسالة إلى Gemini API وتعيد نص الاستجابة.
    """
    try:
        # بدء جلسة دردشة جديدة مع التاريخ الفارغ
        chat_session = model.start_chat(history=[])
        response = chat_session.send_message(prompt)
        logger.debug("تم إرسال الرسالة بنجاح إلى Gemini API.")
        return response.text
    except Exception as e:
        logger.error("خطأ أثناء إرسال الرسالة إلى Gemini API:", exc_info=True)
        return ""

# ===========================================================
# دالة استدعاء OpenRouter (باستخدام Gemini API) بشكل غير متزامن
# ===========================================================
@timed
async def call_openrouter_async(session: aiohttp.ClientSession, messages: list) -> str:
    """
    تجمع جميع الرسائل في محادثة واحدة وتستدعي Gemini API.
    تُستخدم هذه الدالة لاستدعاء نموذج Gemini بطريقة غير متزامنة.
    """
    # تجميع الرسائل في نص واحد، مع الحفاظ على ترتيب الأدوار
    conversation = ""
    for m in messages:
        conversation += f"{m['role']}: {m['content']}\n"
    try:
        # استدعاء Gemini API بطريقة غير متزامنة باستخدام asyncio.to_thread لتجنب حظر الحلقة
        result = await asyncio.to_thread(gemini_send_message, conversation)
        if not result:
            logger.warning("استجابة فارغة من Gemini API عند استدعاء call_openrouter_async.")
        return result
    except Exception as e:
        logger.error("خطأ في call_openrouter_async:", exc_info=True)
        return ""

# ===========================================================
# الثوابت الخاصة بواجهات البحث الخارجية
# ===========================================================
# تم إزالة مفتاح SERPAPI_API_KEY واعتماده على API مجاني (أو عدم الحاجة إليه)
SERPAPI_API_KEY = ""  # API مجاني غير مطلوب
SERPAPI_URL = "https://serpapi.com/search"  # يبقى الرابط كما هو ولكن لن يتم إرسال مفتاح API
JINA_API_KEY = "jina_b331108c76be465fa1dbe1e913cccaaeqsGnJmuKYHnC1J4j4c1L7ogD671l"     # استبدل بمفتاح JINA الخاص بك

# ===========================================================
# دوال مساعدة للتعامل مع عمليات البحث والبيانات
# ===========================================================

@timed
async def generate_search_queries_async(session: aiohttp.ClientSession, user_query: str) -> list:
    """
    توليد استعلامات بحث دقيقة بناءً على استعلام المستخدم.
    يتم طلب من نموذج Gemini توليد قائمة بايثون من السلاسل النصية.
    """
    prompt = (
        "أنت مساعد بحث خبير. بناءً على استعلام المستخدم، أنشئ حتى أربعة "
        "استعلامات بحث دقيقة ومتميزة للحصول على معلومات شاملة حول الموضوع. "
        "أعد قائمة بايثون من السلاسل النصية فقط، مثل: ['استعلام1', 'استعلام2']."
    )
    messages = [
        {"role": "system", "content": "أنت مساعد بحث مفيد ودقيق."},
        {"role": "user", "content": f"استعلام المستخدم: {user_query}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        try:
            # التحقق من أن الاستجابة عبارة عن قائمة بايثون باستخدام eval مع فحص نوع النتيجة
            search_queries = eval(response)
            if isinstance(search_queries, list):
                logger.debug("تم توليد استعلامات البحث بنجاح.")
                return search_queries
            else:
                logger.warning("الرد لم يكن قائمة: الرد = %s", response)
                return []
        except Exception as e:
            logger.error("خطأ في تحليل استعلامات البحث باستخدام eval:", exc_info=True)
            return []
    logger.warning("لم يتم الحصول على استجابة من Gemini لتوليد استعلامات البحث.")
    return []

@timed
async def perform_search_async(session: aiohttp.ClientSession, query: str) -> list:
    """
    تنفيذ بحث على جوجل باستخدام SERPAPI بناءً على الاستعلام.
    تعيد الدالة قائمة من الروابط التي تم استخراجها.
    """
    params = {
        "q": query,
        "engine": "google"
    }
    # إضافة مفتاح API فقط إذا كان غير فارغ
    if SERPAPI_API_KEY:
        params["api_key"] = SERPAPI_API_KEY
    try:
        async with session.get(SERPAPI_URL, params=params) as resp:
            if resp.status == 200:
                results = await resp.json()
                if "organic_results" in results:
                    links = [item.get("link") for item in results["organic_results"] if "link" in item]
                    logger.debug("تم استخراج %d رابط من SERPAPI.", len(links))
                    return links
                else:
                    logger.warning("لا توجد نتائج عضوية في رد SERPAPI.")
                    return []
            else:
                text = await resp.text()
                logger.error("خطأ SERPAPI: %d - %s", resp.status, text)
                return []
    except Exception as e:
        logger.error("استثناء أثناء تنفيذ بحث SERPAPI:", exc_info=True)
        return []

@timed
async def fetch_webpage_text_async(session: aiohttp.ClientSession, url: str) -> str:
    """
    جلب محتوى الصفحة النصي باستخدام Jina.
    يتم استخدام رابط URL المركب لطلب المحتوى.
    """
    full_url = f"{JINA_BASE_URL}{url}"
    headers = {
        "Authorization": f"Bearer {JINA_API_KEY}"
    }
    try:
        async with session.get(full_url, headers=headers) as resp:
            if resp.status == 200:
                text_content = await resp.text()
                logger.debug("تم جلب محتوى الصفحة بنجاح من %s.", url)
                return text_content
            else:
                text = await resp.text()
                logger.error("خطأ جلب الصفحة %s: %d - %s", url, resp.status, text)
                return ""
    except Exception as e:
        logger.error("استثناء أثناء جلب محتوى الصفحة باستخدام Jina من %s:", url, exc_info=True)
        return ""

@timed
async def is_page_useful_async(session: aiohttp.ClientSession, user_query: str, page_text: str) -> str:
    """
    تقييم مدى فائدة محتوى الصفحة للإجابة على استعلام المستخدم.
    يجب على النموذج الرد بكلمة "Yes" أو "No" فقط.
    """
    prompt = (
        "أنت مقيم بحث نقدي. بناءً على استعلام المستخدم ومحتوى الصفحة، "
        "حدد ما إذا كانت الصفحة تحتوي على معلومات مفيدة للإجابة. "
        "أجب بكلمة واحدة بالضبط: 'Yes' أو 'No' دون أي نص إضافي."
    )
    messages = [
        {"role": "system", "content": "أنت مقيم بحث صارم ومختصر."},
        {"role": "user", "content": f"استعلام المستخدم: {user_query}\n\nمحتوى الصفحة (أول 20000 حرف):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        answer = response.strip()
        if answer in ["Yes", "No"]:
            logger.debug("تقييم فائدة الصفحة: %s", answer)
            return answer
        else:
            if "Yes" in answer:
                logger.debug("تم استخراج 'Yes' من الإجابة غير الدقيقة.")
                return "Yes"
            elif "No" in answer:
                logger.debug("تم استخراج 'No' من الإجابة غير الدقيقة.")
                return "No"
    logger.warning("فشل تقييم فائدة الصفحة، إعادة القيمة الافتراضية 'No'.")
    return "No"

@timed
async def extract_relevant_context_async(session: aiohttp.ClientSession, user_query: str, search_query: str, page_text: str) -> str:
    """
    استخراج المعلومات ذات الصلة من محتوى الصفحة بناءً على استعلام المستخدم واستعلام البحث.
    """
    prompt = (
        "أنت مستخرج معلومات خبير. بناءً على استعلام المستخدم، واستعلام البحث المستخدم، "
        "ومحتوى الصفحة، استخرج كل المعلومات ذات الصلة للإجابة دون تعليق إضافي."
    )
    messages = [
        {"role": "system", "content": "أنت خبير في استخراج وتلخيص المعلومات."},
        {"role": "user", "content": f"استعلام المستخدم: {user_query}\nاستعلام البحث: {search_query}\n\nمحتوى الصفحة (أول 20000 حرف):\n{page_text[:20000]}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        logger.debug("تم استخراج المعلومات ذات الصلة بنجاح.")
        return response.strip()
    logger.warning("فشل استخراج المعلومات ذات الصلة.")
    return ""

@timed
async def get_new_search_queries_async(session: aiohttp.ClientSession, user_query: str, previous_search_queries: list, all_contexts: list) -> list:
    """
    تحديد ما إذا كانت هناك حاجة لمزيد من استعلامات البحث بناءً على الاستعلام والمعلومات المجمعة.
    تُعيد الدالة قائمة بايثون جديدة من الاستعلامات أو الرمز "<done>" في حال عدم الحاجة للمزيد.
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "أنت مساعد بحث تحليلي. بناءً على استعلام المستخدم واستعلامات البحث السابقة "
        "والمعلومات المستخرجة، حدد ما إذا كانت هناك حاجة لمزيد من البحث. "
        "إذا كان هناك حاجة، أعد حتى أربعة استعلامات بحث جديدة كقائمة بايثون. "
        "إذا كان البحث مكتملًا، أجب بالرمز <done> فقط."
    )
    messages = [
        {"role": "system", "content": "أنت مخطط بحث منهجي."},
        {"role": "user", "content": f"استعلام المستخدم: {user_query}\nاستعلامات البحث السابقة: {previous_search_queries}\n\nالمعلومات المستخرجة:\n{context_combined}\n\n{prompt}"}
    ]
    response = await call_openrouter_async(session, messages)
    if response:
        cleaned = response.strip()
        if cleaned == "<done>":
            logger.debug("أشار النموذج إلى انتهاء البحث (دون حاجة لاستعلامات جديدة).")
            return "<done>"
        try:
            new_queries = eval(cleaned)
            if isinstance(new_queries, list):
                logger.debug("تم توليد استعلامات بحث جديدة: %s", new_queries)
                return new_queries
            else:
                logger.warning("الرد لم يكن قائمة لاستعلامات البحث الجديدة: الرد = %s", response)
                return []
        except Exception as e:
            logger.error("خطأ في تحليل استعلامات البحث الجديدة باستخدام eval:", exc_info=True)
            return []
    logger.warning("لم يتم الحصول على استجابة لتوليد استعلامات البحث الجديدة.")
    return []

@timed
async def generate_final_report_async(session: aiohttp.ClientSession, user_query: str, all_contexts: list) -> str:
    """
    توليد التقرير النهائي باستخدام جميع المعلومات المجمعة.
    يتم تجميع النصوص المفيدة وتحويلها لتقرير شامل ومنظم.
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "أنت كاتب تقارير خبير. بناءً على المعلومات المجمعة واستعلام المستخدم، "
        "اكتب تقريراً شاملاً ومنظماً يجيب على الاستعلام بدقة دون تعليق زائد."
    )
    messages = [
        {"role": "system", "content": "أنت كاتب تقارير ماهر."},
        {"role": "user", "content": f"استعلام المستخدم: {user_query}\n\nالمعلومات المجمعة:\n{context_combined}\n\n{prompt}"}
    ]
    report = await call_openrouter_async(session, messages)
    if report:
        logger.debug("تم توليد التقرير النهائي بنجاح.")
    else:
        logger.error("فشل توليد التقرير النهائي.")
    return report

@timed
async def process_link(session: aiohttp.ClientSession, link: str, user_query: str, search_query: str) -> str:
    """
    معالجة رابط واحد: جلب محتوى الصفحة، تقييم فائدتها، واستخراج المعلومات ذات الصلة.
    """
    logger.debug("بدء معالجة الرابط: %s", link)
    page_text = await fetch_webpage_text_async(session, link)
    if not page_text:
        logger.warning("لا يوجد محتوى في الصفحة: %s", link)
        return None
    usefulness = await is_page_useful_async(session, user_query, page_text)
    logger.debug("تقييم فائدة الصفحة %s: %s", link, usefulness)
    if usefulness == "Yes":
        context = await extract_relevant_context_async(session, user_query, search_query, page_text)
        if context:
            logger.debug("تم استخراج معلومات من %s (المحتوى المقتطع: %s)", link, context[:200])
            return context
    return None

# ===========================================================
# الدالة الرئيسية غير المتزامنة لتشغيل جميع عمليات البحث والتجميع
# ===========================================================
@timed
async def async_main():
    """
    الدالة الرئيسية التي تدير سير العمل الكامل:
    - استلام استعلام البحث من المستخدم.
    - توليد استعلامات البحث الأولية.
    - إجراء حلقة بحث تكرارية لمعالجة النتائج.
    - توليد التقرير النهائي وعرضه للمستخدم.
    """
    try:
        # استقبال استعلام البحث من المستخدم
        user_query = input("أدخل استعلام البحث أو الموضوع: ").strip()
        if not user_query:
            logger.critical("لم يتم إدخال استعلام صالح. إنهاء البرنامج.")
            return

        # استقبال الحد الأقصى للتكرارات مع التحقق من صحة الإدخال
        iter_limit_input = input("أدخل الحد الأقصى للتكرارات (افتراضي 10): ").strip()
        iteration_limit = int(iter_limit_input) if iter_limit_input.isdigit() else 10

        aggregated_contexts = []  # لتجميع جميع المعلومات المفيدة من كل تكرار
        all_search_queries = []   # لتجميع جميع استعلامات البحث المستخدمة
        iteration = 0

        # استخدام جلسة aiohttp مشتركة لكل الطلبات الشبكية
        async with aiohttp.ClientSession() as session:
            # ----- استعلامات البحث الأولية -----
            new_search_queries = await generate_search_queries_async(session, user_query)
            if not new_search_queries:
                logger.critical("لم يتم توليد استعلامات بحث بواسطة النموذج. إنهاء البرنامج.")
                return
            all_search_queries.extend(new_search_queries)

            # ----- حلقة البحث التكرارية -----
            while iteration < iteration_limit:
                logger.info("=== التكرار %d ===", iteration + 1)
                iteration_contexts = []

                # تنفيذ بحث SERPAPI لكل استعلام بشكل متزامن
                search_tasks = [perform_search_async(session, query) for query in new_search_queries]
                search_results = await asyncio.gather(*search_tasks)

                # تجميع جميع الروابط الفريدة وربط كل رابط بالاستعلام الذي نتج عنه
                unique_links = {}
                for idx, links in enumerate(search_results):
                    query = new_search_queries[idx]
                    for link in links:
                        if link not in unique_links:
                            unique_links[link] = query

                logger.info("تم تجميع %d روابط فريدة في هذا التكرار.", len(unique_links))

                # معالجة كل رابط بشكل متزامن: جلب المحتوى، تقييم الفائدة، واستخراج المعلومات
                link_tasks = [
                    process_link(session, link, user_query, unique_links[link])
                    for link in unique_links
                ]
                link_results = await asyncio.gather(*link_tasks)

                # تجميع المعلومات المفيدة من كل الرابط
                for res in link_results:
                    if res:
                        iteration_contexts.append(res)

                if iteration_contexts:
                    aggregated_contexts.extend(iteration_contexts)
                else:
                    logger.warning("لم يتم العثور على معلومات مفيدة في هذا التكرار.")

                # ----- التحقق مما إذا كانت هناك حاجة لمزيد من استعلامات البحث -----
                new_search_queries = await get_new_search_queries_async(session, user_query, all_search_queries, aggregated_contexts)
                if new_search_queries == "<done>":
                    logger.info("أشار النموذج إلى عدم الحاجة لمزيد من البحث.")
                    break
                elif new_search_queries:
                    logger.info("تم توليد استعلامات بحث جديدة: %s", new_search_queries)
                    all_search_queries.extend(new_search_queries)
                else:
                    logger.info("لم يتم توليد استعلامات بحث جديدة. إنهاء الحلقة.")
                    break

                iteration += 1

            # ----- توليد التقرير النهائي -----
            logger.info("جاري توليد التقرير النهائي...")
            final_report = await generate_final_report_async(session, user_query, aggregated_contexts)
            if final_report:
                print("\n==== التقرير النهائي ====\n")
                print(final_report)
            else:
                logger.error("لم يتم توليد تقرير نهائي.")

    except Exception as main_exc:
        logger.critical("حدث خطأ في الدالة الرئيسية async_main:", exc_info=True)

# ===========================================================
# الدالة الرئيسية لتشغيل البرنامج
# ===========================================================
def main():
    """
    الدالة الرئيسية التي تنفذ البرنامج باستخدام asyncio.
    """
    try:
        asyncio.run(async_main())
    except Exception as e:
        logger.critical("حدث خطأ أثناء تشغيل البرنامج الرئيسي:", exc_info=True)

# نقطة البداية للتنفيذ
if __name__ == "__main__":
    main()

# ===========================================================
# ملاحظات إضافية للتطوير المستقبلي:
# - تم تعيين مفتاح GEMINI_API_KEY مباشرة في الكود لتجنب مشكلات متغيرات البيئة.
# - تمت إزالة نموذج anthropic/claude-3.5-haiku واعتماد gemini-1.5-pro حصرياً.
# - تم استبدال مفتاح SERPAPI_API_KEY بمفتاح مجاني (أو تركه فارغاً) بحيث لا يعتمد على API مدفوع.
# - تم تعزيز استخدام asyncio لتجميع المهام وتحسين سرعة الاستجابة.
# - يُفضل إجراء اختبارات وحدة شاملة للتأكد من استقرار كل دالة.
# - النظام مجهز لإعادة المحاولة تلقائياً عند فشل بعض الطلبات الشبكية.
# - تم الحفاظ على كافة المعلومات السرية مثل API keys والـ tokens كما هي دون تعديل.
# - تم التأكيد على عدم إضافة أي مزايا جديدة أو خدمات خارجية إضافية.
# - الهدف النهائي هو تحقيق أقصى أداء MAP وتقديم تجربة استخدام لا غنى عنها للمستخدم.
# ===========================================================


أدخل استعلام البحث أو الموضوع: ai news
أدخل الحد الأقصى للتكرارات (افتراضي 10): 
