Pour les détails, ça se passe dans le dossier "aventures" 

L'objectif ici est d'expliquer dans les grandes lignes comment fonctionnent les différentes task qui seront intégrées dans le dag,mettre au propre (pep8) les fonctions, les regrouper dans un seul endroit etc. 

Entrée : Fichiers xml

Sortie : Un seul fichier parquet avec en plus un fichier parquet pour chaque fichier xml afin de transmettre les informations entre les task. 


![Texte alternatif](./rpz_task.drawio.png)


# Task 1 

Airflow ne peut pas conserver en mémoire de grandes quantités de données via xcom, la task 1 est là pour répondre à ce problème : 

Elle va, sur chaque fichier xml présent dans todo_xml : 

- Extraire les lignes budgets
- Y ajouter les métadonnées et l'id fichier
- Traiter les MtSup et CaracSup
- En faire un DataFrame avec Pandas (car peu de données)
- Nettoyer les @V 
- Transformer le résultat en fichier parquet dans todo_parquet

Notes & WIP : 
- Prend environ 20min (entre 18 et 21)
- Il n'y a pas encore de vérification d'ID dans la bdd / dans le parquet final
- Le schéma devrait être intégré dans cette partie 
- Il reste encore une petite modif à faire à transformation mtsup pour lorsque ça n'existe pas 

# Task 2 

La task 2 va assembler tout les fichiers parquet dans todo_parquet et les assembler dans un seul parquet avec un schéma

Notes & WIP : 
- Prend entre 2 et 3min pour les 20000 parquets
- Le schéma est encore loin d'être parfait 
- Le parquet est pour le moment "stable", c'est une solution temporaire avant d'avoir une db et un flux continu 
- Dans les faits, il me sert surtout à voir que les fichier sont bien traités 

In [1]:
#imports 
import polars as pl 
import gzip 
from lxml import etree
import pandas as pd 
import xmltodict
import glob
import os 
import time 
import polars as pl 
import duckdb
import shutil 
import pyarrow.parquet as pq
import pyarrow as pa

## Fonctions dans les tasks 

In [None]:
def timtim(fonction):
 ''' Permet de compter le temps de travail necessaire sur un fichier'''
 def wrapper(*args, **kwargs):
  debut = time.time()
  resultat = fonction(*args, **kwargs)
  fin = time.time()
  temps_execution = fin - debut
  print(f"La fonction {fonction.__name__} a pris {temps_execution:.5f} secondes pour s'exécuter.")
  return resultat
 return wrapper

def parse_fichier(chemin) : 
 '''Ouvre et parse le fichier gzip'''
 with gzip.open(chemin, 'rb') as fichier_ouvert : 
  fichier_xml_gzip = fichier_ouvert.read()
  fichier_xml = fichier_xml_gzip.decode('latin-1')
  fichier_dict = xmltodict.parse(fichier_xml)
 return fichier_dict

def recherche_id_fichier(chemin_parquet) :
 ''' Pas encore utilisée, permet de récupérer les ID dans le parquet contenant les 
 données déjà traitées '''
 conduck = duckdb.connect(database=':memory:', read_only=False)
 docubudg_t = conduck.read_parquet(chemin_parquet)
 requete_duckdb = ''' 
 SELECT
    DISTINCT Id_Fichier
 FROM 
    docubudg_t
 '''
 result_requete= conduck.execute(requete_duckdb).fetchdf()
 liste_id = result_requete['Id_Fichier'].to_list()
 conduck.close()
 return liste_id

def _isolement_id(fichier) : 
 '''Extrait l'id du nom du fichier pour la liste comprehension de securité

 ATTENTION, le premier split / va changer si on l'applique sur du minio '''
 val_id_fichier_source = fichier.split("\\")[-1].split('.')[0]
 if '-' in val_id_fichier_source : 
  val_id_fichier = val_id_fichier_source.split('-')[1]
 else : 
  val_id_fichier= val_id_fichier_source
 return val_id_fichier

