In [109]:
## Imports
import os
import json

# LangGraph pour l'architecture en étapes/noeuds
from pydantic import BaseModel, Field
from typing import TypedDict, List, Dict, Any, Optional
from langgraph.graph import StateGraph, END

# LangChain pour la génération de recommandations
from openai import OpenAI
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import PydanticOutputParser


In [110]:
## Récuépration des variables d'environnement (clé API)
openai_api_key = os.environ['OPENAI_API_KEY']

## Création d'un client OpenAI pour la génération de recommandations
client = OpenAI()
llm = ChatOpenAI(model="gpt-4o-mini")

## Document en entrée
json_file = "Data/rapport.json"

In [111]:
## Sortie pour les anomalies
class Anomalies(BaseModel):
    metric: str = Field(description="Métrique ou élément où il y a une anomalie")
    value: Any = Field(description="La valeur qui pose problème")
    issue: str = Field(description="Les conséquences ou explications de la valeur problématique")
    
## Sortie pour les recommendations
class Recommendations(BaseModel):
    anomalie: str = Field(description="Description d'anomalie.s rencontrée.S")
    suggestion: str = Field(description="Action.s suggérée.s pour optimiser l'infrastructure")

class RecommendationList(BaseModel):
    recommendations: List[Recommendations] = Field(description="Une liste de recommandations d'optimisation basées sur les anomalies détectées.")

    
## Définition de l'état du graphe : InputState pour la gestion de l'entrée et OutputState pour la gestion de la sortie
class State(TypedDict):
    input_path: str                 
    input_data: Optional[List[Dict[str, Any]]] 
    anomalies: List[Anomalies]     
    recommendations: RecommendationList
    error: Optional[str]   


In [124]:
## Fonctions pour chaque étape de l'architecture

def data_ingestion(state):
    """ Noeud d'ingestion des données """
    print("### Noeud en cours : Ingestion des données ###")
    print("\tSTATE : ", state)
    try :
        if ".json" in state["input_data"] : 
            with open(state["input_data"], 'r') as file :
                data = json.load(file)
            print(f"Data ingestion complétée pour le fichier : { state['input_data'] }\n")

        else :
            data = state["input_data"]
            print(f"Data ingestion complétée")
        return {**state, "input_data": data, "error": None}
        
    except Exception as e :
        print(f"Erreur pendant l'ingestion du fichier : {e}\n")
        return {**state, "input_data": {}, "error": f"Ingestion failed: {str(e)}"}
    

def anomalies_detection(state):
    """ Noeud de détection d'anomalies dans les donées """
    print("\n\n### Noeud en cours : Détection des anomalies ###")
    print("\tSTATE : ", state)

    if state["error"] != None:
        print("Erreur détectée dans un noeud précédent. Arrêt de la génération de recommandations\n")
        return state
    
    if type(state["input_data"]) == dict : 
        state["input_data"] = [state["input_data"]]
    data = state["input_data"]
    anomalies_detected: List[Anomalies] = []
        
    for d in data :
        if d["cpu_usage"] > 80 :
            anomalies_detected.append({"metric": "cpu_usage","value": d["cpu_usage"],"issue": "High CPU Usage (> 80%)"})
        if d["memory_usage"] > 80 :
            anomalies_detected.append({"metric": "memory_usage","value": d["memory_usage"],"issue": "High Memory Usage (> 80%)"})

        service_status = d["service_status"]
        for service, status in service_status.items():
            if status != "online":
                anomalies_detected.append({"metric": f"service_status : {service}", "value": status, "issue": f"Service {service} is {status}"})
        
    print(f"{len(anomalies_detected)} anomalie.s détectée.s")
    print("\n\tANOMALIES DETECTED ", anomalies_detected)
    state["anomalies"] = anomalies_detected
    return state

def recommandations_generation(state):
    """ Noeud de génération de recommandations à partir des anomalies détectées """
    print("\n\n### Noeud en cours : Génération de recommandations ###")
    print("STATE : ", state)
    
    if state["error"] != None:
        print("Erreur détectée dans un noeud précédent. Arrêt de la génération de recommandations")
        return state
    
    anomalies = state["anomalies"]    
    recommendations: List[Recommendations] = [] 
    parser = PydanticOutputParser(pydantic_object=RecommendationList)
    prompt_template = ChatPromptTemplate.from_messages([("system",
                                                         "Tu es un ingénieur infrastructure expert analysant des anomalies de monitoring. "
                                                         "Ta tâche est de fournir des recommandations d'optimisation concrètes et actionnables basées sur les anomalies détectées,. "
                                                         "Structure ta réponse exactement selon le schéma JSON fourni."
                                                         "\n{format_instructions}"), 
                                                         ("human", 
                                                          "Voici les anomalies détectées sur l'infrastructure :\n"
                                                          "{anomaly_list}\n\n"
                                                          "Génère une liste de recommandations pour traiter ces problèmes, dans un seul texte.  Pour chaque anomalie, fournis un seul objet `Recommendations` "
                                                          "où le champ 'suggestion' regroupe toutes les actions proposées pour cette anomalie, séparées par un saut de ligne et listées par des numéros.")
                                                        ])
        
    chain = prompt_template | llm | parser
    

    if anomalies == []:
        print("Aucune anomalie trouvée dans l'état. Pas de recommandations à générer.")
        return {**state, "recommendations": []}
    
    for a in anomalies:
        formatted_anomalies = f"- Métrique: {a['metric']}, Valeur: {a['value']}, Problème: {a['issue']}"
        try:
            print("Appel du LLM (gpt-4o-mini) pour générer les recommandations...")
            response = chain.invoke({
                "format_instructions": parser.get_format_instructions(), 
                "anomaly_list": formatted_anomalies
            })
            
            # recommendations.append(response.recommendations)
            recommendations.append({"anomalie" : response.recommendations[0].anomalie, "suggestion": response.recommendations[0].suggestion})
            print(f"Génération réussie de {len(recommendations)} recommandations.")

        except Exception as e:
            print(f"Erreur lors de l'appel LLM ou du parsing de la réponse : {e}")
            return {**state, "recommendations": [], "error": f"Génération de recommandations a échoué: {str(e)}"}

    state["recommendations"] = recommendations
    print("FINAL STATE : ", state)
    return state

