# Notebook configuration

In [37]:
import os
import json
import re
import requests
import datetime
from typing import List, Optional, Any, Dict
from urllib.parse import urljoin
import ollama

import pandas as pd
import magic
from bs4 import BeautifulSoup
from markdownify import markdownify
from requests.exceptions import RequestException

from langchain_core.documents import Document
from langchain_core.prompts import PromptTemplate
from langchain_community.document_loaders import (
    PyMuPDFLoader, CSVLoader, JSONLoader, UnstructuredXMLLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_qdrant import QdrantVectorStore, FastEmbedSparse, RetrievalMode
from langchain_ollama import OllamaLLM, OllamaEmbeddings

from smolagents import (
    CodeAgent, 
    LiteLLMModel, 
    DuckDuckGoSearchTool, 
    ToolCallingAgent, 
    tool, 
    VisitWebpageTool,
    GoogleSearchTool
)

from abc import abstractmethod, ABC
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance

#############################################################

oc = ollama.Client("http://localhost:11434")

Data_dir = "/users/formation/irtn7prtnc/LLM_Valdom/Dataset"
Cache_dir = "/users/formation/irtn7prtnc/LLM_Valdom/Cache"

os.makedirs(Data_dir, exist_ok = True)
os.makedirs(Cache_dir, exist_ok = True)

model = LiteLLMModel(
    model_id = "ollama/qwen2.5-coder:32b", #['deepseek-r1:32b', 'qwen2.5-coder:32b', 'llama3.1:8b', 'mistral-nemo:latest', 'mistral:latest']
    api_base = "http://localhost:11434/api/generate",
    num_ctx = 24000
    )

# Mise en place du RAG

### RAG definition (Local)

In [17]:
class RAGInterface(ABC):
    """
    Abstract class defining a generic RAG system. 
    
    This class ensures that all RAG implementations follow a common structure.
    """
    def __init__(self, name: str, knowledge_db: Optional[Any] = None):
        self.name = name  # Identifier for the RAG system
        self.knowledge_db = knowledge_db  # Storage backend (e.g., a vector database)
    
    @abstractmethod
    def retrieve(self, query: str) -> List[Document]:
        """
        Retrieve relevant contexts from the knowledge_db based on the query.
        Args:
            query (str): The user query.
        Returns:
            List[Document]: Retrieved document chunks.
        """
        pass
    
    @abstractmethod
    def generate(self, query: str, retrieved_contexts: List[Document]) -> str:
        """
        Generate a response based on the query and retrieved contexts.
        Args:
            query (str): The user query.
            retrieved_contexts (List[Document]): Relevant document chunks.
        Returns:
            str: The generated response.
        """
        pass

# Default prompt template for RAG
PROMPT_TEMPLATE = """
You are an assistant for question-answering tasks.
Use the following pieces of retrieved context to answer the question.
If you don't know the answer, just say that you don't know.
Use four sentences maximum and keep the answer concise.

Question: {query}
Context: {retrieved_contexts}
Answer:
"""
######################## RAG BM25 ########################

class BM25V0RAG(RAGInterface):
    """
    Sparse Retrieval RAG using BM25 without embeddings for generation.
    
    - Stores text chunks in Qdrant using BM25 sparse retrieval.
    - Retrieves the top-k relevant chunks based on keyword matching.
    - Uses a language model to generate answers from retrieved contexts.
    """

    def __init__(self, generation_model: OllamaLLM, docs_v0: List[Document]):
        # Initialize BM25 sparse retrieval (no embeddings for generation)
        sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25")

        # Store documents in Qdrant using sparse retrieval (BM25)
        self.knowledge_db = QdrantVectorStore.from_documents(
            docs_v0,  
            embedding = OllamaEmbeddings(model="mistral"),  # No embeddings used in this mode
            sparse_embedding =s parse_embeddings,  # BM25 sparse embeddings
            location = ":memory:",  # Store in-memory (can be changed to persistent storage)
            collection_name = "rag_bm",  # Collection name for BM25-based retrieval
            retrieval_mode = RetrievalMode.SPARSE,  # Use only sparse retrieval (BM25)
        )

        # Define model name dynamically
        name = f"bm25_v0_{generation_model.model}"
        super().__init__(name = name, knowledge_db = self.knowledge_db)

        # Initialize the LLM and retriever
        self.llm = generation_model
        self.retriever = self.knowledge_db.as_retriever(
            search_type="similarity", search_kwargs={"k": 5}  # Retrieve top 5 matches
        )
        self.gen_prompt = PromptTemplate.from_template(PROMPT_TEMPLATE)  # Use the structured prompt (no embeddings)

    def add_documents(self, new_docs: List[Document]):
        """
        Add new documents to Qdrant Cloud using BM25 sparse retrieval without embeddings.
        This method adds documents directly using keyword matching (BM25).
        """

        # Here we assume that new_docs are pre-processed and are in a list of Document objects
        for doc in new_docs:
            try:
                # Adding document to Qdrant with BM25 indexing
                # In this case, we don't use vector-based embeddings for the document
                self.knowledge_db.add_documents([doc])  # Adding document to the database directly
            except Exception as e:
                print(f"[Warning] Erreur lors de l'ajout du document {doc.metadata.get('source', 'unknown')}: {e}")

    def retrieve(self, query: str) -> List[Document]:
        """Retrieve relevant documents for a given query."""
        return self.retriever.invoke(query)

    def find_relevant_documents(self, query: str) -> List[str]:
        """Find sources of relevant documents."""
        retrieved = self.retrieve(query)
        return list(set(doc.metadata.get("source", "unknown") for doc in retrieved))

    def generate(self, query: str, retrieved_contexts: List[Document]) -> str:
        """
        Generates a response using the retrieved contexts.

        Args:
            query (str): The user query.
            retrieved_contexts (List[Document]): Retrieved document chunks based on BM25.

        Returns:
            str: The generated answer from the language model.
        """
        # Format retrieved contexts into a single string
        format_retrieved_contexts = "\n".join([rc.page_content for rc in retrieved_contexts])

        # Format the query with the retrieved contexts for generation
        augmented_query = self.gen_prompt.format(
            query=query,
            retrieved_contexts=format_retrieved_contexts
        )

        # Generate the final response
        response = self.llm.invoke(augmented_query)
        return response

######################## RAG Hybride ########################

class HybridRAG(RAGInterface):
    """
    Retrieval-Augmented Generation with hybrid search (dense + sparse).
    """
    def __init__(
        self, 
        generation_model: OllamaLLM, 
        docs: List[Document], 
        collection_name: str = "rag_hybrid", 
        alpha: float = 0.7,
        embedding_model: str = "mistral:latest"):
        
        # Configurable sparse and dense embeddings
        sparse_embeddings = FastEmbedSparse(model_name = "Qdrant/bm25", cache_dir=".")
        dense_embeddings = OllamaEmbeddings(model = embedding_model)
        
        # Initialize vector store with hybrid search
        self.vectorstore = QdrantVectorStore.from_documents(
            docs,
            embedding = dense_embeddings,
            sparse_embedding = sparse_embeddings,
            location = ":memory:",
            collection_name = collection_name,
            retrieval_mode = RetrievalMode.HYBRID,
            sparse_dense_ratio = alpha  # 0 = pure dense, 1 = pure sparse
        )
        
        # Name with generation model
        name = f"hybrid_{generation_model.model}"
        super().__init__(name = name, knowledge_db = self.vectorstore)
        
        self.llm = generation_model
        self.retriever = self.vectorstore.as_retriever(search_kwargs = {"k": 20})
        self.gen_prompt = PromptTemplate.from_template(PROMPT_TEMPLATE)
    
    def add_documents(self, new_docs: List[Document]):
        """Add new documents to the vector store."""
        self.vectorstore.add_documents(new_docs)
    
    def retrieve(self, query: str) -> List[Document]:
        """Retrieve relevant documents for a given query."""
        return self.retriever.invoke(query)
    
    def generate(self, query: str, retrieved_contexts: List[Document]) -> str:
        """Generate a response based on retrieved contexts."""
        context_str = "\n".join([doc.page_content for doc in retrieved_contexts])
        full_prompt = self.gen_prompt.format(query=query, retrieved_contexts=context_str)
        return self.llm.invoke(full_prompt)
    
    def find_relevant_documents(self, query: str) -> List[str]:
        """Find sources of relevant documents."""
        retrieved = self.retrieve(query)
        return list(set(doc.metadata.get("source", "unknown") for doc in retrieved))

### File pre-processing & RAG creation

In [13]:
def load_and_chunk_file(file_path: str) -> List[Document]:
    ext = file_path.split('.')[-1].lower()

    if ext == "pdf":
        loader = PyMuPDFLoader(file_path)
        docs = loader.load()
        for d in docs:
            d.page_content = " ".join(d.page_content.split())
        splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=100)
        return splitter.split_documents(docs)

    elif ext == "csv":
        df = pd.read_csv(file_path)
        if len(df) > 10000:
            df = df.head(10000)
        docs = [
            Document(
                page_content=" ".join(str(value) for value in row if pd.notna(value)), 
                metadata={"source": file_path, "row_index": index}
            )
            for index, row in df.iterrows()
        ]
        splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50)
        return splitter.split_documents(docs)

    elif ext == "json":
        try:
            from langchain_community.document_loaders import JSONLoader
            loader = JSONLoader(file_path, jq_schema=".", text_content=False)
            docs = loader.load()
        except ImportError:
            print(f"[Warning] jq non installé. Chargement brut de {file_path}")
            with open(file_path, "r", encoding="utf-8") as f:
                raw = f.read()
            docs = [Document(page_content=raw, metadata={"source": file_path})]
        
        for d in docs:
            try:
                data = json.loads(d.page_content)
                d.page_content = json.dumps(data, indent=2)
            except:
                pass
        splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=50)
        return splitter.split_documents(docs)

    elif ext == "xml":
        loader = UnstructuredXMLLoader(file_path)
        docs = loader.load()
        for d in docs:
            d.page_content = " ".join(d.page_content.split())
        splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
        return splitter.split_documents(docs)

    else:
        raise ValueError(f"Extension non supportée : {ext}")

