# Import

In [3]:
from datetime import datetime
import pandas as pd
import numpy as np
import re
from app.utils.functions import *
from app.core import config
from IPython.display import HTML
from app.services.ollama_service import format_response
import httpx
from tqdm import tqdm
from tqdm.asyncio import tqdm
import spacy

# Data Preparation

In [11]:
res = await execute_sp(
    "dbo.sp_simBudLines",
    {
        "user_fk": config.USER_FK,
        "form_fk": 167,
        "line_fk": 0,
        "choix": 0,
        "isVisible": 1
    }
)
simple_dict = create_simplified_hierarchy(res)

In [153]:
async def data_preparation(sa_fk):
    data = await execute_sp(
        "ia.sp_simBudFormSA_one", 
        {
            "user_fk": config.USER_FK, 
            "sa_fk": sa_fk, 
            "form_fk": 167
        }
    )
    if data[0].get('EcrituresDetails') == None:
        return pd.DataFrame(None)
    json_string = data[0].get('EcrituresDetails')

    data_records = json.loads(json_string)
    context_data = pd.DataFrame(data_records)
    df = preprocessing_data(context_data, simple_dict)

    if df["Section  analytique"].unique().tolist() in [[''], [], None]:
        df["Section  analytique"] = df["Liste de sélection"]
        
    # Renommage et nettoyage
    df = df.rename(
        columns={
            'Code Hiérarchique': 'Code_H', 
            'Montant': 'Montant',
            'Lignes': 'Ligne_Analytique',
            'Contexte': 'Contexte',
            'Année': 'Annee',
            'Groupe': 'Groupe',
            'Section  analytique': 'Residence'
        }
    )

    df['Annee'] = df['Annee'].astype(int)
    df['Mois'] = df['Mois'].astype(int)
    df['Contexte'] = df['Contexte'].replace({'R': 'Réel', 'B': 'Budget', 'P': 'Prévision'})

    df_agg = df.groupby(
        [
            'Residence', 'Colonnes', 'Annee', 'Mois', "Nature de l'écriture", 'Contexte', 'Code_H', 'Ligne_Analytique', 'Groupe'
        ]
    )['Montant'].sum().reset_index()

    contexte_order = ['Réel', 'Prévision', 'Budget']

    def mois_sort_key(mois):
        try:
            return int(mois)
        except:
            return 99

    df_pivot = df_agg.pivot_table(
        index=['Groupe', 'Code_H', 'Ligne_Analytique'],
        columns=['Annee', 'Contexte', 'Mois', "Nature de l'écriture"],
        values='Montant',
        fill_value="",
        aggfunc='sum'
    )
    # Explicitly infer objects to avoid FutureWarning from fill_value on object dtype
    df_pivot = df_pivot.infer_objects(copy=False)

    if df_pivot.columns.nlevels == 4:
        nature_unique = df_pivot.columns.get_level_values(3).unique().tolist()
        if "Annuelle" in nature_unique:
            nature_unique = [n for n in nature_unique if n != "Annuelle"]
            nature_order_desc = sorted(nature_unique, reverse=True) + ["Annuelle"]
        else:
            nature_order_desc = sorted(nature_unique, reverse=True)
        nature_order_dict = {name: i for i, name in enumerate(nature_order_desc)}
        
        def col_sort_key(x):
            return (
                int(x[0]) if str(x[0]).isdigit() else 0,
                contexte_order.index(x[1]) if x[1] in contexte_order else 99,
                mois_sort_key(x[2]),
                nature_order_dict.get(x[3], 999)
            )
        df_pivot = df_pivot[sorted(df_pivot.columns, key=col_sort_key)]

    df_pivot = df_pivot.reset_index()

    def code_hierarchical_sort_key(code):
        parts = [int(part) if part.isdigit() else part for part in re.split(r'\D+', str(code).strip('.')) if part]
        return parts

    df_pivot_sorted = df_pivot.copy()
    df_pivot_sorted['__sort_key'] = df_pivot_sorted['Code_H'].apply(code_hierarchical_sort_key)
    df_pivot_sorted = df_pivot_sorted.sort_values('__sort_key').drop(columns='__sort_key', level=0).reset_index(drop=True)
        
    """ mask_annuelle = df_pivot_sorted.columns.get_level_values(3) == "Annuelle"
    annuelle_cols = df_pivot_sorted.columns[mask_annuelle].tolist()

    meta_cols = [c for c in df_pivot_sorted.columns if c[0] in ("Groupe", "Code_H", "Ligne_Analytique")]

    selected_cols = meta_cols + annuelle_cols
    df_pivot_sorted_annual = df_pivot_sorted.loc[:, selected_cols] """

    def format_value(val):
        try:
            if isinstance(val, (float, np.floating, int, np.integer)):
                if float(val) == int(val):
                    return int(val)
                else:
                    return "{:.2f}".format(float(val))

            if isinstance(val, str):
                num = float(val.replace(",", ".").strip())
                if num == int(num):
                    return int(num)
                else:
                    return "{:.2f}".format(num)
            return val
        except:
            return val

    for col in df_pivot_sorted.columns[3:]:
        df_pivot_sorted[col] = df_pivot_sorted[col].apply(format_value)

    """ df_pivot_sorted = df_pivot_sorted[
        df_pivot_sorted["Code_H"].apply(
            lambda x: str(x).strip('.').split('.')[0] in ['1', '2', '3', '4']
        )
    ].reset_index(drop=True) """

    return df_pivot_sorted

In [154]:
df = await data_preparation(sa_fk=224)
display(df)

