In [None]:
from dotenv import load_dotenv
load_dotenv()

### Définition du State de notre graph

Dans cette première étape, nous définissons la structure de données qui servira à stocker les informations à chaque étape de la recherche et du traitement des réponses dans le graphe. Cette structure, `ComplexityState`, est une classe dérivée de `TypedDict`, qui offre une manière typée de gérer les données essentielles dans le contexte d'un flux de réponses IA. 

Les attributs principaux incluent :

- `title` : Titre de la requête ou du sujet de recherche.
- `messages` : Historique des messages ou des échanges, composés d'une séquence de messages (`BaseMessage`).
- `search_queries` et `related_queries` : Liste des requêtes de recherche initiales et des requêtes associées générées.
- `search_results`, `images`, et `videos` : Résultats de recherche, liens d'images et de vidéos pertinents.
- `scrapped_results` : Données extraites de pages web.
- `page_summaries` : Résumés des pages pertinentes, ajoutés pour une analyse rapide.

Cette classe `ComplexityState` fournit une base de données centralisée qui permet à chaque étape du flux de travail de lire, écrire ou mettre à jour les informations collectées, assurant ainsi une cohérence dans l'état du graphe.

In [None]:
import operator
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage
from pydantic import BaseModel, Field

class ComplexitySubQueries(BaseModel):
    """List of internet search queries"""
    queries: list[str] = Field(description="List of the generated internet search queries")
    title: str = Field(description="Title for the user chat")


class ComplexityRelatedQueries(BaseModel):
    """List of internet search queries"""
    queries: list[str] = Field(description="List of the generated related search queries")


class ComplexityState(TypedDict):
    title: str
    messages: Annotated[Sequence[BaseMessage], operator.add]
    search_queries: list[str]
    related_queries: list[str]
    search_results: list[str]
    images: list[str]
    videos: list[str]
    scrapped_results: list[str]
    page_summaries: Annotated[list, operator.add]

### Fonctions support

Dans cette section, nous définissons plusieurs fonctions support pour faciliter l'implémentation et la lisibilité du graphe. Ces fonctions permettent d'effectuer des recherches web, d'accéder à des images et des vidéos pertinentes via la bibliothèque duckduckgo, et de configurer un modèle de langage pour gérer les interactions entre les différents nœuds.

Ces fonctions offrent une structure de base pour gérer les actions de recherche et d'interaction du graphe, posant les fondations de la chaîne de traitement qui sera exploitée dans les étapes suivantes.

#### Recherche Web


Ces trois fonctions utilisent duckduckgo pour effectuer des recherches web, d'images, et de vidéos, respectivement. Elles simplifient l'accès aux informations et ressources multimédias en lien avec les requêtes de l'utilisateur.



In [None]:
from duckduckgo_search import AsyncDDGS


async def _search_web(query):
    return await AsyncDDGS(proxy=None).atext(query, max_results=2)

async def _search_videos(query):
    return await AsyncDDGS(proxy=None).avideos(query, max_results=2)

async def _search_images(query):
    return await AsyncDDGS(proxy=None).aimages(query, max_results=2)

#### Création d'un model "générique"

Cette fonction instancie un modèle ChatOpenAI destiné à être utilisé par les nœuds du graphe. 

Pour cette démonstration, nous utilisons un modèle unique pour simplifier l'implémentation et la lisibilité. Cependant, il est possible d'utiliser divers modèles et même différents fournisseurs en fonction des exigences spécifiques de chaque nœud.

In [None]:
from os import getenv
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
        model='gpt-4o-mini',
        temperature=0,
        api_key=getenv("OPENAI_API_KEY")
    )

### Observability