#############################################################

def get_all_files_recursively(folder_path: str, valid_extensions: Optional[List[str]] = None) -> List[str]:
    """
    Parcourt récursivement un dossier et retourne tous les fichiers avec les extensions valides.
    """
    all_files = []
    for root, _, files in os.walk(folder_path):
        for file in files:
            ext = file.lower().split('.')[-1]
            if valid_extensions is None or ext in valid_extensions:
                full_path = os.path.join(root, file)
    return all_files

#############################################################

def build_hybrid_rag_from_folder(folder_path: str, alpha: float = 0.7) -> HybridRAG:
    """
    Construit un système HybridRAG à partir des fichiers dans un dossier (récursivement).
    """
    valid_extensions = ["pdf", "csv", "json", "xml"]
    file_paths = get_all_files_recursively(folder_path, valid_extensions)

    all_docs = []
    for path in file_paths:
        try:
            docs = load_and_chunk_file(path)
            all_docs.extend(docs)
        except Exception as e:
            print(f"[Warning] Erreur lors du traitement de {path} : {e}")

    llm = OllamaLLM(model="mistral:latest")
    rag = HybridRAG(generation_model=llm, docs=all_docs, alpha=alpha)
    return rag

#############################################################

def build_rag_from_folder(folder_path: str) -> BM25V0RAG:
    """
    Construit un système RAG à partir des fichiers dans un dossier (récursivement).
    """
    valid_extensions = ["pdf", "csv", "json", "xml"]
    file_paths = get_all_files_recursively(folder_path, valid_extensions)

    all_docs = []
    for path in file_paths:
        try:
            docs = load_and_chunk_file(path)
            all_docs.extend(docs)
        except Exception as e:
            print(f"[Warning] Erreur lors du traitement de {path} : {e}")

    llm = OllamaLLM(model="mistral:latest")
    # Remplacer "docs" par "docs_v0" pour correspondre au constructeur
    rag = BM25V0RAG(generation_model=llm, docs_v0=all_docs)  
    return rag