def nettoyage_V(dataframe : pd.DataFrame) -> pd.DataFrame :
 ''' Permet de supprimer les @V des colonnes à l'exception de MtSup et CaracSup'''

 nettoyage = lambda x : str(x).replace("{'@V': '", "").replace("'}", "")
 for col in dataframe.columns : 
  if col in ['CaracSup', 'CaracSup'] : 
   dataframe[col] = dataframe[col].astype(str) 
  else :
   dataframe[col] = dataframe[col].apply(nettoyage)
 dataframe_propre = dataframe.reset_index(drop=True)
 return dataframe_propre

def transformation_mtsup(lignes_budget: dict) -> dict:
 ''' Traite les MtSup, qu'ils soient sous forme de list ou de dict '''

 for i in lignes_budget:
  type_mtsup = i.get('MtSup')  # Permet de connaitre le type de MtSup
  if type_mtsup is not None:  # Vérifie si la clé 'MtSup' existe
    if isinstance(type_mtsup, dict):
      dict_mtsup = type_mtsup
      i['MtSup_1_Lib'] = {'@V': dict_mtsup.get('@Code', '')}
      i['MtSup_1_Val'] = {'@V': dict_mtsup.get('@V', '')}
    elif isinstance(type_mtsup, list):
      dict_mtsup = i['MtSup']
      mtsup_propre = {}
      for z, entry in enumerate(dict_mtsup, start=1):
        code = f'MtSup_{z}_Lib'
        valeur = f'MtSup_{z}_Val'
        mtsup_propre[code] = entry.get('@Code', '')
        mtsup_propre[valeur] = entry.get('@V', '')
        i.update(mtsup_propre)
 return lignes_budget

def transformation_caracsup(lignes_budget: dict) -> dict:
 ''' Pareil que transfo_MtSup mais pour CaracSup'''

 for i in lignes_budget:
  type_caracsup = i.get('CaracSup')  # Permet de connaitre le type de CaracSup
  if type_caracsup is not None:  # Vérifie si la clé 'CaracSup' existe
    if isinstance(type_caracsup, dict):
      dict_caracsup = type_caracsup
      i['CaracSup_1_Lib'] = {'@V': dict_caracsup.get('@Code', '')}
      i['CaracSup_1_Val'] = {'@V': dict_caracsup.get('@V', '')}
    elif isinstance(type_caracsup, list):
      dict_caracsup = i['CaracSup']
      caracsup_propre = {}
      for z, entry in enumerate(dict_caracsup, start=1):
        code = f'CaracSup_{z}_Lib'
        valeur = f'CaracSup_{z}_Val'
        caracsup_propre[code] = entry.get('@Code', '')
        caracsup_propre[valeur] = entry.get('@V', '')
        i.update(caracsup_propre)      
 return lignes_budget



In [2]:
def task_1(chemin) : 
 ''' Traite les XML et les envoies sous format parquet,
   le nettoyage des @V est fait à cette étape'''
 
 for fichier in glob.glob(os.path.join(chemin, "*.gz")) : 
  try : 
   id_fichier = _isolement_id(fichier)
   dico = parse_fichier(fichier)
   metadonnees = dico['DocumentBudgetaire']['EnTeteDocBudgetaire']
   metadonnees['Id_Fichier'] = {'@V' : id_fichier}
   docbase = dico['DocumentBudgetaire']['Budget']['LigneBudget']
   if isinstance(docbase, list):
    for i in docbase : 
      i.update(metadonnees)
   elif isinstance(docbase, dict) : 
    docbase.update(metadonnees)
   docbase = transformation_caracsup(docbase)
   docbase = transformation_mtsup(docbase)
   df_base = pd.DataFrame(docbase)
   df_propre = nettoyage_V(df_base)
   df_propre.to_parquet(f'./fichiers/todo_parquet/{id_fichier}_pyarrow', engine='pyarrow')
   #shutil.move(fichier, './fichiers/done_xml/')
  except Exception as e : 
    print(f'Erreur fichier {fichier}, extraction impossible')
    #shutil.move(fichier, './fichiers/todo_xml/error/')
    print(e)
    continue  

