# Overview 

**Dans ce notebook, on crée notre agent IA et on le prompt pour avoir une solution au porblème de Veolia**

In [14]:
import boto3
import os
import time
import json

Test pour créer une session et voir si ca marche correctement:

In [None]:
session = boto3.Session()
print(session)

Session(region_name='us-west-2')


On va maintenant lister tous les modèles LLM disponibles dans notre région:

In [3]:
bedrock = boto3.client('bedrock', region_name='us-west-2') 
models = bedrock.list_foundation_models()

for model in models['modelSummaries']:
    print(f"Model ID: {model['modelId']}")

Model ID: amazon.titan-tg1-large
Model ID: amazon.titan-embed-g1-text-02
Model ID: amazon.titan-text-lite-v1:0:4k
Model ID: amazon.titan-text-lite-v1
Model ID: amazon.titan-text-express-v1:0:8k
Model ID: amazon.titan-text-express-v1
Model ID: amazon.nova-pro-v1:0
Model ID: amazon.nova-lite-v1:0
Model ID: amazon.nova-micro-v1:0
Model ID: amazon.titan-embed-text-v1:2:8k
Model ID: amazon.titan-embed-text-v1
Model ID: amazon.titan-embed-text-v2:0
Model ID: amazon.titan-embed-image-v1:0
Model ID: amazon.titan-embed-image-v1
Model ID: amazon.titan-image-generator-v1:0
Model ID: amazon.titan-image-generator-v1
Model ID: amazon.titan-image-generator-v2:0
Model ID: amazon.rerank-v1:0
Model ID: stability.stable-diffusion-xl-v1:0
Model ID: stability.stable-diffusion-xl-v1
Model ID: stability.sd3-large-v1:0
Model ID: stability.sd3-5-large-v1:0
Model ID: stability.stable-image-core-v1:0
Model ID: stability.stable-image-core-v1:1
Model ID: stability.stable-image-ultra-v1:0
Model ID: stability.stable

On va à présent établir une connection à Amazon Redshift, notre base de données serverless:

In [5]:
# Initialisation du client Redshift Data
client = boto3.client('redshift-data', region_name='us-west-2')

# Paramètres de connexion
database = 'dev'  # nom de notre base de données
workgroup_name = 'wz-solutions-redshift-workgroup'  # notre workgroup

# Requête simple pour tester la connexion
sql_query = 'SELECT * from consommations;'

try:
    response = client.execute_statement(
        Database=database,
        WorkgroupName=workgroup_name, 
        Sql=sql_query
    )
    print("Connexion réussie !")
except Exception as e:
    print(f"Erreur de connexion : {e}")

Connexion réussie !


On va test de faire une requête SQL à Amazon Redshift (notre base de données) et afficher le résultat de la requête: 

In [7]:
sql_query = 'SELECT * FROM consommations LIMIT 10;' # on limite à 10 lignes

try:
    # Exécuter la requête SQL
    response = client.execute_statement(
        Database=database,
        WorkgroupName=workgroup_name, 
        Sql=sql_query
    )
    
    # Récupérer l'ID de l'exécution de la requête
    statement_id = response['Id']
    print(f"Requête envoyée, ID : {statement_id}")

    # Attendre que la requête soit terminée
    while True:
        status_response = client.describe_statement(Id=statement_id)
        status = status_response['Status']

        if status in ['FINISHED', 'FAILED', 'ABORTED']:
            break
        print("En attente des résultats...")
        time.sleep(2)  # Pause avant la prochaine vérification

    # Vérifier si la requête s'est bien exécutée
    if status == 'FINISHED':
        # Récupérer les résultats
        result_response = client.get_statement_result(Id=statement_id)
        records = result_response.get('Records', [])

        # Afficher les résultats
        if records:
            print("\nRésultats de la requête :")
            for row in records:
                print([col.get('stringValue', 'NULL') for col in row])  # Adaptation pour afficher chaque ligne
        else:
            print("Aucun résultat trouvé.")

    else:
        print(f"Erreur lors de l'exécution : {status_response.get('Error', 'Erreur inconnue')}")