### Création du RAG

In [None]:
rag = build_rag_from_folder("/users/formation/irtn7prtnc/llm_engineering/Data")

In [None]:
 # Agent pourrait appeler ceci :
query = "Quelle est la région avec le plus de députés ?"
sources = rag.find_relevant_documents(query)
print("🔍 Documents pertinents :", sources)
    
answer = rag.generate(query, rag.retrieve(query))
print("\n🧠 Réponse :", answer)

In [None]:
rag.retrieve(query)

# Mise en place des Agents

### RAG Agent tools

In [28]:
@tool
def agent_add_file_to_rag(rag : str, path: str) -> str:
    """
    Loads a supported file (PDF, CSV, JSON, XML), splits it into chunks, 
    and adds them to the RAG (Retrieval-Augmented Generation) system for future retrieval.
    
    This function is capable of handling both single files and entire directories. 
    If the path is a directory, all supported files within the directory (and its subdirectories) 
    will be processed.

    Args:
        rag: The RAG system or retriever to which the chunks will be added. This is the core component that stores and retrieves documents during question-answering tasks.
        path: The path to the file or directory to be processed. Supported file formats include: pdf, csv, json & xml.

    Returns:
        str: A message indicating the number of chunks added to the RAG, or an error message if the file format is unsupported or if an error occurs during processing.
    """
    try:
        docs = []
        if os.path.isfile(path):
            if path.endswith(('.pdf', '.csv', '.json', '.xml')):
                docs = load_and_chunk_file(path)
            else:
                return f"Format de fichier non supporté : {path}"

        elif os.path.isdir(path):
            valid_exts = ('.pdf', '.csv', '.json', '.xml')
            for root, _, files in os.walk(path):
                for f in files:
                    full_path = os.path.join(root, f)
                    if full_path.endswith(valid_exts):
                        try:
                            docs.extend(load_and_chunk_file(full_path))
                        except Exception as e:
                            print(f"[Erreur] Chargement échoué pour {full_path} : {e}")
        rag.add_documents(docs)
        return f"{len(docs)} chunks ajoutés depuis {path}"
        
    except Exception as e:
        return f"Le chemin donné n'est ni un fichier ni un dossier : {path}"

    


