In [1]:
import polars as pl
import boto3
import xmltodict
import duckdb
import gzip
import json 
import copy
import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy import text, types
from sqlalchemy import Table, MetaData

import config
import os 

AWS_ACCESS_KEY_ID = os.environ['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['AWS_SECRET_ACCESS_KEY']
BUCKET = "data"

In [2]:
s3 = boto3.client('s3',                         
                  endpoint_url='http://localhost:9000',
                  aws_access_key_id=AWS_ACCESS_KEY_ID,
                  aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

def list_files(client, bucket: str):
    """List files in specific S3 URL"""
    response = client.list_objects(Bucket=bucket)
    for content in response.get('Contents', []):
        yield content.get('Key')

def parsing_infos_coll(dict_from_xml: dict):
    infos_dict = dict()
    dict_entete_doc = dict_from_xml["DocumentBudgetaire"]["EnTeteDocBudgetaire"]
    infos_dict["siret_coll"] = dict_entete_doc["IdColl"]["@V"]
    infos_dict["libelle_collectivite"] = dict_entete_doc["LibelleColl"]["@V"]
    infos_dict["nature_collectivite"] = dict_entete_doc["NatCEPL"]["@V"]
    infos_dict["departement"] = dict_entete_doc.get("Departement", {}).get("@V", None)

    return infos_dict

def parsing_infos_etablissement(dict_from_xml: dict, id_doc_budg):
    infos_dict = dict()
    dict_entete_budget = dict_from_xml["DocumentBudgetaire"]["Budget"]["EnTeteBudget"]
    dict_bloc_budget = dict_from_xml["DocumentBudgetaire"]["Budget"]["BlocBudget"]

    infos_dict["id_doc_budg"] = id_doc_budg
    
    #Parsing entête budgétaire
    infos_dict["siret_etablissement"] = dict_entete_budget["IdEtab"]["@V"]
    infos_dict["libelle"] = dict_entete_budget["LibelleEtab"]["@V"]
    infos_dict["code_insee"] = dict_entete_budget.get(
        "CodInseeColl", {}).get("@V", None)
    infos_dict["nomenclature"] = dict_entete_budget["Nomenclature"]["@V"]
    
    # Parsing bloc budget
    infos_dict["exercice"] = int(dict_bloc_budget["Exer"]["@V"])
    infos_dict["nature_dec"] = dict_bloc_budget["NatDec"]["@V"]
    infos_dict["num_dec"] = int(dict_bloc_budget.get(
        "NumDec", {}).get("@V", None) or 0)
    infos_dict["nature_vote"] = dict_bloc_budget["NatFonc"]["@V"]
    infos_dict["type_budget"] = dict_bloc_budget["CodTypBud"]["@V"]
    infos_dict["id_etabl_princ"] = dict_bloc_budget.get(
        "IdEtabPal", {}).get("@V", None)

    # Parsing des lignes de budget -> json
    infos_dict["json_budget"] = generate_dict_budget(dict_from_xml)
    # duplique certaines données et non nécessaire pour l'instant donc je le commente
    #if isinstance(dict_from_xml["DocumentBudgetaire"]["Budget"]["Annexes"], dict):
    #    infos_dict["list_annexes"] = list(dict_from_xml["DocumentBudgetaire"]["Budget"]["Annexes"].keys())
    
    infos_dict["fk_siret_collectivite"] = dict_from_xml["DocumentBudgetaire"]["EnTeteDocBudgetaire"]["IdColl"]["@V"]

    return infos_dict

def generate_dict_budget(dict_from_xml: dict) -> dict:
        budget_dict = copy.deepcopy(
            dict_from_xml["DocumentBudgetaire"]["Budget"]["LigneBudget"])
        
        if isinstance(budget_dict, dict):
            for field in config.CHAMPS_LIGNE_BUDGET:
                if field in budget_dict:
                    if "@V" in budget_dict[field]:
                        budget_dict[field] = budget_dict[field]['@V']
            # for field in ["MtSup", "CaracSup"]:
            #     if field in budget_dict:
            #         if "@V" in budget_dict[field]:
            #             budget_dict[field] = { budget_dict[field]['@Code']: budget_dict[field]['@V'] }
            return budget_dict
        for idx, row in enumerate(budget_dict):
            for field in config.CHAMPS_LIGNE_BUDGET:
                if field in row:
                    if "@V" in row[field]:
                        budget_dict[idx][field] = row[field]['@V']
            for field in ["MtSup", "CaracSup"]:
                if field in row:
                    budget_dict[idx].pop(field)
            # for field in ["MtSup", "CaracSup"]:
            # if field in row:
            #     if "@V" in row[field]:
            #         budget_dict[idx][field] = { row[field]['@Code']: row[field]['@V'] }
        return budget_dict

In [3]:
champs_budg = config.CHAMPS_LIGNE_BUDGET

def _all_annexe_columns(df, fields):
    all_columns = copy.deepcopy(fields)
    columns = list(df.columns)
    for i in all_columns:
        columns.append(i)
    return columns

def explode_annexe_json_into_rows_first_way(df, fields):
    all_columns = _all_annexe_columns(df, fields)
    all_columns.pop(all_columns.index("json_budget"))
    df['json_budget'] = df.json_budget.apply(lambda x:eval(str(x)))
    temp = df.groupby('id_doc_budg').json_budget.apply(lambda x: pd.DataFrame(x.values[0], index=[0]).reset_index())
    df.drop(columns="json_budget", inplace=True)
    df_result = df.merge(temp, left_on='id_doc_budg', right_on='id_doc_budg')
    return df_result.reindex(columns = all_columns)

def explode_annexe_json_into_rows_second_way(df, fields):
    all_columns = _all_annexe_columns(df, fields)
    s= df.set_index('id_doc_budg').json_budget.apply(lambda x:eval(str(x))).explode()
    temp = pd.DataFrame(s.tolist(), index = s.index).reset_index()
    df.drop(columns="json_budget", inplace=True)
    df_result = temp.merge(df, left_on='id_doc_budg', right_on='id_doc_budg')
    return df_result.reindex(columns = all_columns)

In [4]:
count = 0
print(len(list(list_files(s3, BUCKET))))
for file in list_files(s3, BUCKET):
    id_doc_budg = file.split("-")[1].split(".")[0]

    response = s3.get_object(Bucket=BUCKET,
                        Key=file)
    dict_from_xml = xmltodict.parse(
            gzip.GzipFile(fileobj=response["Body"]), dict_constructor=dict
        )
    temp_df = pd.DataFrame.from_dict(
            parsing_infos_etablissement(dict_from_xml, id_doc_budg),
        )
    
    if count == 0:
        all_columns = _all_annexe_columns(temp_df, config.CHAMPS_LIGNE_BUDGET)
        #all_columns.pop(all_columns.index("json_budget"))
        schema = pd.DataFrame(columns=all_columns)
        engine = create_engine('sqlite:///ab_budget.db', echo=False)
        schema.to_sql('ab_budg', con=engine, if_exists="replace",
                      dtype={
                    "id_doc_budg": types.Integer,
                    "siret_etablissement": types.Text,
                    "libelle": types.Text,
                    "code_insee": types.Text,
                    "nomenclature": types.Text,
                    "exercice": types.Integer,
                    "nature_dec": types.Text,
                    "num_dec": types.Integer,
                    "nature_vote": types.Text,
                    "type_budget": types.Text,
                    "id_etabl_princ": types.Text,
                    "fk_siret_collectivite": types.Text,
                    "Nature": types.Text,
                    "LibCpte": types.Text,
                    "Fonction": types.Text,
                    "Operation": types.Text,
                    "ContNat": types.Text,
                    "ArtSpe": types.Text,
                    "ContFon": types.Text,
                    "ContOp": types.Text,
                    "CodRD": types.Text,
                    "MtBudgPrec": types.Float,
                    "MtRARPrec": types.Float,
                    "MtPropNouv": types.Text,
                    "MtPrev": types.Text,
                    "CredOuv": types.Text,
                    "MtReal": types.Float,
                    "MtRAR3112": types.Float,
                    "OpBudg": types.Text,
                    "TypOpBudg": types.Text,
                    "OpeCpteTiers": types.Text
                    }
                    )
        count = 1

    temp_df = explode_annexe_json_into_rows_first_way(temp_df, config.CHAMPS_LIGNE_BUDGET)

    temp_df.to_sql('ab_budg', con=engine, if_exists='append', index=False)

866


In [5]:
count = 0
print(len(list(list_files(s3, BUCKET))))
for file in list_files(s3, BUCKET):
    id_doc_budg = file.split("-")[1].split(".")[0]

    response = s3.get_object(Bucket=BUCKET,
                        Key=file)
    dict_from_xml = xmltodict.parse(
            gzip.GzipFile(fileobj=response["Body"]), dict_constructor=dict
        )
    temp_df = pd.DataFrame.from_dict(
            parsing_infos_etablissement(dict_from_xml, id_doc_budg),
        )
    
    if count == 0:
        conn = duckdb.connect('actes_budgetaires.duckdb')
        conn.execute("CREATE OR REPLACE TABLE budget ( \
                     id_doc_budg UINTEGER, \
                    siret_etablissement VARCHAR, \
                    libelle VARCHAR, \
                    code_insee VARCHAR, \
                    nomenclature VARCHAR, \
                    exercice UINTEGER, \
                    nature_dec VARCHAR, \
                    num_dec UINTEGER, \
                    nature_vote VARCHAR, \
                    type_budget VARCHAR, \
                    id_etabl_princ VARCHAR, \
                    fk_siret_collectivite VARCHAR, \
                    Nature VARCHAR, \
                    LibCpte VARCHAR, \
                    Fonction VARCHAR, \
                    Operation VARCHAR, \
                    ContNat VARCHAR, \
                    ArtSpe VARCHAR, \
                    ContFon VARCHAR, \
                    ContOp VARCHAR, \
                    CodRD VARCHAR, \
                    MtBudgPrec DOUBLE, \
                    MtRARPrec DOUBLE, \
                    MtPropNouv VARCHAR, \
                    MtPrev VARCHAR, \
                    CredOuv VARCHAR, \
                    MtReal DOUBLE, \
                    MtRAR3112 DOUBLE, \
                    OpBudg VARCHAR, \
                    TypOpBudg VARCHAR, \
                    OpeCpteTiers VARCHAR)")
        count = 1

    temp_df = explode_annexe_json_into_rows_first_way(temp_df, config.CHAMPS_LIGNE_BUDGET)
    conn.sql("INSERT INTO budget SELECT * FROM temp_df")

conn.close()


866


In [12]:
import duckdb 
conn = duckdb.connect('actes_budgetaires.duckdb')
df = conn.sql("SELECT DISTINCT(siret_etablissement), libelle FROM budget limit 20").fetchdf()
conn.close()
df

Unnamed: 0,siret_etablissement,libelle
0,21530130000756,PARKINGS LAVAL
1,20006610800016,CC DES PORTES DE MEUSE - BG (15000)
2,21630298400010,Budget Général
3,21350142200017,BUDGET COMMUNE LANDEAN
4,21610012300019,commune de Auguaise
5,26640462300032,BUDGET PRINCIPAL CCAS
6,21680204100048,COMMUNE DE METZERAL-EAU
7,21640494700014,DM 4 - COMMUNE DE SAINT PE DE LEREN
8,20006610800065,CCPM - BUDGET BAT INDUSTRIELS (15004)
9,21440159800013,Commune de Saint Fiacre


In [7]:
import duckdb 
with duckdb.connect('actes_budgetaires.duckdb') as conn:
   df = conn.sql("SELECT * FROM budget").fetchdf()

df

Unnamed: 0,id_doc_budg,siret_etablissement,libelle,code_insee,nomenclature,exercice,nature_dec,num_dec,nature_vote,type_budget,...,MtBudgPrec,MtRARPrec,MtPropNouv,MtPrev,CredOuv,MtReal,MtRAR3112,OpBudg,TypOpBudg,OpeCpteTiers
0,1810875,20009162700022,REGIE SPANC,,M4-M49_D,2019,09,0,1,P,...,0.0,0.0,0,0,0,0.0,0.0,0,,
1,1810875,20009162700022,REGIE SPANC,,M4-M49_D,2019,09,0,1,P,...,0.0,0.0,0,0,0,0.0,0.0,0,,
2,1810875,20009162700022,REGIE SPANC,,M4-M49_D,2019,09,0,1,P,...,0.0,0.0,0,0,0,0.0,0.0,0,,
3,1810875,20009162700022,REGIE SPANC,,M4-M49_D,2019,09,0,1,P,...,0.0,0.0,0,0,0,0.0,0.0,0,,
4,1810875,20009162700022,REGIE SPANC,,M4-M49_D,2019,09,0,1,P,...,0.0,0.0,0,0,0,0.0,0.0,0,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
172113,1905519,26270755700019,CCAS DE LISORS,27370,M14-M14_CCAS_INF3500,2020,02,1,1,P,...,2500.0,0.0,-84.25,-84.25,0,0.0,0.0,0,,
172114,1905519,26270755700019,CCAS DE LISORS,27370,M14-M14_CCAS_INF3500,2020,02,1,1,P,...,2500.0,0.0,-84.25,-84.25,0,0.0,0.0,0,,
172115,1905519,26270755700019,CCAS DE LISORS,27370,M14-M14_CCAS_INF3500,2020,02,1,1,P,...,2500.0,0.0,-84.25,-84.25,0,0.0,0.0,0,,
172116,1905519,26270755700019,CCAS DE LISORS,27370,M14-M14_CCAS_INF3500,2020,02,1,1,P,...,2500.0,0.0,-84.25,-84.25,0,0.0,0.0,0,,


In [8]:
df_budg = pd.read_sql('SELECT * FROM ab_budg', 'sqlite:///ab_budget.db', index_col="index")

In [9]:
pd.set_option("display.max.columns", None)
df_budg.info()

<class 'pandas.core.frame.DataFrame'>
Index: 172118 entries, None to None
Data columns (total 32 columns):
 #   Column                 Non-Null Count   Dtype  
---  ------                 --------------   -----  
 0   id_doc_budg            172118 non-null  int64  
 1   siret_etablissement    172118 non-null  object 
 2   libelle                172118 non-null  object 
 3   code_insee             132191 non-null  object 
 4   nomenclature           172118 non-null  object 
 5   exercice               172118 non-null  int64  
 6   nature_dec             172118 non-null  object 
 7   num_dec                172118 non-null  int64  
 8   nature_vote            172118 non-null  object 
 9   type_budget            172118 non-null  object 
 10  id_etabl_princ         29569 non-null   object 
 11  json_budget            0 non-null       object 
 12  fk_siret_collectivite  172118 non-null  object 
 13  Nature                 172118 non-null  object 
 14  LibCpte                30492 non-null   

In [10]:
file = list(list_files(s3, bucket))[5]

print(file)
id_doc_budg = file.split("-")[1].split(".")[0]

response = s3.get_object(Bucket=bucket,
                    Key=file)
dict_from_xml = xmltodict.parse(
        gzip.GzipFile(fileobj=response["Body"]), dict_constructor=dict
    )
temp_df = pl.from_dicts(
        parsing_infos_etablissement(dict_from_xml, id_doc_budg),
    schema_overrides={"json_budget pl.Struct},
    infer_schema_length=None
    ).unnest("json_budget")

duckdb.sql('SELECT * FROM temp_df').show()

SyntaxError: EOL while scanning string literal (1669150006.py, line 13)

In [None]:

count = 0
file = "20200801-1810947.xml.gz"
    
print(file)
id_doc_budg = file.split("-")[1].split(".")[0]

response = s3.get_object(Bucket=bucket,
                    Key=file)
dict_from_xml = xmltodict.parse(
        gzip.GzipFile(fileobj=response["Body"]), dict_constructor=dict
    )
temp_df = pd.DataFrame.from_dict(
        parsing_infos_etablissement(dict_from_xml, id_doc_budg),
    )
    
all_columns = _all_annexe_columns(temp_df, config.CHAMPS_LIGNE_BUDGET)
temp_df['json_budget'] = temp_df.json_budget.apply(lambda x:eval(str(x)))
temp = temp_df.groupby('id_doc_budg').json_budget.apply(lambda x: pd.DataFrame(x.values[0], index=[0]).reset_index())
temp_df.drop(columns="json_budget", inplace=True)
df_result = temp_df.merge(temp, left_on='id_doc_budg', right_on='id_doc_budg')
df_result.reindex(columns = all_columns)


20200801-1810947.xml.gz


In [None]:
parsing_infos_etablissement({}, id_doc_budg)
"siret_etablissement", "libelle", "code_insee", "nomenclature", 

KeyError: 'DocumentBudgetaire'

In [None]:
temp_df.dtypes

id_doc_budg              object
siret_etablissement      object
libelle                  object
code_insee               object
nomenclature             object
exercice                  int64
nature_dec               object
num_dec                   int64
nature_vote              object
type_budget              object
id_etabl_princ           object
fk_siret_collectivite    object
index                     int64
Nature                   object
LibCpte                  object
ContNat                  object
ArtSpe                   object
CodRD                    object
MtBudgPrec               object
MtRARPrec                object
MtPropNouv               object
MtPrev                   object
CredOuv                  object
MtReal                   object
MtRAR3112                object
OpBudg                   object
dtype: object