except Exception as e:
    print(f"Erreur de connexion ou d'exécution : {e}")

Requête envoyée, ID : 932147b8-8af9-4780-86af-26a157cf0ebd

Résultats de la requête :
['GN|11801801030951', 'Hauts de France', 'Littoral Audomarois', 'YU001', 'BATIMENTS COLLECTIFS PRIVES', 'NULL', 'ABAQUE_ML_INTERPO', 'NULL', 'NULL', '2022-05-01', 'NULL']
['GN|11811811000142', 'Hauts de France', 'Littoral Audomarois', 'YU001', 'PROFESSIONNELS', 'NULL', 'ABAQUE_REGION_INTERPO', 'NULL', 'NULL', '2022-05-01', 'NULL']
['GN|11802802006107', 'Hauts de France', 'Littoral Audomarois', 'YU001', 'BATIMENTS COLLECTIFS PRIVES', 'NULL', 'ABAQUE_TR_INTERPO', 'NULL', 'NULL', '2022-05-01', 'NULL']
['GN|11801801029965', 'Hauts de France', 'Littoral Audomarois', 'YU001', 'BATIMENTS COLLECTIFS PRIVES', 'NULL', 'ABAQUE_ML_INTERPO', 'NULL', 'NULL', '2022-05-01', 'NULL']
['GN|11809809004582', 'Hauts de France', 'Littoral Audomarois', 'YU001', 'BATIMENTS COLLECTIFS PRIVES', 'NULL', 'ABAQUE_ML_INTERPO', 'NULL', 'NULL', '2022-05-01', 'NULL']
['GN|11804804005498', 'Hauts de France', 'Littoral Audomarois', 'YU0

## On va maintenant passer à la création de notre agent et aux prompts

Configurations: 

In [15]:
# ---- AWS CONFIGURATION ----
AWS_REGION = "us-west-2"
MODEL_ID = "mistral.mixtral-8x7b-instruct-v0:1"  # Utilisation de Mistral AI

# ---- REDSHIFT SERVERLESS CONFIGURATION ----
DATABASE = "dev"  # nom de notre base de données
WORKGROUP_NAME = "wz-solutions-redshift-workgroup"  # notre workgroup

# ---- INITIALISATION DES CLIENTS ----
bedrock = boto3.client("bedrock-runtime", region_name=AWS_REGION)
redshift_client = boto3.client("redshift-data", region_name=AWS_REGION)

Prompt 1: Analyse des tables et colonnes de notre base de données

In [13]:
# ---- FONCTION POUR RÉCUPÉRER LES TABLES REDSHIFT ----
def get_redshift_tables():
    """Récupère la liste des tables disponibles dans la base de données Redshift."""
    sql_query = "SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = 'public';"
    try:
        response = redshift_client.execute_statement(
            Database=DATABASE,
            WorkgroupName=WORKGROUP_NAME,
            Sql=sql_query
        )

        statement_id = response['Id']
        while True:
            status_response = redshift_client.describe_statement(Id=statement_id)
            status = status_response["Status"]
            if status in ["FINISHED", "FAILED", "ABORTED"]:
                break
            time.sleep(2)

        if status == "FINISHED":
            result_response = redshift_client.get_statement_result(Id=statement_id)
            tables = [row[0].get('stringValue', 'NULL') for row in result_response.get("Records", [])]
            return tables if tables else "Aucune table trouvée."
        else:
            return f"Erreur lors de l'exécution : {status_response.get('Error', 'Erreur inconnue')}"

    except Exception as e:
        return f"Erreur de connexion ou d'exécution : {str(e)}"


# ---- FONCTION POUR RÉCUPÉRER LES COLONNES D'UNE TABLE ----
def get_table_columns(table_name):
    """Récupère les colonnes et leur description d'une table Redshift."""
    sql_query = f"""
        SELECT column_name, data_type
        FROM information_schema.columns
        WHERE table_name = '{table_name}';
    """
    try:
        response = redshift_client.execute_statement(
            Database=DATABASE,
            WorkgroupName=WORKGROUP_NAME,
            Sql=sql_query
        )

        statement_id = response['Id']
        while True:
            status_response = redshift_client.describe_statement(Id=statement_id)
            status = status_response["Status"]
            if status in ["FINISHED", "FAILED", "ABORTED"]:
                break
            time.sleep(2)

        if status == "FINISHED":
            result_response = redshift_client.get_statement_result(Id=statement_id)
            columns = [
                f"{row[0].get('stringValue', 'NULL')} ({row[1].get('stringValue', 'NULL')})"
                for row in result_response.get("Records", [])
            ]
            return columns if columns else "Aucune colonne trouvée."
        else:
            return f"Erreur lors de l'exécution : {status_response.get('Error', 'Erreur inconnue')}"

    except Exception as e:
        return f"Erreur de connexion ou d'exécution : {str(e)}"


# ---- FONCTION POUR ANALYSER LES TABLES AVEC L'AGENT ----
def agent_analyze_tables():
    """Analyse et décrit chaque table détectée dans Redshift."""
    tables = get_redshift_tables()
    if isinstance(tables, str):
        return tables

    all_tables_info = []
    for table in tables:
        columns = get_table_columns(table)
        all_tables_info.append(f"Table: {table}\nColonnes: {', '.join(columns)}")

    # Construire le prompt pour Mistral AI
    prompt = f"""
    Tu es un expert en bases de données. Analyse et décris chacune des tables trouvées dans Amazon Redshift.
    
    Voici les informations trouvées :
    {chr(10).join(all_tables_info)}

    Pour chaque table :
    1. Donne son rôle dans la base de données.
    2. Explique à quoi sert chaque colonne.
    
    Réponds de manière simple et concise, chaque description de colonne doit être de la même longueur et la plus courte et complète possible.
    """

    return analyze_with_mistral(prompt)


# ---- FONCTION POUR INTERAGIR AVEC MISTRAL AI ----
def analyze_with_mistral(prompt):
    """Envoie un prompt à Mistral AI via Amazon Bedrock."""
    request_body = {
        "prompt": prompt,
        "max_tokens": 1000,
        "temperature": 0.3,
        "top_p": 0.9
    }

    response = bedrock.invoke_model(
        modelId=MODEL_ID,
        body=json.dumps(request_body)
    )

    response_body = json.loads(response["body"].read())
    return response_body.get("outputs", [{}])[0].get("text", "")


# ---- EXÉCUTION ----
if __name__ == "__main__":
    response = agent_analyze_tables()
    print("\n Analyse des Tables Redshift \n")
    print(response) 
    rep_prompt1 = response 



 Analyse des Tables Redshift 


    abonnements
    1. Gère les informations sur les abonnements des clients.
    2. cle_abonnement : identifiant unique de l'abonnement
     date_entree_local_abonnement : date d'entrée en vigueur locale de l'abonnement
     date_resiliation_abonnement : date de résiliation de l'abonnement
     date_souscription_abonnement : date de souscription de l'abonnement

    consommations
    1. Enregistre les données de consommation des clients.
    2. annee_conso : année de la consommation
     mois_conso : mois de la consommation
     diametre_nominal : diamètre nominal de la conduite
     volume_mois : volume consommé dans le mois
     type_abaque : type d'abaque utilisé pour le calcul de la consommation
     libelle_categorie_abonne : libellé de la catégorie de l'abonné
     code_contrat : code du contrat
     libelle_territoire : libellé du territoire
     libelle_region : libellé de la région
     cle_pds : identifiant unique du point de livraison
     d

Prompt 2: Rapport d'Anomalies sur une table choisie par l'utilisateur

In [21]:
# ---- FONCTION POUR RÉCUPÉRER UN ÉCHANTILLON D'UNE TABLE ----
def get_table_sample(table_name):
    """Récupère 100 lignes d'une table Redshift pour analyse."""
    sql_query = f"SELECT * FROM {table_name} LIMIT 100;"
    
    try:
        print(f"🔄 Envoi de la requête à Redshift: {sql_query}")
        response = redshift_client.execute_statement(
            Database=DATABASE,
            WorkgroupName=WORKGROUP_NAME,
            Sql=sql_query
        )

        statement_id = response['Id']
        print(f"✅ Requête envoyée, ID: {statement_id}")

        # Timeout après 60 secondes
        start_time = time.time()
        while True:
            status_response = redshift_client.describe_statement(Id=statement_id)
            status = status_response["Status"]

            if status in ["FINISHED", "FAILED", "ABORTED"]:
                break
            
            # Vérification du timeout (60 secondes max)
            if time.time() - start_time > 60:
                print("⏳ Timeout dépassé (60s). Annulation de la requête.")
                return "Timeout: La requête a pris trop de temps."

            print("⏳ En attente des résultats...")
            time.sleep(3)  # Vérification toutes les 3 secondes

        if status == "FINISHED":
            print("✅ Requête terminée, récupération des résultats...")
            result_response = redshift_client.get_statement_result(Id=statement_id)
            records = [
                ", ".join([col.get('stringValue', 'NULL') for col in row])
                for row in result_response.get("Records", [])
            ]
            return records if records else "Aucune donnée trouvée."
        else:
            print(f"❌ Erreur d'exécution: {status}")
            return f"Erreur lors de l'exécution: {status_response.get('Error', 'Erreur inconnue')}"

    except Exception as e:
        print(f"❌ Erreur de connexion ou d'exécution : {str(e)}")
        return f"Erreur : {str(e)}"


# ---- FONCTION POUR DÉTECTER LES ANOMALIES ----
def agent_detect_anomalies(table_name):
    """Détecte les anomalies d'une table spécifique dans Redshift via Mistral AI."""
    table_sample = get_table_sample(table_name)
    
    if isinstance(table_sample, str):
        return table_sample  # Si erreur, on la renvoie directement

    formatted_sample = "\n".join(table_sample)

    # Construction du prompt
    prompt = f"""
    Tu es un expert en qualité des données.
    {rep_prompt1}
    
    Voici un échantillon de la table "{table_name}":
    {formatted_sample}
    
    Identifie les anomalies (valeurs nulles, doublons, erreurs de format, etc.).
    Génère une requête SQL pour afficher les lignes contenant des données incorrectes.

    Réponds en suivant ce format :
    - **Type d'anomalie** : [Catégorie de l'anomalie]
    - **Description** : [Brève explication]
    - **Requête SQL** : [Requête pour afficher les données erronées]
    """

    return analyze_with_mistral(prompt)


# ---- FONCTION POUR INTERAGIR AVEC MISTRAL AI ----
def analyze_with_mistral(prompt):
    """Envoie un prompt à Mistral AI via Amazon Bedrock."""
    request_body = {
        "prompt": prompt,
        "max_tokens": 3000,
        "temperature": 0.3,
        "top_p": 0.9
    }

    try:
        print("🔄 Envoi du prompt à Mistral AI...")
        response = bedrock.invoke_model(
            modelId=MODEL_ID,
            body=json.dumps(request_body)
        )
        print("✅ Réponse reçue de Mistral.")

        response_body = json.loads(response["body"].read())
        return response_body.get("outputs", [{}])[0].get("text", "")

    except Exception as e:
        print(f"❌ Erreur d'interaction avec Mistral AI : {str(e)}")
        return f"Erreur : {str(e)}"


# ---- EXÉCUTION ----
if __name__ == "__main__":
    table_name = input("🔍 Entrez le nom de la table à analyser : ")
    response = agent_detect_anomalies(table_name)

    print("\n Rapport d'Anomalies \n")
    print(response)
    rep_prompt2 = response

🔄 Envoi de la requête à Redshift: SELECT * FROM factures LIMIT 100;
✅ Requête envoyée, ID: aa5bb81d-51f5-4f0a-a13e-025e6ab4b9df
⏳ En attente des résultats...
✅ Requête terminée, récupération des résultats...
🔄 Envoi du prompt à Mistral AI...
✅ Réponse reçue de Mistral.

🔎 **Rapport d'Anomalies** 🔎


    Anomalie 1
    ----------------------------------------------------------------------------------------------------------------
    - **Type d'anomalie** : Valeurs nulles
    - **Description** : Il y a des lignes avec des valeurs nulles dans la colonne "nb_factures_par_pds".
    - **Requête SQL** : 
    ```
    SELECT * FROM factures WHERE nb_factures_par_pds IS NULL;
    ```
    
    Anomalie 2
    ----------------------------------------------------------------------------------------------------------------
    - **Type d'anomalie** : Valeurs nulles
    - **Description** : Il y a des lignes avec des valeurs nulles dans la colonne "conso_facture".
    - **Requête SQL** : 
    ```
    

Prompt 3: Rapport d'Anomalies entre deux tables choisies par l'utilisateur (jointures des tables)

In [22]:
# ---- FONCTION POUR ANALYSER DES ANOMALIES DANS LES JOINTURES ----
def agent_detect_join_anomalies(table1, table2):
    """Détecte les incohérences entre deux tables en utilisant Mistral AI."""
    
    # Construction de la requête SQL pour joindre les tables et analyser les incohérences
    sql_query = f"""
    SELECT 
        a.CLE_ABONNEMENT, 
        a.DATE_RESILIATION_ABONNEMENT,
        c.VOLUME_MOIS,
        f.CONSO_FACTURE,
        f.DATE_EMISSION_FACTURE
    FROM abonnements a
    LEFT JOIN consommations c ON a.CLE_ABONNEMENT = c.CLE_PDS
    LEFT JOIN factures f ON a.CLE_ABONNEMENT = f.CLE_ABONNEMENT
    WHERE 
        (a.DATE_RESILIATION_ABONNEMENT IS NOT NULL AND a.DATE_RESILIATION_ABONNEMENT < CURRENT_DATE)
        OR (c.VOLUME_MOIS > 0 AND a.DATE_RESILIATION_ABONNEMENT IS NOT NULL AND a.DATE_RESILIATION_ABONNEMENT < CURRENT_DATE)
        OR (f.CONSO_FACTURE > 0 AND a.DATE_RESILIATION_ABONNEMENT IS NOT NULL AND a.DATE_RESILIATION_ABONNEMENT < CURRENT_DATE)
    LIMIT 100;

    """
    
    print(f"🔄 Exécution de la requête : {sql_query}")
    
    try:
        response = redshift_client.execute_statement(
            Database=DATABASE,
            WorkgroupName=WORKGROUP_NAME,
            Sql=sql_query
        )

        statement_id = response['Id']
        print(f"✅ Requête envoyée, ID: {statement_id}")

        # Timeout après 60s
        start_time = time.time()
        while True:
            status_response = redshift_client.describe_statement(Id=statement_id)
            status = status_response["Status"]
            if status in ["FINISHED", "FAILED", "ABORTED"]:
                print(f"🔴 Statut final : {status}")
                break

            if time.time() - start_time > 60:
                print("⏳ Timeout dépassé (60s). Annulation.")
                return "Timeout: La requête a pris trop de temps."

            print("⏳ En attente des résultats...")
            time.sleep(3)

        if status == "FINISHED":
            print("✅ Requête terminée, récupération des résultats...")
            result_response = redshift_client.get_statement_result(Id=statement_id)
            records = [
                ", ".join([col.get('stringValue', 'NULL') for col in row])
                for row in result_response.get("Records", [])
            ]
            return records if records else "Aucune anomalie trouvée."
        else:
            print(f"❌ Erreur d'exécution: {status}")
            return f"Erreur lors de l'exécution: {status_response.get('Error', 'Erreur inconnue')}"

    except Exception as e:
        print(f"❌ Erreur de connexion ou d'exécution : {str(e)}")
        return f"Erreur : {str(e)}"


# ---- FONCTION POUR DEMANDER À L’AGENT D’ANALYSER LES ANOMALIES ----
def agent_analyze_join_anomalies(table1, table2):
    """Envoie un prompt à Mistral AI pour analyser les incohérences entre deux tables."""
    anomalies = agent_detect_join_anomalies(table1, table2)
    
    if isinstance(anomalies, str):
        return anomalies  # Si erreur, on la renvoie direct 

    formatted_anomalies = "\n".join(anomalies)

    # Construction du prompt pour Mistral AI
    prompt = f"""
    Tu es un expert en qualité des données.
    {rep_prompt1}

    Voici les incohérences trouvées entre les tables "{table1}" et "{table2}":
    {formatted_anomalies}
    
    Ton objectif :
    1. Identifier les types d'anomalies liant ces tables (exemple : consommations sans abonnement actif).
    2. Expliquer chaque anomalie de manière simple, courte et précise.
    3. Générer une requête SQL permettant d'afficher ces anomalies.

    Réponds en suivant ce format :
    - **Type d'anomalie** : [Catégorie de l'anomalie]
    - **Description** : [Brève explication]
    - **Requête SQL** : [Requête pour afficher les données erronées]
    """

    return analyze_with_mistral(prompt)


# ---- FONCTION POUR INTERAGIR AVEC MISTRAL AI ----
def analyze_with_mistral(prompt):
    """Envoie un prompt structuré à Mistral AI via Amazon Bedrock."""
    request_body = {
        "prompt": prompt,
        "max_tokens": 3000,
        "temperature": 0.3,
        "top_p": 0.9
    }

    try:
        print("🔄 Envoi du prompt à Mistral AI...")
        response = bedrock.invoke_model(
            modelId=MODEL_ID,
            body=json.dumps(request_body)
        )
        print("✅ Réponse reçue de Mistral.")

        response_body = json.loads(response["body"].read())
        return response_body.get("outputs", [{}])[0].get("text", "")

    except Exception as e:
        print(f"❌ Erreur d'interaction avec Mistral AI : {str(e)}")
        return f"Erreur : {str(e)}"


# ---- EXÉCUTION ----
if __name__ == "__main__":
    print("🔍 Détection d'anomalies dans les jointures entre tables")
    table1 = input("Entrez le premier nom de table : ")
    table2 = input("Entrez le deuxième nom de table : ")
    
    response = agent_analyze_join_anomalies(table1, table2)

    print("\n🔎 **Rapport d'Anomalies dans les Jointures** 🔎\n")
    print(response)

🔍 Détection d'anomalies dans les jointures entre tables
🔄 Exécution de la requête : 
    SELECT 
        a.CLE_ABONNEMENT, 
        a.DATE_RESILIATION_ABONNEMENT,
        c.VOLUME_MOIS,
        f.CONSO_FACTURE,
        f.DATE_EMISSION_FACTURE
    FROM abonnements a
    LEFT JOIN consommations c ON a.CLE_ABONNEMENT = c.CLE_PDS
    LEFT JOIN factures f ON a.CLE_ABONNEMENT = f.CLE_ABONNEMENT
    WHERE 
        (a.DATE_RESILIATION_ABONNEMENT IS NOT NULL AND a.DATE_RESILIATION_ABONNEMENT < CURRENT_DATE)
        OR (c.VOLUME_MOIS > 0 AND a.DATE_RESILIATION_ABONNEMENT IS NOT NULL AND a.DATE_RESILIATION_ABONNEMENT < CURRENT_DATE)
        OR (f.CONSO_FACTURE > 0 AND a.DATE_RESILIATION_ABONNEMENT IS NOT NULL AND a.DATE_RESILIATION_ABONNEMENT < CURRENT_DATE)
    LIMIT 100;

    
✅ Requête envoyée, ID: 751809c8-f4c0-4229-aaba-feae77c25af0
🔴 Statut final : FINISHED
✅ Requête terminée, récupération des résultats...
🔄 Envoi du prompt à Mistral AI...
✅ Réponse reçue de Mistral.

🔎 **Rapport d'Anomalie