In [None]:
import os
import json
import pandas as pd
import re
import sqlglot
from sqlglot import exp
from sqlglot.optimizer.qualify import qualify
from sqlglot.optimizer.scope import find_all_in_scope
from sqlglot.optimizer.scope import build_scope
import pandas as pd
from collections import defaultdict
from sqlglot.errors import OptimizeError
import os
from data_lineage.utils import measure_execution_time
from functools import wraps
import hashlib

In [None]:
#onction pour lire un fichier JSON, afficher son contenu et le stocker dans un dictionnaire
def remove_comments(sql):
    """
    Removes line and block comments from an SQL query.

    Args:
        sql (str): The SQL content to clean.

    Returns:
        str: The SQL content without comments.
    """
    # Supprime les commentaires de ligne (commençant par -- ou ---)
    sql = re.sub(r'--+.*?(\r\n|\r|\n)', '\n', sql)
    # Supprime les commentaires en bloc /* ... */
    sql = re.sub(r'/\*.*?\*/', '', sql, flags=re.DOTALL)
    return sql

def remove_hql_trim(hql_content):
    """
    Removes empty trim() functions from an HQL query.

    Args:
        hql_content (str): The HQL content to clean.

    Returns:
        str: The HQL content without empty trim() functions.
    """
    # Supprime les trim() vides
    hql_content = re.sub(r'trim\s*\(\s*\)', "''", hql_content, flags=re.IGNORECASE)
    return hql_content




def create_lineage_dic(hql_file_path: str, results: dict) -> dict:
    """
        Lit une requête HQL depuis un fichier, parse et qualifie la requête, puis construit un dictionnaire
        de lignage des données.

        Cette fonction analyse une requête HQL pour extraire les informations de lignage des données, telles que
        les colonnes détectées, les fonctions d'agrégation, les opérations arithmétiques, la formule SQL et les
        tables utilisées. Le résultat est structuré sous la forme d'un dictionnaire.

        Args:
            hql_file_path (str): Le chemin du fichier HQL à analyser.
            results (dict): Dictionnaire contenant des résultats intermédiaires pour l'analyse, issu de la fonction 'process_hql_files' 

        Returns:
            dict: Dictionnaire des lignages où chaque clé est un fichier HQL et chaque valeur est un dictionnaire
                contenant les informations de lignage des données. La structure est la suivante :
                {
                    "<chemin_fichier>.hql": {
                        "ALIAS_OR_NAME": {
                            "Alias/Projection": ...,
                            "Colonnes détectées": [...],
                            "Fonctions d'agg": [...],
                            "Opérations arithmétiques": [...],
                            "Formule SQL": ...,
                            "Table(s) utilisées": ...
                        },
                        ...
                    },
                    ...
                }
        """
    lineage_dict = {}
    temp_projection=0

    try:
        with open(hql_file_path, "r", encoding="utf-8") as f:
            hql_content = f.read()
    except FileNotFoundError:
        print(f"Fichier introuvable: {hql_file_path}")
        return {}
    
    hql_content=remove_comments(hql_content)
    hql_content=remove_hql_trim(hql_content)
    expression = sqlglot.parse_one(hql_content, read="hive")
    if not expression:
        print(f"Impossible de parser le HQL dans: {hql_file_path}")
        return {}
    
    try:
        expression_qualified= qualify(expression)
    except sqlglot.errors.OptimizeError as e:
        print(f"Warning: {e}")  # Affiche un avertissement sans interrompre l'exécution
        expression_qualified = expression  
    all_selects = list(expression_qualified.find_all(exp.Select))
    print("file_path",hql_file_path)
    lineage_dict[hql_file_path] = {}
    for select_expr in all_selects:
        tables_in_select,_ = measure_execution_time(find_tables_in_select,select_expr)
        # print("tablein select",tables_in_select)
        tables_str = ", ".join(tables_in_select) if tables_in_select else "Aucune table"
        for proj in select_expr.selects:
            if isinstance(proj, exp.Alias):
                alias_name = proj.alias or "NO_ALIAS"
                expr_to_analyze = proj.this
                # print("expr to analyze",expr_to_analyze)
                # print(repr(proj))

            else:
                alias_name = proj.alias_or_name or "NO_ALIAS"
                expr_to_analyze = proj
            
            #print("expr to analyze",expr_to_analyze)
            info,t = measure_execution_time(analyze_projection,expr_to_analyze, hql_content, results)
            temp_projection+=t

            lineage_dict[hql_file_path][alias_name] = {
                "Alias/Projection": alias_name,
                "Colonnes détectées": info["columns_used"],
                # "Schema": schemas_par_col,
                "agg": info["aggregations"],
                "Opérations arithmétiques": info["arithmetic_ops"],
                "Formule SQL": info["formula_sql"],
                "Table(s) utilisées": tables_str,
            }
    print("analyse des champs",temp_projection)
    #print("lineage_dict",lineage_dict)
    return lineage_dict




