In [1]:
import xmltodict
import pandas as pd
from pathlib import Path
import psycopg2

DB_PASSWORD = "vG&XGLqTY4EF7DCiozYoKlqRRUaC7ECd"

def create_dict_from_xml(chemin_fichier):
    with open(chemin_fichier, encoding='utf8') as fd:
        doc = xmltodict.parse(fd.read(), dict_constructor=dict)
    return doc

In [21]:
"""
  Parse le dictionnaire des types complexes et retourne le résultat dans un dataframe et un dictionnaire
"""

def _parse_complex_type(annexe_type : dict):
    list_records = []
    all_complex_type = annexe_type['xs:schema']['xs:complexType']
    i=0
    for complex_type in all_complex_type:
        temp_dict = dict()
        nom_type_complexe = complex_type['@name']
        if isinstance(complex_type['xs:attribute'], dict):
            for element in complex_type['xs:attribute']['xs:simpleType']['xs:restriction']['xs:enumeration']:
                temp_dict[element['@value']] = element.get('xs:annotation', {}).get('xs:documentation', element['@value'])
        
        result_dict = {
            "type" : nom_type_complexe,
            "enum" : temp_dict
        }
        list_records.append(result_dict)
    
    df = pd.DataFrame.from_records(list_records)

    return df

def _generate_complex_type_df(chemin : Path) -> pd.DataFrame:
    annexe_type = create_dict_from_xml(chemin)
    complexe_types_df = _parse_complex_type(annexe_type)
    
    return complexe_types_df

In [9]:
"""
  Génération d'une documentation des codes de données des annexes
"""

def _parse_annexe_fields_documentation(class_annexe : dict) -> pd.DataFrame:
    elements = class_annexe['xs:sequence']['xs:element']
    list_records = []
    dict_champs = dict()
    nom_annexe = class_annexe["@name"][1:]
    
    for element in elements:
        documentation = element['xs:annotation']['xs:documentation']
        if isinstance(documentation, str):
            libelle = documentation
            description = documentation
        else:
            libelle = element['xs:annotation']['xs:documentation']['z:libelle']
            description = element['xs:annotation']['xs:documentation'].get('z:description')
        dict_champs = {
            "nom_annexe" : nom_annexe,
            "nom_champ" : element["@name"],
            "type" : element["@type"],
            "libelle" : libelle,
            "description" : description,
        }
        list_records.append(dict_champs)
    
    df = pd.DataFrame.from_records(list_records)
    df["description"] = df["description"].str.replace(r'^<[^<>]*>', '', regex=True)
    df["description"] = df["description"].str.replace(r'^\s*<ul>', '', regex=True)
    df["description"] = df["description"].str.replace(r'^\s*<li>', '', regex=True)
    df["description"] = df["description"].str.replace(r'<ul>', ' : ', regex=True)
    df["description"] = df["description"].str.replace(r'<li>', ' - ', regex=True)
    df["description"] = df["description"].str.replace(r'<[^<>]*>', ' ', regex=True)
    df["description"] = df["description"].str.replace(r'\s\s+', ' ', regex=True)

    
    return df

def _merge_annexe_type_and_documentation(chemin_annexe: Path, complex_type_df: pd.DataFrame) -> pd.DataFrame:
    class_to_generate = create_dict_from_xml(chemin_annexe)['xs:schema']['xs:complexType'][1]
    init_df = _parse_annexe_fields_documentation(class_to_generate)
    init_df = init_df.merge(complex_type_df, how='left')
    return init_df



In [18]:
PATH_TO_SCHEMA = Path("/home/rbevenot/projets/dataprep-ab/schema/schema_doc_budg_V100/")


def _get_list_annexes_path():
    dict_annexe = create_dict_from_xml(PATH_TO_SCHEMA.joinpath("SchemaDocBudg/Class_Annexes.xsd"))["xs:schema"]['xs:include']
    dict_annexe.pop(0)
    class_annexe_paths = []
    for annexe in dict_annexe: 
        class_annexe_paths.append(PATH_TO_SCHEMA.joinpath(f"SchemaDocBudg/{annexe['@schemaLocation']}"))
    return class_annexe_paths


def _parse_all_annexes_fields_documentation() -> pd.DataFrame:
    #Erreurs à traiter manuellement :
    # - l'annexe signatures dont la balise xs:complextype est inversée par rapport à l'habitude, il faut copier le premier bloc xs:complextype en dessous du 2ème.
    # - l'annexe emprunt "IndSousJacentDtVote" ou il y a deux balises documentation qui génère une liste (seul cas)
    df_result = pd.DataFrame()
    
    annexes_paths = _get_list_annexes_path()
    annexe_complexe_types = _generate_complex_type_df(PATH_TO_SCHEMA.joinpath("SchemaDocBudg/CommunAnnexe.xsd"))
    
    for annexe_path in annexes_paths:
        df = _merge_annexe_type_and_documentation(annexe_path, annexe_complexe_types)
        df_result = pd.concat([df, df_result])
    
    return df_result.set_index('nom_champ')


def generate_annexe_data_documentation_csv(path_to_export : Path):
    df_to_csv = _parse_all_annexes_fields_documentation()
    df_to_csv.to_csv(path_to_export, index=False)  
    


In [19]:
generate_annexe_data_documentation_csv("/home/rbevenot/projets/dataprep-ab/data_doc.csv")