Annee,Groupe,Code_H,Ligne_Analytique,2022,2022,2022,2022,2022,2022,2022,...,2026,2026,2026,2026,2026,2026,2026,2026,2026,2026
Contexte,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Réel,Réel,Réel,Réel,Réel,Réel,Réel,...,Budget,Budget,Budget,Budget,Budget,Budget,Budget,Budget,Budget,Budget
Mois,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,1,2,3,4,5,6,7,...,4,5,6,7,8,9,10,11,12,12
Nature de l'écriture,Unnamed: 1_level_3,Unnamed: 2_level_3,Unnamed: 3_level_3,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Mensuelle,...,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Mensuelle,Annuelle
0,Chiffre d'affaire,1.,RECETTES,166643,141995,139423,157690,133213,136995,175378,...,204957,171803,176400,204364,178761,176367,206846,177050,165800,2221683
1,Chiffre d'affaire,1.1.,Loyers logements et parkings HT,143177,140713,140431,130839,130957,132729,133824,...,168918,168918,172014,168479,171577,170979,171862,171862,173628,2057395
2,Chiffre d'affaire,1.1.1.,CA Locatif Estudines,142098,139996,139868,130404,130598,132427,133522,...,168318,168318,171414,167879,170977,170379,171262,171262,173028,2050195
3,Chiffre d'affaire,1.1.4.,CA Locatif Parkings,1078,716,564,435,359,302,302,...,600,600,600,600,600,600,600,600,600,7200
4,Chiffre d'affaire,1.2.,RECETTES ANNEXES,23467,1282,38,26850,2256,4657,41555,...,36150,3000,4500,36000,7300,5500,35100,5300,3100,176470
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
148,Chiffre d'affaire,10.,% DES RECETTES TOTALES,23.06,-26.19,-38.81,49.38,-30.60,-5.33,44.53,...,52.44,66,-69.20,49,65.55,-65.21,51.60,63.38,-104.07,13.40
149,Chiffre d'affaire,11.,Dotations aux amortissements (CAPEX),849,766,848,821,862,866,1234,...,2675,2747,2675,3116,3074,2992,3074,2992,3074,34447
150,Marge,12.,EBITDA,39274,-36423,-53258,78695,-39898,-6439,79330,...,110156,116129,-119385,103264,120260,-112009,109806,115205,-169469,332103
151,Charge,13.,CAPEX,,,,16774,5431,,41603,...,,,,,,,,,,


In [21]:
def melt_pivot_table(df: pd.DataFrame):
    """Transformer la table pivot en format long"""
    # Préparer la liste pour le format long
    melted_data = []
    mois_str = [
        'Janvier',
        'Février',
        'Mars',
        'Avril',
        'Mai',
        'Juin',
        'Juillet',
        'Août',
        'Septembre',
        'Octobre',
        'Novembre',
        'Décembre'
    ]
    # Pour chaque ligne
    for idx in range(len(df)):
        # Pour chaque colonne de données (skip les 4 premières colonnes descriptives)
        for col_idx in range(3, len(df.columns)):
            col = df.columns[col_idx]
            
            # Extraire les métadonnées
            annee, contexte, mois, nature = col
            
            # Récupérer la valeur
            valeur = df.iloc[idx, col_idx]
            
            # Créer une entrée
            entry = {
                'Groupe': df.iloc[idx, 0],
                'Code_H': df.iloc[idx, 1],
                'Ligne_Analytique': df.iloc[idx, 2],
                'Année': annee,
                'Contexte': contexte,
                'Mois': mois_str[mois-1],
                'Nature': nature,
                'Valeur': valeur
            }
            
            melted_data.append(entry)
    
    return pd.DataFrame(melted_data)

# Transformer les données
melted_df = melt_pivot_table(df)
print(f"Données transformées: {len(melted_df)} lignes")
print('---'*50)
display(melted_df.head())

Données transformées: 11934 lignes
------------------------------------------------------------------------------------------------------------------------------------------------------


Unnamed: 0,Groupe,Code_H,Ligne_Analytique,Année,Contexte,Mois,Nature,Valeur
0,Chiffre d'affaire,1.0,RECETTES,2022,Réel,Janvier,Mensuelle,166643
1,Chiffre d'affaire,1.0,RECETTES,2022,Réel,Février,Mensuelle,141995
2,Chiffre d'affaire,1.0,RECETTES,2022,Réel,Mars,Mensuelle,139423
3,Chiffre d'affaire,1.0,RECETTES,2022,Réel,Avril,Mensuelle,157690
4,Chiffre d'affaire,1.0,RECETTES,2022,Réel,Mai,Mensuelle,133213


In [22]:
def create_documents_from_melted(melted_df):
    """Créer des documents LangChain à partir du format long"""
    documents = []
    
    for _, row in melted_df.iterrows():
        # Créer le contenu textuel
        doc_content = f"""
        Catégorie: {row['Groupe']}
        Code Hierarchique: {row['Code_H']}
        Ligne Analytique: {row['Ligne_Analytique']}
        Période: {row['Contexte']} - {row['Année']}
        Mois: {row['Mois']}
        Nature de l'ecriture: {row['Nature']}
        Valeur: {row['Valeur']}
        """
        
        # Créer le document
        doc = {
            "page_content": doc_content.strip(),
            "metadata": {
                'Groupe': row['Groupe'],
                'code_h': row['Code_H'],
                'ligne_analytique': row['Ligne_Analytique'],
                'annee': row['Année'],
                'contexte': row['Contexte'],
                'mois': str(row['Mois']),
                'Nature': row['Nature'],
                'valeur': str(row['Valeur'])
            }
        }
        documents.append(doc)
    return documents

# Créer les documents
documents = create_documents_from_melted(melted_df)

# Retriever

In [8]:
url = "http://192.168.1.23:1234"
model = "llama3:8b"

In [10]:
import requests