def build_lineage(dependencies, results):
    """
    Construit le lignage des tables Hive à partir des fichiers HQL.

    Args:
        dependencies (dict): Dictionnaire des dépendances où chaque clé est une table Hive
                             et chaque valeur est une liste de fichiers HQL associés.
        results (dict): Dictionnaire contenant des résultats intermédiaires pouvant être utilisés
                        par la fonction `create_lineage_dic`.

    Returns:
        dict: Dictionnaire des lignages où chaque clé est un fichier HQL et chaque valeur est
              le résultat de l'analyse par `create_lineage_dic`.
    """
    lineage = {}
    for hive_table, hql_files in dependencies.items():
        if hive_table!=None:

            if isinstance(hql_files, str):  # Gérer le cas où un seul fichier est donné sous forme de chaîne
                hql_files = [hql_files] 
            if hql_files!=None:       
                for hql_file in hql_files:
                    if not hql_file.startswith("/"):
                        if os.path.exists(hql_file):  # Vérifie que le fichier existe
                            current_lineage_dict=create_lineage_dic(hql_file, results)
                            #print("current_lineage_dict",current_lineage_dict)
                            lineage[hql_file] = current_lineage_dict
                            #print("lineage",lineage)
                        else:
                            print(f"Fichier HQL non trouvé : {hql_file}")
            else:
                pass
    return lineage


def read_json(input_file):
    try:
        # Lecture et chargement du fichier JSON
        with open(input_file, "r", encoding="utf-8") as infile:
            data = json.load(infile)

        # Affichage du contenu du fichier JSON
        # Retourner le contenu du fichier JSON sous forme de dictionnaire
        return data

    except json.JSONDecodeError as e:
        print(f"Erreur lors du chargement du fichier JSON : {e}")
    except FileNotFoundError:
        print(f"Le fichier '{input_file}' est introuvable.")
    except Exception as e:
        print(f"Une erreur est survenue : {e}")
        return None
    