In [None]:
#Quid des données obsolètes ?
# A gérer le cas de CodMotifContrAgent (nested documentation)
to_replace = parse_all_annexes_fields_documentation()["enum"]   \
                                        .dropna()  \
                                        .to_dict()

In [10]:
def connection_pg2():
    return psycopg2.connect(database ="actes_budgetaire", user = "postgres",
                        password = DB_PASSWORD, host = "localhost",
                        port = "5432")

def get_data(connexion, annexe, year, nature_dec):
    query= f"""SELECT annexe.id as annexe_id, *  FROM annexe 
    JOIN documentbudgetaire 
        ON documentbudgetaire.id = annexe.fk_id_document_budgetaire
    JOIN collectivite 
        ON collectivite.siret_coll = documentbudgetaire.fk_siret_collectivite 
    WHERE annexe.type_annexe = '{annexe}' 
    AND documentbudgetaire.exercice = '{year}'
    AND documentbudgetaire.nature_dec = '{nature_dec}' 
        LIMIT 5 """
    dat = pd.read_sql_query(query, conn)
    return dat

with connection_pg2() as conn:
    dat = get_data(connexion=conn, annexe='DATA_EMPRUNT', year=2019, nature_dec= '09')

needed_columns = ['annexe_id', 'fk_id_document_budgetaire', 'exercice', 'type_annexe', 'json_annexe','siret_etablissement', 'libelle', 'nomenclature', 'nature_dec', 'nature_vote','type_budget', 'siret_coll', 'libelle_collectivite', 'nature_collectivite']

def keep_needed_columns(needed_columns : list, df : pd.DataFrame) -> pd.DataFrame:
    to_drop = [item for item in list(df.columns) if item not in needed_columns]
    return df.drop(to_drop, axis=1)

import app.backend.timer as timer
import app.backend.config as conf

data = conf.CHAMPS_ANNEXES
    
def annexe(nom):
    donnees = data[nom]
    return donnees

EMPRUNT_FIELD = annexe(nom="DATA_EMPRUNT")



In [None]:
import json
import copy

def _all_annexe_columns(df, field):
    all_columns = copy.deepcopy(field)
    columns = list(df.columns)
    for i in columns:
        all_columns.append(i)
    return all_columns

@timer.timed
def explode_annexe_json_into_rows_first_way(df):
    all_columns = _all_annexe_columns(df, EMPRUNT_FIELD)
    df['json_annexe'] = df.json_annexe.apply(eval)
    result = df.groupby('annexe_id').json_annexe.apply(lambda x: pd.DataFrame(x.values[0])).reset_index()
    return result.reindex(columns = all_columns)
  
@timer.timed
def explode_annexe_json_into_rows_second_way(df):
    all_columns = _all_annexe_columns(df, EMPRUNT_FIELD)
    s= df.set_index('annexe_id').json_annexe.apply(eval).explode()
    result = pd.DataFrame(s.tolist(), index = s.index).reset_index()
    return result.reindex(columns = all_columns)

@timer.timed
def explode_annexe_json_into_rows_third_way(df):
    dfs = []
    dict_annexe = dict()
    for index, row  in df.iterrows():
        dict_annexe[index] = json.loads(row["json_annexe"])
        for element in dict_annexe[index]:
            if element:
                for field in EMPRUNT_FIELD:
                    element.setdefault(field, None)

        json_df = pd.json_normalize(dict_annexe[index])
        dfs.append(json_df.assign(**row.drop("json_annexe")))

    json_df.assign(**row)
    #print('dfs')
    #print(dfs)
    return pd.concat(dfs)

In [37]:
for _ in range(100):
    x= keep_needed_columns(needed_columns, dat)
    explode_annexe_json_into_rows_first_way(x)

    #Vainqueur par KO !
    x= keep_needed_columns(needed_columns, dat)
    explode_annexe_json_into_rows_second_way(x)

    x= keep_needed_columns(needed_columns, dat)
    explode_annexe_json_into_rows_third_way(x)

#len(EMPRUNT_FIELD)

{'CodTypFlux': {'01': 'Flux réciproques entre le groupement à fiscalité propre et les communes',
  '02': 'Présentation consolidée du groupement à fiscalité propre et des communes (après ma neutralisation des flux)'},
 'CodInvFonc': {'I': 'Investissement', 'F': 'Fonctionnement'},
 'CodRD': {'R': 'Recette', 'D': 'Dépense'},
 'CodRDTot': {'R': 'Recette', 'D': 'Dépense'},
 'CodRessExt': {'O': 'Oui', 'N': 'Non'},
 'CodApprent': {'01': 'Apprentissage',
  '02': 'Enseignement professionnel',
  '03': 'Formations continues',
  '04': 'Première section FNDMA',
  '05': 'Seconde section FNDMA',
  '06': 'Dotation décentralisation',
  '07': 'Dotation indemnité compensatoire forfaitaire',
  '08': 'Contribution additionnelle',
  '09': 'FSE',
  '10': 'FEDER',
  '11': 'FEOGA',
  '12': 'Reversement excédent de ressources CFA',
  '13': 'Autres ressources',
  '14': 'Effort propre de la région'},
 'CodInvFon': {'I': 'Investissement', 'F': 'Fonctionnement'},
 'TypeGestion': {'1': 'AP/AE', '2': 'Crédits de paie