######

@tool
def detect_file_type(file_path: str) -> str:
    """Detects the MIME type of a file.

    Args:
        file_path: The path of the file that we want the type.

    Returns:
        Tue file type of an error message if it's not possible to detecting the type.
    """
    try:
        mime = magic.Magic(mime=True)
        return mime.from_file(file_path)

    except Exception as e:
        return f"Error detecting file type: {str(e)}"
    
######

@tool
def extract_any_archive(file_path: str, destination: str = None) -> str:
    """Extracts a archive file to a specified directory.

    Args:
        file_path: The path of the file to extract.
        destination: The destination of the extracted file.
    Returns:
        The extracted file path, or an error message if file extraction failed.
    """
    try:
        if destination is None:
            destination = os.path.splitext(file_path)[0]

        patoolib.extract_archive(file_path, outdir=destination)

        os.remove(zip_path)

        return f"Archive extracted successfully to: {destination}"

    except Exception as e:
        return f"Error extracting archive: {str(e)}"
    
######

@tool
def move_file(source: str, destination: str) -> str:
    """Moves a file or directory to a new location.

    Args:
        source: The current path of the file to move.
        destination: The new path of the file.

    Returns:
        The new file path, or an error message if file transfer failed
    """
    try:
        if not os.path.exists(destination):
            os.makedirs(destination)

        shutil.move(source, destination)

        return f"File successfully move to : {destination}"

    except Exception as e:
        return f"Fail during file transfert : {str(e)}"   

######

@tool
def normalize_and_ensure_unique_filename(path: str) -> str:
    """
    Normalizes the name of a file or directory and ensures that it is unique 
    by appending a numerical suffix if a file with the same name already exists.

    The function first replaces any non-alphanumeric characters (except for underscores, 
    hyphens, and periods) with underscores, and then checks if the normalized name 
    already exists. If it does, it appends a numerical suffix to the name.

    Args:
        path: The full path of the file or directory whose name is to be normalized and checked for uniqueness.

    Returns:
        str: The normalized and unique file or directory path.
    """
    base = os.path.basename(path)
    normalized_name = re.sub(r"[^a-zA-Z0-9_\-\.]", "_", base)

    # Get the directory and ensure the file name is unique
    directory = os.path.dirname(path)
    unique_path = os.path.join(directory, normalized_name)

    # Ensure uniqueness
    if not os.path.exists(unique_path):
        return unique_path

    # If the file exists, append a numerical suffix
    base, ext = os.path.splitext(normalized_name)
    i = 1
    while os.path.exists(os.path.join(directory, f"{base}_{i}{ext}")):
        i += 1

    return os.path.join(directory, f"{base}_{i}{ext}")


### Web Agent tools

In [29]:
@tool
def visit_webpage(url: str) -> str:
    """Fetches the content of a webpage and returns it in a clean Markdown format.

    This tool sends an HTTP GET request to the provided URL, retrieves the HTML content,
    converts it into Markdown to preserve readability while removing HTML-specific elements,
    and returns the cleaned content. It automatically handles request errors and unexpected failures.

    Args:
        url: The URL of the webpage to retrieve and convert.

    Returns:
        A string containing the converted Markdown content of the webpage,
        or an error message if the request or conversion fails.
    """
    try:
        # Send a GET request to the URL
        response = requests.get(url)
        response.raise_for_status()

        # Convert the HTML content to Markdown
        markdown_content = markdownify(response.text).strip()

        # Remove multiple line breaks
        markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
        return markdown_content

    except RequestException as e:
        return f"Error fetching the webpage: {str(e)}"

    except Exception as e:
        return f"An unexpected error occurred: {str(e)}"

##########