def track_fields_across_lineage(rdms_table_name,data, results,dic_fields_from_dwh):
    """
    Suit les opérations menés sur les colonnes de la première à la dernière table pour chaque ligne de dépendances  pour une table rdms

    Args:
        data (dict): Dictionnaire contenant plusieurs tables RDMS et leurs informations :
                     - "liste_champs" : Liste des champs à suivre
                     - "dependencies" : Dictionnaire des tables Hive et leurs fichiers HQL associés
        results (dict): Dictionnaire contenant des résultats intermédiaires pour l'analyse.

    Returns:
        dict: Dictionnaire contenant le lignage des champs sous la forme :
              {
                  "champ1": [
                      { "chemin_du_fichier.hql": "path/alors/exec.hql",
                        "Opérations arithmétiques": ["+", "-", ...],
                        "Formule SQL": "SELECT ... FROM ... WHERE ...",
                        "Table(s) utilisées": ["table1", "table2"]},
                      { ... }
                  ],
                  "champ2": [ ... ]
              }
    """
    overall_field_tracking = {}
    # parcours du dictionnaire contenant le sinfos des tables rdms


    for i, info in data.items():
        fields_first_hive_table = info.get("liste_champs", [])
        rdms=info.get('rdms_table')
        tmp_dwh=info.get('staging_table_dwh',None)
        first_hive_table=info.get('first_hive table')
        #print('rmds_table',rdms)
        fields_rdms_tmp=None
        
        #print("rdms_table_name",rdms_table_name)
        if rdms.lower()==rdms_table_name.lower():
            # on extrait les dependences du datalake des tables rdms
            dependencies = info.get("dependencies",None)
            lineage = build_lineage(dependencies, results)  # Extraction du dictionnaire de lineag epour cette table
            # pour chaque fichier hql correspondant à l'alimentation d'une table on a besoin des informations sur chacun des champs de cette table sous forme d'un dictionnaire
            #print("lineage",lineage)
            
        # Recherche des champs de la table temporaire et rdms finale dans le dictionnaire en paramètre dans le dictionnaire et on récupère ses champs 
            for i,value in dic_fields_from_dwh.items():
                    if i.lower()==tmp_dwh.lower():
                        fields_rdms_tmp=value
                        #print("rdms_temp_fields",fields_rdms_tmp)

                    if i.lower()==rdms.lower():
                        fields_rdms=value

                    if fields_rdms_tmp!=None and fields_rdms!=None:
                        break

            if fields_rdms_tmp!=None:
                    # 
                    for hql_file, tables in lineage.items():
                        for table, details in tables.items():
                            for key, info in details.items():
                                detected_column = info.get("Colonnes détectées",None)
                                if not detected_column:  # Si aucune colonne détectée
                                    detected_column = "NO DETECTED COLUMN"
                                    if not detected_column:
                                        detected_column = "INCONNUE"
                                # Si c'est une liste, on la met en minuscule
                                if isinstance(detected_column, list):
                                    detected_column = [col.lower() for col in detected_column]
                                else:
                                    detected_column = detected_column.lower()
                                for col in detected_column if isinstance(detected_column, list) else [detected_column]:
                                    if col not in overall_field_tracking:
                                        overall_field_tracking[col] = []
                                    # on a besoin de connaitre à quel champ de la table temporaire au dwh correspond le champ de la table du datalake
                                    alias=info.get("Alias/Projection", None)
                                    alias_upper=alias.upper()
                                    #print("fields_rdms_tmp",fields_rdms_tmp)
                                    previous_entry = None
                                    if alias!=None:
                                        # on regarde si l'alias est dans la liste des champs des champs
                                        #  de dernière table d'aggrégation avant l'insertion dans la table rdms     
                                        if  alias_upper in fields_rdms_tmp:
                                            try:    
                                               # on se rassure que les deux listes de champs ont la même taille 
                                              
                                                if len(fields_rdms_tmp)==len(fields_rdms):
                                                     #print("same size")
                                                     indice = fields_rdms_tmp.index(alias_upper)  # 25 n'est pas dans la liste
                                                     rdms_field=fields_rdms[indice]
                                                     #print("rdms_field",rdms_field)
                                                     #print("alias",alias)
                                                     field_entry = {
                                                        "rdms_field":rdms_field,
                                                        "path": "",
                                                        "colonne": "",
                                                        "Opérations arithmétiques": "",
                                                        "Alias": alias,
                                                        "Formule SQL": "",
                                                        "Table(s) utilisées": ""
                                                    }
                                                     overall_field_tracking[col].append(field_entry)
                                            except ValueError:
                                                print("L'alias n'est pas dans la liste des champs de la table")
                                        
                                        formule=info.get("Formule SQL", "")
                                        if col in formule:
                                            if rdms_field!=None:
                                                field_entry = {
                                                    "rdms_field":rdms_field,
                                                    "path": hql_file,
                                                    "colonne": col,
                                                    "Opérations arithmétiques": info.get("Opérations arithmétiques", []),
                                                    "Alias": info.get("Alias/Projection", None),
                                                    "Formule SQL": info.get("Formule SQL", ""),
                                                    "Table(s) utilisées": info.get("Table(s) utilisées", "")
                                                }
                                            else:
                                                field_entry = {
                                                    "rdms_field":"",
                                                    "path": hql_file,
                                                    "colonne": col,
                                                    "Opérations arithmétiques": info.get("Opérations arithmétiques", []),
                                                    "Alias": info.get("Alias/Projection", None),
                                                    "Formule SQL": info.get("Formule SQL", ""),
                                                    "Table(s) utilisées": info.get("Table(s) utilisées", "")
                                                }

                                    overall_field_tracking[col].append(field_entry)
    return overall_field_tracking



In [None]:

def list_all_files(directory: str) -> list:
    """
    Retourne tous les chemins de fichiers dans un répertoire, y compris les sous-répertoires.
    """
    file_paths = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            file_paths.append(os.path.join(root, file))
    return file_paths

In [None]:
import sqlglot
import os
from sqlglot import exp
from sqlglot import parse_one
from sqlglot.optimizer.scope import find_all_in_scope
from sqlglot.optimizer.scope import build_scope
from sqlglot.lineage import lineage
import re
from sqlglot import exp
from sqlglot.optimizer.qualify import qualify
from data_lineage.utils import list_all_files
from data_lineage.fields import process_hql_files
from data_lineage.fields import extract_lineage_fields
from data_lineage.fields import export_lineage_to_excel
from data_lineage.fields import create_lineage_dic
from data_lineage.fields import print_lineage_dict
from data_lineage.utils import map_rdms_file_hql_file
from data_lineage.utils import extract_hive_table_and_queries
from data_lineage.fields import get_unique_tables_names_from_lineage_dict
from data_lineage.utils import extract_exec_queries
from data_lineage.utils import generate_dic_with_rdms_and_dependencies
from data_lineage.fields import get_hql_path_from_table_name
from data_lineage.utils import process_conf_files
from data_lineage.utils import get_dir_dependances_2
from data_lineage.fields import create_dict_tables_dependencies_and_path
from data_lineage.fields import create_dict_tables_dependencies_and_path_for_hive_tables
from data_lineage.fields import build_lineage
from data_lineage.fields import track_fields_across_lineage
from data_lineage.fields import track_fields_across_lineage_for_data_lake
import time
from data_lineage.utils import measure_execution_time
from data_lineage.fields import export_tracking_lineage_to_excel
from data_lineage.utils import display_table_dependencies_2
from data_lineage.format_json import read_json
from data_lineage.data_sources import data_sources_lineage
from data_lineage.fields import export_tracking_lineage_to_excel_2