class LocalLLM:
    def __init__(self, api_url, model_name):
        self.api_url = api_url
        self.model_name = model_name

    def generate(self, prompt, temperature=0.3):
        data = {
            "model": self.model_name,
            "messages": [
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            "temperature": temperature,
            "top_p": 0.9,
            "top_k": 40,
            "repeat_penalty": 1.1,
            "stream": False
        }
        response = requests.post(
            url=f"{self.api_url}/v1/chat/completions",
            json=data
        )
        if response.status_code == 200:
            return response.json()['choices'][0]['message']['content']
        else:
            raise RuntimeError(f"Erreur LLM: {response.status_code}, {response.text}")


In [None]:
from chromadb import Client
from chromadb.utils import embedding_functions

# Fonction d'embedding locale générique compatible chromadb
def get_local_embeddings():
    return embedding_functions.DefaultEmbeddingFunction()

# Création du vectorstore avec embeddings locaux (corrigé pour cohérence d'import et reproductibilité)
embeddings = get_local_embeddings()
client = Client()
collection = client.create_collection("finance_docs")
for idx, doc in enumerate(documents):
    collection.add(
        documents=[doc['page_content']],
        metadatas=[doc['metadata']],
        ids=[str(idx)]
    )
vectorstore = collection

In [31]:
from chromadb import Collection
from typing import List, Dict, Any

class SimpleRetriever:
    def __init__(self, vectorstore: Collection):
        self.vectorstore = vectorstore

    def retrieve(self, query: str, k: int = 10) -> List[Dict[str, Any]]:
        """
        Cherche simplement les k documents les plus pertinents pour la requête, 
        sans utiliser de logique sur la hiérarchie métier.
        """
        results = self.vectorstore.query(
            query_texts=[query],
            n_results=k,
        )
        return results

In [32]:
# Adapte SimpleRetriever pour utiliser la collection Chroma directement si besoin
retriever = SimpleRetriever(vectorstore)

In [None]:
PROMPT = """
Tu es analyste financier senior spécialisé en exploitation de résidences étudiantes.

RÈGLES ABSOLUES
- Analyse strictement limitée aux données fournies par l'utilisateur.
- Aucune hypothèse, extrapolation ou connaissance externe.
- Priorité des valeurs : Réel > Prévision > Budget.
- Si une donnée est absente, écrire exactement : « Non disponible ».

CONTRAINTES MÉTIER
- Janvier-août : Prévision = Réel.
- Septembre-décembre : Prévision = projection du Réel.

STYLE ET SORTIE
- Langue : français professionnel.
- Phrases courtes et factuelles.
- Toujours citer l'année et l'unité (€ ou %).
- TEXTE SIMPLE uniquement. Aucun JSON, aucun code, aucune balise.
- Respect strict du format demandé. Aucune section supplémentaire.

CONTEXTE DES DONNÉES:
{context}

QUESTION: {question}

"""

def hierarchical_rag_query(query, llm=None):
    """Requête RAG avec gestion hiérarchique en utilisant SimpleRetriever et LocalLLM"""
    docs = retriever.retrieve(query, k=10)
    context = "\n\n---\n\n".join([doc.page_content if hasattr(doc, "page_content") else (doc['page_content'] if isinstance(doc, dict) else str(doc)) for doc in docs])
    input_prompt = PROMPT.format(context=context, question=query)
    if llm is None:
        llm = LocalLLM(api_url=url, model_name=model)
    response = llm.generate(input_prompt, temperature=0.3)
    return {
        "response": response,
        "sources": docs
    }

In [42]:
res = hierarchical_rag_query("Quelles sont les recettes réelles de 2024 ?")

In [43]:
print(res["response"])

Je ne dispose pas des données nécessaires pour répondre à cette question. Les seules informations disponibles sont les métadonnées, qui n'indiquent pas les recettes réelles de 2024.

Si vous pouviez fournir les données pertinentes (par exemple, les revenus par mois ou les taux d'occupation), je pourrais évaluer les recettes réelles de 2024 en fonction des règles absolues et des contraintes métier.


In [39]:
res["sources"]

{'ids': [['9026', '9039', '11607', '11606', '11600', '9028', '10820', '9046']],
 'embeddings': None,
 'documents': [["Catégorie: Chiffre d'affaire\n        Code Hierarchique: 4.\n        Ligne Analytique: % DES RECETTES TOTALES\n        Période: Budget - 2025\n        Mois: Mai\n        Nature de l'ecriture: Mensuelle\n        Valeur: 85.21",
   "Catégorie: Chiffre d'affaire\n        Code Hierarchique: 4.\n        Ligne Analytique: % DES RECETTES TOTALES\n        Période: Budget - 2026\n        Mois: Mai\n        Nature de l'ecriture: Mensuelle\n        Valeur: 85.40",
   "Catégorie: Chiffre d'affaire\n        Code Hierarchique: 10.\n        Ligne Analytique: % DES RECETTES TOTALES\n        Période: Budget - 2025\n        Mois: Décembre\n        Nature de l'ecriture: Mensuelle\n        Valeur: -29.09",
   "Catégorie: Chiffre d'affaire\n        Code Hierarchique: 10.\n        Ligne Analytique: % DES RECETTES TOTALES\n        Période: Budget - 2025\n        Mois: Novembre\n        Nature

---

# New Parser User Query

In [7]:
nlp = spacy.load("fr_core_news_md")

async def parse_user_query(
    query: str, 
    synonymes_groupes: dict[str, list[str]], 
    simple_dict: list[dict]) -> dict[str, list[str]]:
    """Transforme une question utilisateur en paramètres de fonction."""
    def _strip_accents(text: str) -> str:
        """
        Supprime les accents et normalise la casse/ponctuation dans une chaîne pour faciliter les comparaisons.
        """
        if not isinstance(text, str) or not text.strip():
            return ""
        s = (
            text.replace("’", "'")
                .replace("`", "'")
                .replace("‘", "'")
                .replace("–", "-")
                .replace("—", "-")
        )
        s = unicodedata.normalize("NFD", s)
        s = ''.join(
            c for c in s
            if unicodedata.category(c) != 'Mn'
        )
        s = re.sub(r"\s+", " ", s.lower()).strip()
        return s

    async def _check_detail(question: str, threshold: int = 80) -> bool:
        question_norm = _strip_accents(question.lower())
        res = await get_mapping()
        keywords_details: list[str] = res["Détail"]
        for kw in keywords_details:
            if fuzz.partial_ratio(question_norm, _strip_accents(kw.lower())) > threshold:
                return True
        return False

    async def _get_col_info(question: str, threshold: int = 90) -> dict|None:
        lst = await execute_sp(
            "dbo.sp_simBudCol",
            {
                "user_fk": 8,
                "codeMetier": 'EXP',
                "form_fk": 167,
                "codeFormType": None,
                "type_fk": 0,
                "colYear_fk": 0
            }
        )
        df_col = pd.DataFrame(lst)
        df_col = df_col[df_col["labelType"]=="Année contexte"][["label", "RB", "Mois", "theYear"]].copy()
        
        q = _strip_accents(question.lower())
        q_tokens = set(re.findall(r"\w+", q))
        LISTE_LIGNES: list[str] = df_col["label"].to_list()
        results = {}
        for ligne in LISTE_LIGNES:
            ln = _strip_accents(ligne.lower())
            ln_tokens = set(re.findall(r"\w+", ln))
            if ln_tokens and ln_tokens.issubset({str(t).rstrip('s') for t in q_tokens} | q_tokens):
                results[ligne] = 100
                continue
            score = fuzz.token_set_ratio(q, ln)
            if score >= threshold:
                results[ligne] = max(results.get(ligne, 0), score)
        if results:
            line = df_col[df_col["label"] == sorted(results.items(), key=lambda x: -x[1])[0][0]]
            return {
                "label": line["label"].iloc[0],
                "annee": int(line["theYear"].iloc[0]),
                "mois": int(line["Mois"].iloc[0]),
                "contexte": line["RB"].iloc[0]
            }    
        else:
            None

    doc = nlp(query)
    query_lower = query.lower()
    params = {
        'groupes': [],
        'types_valeur': [],
        'annees': [],
        'nature_ecriture': [],
        'lignes': [],
        'mois': []
    }
    col_infos = await _get_col_info(query_lower)

    # GROUPE
    def _select_groupe(question: str, threshold: float = 70) -> list:
        query_norm = _strip_accents(question)

        best_matches: dict[str, float] = {}
        max_overall_score = 0.0

        for g, mots in list(synonymes_groupes.items())[:3]:
            max_group_score = 0.0
            for m in mots:
                m_norm = _strip_accents(m)
                score = fuzz.token_set_ratio(query_norm, m_norm)
                if score > max_group_score:
                    max_group_score = score
                    
            best_matches[g] = max_group_score
            
            if max_group_score > max_overall_score:
                max_overall_score = max_group_score
        res = []
        if max_overall_score >= threshold:
            score_tolerance = 5
            for g, score in best_matches.items():
                if score >= threshold and score >= (max_overall_score - score_tolerance):
                    res.append(g)
        return res
    params['groupes'] = _select_groupe(query_lower)

    # TYPE DE VALEUR
    def _define_contexte(question: str) -> list:
        types_valeur = set()
        query_lower_noacc = _strip_accents(question)
        if re.search(r"reel|realise|actuel", query_lower_noacc):
            types_valeur.add('R')
        if re.search(r"budget", query_lower_noacc):
            types_valeur.add('B')
        if re.search(r"prevision|prevu|projection|project|prev", query_lower_noacc):
            types_valeur.add('P')
        for ent in doc.ents:
            ent_text_noacc = _strip_accents(ent.text.lower())
            if ent.label_ == "MISC":
                if re.search(r"reel|realise|actuel", ent_text_noacc):
                    types_valeur.add('R')
                if re.search(r"budget", ent_text_noacc):
                    types_valeur.add('B')
                if re.search(r"prevision|prevu|projection|project|prev", ent_text_noacc):
                    types_valeur.add('P')
        return sorted(types_valeur)#! if len(types_valeur) > 0 else ['R']
    params['types_valeur'] = _define_contexte(query_lower)

    # ANNEE
    def _define_year(question: str) -> list:
        annees = set(map(int, re.findall(r"\b20\d{2}\b", question)))
        
        pattern_2digit = r"\b(?:Budget|Réel|Reel)\s*'?(\d{2})\b"
        matches_2digit = re.findall(pattern_2digit, question, re.IGNORECASE)
        for match in matches_2digit:
            yy = int(match)
            if yy < 50:
                year = 2000 + yy
            else:
                year = 1900 + yy
            annees.add(year)

        for ent in doc.ents:
            if ent.label_ == "DATE":
                if ent.text.isdigit() and len(ent.text) == 4 and ent.text.startswith("20"):
                    annees.add(int(ent.text))
                elif re.match(r"20\d{2}[-/ ]20\d{2}", ent.text):
                    annees.update(map(int, re.findall(r"20\d{2}", ent.text)))
                else:
                    annees.update(map(int, re.findall(r"20\d{2}", ent.text)))

                ent_matches = re.findall(pattern_2digit, ent.text, re.IGNORECASE)
                for em in ent_matches:
                    yy = int(em)
                    if yy < 50:
                        year = 2000 + yy
                    else:
                        year = 1900 + yy
                    annees.add(year)

        interval_patterns_annee = [
            r"(?:entre)\s+20(\d{2})\s+(?:et)\s+20(\d{2})",
            r"(?:de)\s+20(\d{2})\s+(?:à|a)\s+20(\d{2})"
        ]
        for pat in interval_patterns_annee:
            m = re.search(pat, question)
            if m:
                y1, y2 = int("20" + m.group(1)), int("20" + m.group(2))
                if y1 <= y2:
                    annees.update(range(y1, y2 + 1))
                else:
                    annees.update(range(y2, y1 + 1))

        if annees:
            return sorted(annees)
        else:
            if re.search(r"cette (année|annee)", question):
                return [datetime.now().year]
            elif re.search(r"(année|annee) dernière|an dernier|(année|annee) (passée|passee)|(année|annee) (précédente|precedente)", question):
                return [datetime.now().year - 1]
            elif re.search(r"(l'année|l'annee) prochaine|l'an prochain", question):
                return [datetime.now().year + 1]
    params['annees'] = _define_year(query_lower)

    # MOIS
    def _define_month(question: str) -> list:
        mois_map = {
            1: [r"janv(?:ier)?", r"jan"],
            2: [r"f[ée]vr(?:ier)?", r"fev"],
            3: [r"mars?", r"mar"],
            4: [r"avr(?:il)?", r"avr"],
            5: [r"mai"],
            6: [r"juin"],
            7: [r"juil(?:let)?", r"jul"],
            8: [r"ao[uû]t?", r"aou"],
            9: [r"sept(?:embre)?", r"sep"],
            10: [r"oct(?:obre)?", r"oct"],
            11: [r"nov(?:embre)?", r"nov"],
            12: [r"d[ée]c(?:embre)?", r"dec"],
        }

        query_norm = _strip_accents(question)
        mois = set()
        interval_patterns = [
            r"(?:entre)\s+([a-zéûî\.]+)\s+(?:et)\s+([a-zéûî\.]+)",
            r"(?:de)\s+([a-zéûî\.]+)\s+(?:à|a)\s+([a-zéûî\.]+)",
        ]
        interval_match = None
        for pat in interval_patterns:
            m = re.search(pat, query_norm)
            if m:
                interval_match = m
                break

        if interval_match:
            mois1_txt, mois2_txt = interval_match.groups()
            mois1_txt = mois1_txt.replace('.', '')
            mois2_txt = mois2_txt.replace('.', '')
            mois1_num, mois2_num = None, None
            for num, patterns in mois_map.items():
                for pat in patterns:
                    if re.fullmatch(pat, mois1_txt):
                        mois1_num = num
                    if re.fullmatch(pat, mois2_txt):
                        mois2_num = num
            if mois1_num and mois2_num:
                if mois1_num <= mois2_num:
                    mois = set(range(mois1_num, mois2_num + 1))
                else:
                    mois = set(list(range(mois1_num, 13 + 1)) + list(range(1, mois2_num + 1)))
        else:
            for num, patterns in mois_map.items():
                for pat in patterns:
                    if re.search(rf"\b{pat}\b", query_norm):
                        mois.add(num)

            trimestre_regex = [
                (r"\b(1(er)?|premier|i+)[s\-]*(trimestre|trim)\b", [1, 2, 3]),
                (r"\b(2(e|ème|eme)?|deuxi[eè]me|ii+)[s\-]*(trimestre|trim)\b", [4, 5, 6]),
                (r"\b(3(e|ème|eme)?|troisi[eè]me|iii+)[s\-]*(trimestre|trim)\b", [7, 8, 9]),
                (r"\b(4(e|ème|eme)?|quatri[eè]me|iv+)[s\-]*(trimestre|trim)\b", [10, 11, 12]),
            ]
            for pat, mois_list in trimestre_regex:
                if re.search(pat, query_norm):
                    mois.update(mois_list)
                    break

            semestre_regex = [
                (r"\b(1(er)?|premier|i+)[s\-]*(semestre|sem)\b", [1, 2, 3, 4, 5, 6]),
                (r"\b(2(e|ème|eme)?|deuxi[eè]me|ii+)[s\-]*(semestre|sem)\b", [7, 8, 9, 10, 11, 12]),
            ]
            for pat, mois_list in semestre_regex:
                if re.search(pat, query_norm):
                    mois.update(mois_list)
                    break

            if re.search(r"\btous les mois\b", query_norm):
                mois.update(range(1, 13))
            elif re.search(r"\bmois courant\b", query_norm):
                mois.update([datetime.now().month])
            elif re.search(r"\bmois dernier\b", query_norm):
                mois.update([datetime.now().month - 1 if datetime.now().month > 1 else 12])
            elif re.search(r"\bmois prochain\b", query_norm):
                mois.update([datetime.now().month + 1 if datetime.now().month < 12 else 1])
        return sorted(mois)
    params['mois'] = _define_month(query_lower)

    # NATURE ECRITURE
    def _select_nature_ecriture(question: str) -> list:
        nature_ecritures = set()
        if re.search(r"\b(mensuel(le)?|mois|trimestre|semestre|janvier|février|fevrier|mars|avril|mai|juin|juillet|août|aout|septembre|octobre|novembre|décembre|decembre)\b", question):
            nature_ecritures.add('Mensuelle')
        if params['mois']:
            nature_ecritures.add('Mensuelle')
        if re.search(r"\b(annuel(le)?|total|cette année|cette annee|année|annee)\b", question):
            nature_ecritures.add('Annuelle')
        if not nature_ecritures:
            if params.get('mois') and 0 < len(params['mois']) < 12:
                nature_ecritures.add('Mensuelle')
            if params.get('mois') and len(params['mois']) == 12:
                nature_ecritures.add('Annuelle')
        return [v for v in ['Mensuelle', 'Annuelle'] if v in nature_ecritures]
    params['nature_ecriture'] = _select_nature_ecriture(query_lower)

    # LIGNES
    async def _match_lignes(question: str, threshold: int = 75, return_scores: bool = False):
        result_list = extract_all_descendants_for_list(simple_dict)
        LISTE_LIGNES: list[str] = [label for sublist in result_list for label in sublist]
        q = _strip_accents(question.lower())
        q_tokens = set(re.findall(r"\w+", q))
        results = {}
        # Plus grande tolérance au singulier/pluriel : on compare formes singulier et pluriel, pour chaque token de la ligne et de la question
        def _sing_plur_forms(token):
            if token.endswith('s'):
                return {token, token[:-1]}
            else:
                return {token, token + 's'}
        
        for ligne in LISTE_LIGNES:
            ln = _strip_accents(ligne.lower())
            ln_tokens_raw = set(re.findall(r"\w+", ln))
            q_tokens_raw = q_tokens

            # Génère toutes formes singulier/pluriel pour ln_tokens et q_tokens
            ln_tokens_all = set()
            for t in ln_tokens_raw:
                ln_tokens_all.update(_sing_plur_forms(t))
            q_tokens_all = set()
            for t in q_tokens_raw:
                q_tokens_all.update(_sing_plur_forms(t))
            
            # Test : Tous les tokens 'ligne' (sing/plur) présents dans q (sing/plur)
            if ln_tokens_all and ln_tokens_all.issubset(q_tokens_all):
                results[ligne] = 100
                continue

            # Fallback fuzzy
            score = fuzz.token_set_ratio(q, ln)
            if score >= threshold:
                results[ligne] = max(results.get(ligne, 0), score)

        ordered = sorted(results.items(), key=lambda x: -x[1])
        detail = await _check_detail(question)
        if detail and ordered:
            enfants = []
            for l in ordered:
                enfants += get_children_by_label(simple_dict, l[0])
            return set(enfants + [lbl for lbl, _ in ordered])
        return ordered if return_scores else [lbl for lbl, _ in ordered]
    params['lignes'] = await _match_lignes(query_lower)

    # Correct for NoneType issues and robustify list usage
    if col_infos:
        annee = col_infos.get("annee")
        # Ensure params['annees'] is always a list
        if not isinstance(params.get('annees'), list) or params['annees'] is None:
            params['annees'] = []
        if annee is not None and annee not in params['annees']:
            params['annees'].append(annee)
        mois = col_infos.get("mois", 0)
        if not isinstance(params.get('nature_ecriture'), list) or params['nature_ecriture'] is None:
            params['nature_ecriture'] = []
        if mois == 0:
            if "Annuelle" not in params['nature_ecriture']:
                params['nature_ecriture'].append("Annuelle")
        else:
            if "Mensuelle" not in params['nature_ecriture']:
                params['nature_ecriture'].append("Mensuelle")
            if not isinstance(params.get('mois'), list) or params['mois'] is None:
                params['mois'] = []
            if mois not in params['mois']:
                params['mois'].append(mois)

        contexte = col_infos.get("contexte")
        if not isinstance(params.get('types_valeur'), list) or params['types_valeur'] is None:
            params['types_valeur'] = []
        if contexte and contexte not in params['types_valeur']:
            params['types_valeur'].append(contexte)

    # Normalize all params lists (ensure always list, de-duplicate, and sort if non-empty)
    for k in ['groupes', 'types_valeur', 'annees', 'nature_ecriture', 'lignes', 'mois']:
        param_val = params.get(k)
        if not isinstance(param_val, list) or param_val is None:
            params[k] = []
        else:
            params[k] = sorted(set(param_val)) if param_val else []

    if not params['nature_ecriture']:
        params['nature_ecriture'] = ['Annuelle']

    return params


In [9]:
synonyme_groupe = await get_mapping()

In [12]:
sa_fk = 224

data = await execute_sp(
    "ia.sp_simBudFormSA_one", 
    {
        "user_fk": config.USER_FK, 
        "sa_fk": sa_fk, 
        "form_fk": 167
    }
)
if data[0].get('EcrituresDetails') == None:
    df = pd.DataFrame(None)
else:
    json_string = data[0].get('EcrituresDetails')

    data_records = json.loads(json_string)
    context_data = pd.DataFrame(data_records)
    df = preprocessing_data(context_data, simple_dict)

In [None]:
def data_to_pivot(df: pd.DataFrame) -> pd.DataFrame:
    if df["Section  analytique"].unique().tolist() in [[''], [], None]:
        df["Section  analytique"] = df["Liste de sélection"]
        
    # Renommage et nettoyage
    df = df.rename(
        columns={
            'Code Hiérarchique': 'Code_H', 
            'Montant': 'Montant',
            'Lignes': 'Ligne_Analytique',
            'Contexte': 'Contexte',
            'Année': 'Annee',
            'Groupe': 'Groupe',
            'Section  analytique': 'Residence'
        }
    )

    df['Annee'] = df['Annee'].astype(int)
    df['Mois'] = df['Mois'].astype(int)

    df_agg = df.groupby(
        [
            'Residence', 'Colonnes', 'Annee', 'Mois', "Nature de l'écriture", 'Contexte', 'Code_H', 'Ligne_Analytique', 'Groupe'
        ]
    )['Montant'].sum().reset_index()

    contexte_order = ['R', 'P', 'B']

    def _mois_sort_key(mois):
        try:
            return int(mois)
        except:
            return 99

    df_pivot = df_agg.pivot_table(
        index=['Groupe', 'Code_H', 'Ligne_Analytique'],
        columns=['Annee', 'Contexte', 'Mois', "Nature de l'écriture"],
        values='Montant',
        fill_value="",
        aggfunc='sum'
    )
    # Explicitly infer objects to avoid FutureWarning from fill_value on object dtype
    df_pivot = df_pivot.infer_objects(copy=False)

    if df_pivot.columns.nlevels == 4:
        nature_unique = df_pivot.columns.get_level_values(3).unique().tolist()
        if "Annuelle" in nature_unique:
            nature_unique = [n for n in nature_unique if n != "Annuelle"]
            nature_order_desc = sorted(nature_unique, reverse=True) + ["Annuelle"]
        else:
            nature_order_desc = sorted(nature_unique, reverse=True)
        nature_order_dict = {name: i for i, name in enumerate(nature_order_desc)}
        
        def _col_sort_key(x):
            return (
                int(x[0]) if str(x[0]).isdigit() else 0,
                contexte_order.index(x[1]) if x[1] in contexte_order else 99,
                _mois_sort_key(x[2]),
                nature_order_dict.get(x[3], 999)
            )
        df_pivot = df_pivot[sorted(df_pivot.columns, key=_col_sort_key)]

    df_pivot = df_pivot.reset_index()

    def _code_hierarchical_sort_key(code):
        parts = [int(part) if part.isdigit() else part for part in re.split(r'\D+', str(code).strip('.')) if part]
        return parts

    df_pivot_sorted = df_pivot.copy()
    df_pivot_sorted['__sort_key'] = df_pivot_sorted['Code_H'].apply(_code_hierarchical_sort_key)
    df_pivot_sorted = df_pivot_sorted.sort_values('__sort_key').drop(columns='__sort_key', level=0).reset_index(drop=True)

    def _format_value(val):
        try:
            if isinstance(val, (float, np.floating, int, np.integer)):
                if float(val) == int(val):
                    return int(val)
                else:
                    return "{:.2f}".format(float(val))

            if isinstance(val, str):
                num = float(val.replace(",", ".").strip())
                if num == int(num):
                    return int(num)
                else:
                    return "{:.2f}".format(num)
            return val
        except:
            return val

    for col in df_pivot_sorted.columns[3:]:
        df_pivot_sorted[col] = df_pivot_sorted[col].apply(_format_value)

    """ df_pivot_sorted = df_pivot_sorted[
        df_pivot_sorted["Code_H"].apply(
            lambda x: str(x).strip('.').split('.')[0] in ['1', '2', '3', '4']
        )
    ].reset_index(drop=True) """

    return df_pivot_sorted

In [31]:
df_pivot = data_to_pivot(df)

---

In [77]:
query = "JE voudrais une analyse de l'Ebitda"
parse = await parse_user_query(query, synonyme_groupe, simple_dict)
parse

{'groupes': ['Marge'],
 'types_valeur': [],
 'annees': [],
 'nature_ecriture': ['Annuelle'],
 'lignes': ['EBITDA'],
 'mois': []}

In [78]:
def get_ret_dataframe(df: pd.DataFrame, param: dict[str, list]) -> pd.DataFrame:
    if not param["groupes"] and not param["lignes"]:
        return None
    if param["groupes"]:
        df_group = df[df[df.columns.levels[0][-2]].isin(param["groupes"])]
    else:
        df_group = df

    if param['types_valeur']:
        mask_typ = df_group.columns.get_level_values(1).isin(param['types_valeur'])
        cols = df_group.columns[mask_typ].tolist()
        meta_cols = [c for c in df_group.columns if c[0] in ("Groupe", "Code_H", "Ligne_Analytique")]
        selected_cols = meta_cols + cols
        df_val = df_group.loc[:, selected_cols]
    else:
        df_val = df_group

    if param['annees']:
        mask_yrs = df_val.columns.get_level_values(0).isin(param['annees'])
        cols = df_val.columns[mask_yrs].tolist()
        meta_cols = [c for c in df_val.columns if c[0] in ("Groupe", "Code_H", "Ligne_Analytique")]
        selected_cols = meta_cols + cols
        df_annee = df_val.loc[:, selected_cols]
    else:
        df_annee = df_val

    if param['nature_ecriture']:
        mask_nat = df_annee.columns.get_level_values(3).isin(param['nature_ecriture'])
        cols = df_annee.columns[mask_nat].tolist()
        meta_cols = [c for c in df_annee.columns if c[0] in ("Groupe", "Code_H", "Ligne_Analytique")]
        selected_cols = meta_cols + cols
        df_nature = df_annee.loc[:, selected_cols]
    else:
        df_nature = df_annee

    if param["lignes"]:
        df_lignes = df_nature[df_nature[df_nature.columns.levels[0][-4]].isin(param["lignes"])]
    else:
        df_lignes = df_nature

    if param["mois"]:
        if "Annuelle" in param['nature_ecriture']:
            if 12 not in param['mois']:
                param['mois'].append(12)
        mask_mois = df_lignes.columns.get_level_values(2).isin(param['mois'])
        cols = df_lignes.columns[mask_mois].tolist()
        meta_cols = [c for c in df_lignes.columns if c[0] in ("Groupe", "Code_H", "Ligne_Analytique")]
        selected_cols = meta_cols + cols
        df_mois = df_lignes.loc[:, selected_cols]
    else:
        df_mois = df_lignes

    return df_mois

In [79]:
pdf = get_ret_dataframe(df_pivot, parse)
pdf

Annee,Groupe,Code_H,Ligne_Analytique,2022,2023,2024,2025,2025,2026
Contexte,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,R,R,R,P,B,B
Mois,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,12,12,12,12,12,12
Nature de l'écriture,Unnamed: 1_level_3,Unnamed: 2_level_3,Unnamed: 3_level_3,Annuelle,Annuelle,Annuelle,Annuelle,Annuelle,Annuelle
150,Marge,12.0,EBITDA,54004,275367,252303,202523,249344,332103


In [80]:
def transform_for_llm(df_pivot: pd.DataFrame|None) -> str:
    """
    Met à jour:
        - Génère un texte métrique optimisé pour l'entrée LLM.
        - Détecte la présence de colonnes contextuelles (ex : 'Groupe', 'Code_H', 'Ligne_Analytique')
          et construit un label 'Indicateur' consolidé à partir de ces informations.
        - Requiert que les fonctions utilitaires _normalize_col et _format_euro_fr existent déjà.
    Returns :
        - metric_text : str, texte à fournir au LLM
    """
    if df_pivot is None:
        return ""

    def _format_euro_fr(x: float, line: str) -> str:
        """Format number in French style with 2 decimals and a non-breaking space thousands separator."""
        if pd.isna(x):
                return "N/A"
        elif str(line).startswith("%"):
            s = f"{x:,.2f}"
            s = s.replace(",", " ")
            return f"{s} %"
        else:
            s = f"{x:,.0f}"
            s = s.replace(",", " ")
            return f"{s} €"

    def _normalize_col(col):
        if isinstance(col, tuple):
            # year is always at position 0
            if len(col) >= 1:
                year = str(col[0])
            else:
                year = "unknown"
            # find Réel/Prévision/Budget if present in tuple (also accent-insensitive, lowercase!)
            typ = next(
                (str(x) for x in col if isinstance(x, str) and str(x).lower() in ("réel", "budget", "prévision", "prevision")),
                None
            )
            if typ is None:
                # fallback: any string in col
                typ = next((str(x) for x in col if isinstance(x, str)), "Réel")

            # try to recover nature and mois heuristically (commonly last two positions in tuple)
            col_strs = [str(x) for x in col if isinstance(x, str)]

            # We will look at positions from the end (to be robust to existing pivot structure)
            # assume 'nature' (ex: Mensuelle, Annuelle) is very likely last, 'mois' just before, if present
            if len(col) >= 3:
                nature = str(col[-1]) if col[-1] is not None else None
                mois = str(col[-2]) if col[-2] is not None else None
                # If col[-1] (nature) is not a valid value, set to None
                if not nature or nature.lower() in ("", "none", "nan"):
                    nature = None
                if not mois or mois.lower() in ("", "none", "nan"):
                    mois = None
            else:
                nature = None
                mois = None

            return {"year": year, "type": typ, "nature": nature, "mois": mois}
        else:
            # fallback: not a tuple
            year = str(col)
            return {"year": year, "type": "Réel", "nature": None, "mois": None}

        new_cols = list(df_pivot.columns)

    new_cols = list(df_pivot.columns)
    for i, col in enumerate(df_pivot.columns):
        if i >= 3:
            col_as_list = list(col)
            if col_as_list[1] == 'R':
                col_as_list[1] = 'Réel'
            elif col_as_list[1] == 'P':
                col_as_list[1] = 'Prévision'
            elif col_as_list[1] == 'B':
                col_as_list[1] = 'Budget'
            new_cols[i] = tuple(col_as_list)
    df_pivot.columns = pd.MultiIndex.from_tuples(new_cols)

    df = df_pivot.copy()

    # If index contains labels, reset to columns
    if df.index.name is None or df.index.name == "":
        df = df.reset_index()

    # If there are contextual columns, build a consolidated 'Indicateur' column
    context_cols = [c for c in ['Code_H', 'Ligne_Analytique', 'Indicateur'] if c in df.columns]

    if len(context_cols) > 1:
        # create a single descriptive indicator by joining available context columns (in order)
        df['Indicateur_consolide'] = df[context_cols].astype(str).apply(
            lambda row: " | ".join([str(x).strip() for x in row.values if str(x).strip() not in ['nan', 'None']]),
            axis=1
        )
        # prefer the consolidated name
        indicator_col = 'Indicateur_consolide'
    else:
        # detect a single indicator column if present, otherwise use first column
        indicator_col = None
        for possible in ['Ligne_Analytique', 'Indicateur', 'index', 0]:
            if possible in df.columns:
                indicator_col = possible
                break
        if indicator_col is None:
            indicator_col = df.columns[0]
        # if chosen indicator_col isn't already a string label, coerce to str
        if indicator_col != 'Indicateur':
            df[indicator_col] = df[indicator_col].astype(str)

    # Ensure the DataFrame has a column named exactly 'Indicateur' used downstream
    if indicator_col != 'Indicateur':
        df = df.rename(columns={indicator_col: 'Indicateur'})
    else:
        # if it already is 'Indicateur', ensure string type
        df['Indicateur'] = df['Indicateur'].astype(str)

    value_cols = [c for c in df.columns if c != 'Indicateur']

    rows = []
    for _, row in df.iterrows():
        # Avoid pandas row pretty-print for the indicator label
        if isinstance(row['Indicateur'], pd.Series):
            indicator_label = " | ".join(str(x).strip() for x in row['Indicateur'].values if str(x).strip() not in ['nan', 'None'])
        else:
            indicator_label = str(row['Indicateur']).strip()
        for col in value_cols:
            meta = _normalize_col(col)
            year = meta.get('year', 'unknown')
            typ = meta.get('type', 'Réel')
            nature = meta.get('nature', 'unknown')
            mois = meta.get('mois', 'unknown')
            try:
                val = row[col]
            except Exception:
                val = row.get(col, None)

            # Try to keep numeric
            numeric = None
            if pd.api.types.is_numeric_dtype(type(val)):
                try:
                    numeric = float(val) if not pd.isna(val) else None
                except Exception:
                    numeric = None
            else:
                try:
                    numeric = float(str(val).replace("€", "").replace("%", "").replace(" ", "").replace(",", "."))
                except Exception:
                    numeric = None

            lbl = indicator_label.split(" | ")[-1]
            txt = _format_euro_fr(numeric, lbl) if numeric is not None else "N/A"

            rows.append({
                'Indicateur': indicator_label,
                'Année': year,
                'Type': typ,
                'Nature': nature,
                'Mois': mois,
                'Valeur_num': numeric,
                'Valeur_txt': txt
            })

    df_long = pd.DataFrame(rows)

    def _context_rank(typ):
        t = str(typ).lower()
        if "réel" in t:
            return 0
        if "prevision" in t or "prévision" in t:
            return 1
        if "budget" in t:
            return 2
        return 99

    def _block_for_indicator(ind):
        label = str(ind).strip()
        lines = [f"[{label}]"]
        sub = df_long[df_long['Indicateur'].astype(str).values == str(ind)].copy()

        # Filter out technical garbage in 'Année'
        sub = sub[~sub['Année'].astype(str).str.lower().isin(['groupe', 'indicateur', 'index'])]

        # Attempt numeric year conversion for sorting; fallback keeps original order
        sub_sorted = sub.copy()
        try:
            sub_sorted["Année_num"] = pd.to_numeric(sub_sorted["Année"], errors='coerce')
        except Exception:
            sub_sorted["Année_num"] = sub_sorted["Année"]

        # First sort by Type context order, then by year
        if 'Type' in sub_sorted.columns:
            sub_sorted = sub_sorted.sort_values(
                by=["Type", "Année_num"],
                key=lambda col: col.map(_context_rank) if col.name == "Type" else col,
                ascending=[True, True]
            )
        else:
            sub_sorted = sub_sorted.sort_values(by=["Année_num"])

        # Ensure ordering Réel -> Prévision -> Budget within each year
        entries = []
        for ctx in ["Réel", "Prévision", "Budget"]:
            sub_ctx = sub_sorted[sub_sorted["Type"].astype(str).str.lower().str.contains(ctx.lower(), na=False)]
            entries.append(sub_ctx)
        if entries:
            merged = pd.concat(entries)
            merged = merged.drop_duplicates(subset=["Année", "Type", "Nature", "Mois"])
        else:
            merged = sub_sorted
        
        mois_str = [
            'Janvier',
            'Février',
            'Mars',
            'Avril',
            'Mai',
            'Juin',
            'Juillet',
            'Août',
            'Septembre',
            'Octobre',
            'Novembre',
            'Décembre'
        ]
        # Produce lines
        for _, rr in merged.iterrows():
            year = rr['Année']
            typ = rr['Type']
            nature = rr['Nature']
            mois = rr['Mois']
            txt = rr['Valeur_txt']
            # Skip rows with completely empty or N/A values for non-year labels
            if (pd.isna(year) or str(year).strip().lower() in ['nan', 'none', '']) and txt in ["0", "0 €", "N/A"]:
                continue
            if nature == "Annuelle":
                lines.append(f"- {typ} {nature} {year}: {txt}")
            else:
                lines.append(f"- {typ} {mois_str[int(mois)-1]} {year}: {txt}")
        return "\n".join(lines)

    indicators = df_long['Indicateur'].drop_duplicates().tolist()
    blocks = [_block_for_indicator(ind) for ind in indicators]

    metric_text = "\n\n".join(blocks)

    return metric_text

In [81]:
res = transform_for_llm(pdf)
print(count_tokens(res))
print(res)

54
[12. | EBITDA]
- Réel Annuelle 2022: 54 004 €
- Réel Annuelle 2023: 275 367 €
- Réel Annuelle 2024: 252 303 €
- Prévision Annuelle 2025: 202 523 €
- Budget Annuelle 2025: 249 344 €
- Budget Annuelle 2026: 332 103 €