@tool
def summarize_webpage(url: str) -> str:
    """Fetches the content of a webpage and summarizes it in a concise paragraph.

    This tool first downloads the webpage, converts its HTML into Markdown, then
    uses a language model to summarize the key points. Ideal for previewing large pages
    or extracting meaningful information quickly.

    Args:
        url: The URL of the webpage to summarize.

    Returns:
        A summary string of the page content, or an error message if the operation fails.
    """
    try:
        from markdownify import markdownify
        import re
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        # Convert HTML to Markdown
        markdown_content = markdownify(response.text).strip()
        markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)

        # Trim if too long
        if len(markdown_content) > 4000:
            markdown_content = markdown_content[:4000] + "..."

        # Call LLM (Qwen or another)
        from langchain_core.runnables import Runnable
        from langchain_core.prompts import PromptTemplate
        from langchain_core.output_parsers import StrOutputParser
        from langchain_community.llms import Ollama

        llm = Ollama(model="mistral:latest")
        prompt = PromptTemplate.from_template(
            "Summarize the following web content:\n\n{content}\n\nSummary:"
        )
        chain: Runnable = prompt | llm | StrOutputParser()

        return chain.invoke({"content": markdown_content})

    except Exception as e:
        return f"Error during summarization: {str(e)}"  

##########
    
@tool
def check_url_validity(url: str) -> bool:
    """Downloads a file from a given URL to the local cache directory.

    This tool initiates a streamed download of the file pointed to by the given URL,
    saves it to a local cache folder, and returns the file path upon success.
    It supports large files by downloading them in chunks.

    Args:
        url: The direct URL of the file to download.

    Returns:
        A confirmation message with the path to the downloaded file,
        or an error message if the download fails.
    """
    
    try:
        response = requests.head(url, allow_redirects=True, timeout=5)
        return response.status_code == 200

    except requests.RequestException:
        return False
    
##########

@tool
def download_file(url: str) -> str:
    """Checks if a URL is reachable and returns a boolean response.

    This tool sends an HTTP HEAD request to the target URL to verify its availability,
    following redirects if necessary. It's useful to ensure a link is valid before fetching or downloading.

    Args:
        url: The URL to verify for availability and accessibility.

    Returns:
        True if the URL is reachable (HTTP status 200), otherwise False.
    """
    try:
        local_filename = os.path.join(Cache_dir, url.split('/')[-1])

        with requests.get(url, stream=True) as r:
            r.raise_for_status()

            with open(local_filename, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
        return f"File downloaded with succes : {local_filename}"

    except Exception as e:
        return f"Error during file downloading: {str(e)}"

##########

@tool
def follow_links_recursive(url: str, depth: int = 4) -> List[str]:
    """Recursively explores hyperlinks on a webpage up to a given depth.

    This tool fetches a webpage and extracts all links from it.
    It then visits each discovered link (if it's a valid URL) and repeats
    the process up to the specified recursion depth.

    Args:
        url: The starting URL to explore.
        depth: The maximum depth of recursion. Depth 0 returns only the original URL.

    Returns:
        A list of all reachable URLs found during the recursive exploration.
        May contain both relative and absolute URLs.
    """
    from urllib.parse import urljoin, urlparse

    visited = set()
    result = set()

    def crawl(current_url, current_depth):
        if current_depth > depth or current_url in visited:
            return
        visited.add(current_url)

        try:
            response = requests.get(current_url, timeout=5)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, "html.parser")
            links = [urljoin(current_url, a["href"]) for a in soup.find_all("a", href=True)]
            for link in links:
                result.add(link)
                crawl(link, current_depth + 1)
        except Exception:
            pass  # Skip errors silently

    crawl(url, 0)
    return list(result)

##########

@tool
def extract_and_classify_links(url: str) -> Dict[str, List[str]]:
    """Extracts all hyperlinks from a webpage and classifies them into categories.

    This tool visits the given URL, extracts all <a href> links, and organizes them into
    the following categories:
    - 'webpages': regular HTML pages or links without extensions
    - 'files': downloadable documents (PDF, CSV, JSON, XML, ZIP, etc.)
    - 'media': images, audio, video files (JPG, MP4, MP3, etc.)
    - 'others': any remaining links

    Args:
        url: The URL of the webpage to scan.

    Returns:
        A dictionary with link categories as keys and lists of corresponding URLs as values.
    """
    file_exts = ('.pdf', '.csv', '.xlsx', '.xls', '.json', '.xml', '.zip')
    media_exts = ('.png', '.jpg', '.jpeg', '.gif', '.mp4', '.mp3', '.wav', '.webm')

    classified = {
        "webpages": [],
        "files": [],
        "media": [],
        "others": [],
    }

    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")

        for a in soup.find_all("a", href=True):
            href = a["href"]
            full_url = urljoin(url, href)
            lower = full_url.lower()

            if any(lower.endswith(ext) for ext in file_exts):
                classified["files"].append(full_url)
            elif any(lower.endswith(ext) for ext in media_exts):
                classified["media"].append(full_url)
            elif lower.startswith("http") and (lower.endswith("/") or "." not in lower.split("/")[-1]):
                classified["webpages"].append(full_url)
            else:
                classified["others"].append(full_url)

        return classified

    except Exception as e:
        return {"error": [f"Error during extraction: {str(e)}"]}