# Démarrer le chronomètre
path=r"C:\\Users\\YBQB7360\\Downloads\\HDFS\\HDFS\\PROD\\SCRIPTS\\FT\\BDI\\FT_BDI_AMELIORE\\insert_into_spark_ft_bdi_ameliore.hql"
name_file=os.path.basename(path)
hdfs_dir = r"C:\Users\YBQB7360\Downloads\HDFS\HDFS"
paths_scripts=r'C:\Users\YBQB7360\Downloads\HDFS\HDFS\PROD\SCRIPTS'
file_scripts_paths=list_all_files(paths_scripts)
create_table_dic=process_hql_files(file_scripts_paths)
#dic_table_fields=extract_lineage_fields(hql_content)
directory_conf = r"C:\Users\YBQB7360\Downloads\HDFS\HDFS\PROD\CONF"
table_name='MON.FT_A_DATA_TRANSFER'
flow_file_path=r"C:\Users\YBQB7360\Documents\Data gouvernance\PRODv2.0\PRODv2.0.json"
#liste_table=list(dic_table_fields.keys())
#lineage_dic,_ = measure_execution_time(create_lineage_dic, path, create_table_dic)
#export_lineage_to_excel(lineage_dic, "lineage_"+name_file+".xlsx")
dict_fields_from_dwh=read_json(r"C:\Users\YBQB7360\Documents\Data gouvernance\_data_gouv\tables_mon_fields_description_dict.json")

dic_rdms_hive=extract_hive_table_and_queries(directory_conf)
dict_table_paths=map_rdms_file_hql_file(dic_rdms_hive,file_scripts_paths)
dic_files_queries_paths = process_conf_files(directory_conf, hdfs_dir)

#  dic table hive -> dependances
dic_tables_dependencies = get_dir_dependances_2(dic_files_queries_paths)
#display_table_dependencies_2(dic_tables_dependencies,"MON.SPARK_SMS_PARC")
dic_rdms_hive_dependencies=generate_dic_with_rdms_and_dependencies(dic_rdms_hive, dic_tables_dependencies)
# permet de ratacher à chaque source de données le ou les noms des hql qui l'alimente
dict_tables_dependencies_and_fields,_=measure_execution_time(create_dict_tables_dependencies_and_path,dict_table_paths,dic_rdms_hive_dependencies,create_table_dic,dic_files_queries_paths)

filter_list=["MON.FT_GLOBAL_ACTIVITY"]
#data_sources_lineage(hdfs_dir,paths_scripts,directory_conf,flow_file_path,filter_list,"dependencies_with_raw_server_filtered.xlsx")  
#dict_tables_hive,_=measure_execution_time(create_dict_tables_dependencies_and_path_for_hive_tables,dict_table_paths,dic_tables_dependencies,create_table_dic)
"""
print("dict_tables_dependencies_and_fields")

for i,value in dict_tables_dependencies_and_fields.items():
    print("rdms_table",value.get('rdms_table',None))
    print("first_hive table",value.get('first_hive table',None))
    dependencies=value.get('dependencies',None)
    print(dependencies)
    break

print("dict_tables_hive")
for i,value in dict_tables_hive.items():
    print("i",i,"value",value)
    break
"""
#lineage_dic_for_one_chain_of_dependencies,t=measure_execution_time(build_lineage,dependencies,create_table_dic)

#lineage_fields_across_dependencies,t=measure_execution_time(track_fields_across_lineage_for_data_lake,table_name,dict_tables_dependencies_and_fields,create_table_dic,dict_tables_hive)
lineage_fields_across_dependencies,t=measure_execution_time(track_fields_across_lineage,table_name,dict_tables_dependencies_and_fields,create_table_dic,dict_fields_from_dwh)

#print("lineage_fields_across_dependencies",lineage_fields_across_dependencies)
#export_tracking_lineage_to_excel(lineage_fields_across_dependencies,"lineage_"+table_name+".xlsx")
export_tracking_lineage_to_excel_2(lineage_fields_across_dependencies,"lineage_"+table_name+".xlsx")
#dict_tables_hql_from_request_lineage=get_hql_path_from_table_name(dict_table_paths,list_table_from_hql)
#print(dict_tables_hql_from_request_lineage)
#nom="MON.FT_CONTRACT_SNAPSHOT"
#for i,value in dict_table_paths.items():
    #contrat=dict_table_paths.get(nom,None)