def file_creation(state):
    print("\n\n### Noeud en cours : Création d'un fichier avec les anomalies et les recommandations (état final du graphe) ###")
    print("FINAL STATE : ", state)

    with open("Recommendations/recommandations.json", "w") as outfile:
        json.dump(state, outfile, indent=4, sort_keys=False, ensure_ascii=False)#.encode('utf8')


In [125]:
## Définition du graphe
# Initialisation du graphe
workflow = StateGraph(State) #, input=InputState, output=OutputState)

# Définition des noeuds du graphe
workflow.add_node("ingestion", data_ingestion)
workflow.add_node("analyze", anomalies_detection)
workflow.add_node("recommend", recommandations_generation)
workflow.add_node("file_creation", file_creation)

# Début du graphe
workflow.set_entry_point("ingestion")

# Définition des arêtes du graphe
workflow.add_edge("ingestion", "analyze")
workflow.add_edge("analyze", "recommend")
# workflow.add_edge("recommend", END)
workflow.add_edge("recommend", "file_creation")
workflow.add_edge("file_creation", END)

# Compilation du graphe
graph = workflow.compile()

In [126]:
# from IPython.display import Image, display
# display(Image(graph.get_graph().draw_mermaid_png()))

In [None]:
graph.invoke({"input_data" : {
    "timestamp": "2023-10-01T15:00:00Z",
    "cpu_usage": 73,
    "memory_usage": 79,
    "latency_ms": 213,
    "disk_usage": 77,
    "network_in_kbps": 1506,
    "network_out_kbps": 1618,
    "io_wait": 6,
    "thread_count": 165,
    "active_connections": 73,
    "error_rate": 0.04,
    "uptime_seconds": 370800,
    "temperature_celsius": 67,
    "power_consumption_watts": 290,
    "service_status": {
      "database": "online",
      "api_gateway": "degraded",
      "cache": "degraded"}
    }
  }    
)

### Noeud en cours : Ingestion des données ###
	STATE :  {'input_data': {'timestamp': '2023-10-01T15:00:00Z', 'cpu_usage': 73, 'memory_usage': 79, 'latency_ms': 213, 'disk_usage': 77, 'network_in_kbps': 1506, 'network_out_kbps': 1618, 'io_wait': 6, 'thread_count': 165, 'active_connections': 73, 'error_rate': 0.04, 'uptime_seconds': 370800, 'temperature_celsius': 67, 'power_consumption_watts': 290, 'service_status': {'database': 'online', 'api_gateway': 'degraded', 'cache': 'degraded'}}}
Data ingestion complétée


### Noeud en cours : Détection des anomalies ###
	STATE :  {'input_data': {'timestamp': '2023-10-01T15:00:00Z', 'cpu_usage': 73, 'memory_usage': 79, 'latency_ms': 213, 'disk_usage': 77, 'network_in_kbps': 1506, 'network_out_kbps': 1618, 'io_wait': 6, 'thread_count': 165, 'active_connections': 73, 'error_rate': 0.04, 'uptime_seconds': 370800, 'temperature_celsius': 67, 'power_consumption_watts': 290, 'service_status': {'database': 'online', 'api_gateway': 'degraded', 'cache': '

{'input_data': [{'timestamp': '2023-10-01T15:00:00Z',
   'cpu_usage': 73,
   'memory_usage': 79,
   'latency_ms': 213,
   'disk_usage': 77,
   'network_in_kbps': 1506,
   'network_out_kbps': 1618,
   'io_wait': 6,
   'thread_count': 165,
   'active_connections': 73,
   'error_rate': 0.04,
   'uptime_seconds': 370800,
   'temperature_celsius': 67,
   'power_consumption_watts': 290,
   'service_status': {'database': 'online',
    'api_gateway': 'degraded',
    'cache': 'degraded'}}],
 'anomalies': [{'metric': 'service_status : api_gateway',
   'value': 'degraded',
   'issue': 'Service api_gateway is degraded'},
  {'metric': 'service_status : cache',
   'value': 'degraded',
   'issue': 'Service cache is degraded'}],
 'recommendations': [{'anomalie': 'Service api_gateway is degraded',
   'suggestion': "1. Vérifiez l'état des instances du service api_gateway pour identifier toute instance défaillante.\n2. Augmentez les ressources (CPU, RAM) allouées au service si elles sont insuffisantes.\n

In [None]:
graph.invoke({"input_data" : json_file}) 