##########

@tool
def get_keyword_context(url: str, keyword: str, window: int = 50) -> List[str]:
    """Extracts excerpts of text around a given keyword from a webpage.

    This tool fetches the content of a webpage, converts it into plain text,
    and searches for occurrences of the keyword. For each occurrence, it returns
    a snippet that includes `window` words before and after the keyword.

    Args:
        url: The URL of the webpage to analyze.
        keyword: The keyword to search for in the text (case-insensitive).
        window: The number of words to include before and after each match.

    Returns:
        A list of contextual excerpts where the keyword appears, or an error message if the request fails.
    """
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")
        text = soup.get_text(separator=' ', strip=True)

        words = text.split()
        keyword_lower = keyword.lower()
        contexts = []

        for i, word in enumerate(words):
            if keyword_lower in word.lower():
                start = max(i - window, 0)
                end = min(i + window + 1, len(words))
                context = ' '.join(words[start:end])
                contexts.append(context)

        return contexts if contexts else [f"No occurrences of '{keyword}' found."]

    except Exception as e:
        return [f"Error while extracting context: {str(e)}"]

### Data Agent tools

In [30]:
@tool
def find_relevant_documents(query: str) -> List[str]:
    """Finds the most relevant documents for a given user query using the local RAG system.

    This tool queries the vector and sparse retrievers to find the most contextually relevant
    documents or file paths related to the input query.

    Args:
        query: The user's question or search string.

    Returns:
        A list of strings representing the most relevant documents or paths.
    """
    sources = rag.find_relevant_documents(query)
    return sources

##########

def parse_pdf(path: str) -> str:
    import fitz  # PyMuPDF
    try:
        doc = fitz.open(path)
        text = "\n".join(page.get_text() for page in doc)
        doc.close()
        return text
    except Exception as e:
        return f"Error parsing PDF: {str(e)}"

def parse_csv(path: str) -> str:
    import pandas as pd
    try:
        df = pd.read_csv(path)
        return df.head(10).to_markdown()
    except Exception as e:
        return f"Error parsing CSV: {str(e)}"
    
def parse_json(path: str) -> str:
    import json
    try:
        with open(path, "r") as f:
            data = json.load(f)
        return json.dumps(data, indent=2)[:3000]  # Trimmed for safety
    except Exception as e:
        return f"Error parsing JSON: {str(e)}"

def parse_xml(path: str) -> str:
    import xml.etree.ElementTree as ET
    try:
        tree = ET.parse(path)
        root = tree.getroot()

        def parse_element(elem, level=0):
            text = f"{'  ' * level}<{elem.tag}>: {elem.text.strip() if elem.text else ''}\n"
            for child in elem:
                text += parse_element(child, level + 1)
            return text

        return parse_element(root)
    except Exception as e:
        return f"Error parsing XML: {str(e)}"
     
###########

@tool
def detect_and_parse(path: str) -> str:
    """Automatically detects the type of a local file and parses its content accordingly.

    This tool acts as a smart wrapper that routes the file to the appropriate parser based
    on its extension (PDF, CSV, JSON, XML). It is ideal for agents that don't know in advance
    what type of file they are dealing with.

    Use this tool when the user provides a file path and wants to:
    - View or analyze the content, regardless of the file type.
    - Extract text or structure without needing to specify the format.

    Args:
        path: Path to the file.

    Returns:
        The parsed content or structure of the file, or an error message.
    """
    ext = path.lower().split(".")[-1]
    if ext == "pdf":
        return parse_pdf(path)
    elif ext == "csv":
        return parse_csv(path)
    elif ext == "json":
        return parse_json(path)
    elif ext == "xml":
        return parse_xml(path)
    else:
        return "Unsupported file type"

###########

