In [None]:
from IPython import get_ipython
from IPython.display import display
# %%
# Install dependencies
!pip install transformers torch PyPDF2 requests aiohttp beautifulsoup4 sentence-transformers google-auth-oauthlib google-auth-httplib2 google-api-python-client -q

# Imports
import os
import json
import csv
import requests
import aiohttp
import asyncio
from typing import Dict, List, Optional, Any, Union
from dataclasses import dataclass
from google.colab import drive, userdata  # Import userdata here
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from bs4 import BeautifulSoup
import PyPDF2
from urllib.robotparser import RobotFileParser

# Utility class for displaying messages with emojis
class Display:
    EMOJIS = {"loading": "⏳", "processing": "🔄", "done": "✅", "error": "❌", "info": "ℹ️", "warning": "⚠️"}
    
    @staticmethod
    def message(type_key: str, message: str, end: str = "\n") -> None:
        emoji = Display.EMOJIS.get(type_key, "ℹ️")
        print(f"{emoji} {message}", end=end)

# Mount Google Drive
def mount_drive() -> None:
    """Mounts Google Drive to save datasets."""
    drive.mount('/content/drive', force_remount=True)
    os.makedirs("/content/drive/MyDrive/datasets", exist_ok=True)
    Display.message("done", "Connected to Google Drive")
    Display.message("warning", "Ensure datasets do not contain personal data per GDPR or copyrighted content without permission.")

# API Client Class for Mistral
class APIClient:
    def __init__(self, mistral_key: str) -> None:
        self.mistral_key = mistral_key
        self.mistral_url = "https://api.mistral.ai/v1/chat/completions"
        Display.message("warning", "Ensure compliance with Mistral Terms of Service.")

    async def mistral_request(self, task: str, text: str, prompt: str) -> Optional[Dict[str, Any]]:
        headers = {"Authorization": f"Bearer {self.mistral_key}", "Content-Type": "application/json"}
        payload = {
            "model": "open-mistral-nemo-2407",
            "messages": [{"role": "user", "content": f"Task: {task}\nPrompt: {prompt}\nText: {text}"}],
            "max_tokens": 2000,
            "temperature": 0.7
        }
        async with aiohttp.ClientSession() as session:
            try:
                async with session.post(self.mistral_url, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=30)) as response:
                    if response.status == 200:
                        data = await response.json()
                        result = {"result": data["choices"][0]["message"]["content"]}
                        Display.message("done", f"Mistral processed {task}")
                        return result
                    elif response.status == 429:
                        Display.message("warning", "Rate limit reached. Waiting for 60 seconds before retrying.")
                        await asyncio.sleep(60)
                        return await self.mistral_request(task, text, prompt)  # Retry after waiting
                    else:
                        Display.message("error", f"Mistral API failed: {response.status}")
                        return None
            except Exception as e:
                Display.message("error", f"Mistral error: {str(e)}")
                return None
            