Enfin, nous incluons une fonction qui crée un simple callback LangChain pour Langfuse, permettant de "déverser" les évènements du graphe dans [Langfuse](http://localhost:3003) pour apporter de l'observabilité à notre graphe..

In [None]:
from langfuse.callback import CallbackHandler

def langfuse_handler(session_id: str):
    return CallbackHandler(
    secret_key=getenv("LANGFUSE_SECRET_KEY"),
    public_key=getenv("LANGFUSE_PUBLIC_KEY"),
    host=getenv("LANGFUSE_HOST"),
    session_id=session_id,
    user_id="user-id"
)

### Création des différents noeuds

Dans cette section, nous définissons les fonctions de création des nœuds du graphe. Ces nœuds sont les éléments de base de notre flux de traitement et utilisent les fonctions support pour effectuer des recherches, extraire des données et générer des réponses. Chaque nœud exécute une étape spécifique, et ensemble, ils constituent le flux complet de génération de réponse.

Ces nœuds forment la logique principale de traitement du graphe, orchestrant la recherche, l’analyse et la génération de réponse finale. Ensemble, ils permettent de transformer une requête utilisateur en une réponse complète, enrichie et multimodale.

#### Générateur de requête

Appelle un modèle de langage pour générer trois requêtes web à partir de la requête utilisateur initiale. Ces requêtes élargissent le contexte de recherche et permettent de couvrir des angles divers.

In [None]:
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder


genereate_queries_template = ChatPromptTemplate.from_messages(
    [
        (
            "system", """Provide a list of 3 better search queries for 
            web search engine to answer the given question.
            Answer:"""
        ),
        MessagesPlaceholder(variable_name="messages"),
    ]
)

async def genereate_queries(state: ComplexityState):
    structured_model = model.with_structured_output(ComplexitySubQueries)
    chain = genereate_queries_template | structured_model 
    messages = state['messages']
    result = await chain.ainvoke(messages)
    return {"search_queries":result.queries, "title":result.title}

#### Exécuteur de requêtes Internet

`search_web` utilise la fonction support de recherche web pour exécuter chaque requête générée par generate_queries. Grâce à asyncio, ces recherches sont parallélisées pour optimiser les performances.

`search_images` et `search_videos` appellent respectivement les fonctions de recherche d'images et de vidéos pour chacune des trois requêtes générées par generate_queries. Comme pour search_web, ces recherches sont parallélisées avec asyncio pour améliorer les performances et maximiser la diversité des résultats visuels.

In [None]:
import asyncio

async def search_web(state: ComplexityState):
    queries = state['search_queries']
    tasks = [_search_web(q) for q in queries]
    results = await asyncio.gather(*tasks)
    return {"search_results":results}

async def search_videos(state: ComplexityState):
    queries = state['search_queries']
    tasks = [_search_videos(q) for q in queries]
    results = await asyncio.gather(*tasks)
    flatten_results = [item for sublist in results for item in sublist]
    return {"videos":flatten_results}

async def search_images( state: ComplexityState):
    queries = state['search_queries']
    tasks = [_search_images(q) for q in queries]
    results = await asyncio.gather(*tasks)
    flatten_results = [item for sublist in results for item in sublist]
    return {"images":flatten_results}

#### Scrap Web

Utilise un `Loader` fourni par `LangChain` pour récupérer et structurer le contenu des pages web trouvées lors de la recherche précédente. Cela permet d'extraire du texte des pages sélectionnées pour une analyse plus approfondie.

In [None]:
from langchain_community.document_loaders import AsyncHtmlLoader
from langchain_community.document_transformers import Html2TextTransformer

async def scrap_web(state: ComplexityState):
    search_results = state['search_results']
    urls = [result['href'] for search_result in search_results for result in search_result]
    loader = AsyncHtmlLoader(urls)
    docs = loader.load()
    html2text = Html2TextTransformer()
    docs_transformed = html2text.transform_documents(docs)
    return {"scrapped_results":docs_transformed}

#### Sumarize Web

 Appelle un modèle de langage pour générer un résumé de chaque page web scrappée, condensant l'information et facilitant son utilisation dans les étapes suivantes.

In [None]:
summarize_template = ChatPromptTemplate.from_messages(
    [
        (
            "system", """You are an expert at summarizing web search result pages.
            Your task is to summarize the following content in order for anoter assitant to respond to user question. 
            Do mention the source in your summay.
            Do NOT answer directly to the question, another expert will take care of this task.
            Question: {query}
            Source: {source}
            Content: {content} """
        )
    ]
    )

async def summarize(state: ComplexityState):
    chain = summarize_template | model
    page_summary = await chain.ainvoke({
        "query": state['query'], 
        "source": state['content'].metadata['source'], 
        "content": state['content'].page_content})
    return {"page_summaries":[page_summary.content]}

#### Related questions generation

Génère des questions connexes ou "related questions" en rapport avec la requête utilisateur. Ce nœud permet d'enrichir le flux en fournissant des pistes d'exploration supplémentaires.

In [None]:
related_template = ChatPromptTemplate.from_messages(
    [
        (
            "system", """Provide a list of 5 follow up question related to the user one."""
        ),
        ( "user", "{query}")
    ]
)

async def generate_related(state: ComplexityState):
    query = state['messages'][0]
    chain = related_template | model.with_structured_output(ComplexityRelatedQueries)
    result = await chain.ainvoke({"query": query.content})
    return {"related_queries":result.queries}

#### Final answer writer

Synthétise les informations obtenues (résumés des pages web, images, vidéos, etc.) pour répondre de manière complète et directe à la question de l'utilisateur.

In [None]:
writer_template = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                """You are a research writer. Your sole purpose is to write a well-written
                research reports about a topic based on research findings and information..

                """,
            ),
            (
                "user",
                """Query or Topic: {query}               
                 Your task is to write an in depth, well written and detailed introduction and conclusion to the research report based on the provided research data.
                 You MUST include any relevant sources to the introduction and conclusion as markdown hyperlinks -
                 "For example: 'This is a sample text. ([url website](url))'.
                 You MUST return nothing but the report. 
                    ----
                Summaries: {page_summaries}""",
            ),
        ]
    )