@tool
def get_document_metadata(path: str) -> Dict[str, str]:
    """Returns metadata information about a local document file.

    Use this tool when the user wants to inspect technical information about a file such as
    size, type, name, and last modification date.

    Args:
        path: The local path to the document file.

    Returns:
        A dictionary containing metadata about the file.
    """
    try:
        stat = os.stat(path)
        metadata = {
            "name": os.path.basename(path),
            "size (KB)": f"{stat.st_size // 1024}",
            "last_modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
            "type": os.path.splitext(path)[1].lower()
        }
        return metadata
    except Exception as e:
        return {"error": str(e)}

###########    
    
@tool
def get_keyword_context(path: str, keyword: str, window: int = 50) -> List[str]:
    """Extracts short text segments around a keyword from a local document.

    Use this tool when the user is searching for how a specific term is used inside a
    document. It returns snippets that include the keyword and surrounding words.

    Args:
        path: Path to the local file.
        keyword: The target word or phrase to search.
        window: Number of words before and after the keyword to include.

    Returns:
        A list of text excerpts, or a message if no match is found.
    """
    try:
        content = detect_and_parse(path)
        words = content.split()
        keyword_lower = keyword.lower()
        contexts = []

        for i, word in enumerate(words):
            if keyword_lower in word.lower():
                start = max(i - window, 0)
                end = min(i + window + 1, len(words))
                context = ' '.join(words[start:end])
                contexts.append(context)

        return contexts if contexts else [f"No occurrences of '{keyword}' found."]

    except Exception as e:
        return [f"Error while extracting context: {str(e)}"]

### Définition des Agents

In [38]:
rag_agent = ToolCallingAgent(
    tools = [agent_add_file_to_rag, extract_any_archive, move_file, normalize_and_ensure_unique_filename],
    model = model,
    add_base_tools = True,
    max_steps = 10,
    name = "RAG_Agent",
    description = """RAG_Agent is a local retrieval-augmented generation (RAG) agent designed to manage, process, and store documents within a retrieval system.
    It supports various file types such as PDF, CSV, JSON, and XML. The agent can download, extract, normalize, and ensure uniqueness of document paths. 
    It integrates file handling capabilities like moving files to specific locations and adding them to a retrieval-augmented system for future queries. 
    RAG_Agent is ideal for organizing data, handling large document collections, and enhancing knowledge retrieval capabilities.
    This agent works with a local RAG system to optimize document storage and querying, ensuring seamless integration of newly added files or directories."""
    )

#############################################################

data_agent = ToolCallingAgent(
    tools = [find_relevant_documents, detect_and_parse, get_document_metadata, get_keyword_context],
    model = model,
    add_base_tools = True,
    max_steps = 10,
    name = "Data_agent",
    description ="""DataAgent is a local data analysis agent designed to understand and extract insights from documents stored on the local file system.
    It supports multiple file types — PDF, CSV, JSON, and XML — and can intelligently detect and parse files, find relevant documents using a local RAG 
    system, summarize content, answer questions, and locate context around keywords.
    With access to powerful file-specific tools, DataAgent provides a flexible and intelligent interface for navigating structured and unstructured data, 
    especially in research, compliance, business reporting, and document management use cases."""
    )

#############################################################

web_agent = ToolCallingAgent(
    tools = [visit_webpage, summarize_webpage, check_url_validity, download_file, follow_links_recursive, extract_and_classify_links, GoogleSearchTool()],
    model = model,
    add_base_tools = True,
    max_steps = 20,
    name="Web_agent",
    description =""" Web_agent is a capable and autonomous web exploration agent designed to browse, analyze, and extract content from websites. 
        It can validate links, visit web pages, convert them into clean Markdown, extract hyperlinks for further navigation, and download files if necessary.
        Equipped with tools for safe and efficient web scraping, the agent is ideal for tasks like retrieving documents, discovering resources, crawling sites
        or building datasets from public data portals. You are WebNavigatorAgent, a specialized web assistant working under the supervision of a manager agent. 
        Your mission is to search, retrieve, and structure meaningful information related to the Assemblée nationale (French National Assembly)."""
        )

#############################################################