# Data Fetcher Class
class DataFetcher:
    def __init__(self, youtube_key: str, google_key: str, cse_id: str) -> None:
        self.youtube_key = youtube_key
        self.google_key = google_key
        self.cse_id = cse_id
        self.youtube = build("youtube", "v3", developerKey=self.youtube_key)
        self.google_search = build("customsearch", "v1", developerKey=self.google_key)
        self.session = requests.Session()
        self.session.headers.update({"User-Agent": "Mozilla/5.0"})
        Display.message("warning", "Ensure API usage complies with YouTube and Google Terms of Service. Check quota limits.")

    def fetch_youtube_data(self, url: str) -> str:
        Display.message("processing", f"Fetching YouTube data: {url}", end="\r")
        try:
            video_id = url.split("v=")[1].split("&")[0] if "v=" in url else url.split("/")[-1]
            video_request = self.youtube.videos().list(part="snippet", id=video_id)
            video_response = video_request.execute()
            snippet = video_response["items"][0]["snippet"]
            content = f"Title: {snippet['title']}\nDescription: {snippet['description']}"
            Display.message("done", f"Fetched YouTube data: {url}")
            return content
        except HttpError as e:
            Display.message("error", f"YouTube API error: {str(e)}")
            return ""

    def search_google(self, query: str, num_results: int = 5) -> str:
        Display.message("processing", f"Searching Google: {query}", end="\r")
        try:
            response = self.google_search.cse().list(q=query, cx=self.cse_id, num=num_results).execute()
            search_results = ""
            for item in response["items"]:
                search_results += f"Title: {item['title']}\nLink: {item['link']}\nSnippet: {item['snippet']}\n\n"
            Display.message("done", f"Google search completed: Found {len(response['items'])} results")
            return search_results
        except HttpError as e:
            Display.message("error", f"Google API error: {str(e)}")
            return ""

    async def fetch_web_content(self, url: str) -> str:
        Display.message("processing", f"Fetching web content from {url}", end="\r")
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(url, headers={\"User-Agent\": \"Mozilla/5.0\"}, timeout=aiohttp.ClientTimeout(total=30)) as response:
                    if response.status == 200:
                        html = await response.text()
                        soup = BeautifulSoup(html, 'html.parser')
                        content = soup.get_text(separator="\n").strip()
                        Display.message("done", f"Fetched web content from {url}")
                        Display.message("warning", f"Ensure {url} Terms of Service allows scraping.")
                        return content
                    return ""
            except Exception as e:
                Display.message("error", f"Web fetch error: {str(e)}")
                return ""

    def read_pdf(self, url_or_path: str) -> str:
        Display.message("processing", f"Reading PDF: {url_or_path}", end="\r")
        Display.message("warning", f"Ensure {url_or_path} is public domain or you have permission to use.")
        try:
            if url_or_path.startswith("http"):
                local_path = "temp.pdf"
                response = self.session.get(url_or_path)
                response.raise_for_status()
                with open(local_path, "wb") as f:
                    f.write(response.content)
                pdf_path = local_path
            else:
                pdf_path = url_or_path.replace("[", "").replace("]", "")
            with open(pdf_path, "rb") as file:
                reader = PyPDF2.PdfReader(file)
                text = "".join([page.extract_text() or "" for page in reader.pages])
            if url_or_path.startswith("http"):
                os.remove(pdf_path)
            Display.message("done", f"Read PDF: {url_or_path}")
            return text
        except Exception as e:
            Display.message("error", f"PDF error: {str(e)}")
            return ""

    async def fetch_data(self, source: str, query: str) -> Dict[str, str]:
        if source == "youtube":
            content = self.fetch_youtube_data(query)
        elif source == "google":
            content = self.search_google(query)
        elif source == "web":
            content = await self.fetch_web_content(query)
        elif source == "pdf":
            content = self.read_pdf(query)
        else:
            Display.message("error", f"Unsupported source: {source}")
            return {"source": source, "query": query, "content": ""}
        return {"source": source, "query": query, "content": content}

# Text Processor Class
class TextProcessor:
    def __init__(self, api_client: APIClient) -> None:
        self.api_client = api_client

    async def process_text(self, task: str, text: str, prompt: str = "") -> Any:
        Display.message("processing", f"Task: {task} | Text: {text[:50]}... | Prompt: {prompt or 'N/A'}")
        return await self.api_client.mistral_request(task, text, prompt)

# Dataset Builder Class
class DatasetBuilder:
    def __init__(self, mistral_key: str, youtube_key: str, google_key: str, cse_id: str) -> None:
        self.api_client = APIClient(mistral_key)
        self.data_fetcher = DataFetcher(youtube_key, google_key, cse_id)
        self.text_processor = TextProcessor(self.api_client)

    async def process_task(self, task_info: Dict[str, str]) -> Optional[Dict[str, Any]]:
        source = task_info.get("source", "direct")
        query = task_info["query"]
        task = task_info["task"]
        prompt = task_info.get("prompt", "")

        if source != "direct":
            fetched = await self.data_fetcher.fetch_data(source, query)
            if not fetched["content"]:
                return None
            text = fetched["content"]
        else:
            text = query

        result = await self.text_processor.process_text(task, text, prompt)
        if result:
            return {
                "source": source,
                "query": query,
                "task": task,
                "prompt": prompt,
                "content": text,
                "result": result,
                "processed_by": "mistral"
            }
        return None

    def save_dataset(self, data: List[Dict[str, Any]], name: str, output_format: str = "json") -> Optional[str]:
        path = f"/content/drive/MyDrive/datasets/{name}.{output_format}"
        try:
            if output_format == "json":
                with open(path, "w", encoding="utf-8") as f:
                    json.dump(data, f, ensure_ascii=False, indent=2)
            elif output_format == "csv":
                with open(path, "w", encoding="utf-8", newline="") as f:
                    writer = csv.DictWriter(f, fieldnames=["source", "query", "task", "prompt", "content", "result", "processed_by"])
                    writer.writeheader()
                    writer.writerows(data)
            Display.message("done", f"Saved dataset at {path}")
            return path
        except Exception as e:
            Display.message("error", f"Failed to save dataset: {str(e)}")
            return None

# Main function
async def run(
    tasks: List[Dict[str, str]],
    name: str = "dataset",
    output_format: str = "json",
    mistral_key: str = "",
    youtube_key: str = "",
    google_key: str = "",
    cse_id: str = ""
) -> Optional[str]:
    mount_drive()
    if not mistral_key:
        Display.message("error", "Missing Mistral API key")
        return None
    
    builder = DatasetBuilder(mistral_key, youtube_key, google_key, cse_id)
    tasks_list = asyncio.gather(*[builder.process_task(task) for task in tasks])
    results = await tasks_list
    dataset = [r for r in results if r]
    return builder.save_dataset(dataset, name, output_format)

# Example usage
if __name__ == "__main__":
    # Get API keys from userdata
    MISTRAL_API_KEY = userdata.get('MISTRAL_API_KEY')
    YOUTUBE_API_KEY = userdata.get('YOUTUBE_API_KEY')
    GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
    CSE_ID = userdata.get('CSE_ID')

    tasks = [
        {"source": "youtube", "query": "https://www.youtube.com/watch?v=OTkq4OsG_Yc", "task": "summarization", "prompt": "Summarize the video content"},
        {"source": "google", "query": "AI advancements", "task": "text_classification", "prompt": "Classify sentiment as positive or negative"},
        {"source": "pdf", "query": "/content/drive/MyDrive/[No_Limit_Holdem_Theory_and_Practice].pdf", "task": "summarization", "prompt": "Summarize the document"},
        {"query": "Poker", "task": "text_classification", "prompt": "Classify sentiment"}
    ]

    import nest_asyncio
    nest_asyncio.apply()

    output_path = asyncio.get_event_loop().run_until_complete(run(
        tasks,
        name="multi_source_multi_task_dataset",
        mistral_key=MISTRAL_API_KEY,
        youtube_key=YOUTUBE_API_KEY,
        google_key=GOOGLE_API_KEY,
        cse_id=CSE_ID
    ))
    if output_path:
        print(f"Dataset saved at: {output_path}")