In [3]:
# Schema 

custom_schema = pa.schema([
        ('Nature', pa.string()),
        ('Fonction', pa.string()),
        ('LibCpte', pa.string()),
        ('ContNat', pa.string()),
        ('ArtSpe', pa.string()), #Doit être corrigé en bool,
        ('ContFon', pa.string()),
        ('CodRD', pa.string()), #Val D ou R,
        ('MtBudgPrec', pa.string()), #float32 #nullable = True
        ('MtRARPrec', pa.string()), #float32
        ('MtPropNouv', pa.string()), #float32
        ('MtPrev', pa.string()), #float32
        ('CredOuv', pa.string()), #int32
        ('MtReal', pa.string()), #float32
        ('MtRAR3112', pa.string()), #float32
        ('OpBudg', pa.string()),
        ('MtSup', pa.string()),
        ('MtSup_1_Lib', pa.string()),
        ('MtSup_1_Val', pa.string()),
        ('MtSup_2_Lib', pa.string()),
        ('MtSup_2_Val', pa.string()),
        ('MtSup_3_Lib', pa.string()),
        ('MtSup_3_Val', pa.string()),
        ('MtSup_4_Lib', pa.string()),
        ('MtSup_4_Val', pa.string()),
        ('MtSup_5_Lib', pa.string()),
        ('MtSup_5_Val', pa.string()),
        ('MtSup_6_Lib', pa.string()),
        ('MtSup_6_Val', pa.string()),
        ('MtSup_7_Lib', pa.string()),
        ('MtSup_7_Val', pa.string()),
        ('MtSup_8_Lib', pa.string()),
        ('MtSup_8_Val', pa.string()),
        ('MtSup_9_Lib', pa.string()),
        ('MtSup_9_Val', pa.string()),
        ('MtSup_10_Lib', pa.string()),
        ('MtSup_10_Val', pa.string()), #10
        ('CaracSup_1_Lib', pa.string()),
        ('CaracSup_1_Val', pa.string()),
        ('CaracSup_2_Lib', pa.string()),
        ('CaracSup_2_Val', pa.string()),
        ('CaracSup_3_Lib', pa.string()),
        ('CaracSup_3_Val', pa.string()),
        ('CaracSup_4_Lib', pa.string()),
        ('CaracSup_4_Val', pa.string()),
        ('CaracSup_5_Lib', pa.string()),
        ('CaracSup_5_Val', pa.string()),
        ('CaracSup_6_Lib', pa.string()),
        ('CaracSup_6_Val', pa.string()),
        ('CaracSup_7_Lib', pa.string()), #7
        ('CaracSup_7_Val', pa.string()),
        ('TypOpBudg', pa.string()), #des 2 et des 1
        ('OpeCpteTiers', pa.string()),
        ('DteStr', pa.date32()),
        ('LibellePoste', pa.string()),
        ('IdPost', pa.string()),
        ('LibelleColl', pa.string()),
        ('IdColl', pa.string()),
        ('NatCEPL', pa.string()),
        ('Departement', pa.string()), #On oublie pas les 2A,
        ('Id_Fichier', pa.int64())
    ])

In [None]:
def task_2(dossier, nom_parquet, schema_cust) :
 liste_parquet_t = []
 for parquet in glob.glob(os.path.join(dossier, "*")) : 
  liste_parquet_t.append(parquet)

 schema = schema_cust
 #schema = pq.ParquetFile(liste_parquet_t[0]).schema_arrow
 with pq.ParquetWriter(f"{nom_parquet}.parquet", schema=schema) as writer:
    for file in liste_parquet_t:
        table = pq.read_table(file, schema=schema)
        writer.write_table(table)