manager_agent = CodeAgent(
    tools = [],
    model = model,
    managed_agents = [web_agent, data_agent, rag_agent],
    additional_authorized_imports = ["time", "numpy", "pandas"],
    planning_interval = 3,
    verbosity_level = 2,
    #add_base_tools = True
    max_steps = 10,
    description = """You are ManagerAgent, an intelligent assistant responsible for answering complex questions about the French National Assembly (Assemblée nationale). Your mission is to analyze the user's query, determine the type of information needed (legal, institutional, statistical, historical, etc.), and orchestrate the best multi-step strategy to provide accurate, well-sourced, and structured answers.

 Strategy:
1. **Always begin with local analysis.**
   - Use the local Retrieval-Augmented Generation (RAG) system to search for relevant documents and datasets already available on disk.
   - Tools include:
     - `find_relevant_documents(query)` — to retrieve the most relevant documents
     - `query_document(path, question)` — to extract an answer from a specific document
     - `summarize_document(path)` — to generate a summary
     - `get_keyword_context(path, keyword)` — to locate passages around a key term

2. **If the local search does not yield enough relevant or up-to-date information**, delegate retrieval tasks to `WebNavigatorAgent`, who can:
   - Visit and summarize web pages (`visit_webpage`, `summarize_webpage`)
   - Extract and classify all links from a webpage (`extract_and_classify_links`)
   - Recursively crawl relevant sections of a site (`follow_links_recursive`)
   - Search for downloadable files (`list_files_on_page`, `search_file_links_by_keyword`)
   - Extract content around a keyword (`get_keyword_context`)
   - Download and return datasets (`download_file`)

3. **When using WebNavigatorAgent**, be precise in your instructions. Example queries:
   - “Search https://data.assemblee-nationale.fr for datasets on legislative activity”
   - “Get the Markdown version of the debate on the pension reform from www.assemblee-nationale.fr”
   - “Find and download the list of deputies elected in 2022”

 Response construction:
- Once all relevant data is collected (locally or online), analyze and synthesize it.
- Provide a structured response with:
  - A clear **summary**
  - **Source links**
  - Key **excerpts** or statistics
  - Optional **suggested follow-ups**
- Be transparent about whether information came from local files or web sources.

Notes:
- If the user asks about something very recent or unindexed, expect to call WebNavigatorAgent.
- If the user gives a specific file path, jump directly to `query_document()` or `summarize_document()`.

Your job is not just to fetch, but to **curate**, **explain**, and **guide**.
"""
    )


In [39]:
web_agent.system_prompt =
"""You are equipped with the following tools:
1. `check_url_validity` — Verify if a URL is reachable. Always use this first.
2. `visit_webpage` — Visit and convert a webpage to Markdown for readable content.
3. `extract_links` — Extract all hyperlinks from a page to explore further.
4. `download_file` — Download any available file if relevant. The downloaded file mus be place at '/users/formation/irtn7prtnc/llm_engineering/Cache'

Your search priority is as follows:

1. Always begin by exploring these official websites:
- https://data.assemblee-nationale.fr
- https://www.assemblee-nationale.fr

Try to locate relevant pages, datasets, documents or structured information directly from these domains.

2. If nothing relevant is found, you may expand your search to the broader internet.

Use tools one at a time, with precision. Think step-by-step. Only extract or download what is useful for answering the manager agent’s request."""
            
            
data_agent.system_prompt =
"""You are DataAgent, an intelligent assistant specialized in analyzing and retrieving insights from local documents.

You have access to the following tools:

1. `find_relevant_documents(query: str)`  
   - Use this when the user is searching for specific information or asking a general question.
   - It returns a list of local files most relevant to the query using a retrieval-augmented generation system.

2. `list_available_documents()`  
   - Use this to see which documents are currently indexed or available for analysis.

3. `summarize_document(path: str)`  
   - Use this when the user wants a high-level overview of a specific document without reading it entirely.

4. `query_document(path: str, question: str)`  
   - Use this when the user wants an answer derived only from a specific document.

5. `get_document_metadata(path: str)`  
   - Use this to inspect metadata like file type, size, and modification date.

6. `detect_and_parse(path: str)`  
   - Automatically detects and parses the file based on extension (PDF, CSV, JSON, XML). Use this when unsure about file format.

7. `parse_pdf(path: str)` / `parse_csv(path: str)` / `parse_json(path: str)` / `parse_xml(path: str)`  
   - Use these to extract raw content from a specific document type.

8. `get_keyword_context(path: str, keyword: str, window: int)`  
   - Use this to extract snippets surrounding a keyword inside a document.

You should ask clarifying questions if the user provides insufficient detail (e.g., which document to query).  
Think step-by-step, choose the correct tool based on the user’s intent, and explain your reasoning clearly.

If the user provides a file path, validate and process it using `detect_and_parse`.  
If the user asks “what is this file about?”, consider using `summarize_document`.  
If they want to search across all files, use `find_relevant_documents`.

Be helpful, accurate, and efficient."""
    



SyntaxError: invalid syntax (504797157.py, line 1)

# Utilisation de l'agent

In [40]:
agent_output = manager_agent.run('Give me the first and  the last time Jean Lassale was ellected as french depute')

print("Final output:")
print(agent_output)

KeyboardInterrupt: 