async def writer_answer(state: ComplexityState):
    chain = writer_template | model
    response = await chain.ainvoke({
        "query": state['messages'][-1], 
        "page_summaries": state['page_summaries']})
    return {"messages":[response]}

### Créer et compiler le graph

Dans cette étape, nous créons et compilons le graphe qui orchestre le flux de traitement. Le graphe est instancié avec StateGraph en utilisant la structure d'état ComplexityState, et chaque nœud de traitement y est ajouté pour exécuter les différentes étapes de génération de réponse.

Définition des Nœuds : Nous ajoutons chaque fonction créée précédemment comme nœud au graphe de la génération des requêtes utilisateur jusqu’à la création de la réponse finale et des questions complémentaires.

Définition des Liaisons :

Les étapes de base, comme la recherche web et l'analyse de page, sont reliées de manière linéaire avec add_edge.
Exécution parallèle des Résumés : Une fonction _parallel_summarizer est définie pour transmettre chaque résultat scrappé à un nœud de résumé en parallèle, utilisant Send pour fan-out les tâches de résumé à partir des résultats scrappés.
Fan-Out : Après le résumé, nous distribuons les tâches vers les nœuds de recherche d'images, de recherche de vidéos et de génération de réponse.
Fan-In : Les résultats des recherches multimédias et de la réponse sont ensuite regroupés pour générer des questions complémentaires, complétant ainsi le flux de traitement.
Compilation du Graphe : Enfin, nous créons un checkpointer de type MemorySaver pour enregistrer l’état du graphe, puis nous compilons le flux complet avec workflow.compile, produisant une application fonctionnelle qui exécute le workflow.

Ce graphe complet orchestre les différentes étapes de recherche, de traitement et de réponse en exploitant des opérations parallèles et des flux de données organisés. Il constitue le cœur du processus de génération de réponse dans notre implémentation de Perplexity AI.








In [None]:
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langgraph.constants import Send

graph = StateGraph(ComplexityState)
graph.add_node("Génération requête",genereate_queries)
graph.add_node("Recherche web", search_web)
graph.add_node("Web scrapper", scrap_web)
graph.add_node("Recherche images", search_images)
graph.add_node("Recherche vidéos",search_videos)
graph.add_node("Résumé",summarize)
graph.add_node("Générer question complémentaires",generate_related)
graph.add_node("Générer réponse",writer_answer)

graph.set_entry_point("Génération requête")
graph.add_edge("Génération requête","Recherche web")   
graph.add_edge("Recherche web","Web scrapper")    

def _parallel_sumarizer(state):
    return [Send("Résumé", {"content": scrapped_result, "query": state['messages'][0].content}) for scrapped_result in state['scrapped_results']]


graph.add_conditional_edges("Web scrapper", _parallel_sumarizer,["Résumé"]) 

#Fan out from summarizer to writer, videos searcher, images searcher
graph.add_edge(["Résumé"],"Recherche images")
graph.add_edge(["Résumé"],"Recherche vidéos")
graph.add_edge(["Résumé"],"Générer réponse")
#Fan in back to related questions generator
graph.add_edge("Recherche images","Générer question complémentaires")
graph.add_edge("Générer réponse","Générer question complémentaires")
graph.add_edge("Recherche vidéos","Générer question complémentaires")
graph.set_finish_point("Générer question complémentaires")
 
checkpointer = MemorySaver()
wokflow = graph.compile(checkpointer=checkpointer)

### Visualisation du graph

In [None]:
from IPython.display import Image, display

try:
    display(Image(wokflow.get_graph().draw_mermaid_png()))
except Exception:
    # This requires some extra dependencies and is optional
    pass

### Exécution du graph

In [None]:
from langchain_core.messages import (
    HumanMessage,
)

query = "Is AI in a hype cycle ?"
thread_id="05.02-perplexity_clone"

async for chunk in wokflow.astream(
    {"messages": [HumanMessage(content=query)]},
     {
        "callbacks": [langfuse_handler(thread_id)],
        "configurable": {"thread_id": thread_id}
    }, stream_mode="updates",
    ):
    for node, values in chunk.items():
        print(f"Receiving update from node: '{node}'")
        print(values)
        print("\n\n")