# Preprocessor Notebook : Zipped CSV Files

Ce notebook permet de traiter les sources de données récupérés en .zip et contenant des fichiers csv.

 ### Paramètres
 Ce Notebook prend des paramètres en entrée, définis sur la seconde cellule (ci-dessous).
 La cellule a le tag "parameters" ce qui permet de lui passer des valeurs via papermill.
 - filepath : le chemin vers le fichier Excel à traiter
 - model_name : le nom du modèle source

 ### Principe
 Ce notebook extrait les fichiers CSV contenus dans un fichier ZIP et les charge en schéma Bronze.
 Chaque fichier CSV constitue un dataset et donc une table en schéma Bronze. 
 La règle de nommage des tables est la suivante :
 - Par défaut : "nom de la table" = "nom_domaine"_"nom_modèle"_"nom du fichier csv extrait"
 - Si le nom dépasse 63 caractères (limite PostgreSQL) : "nom de la table" = "nom_domaine"_"nom du fichier csv extrait"
 
 La suite d'actions réalisée est :
 - Import des fonctions utiles
 - Dézip de tous les fichiers contenus dans le .zip
 - Dump d'un artefact pour chaque fichier extrait (quel que soit le format)
 - Import de tous les artefacts CSV en Pandas 
 - Initialisation d'une connexion PostGreSQL
 
 Pour chaque dataframe :
 - Drop de la table Bronze (if exists)
 - Chargement du dataframe en Bronze

In [13]:
# Manage all imports
import pandas as pd
import os
import datetime
import sys

# Dirty trick to be able to import common odis modules, if the notebook is not executed from 13_odis
current_dir = os.getcwd()
parent_dir = os.path.dirname(os.getcwd())
while not current_dir.endswith("13_odis"):
    print("changing to parent dir")
    os.chdir(parent_dir)
    current_dir = parent_dir
    parent_dir = os.path.dirname(current_dir)

print(os.getcwd())
sys.path.append(current_dir)

/Users/alex/dev/13_odis


# Initialisation

Chargement des principaux imports et variables

In [14]:
# additional imports
from common.config import load_config
from common.data_source_model import DataSourceModel
from common.utils.file_handler import FileHandler
from common.utils.interfaces.data_handler import OperationType

In [15]:
# define parameters for papermill

# model_name = "emploi.salaire_median"
# filepath = 'data/imports/emploi/emploi.salaire_median_1.zip'

model_name = "emploi.deplacement_domicile_travail"
filepath = 'data/imports/emploi/emploi.deplacement_domicile_travail_1.zip'

# model_name = "emploi.etablissements_employeurs_secteur_prive"
# filepath = 'data/imports/emploi/emploi.etablissements_employeurs_secteur_prive_1.zip'

In [16]:
# Initialize common variables
dataframes = {}
artifacts = []

config = load_config("datasources.yaml", response_model=DataSourceModel)
model = config.get_model( model_name = model_name )
start_time = datetime.datetime.now()

# Instantiate File Handler for file loads and dumps
handler = FileHandler()

# Extraction des CSV

Extraction des fichiers CSV du Zip, avec une fonction pour calculer les noms de fichier

In [17]:
def get_subtable_name(dfname:str, model:DataSourceModel) -> str:

    subtable_name = ''

    long_name = f"{model.table_name}_{dfname.lower()}"

    print(len(long_name))

    if len(long_name) >= 63:
        print(f"Long table name exceeds the 63 characters limit: {long_name}")
        subtable_name = f"{model.domain_name}_{dfname.lower()}"
        print(f"Creating table with shorter name: {subtable_name}")
        print(f"Shorter table name length: {len(subtable_name)}")
    else:
        subtable_name = long_name

    return subtable_name

In [18]:
from zipfile import ZipFile

# unzip and dump files into the data/imports folder
with open(filepath, 'rb') as f:
    zip_archive = ZipFile(f)

    zip_members = zip_archive.infolist()
    for member in zip_members:

        if not member.is_dir():

            member_filename = member.filename
            member_name = member_filename.split(".")[0]
            member_format = member_filename.split(".")[-1]
            print(member_format)
            
            f_member = zip_archive.open( member, 'r' ).read()
    
            artifact = handler.artifact_dump(
                f_member,
                get_subtable_name(member_name,model),
                model,
                format = member_format
            )

            print(artifact.model_dump(mode="yaml"))

            artifacts.append(artifact)

csv
65
Long table name exceeds the 63 characters limit: emploi_deplacement_domicile_travail_ds_rp_navettes_princ_metadata
Creating table with shorter name: emploi_ds_rp_navettes_princ_metadata
Shorter table name length: 36
2025-05-24 15:04:09,530 - DEBUG :: file_handler.py :: dump (130) :: dumping: data/imports/emploi/emploi.deplacement_domicile_travail_emploi_ds_rp_navettes_princ_metadata.csv
2025-05-24 15:04:09,536 - DEBUG :: file_handler.py :: file_dump (273) :: emploi.deplacement_domicile_travail -> results saved to : 'data/imports/emploi/emploi.deplacement_domicile_travail_emploi_ds_rp_navettes_princ_metadata.csv'


{'name': 'emploi_ds_rp_navettes_princ_metadata', 'storage_info': {'location': 'data/imports/emploi', 'format': 'csv', 'file_name': 'emploi.deplacement_domicile_travail_emploi_ds_rp_navettes_princ_metadata.csv', 'encoding': 'utf-8'}, 'load_to_bronze': True, 'success': True}
csv
61
2025-05-24 15:04:09,659 - DEBUG :: file_handler.py :: dump (130) :: dumping: data/imports/emploi/emploi.deplacement_domicile_travail_emploi_deplacement_domicile_travail_ds_rp_navettes_princ_data.csv
2025-05-24 15:04:09,775 - DEBUG :: file_handler.py :: file_dump (273) :: emploi.deplacement_domicile_travail -> results saved to : 'data/imports/emploi/emploi.deplacement_domicile_travail_emploi_deplacement_domicile_travail_ds_rp_navettes_princ_data.csv'
{'name': 'emploi_deplacement_domicile_travail_ds_rp_navettes_princ_data', 'storage_info': {'location': 'data/imports/emploi', 'format': 'csv', 'file_name': 'emploi.deplacement_domicile_travail_emploi_deplacement_domicile_travail_ds_rp_navettes_princ_data.csv', 'enc

# Dump intermédiaire

Sauvegarde locale de tous les artefacts extraits du zip

In [19]:
preprocess_metadata = handler.dump_metadata(
    model = model,
    operation = OperationType.PREPROCESS,
    start_time = start_time,
    complete = True,
    errors = 0,
    artifacts = artifacts,
    pages = []
)

2025-05-24 15:04:09,788 - DEBUG :: file_handler.py :: dump (130) :: dumping: data/imports/emploi/emploi.deplacement_domicile_travail_metadata_preprocess.json
2025-05-24 15:04:09,791 - DEBUG :: file_handler.py :: file_dump (273) :: emploi.deplacement_domicile_travail -> results saved to : 'data/imports/emploi/emploi.deplacement_domicile_travail_metadata_preprocess.json'
2025-05-24 15:04:09,792 - DEBUG :: file_handler.py :: dump_metadata (455) :: Metadata written in: 'data/imports/emploi/emploi.deplacement_domicile_travail_metadata_preprocess.json'


# Traitement Pandas

Import des CSV en Dataframes et chargement en tables Bronze

In [20]:
from pathlib import Path

for artifact in artifacts:

    base_path = Path( artifact.storage_info.location )
    filepath = base_path / artifact.storage_info.file_name

    df = pd.read_csv(
        filepath,
        sep = ';',
        engine = 'python'
        )

    dataframes[ artifact.name ] = df


In [21]:
print("Loaded dataframes:")
last_key = ''
for key in dataframes.keys():
    last_key = key
    print(key)

dataframes[last_key].head()

Loaded dataframes:
emploi_ds_rp_navettes_princ_metadata
emploi_deplacement_domicile_travail_ds_rp_navettes_princ_data


Unnamed: 0,AGE,EMPSTA_ENQ,FREQ,GEO,GEO_OBJECT,RP_MEASURE,TRANS,WORK_AREA,TIME_PERIOD,OBS_VALUE
0,Y_GE15,1,A,73087,COM,POP,_T,21,2015,1803.446354
1,Y_GE15,1,A,72237,COM,POP,_T,21,2015,202.962271
2,Y_GE15,1,A,73012,COM,POP,_T,23,2015,2.0
3,Y_GE15,1,A,73006,COM,POP,_T,22,2015,25.980651
4,Y_GE15,1,A,73220,COM,POP,_T,21,2015,108.939577


In [22]:
from dotenv import dotenv_values
import sqlalchemy
from sqlalchemy import text

# prepare db client
vals = dotenv_values()

conn_str = "postgresql://{}:{}@{}:{}/{}".format(
    vals["PG_DB_USER"],
    vals["PG_DB_PWD"],
    vals["PG_DB_HOST"],
    vals["PG_DB_PORT"],
    vals["PG_DB_NAME"]
)

dbengine = sqlalchemy.create_engine(conn_str)

In [23]:
# insert all to bronze
# make the final table name lowercase to avoid issues in Postgre

for name, dataframe in dataframes.items():

    query_str = f"DROP TABLE IF EXISTS bronze.{name} CASCADE"

    # dropping existing table with cascade
    with dbengine.connect() as con:
        print(f"Dropping if exists: {name}")
        result = con.execute(text(query_str))
        con.commit()

    print(f"Inserting DataFrame {name}")
    dataframe.to_sql(
        name = name,
        con = dbengine,
        schema = 'bronze',
        index = True,
        if_exists = 'replace'
    )

Dropping if exists: emploi_ds_rp_navettes_princ_metadata
Inserting DataFrame emploi_ds_rp_navettes_princ_metadata
Dropping if exists: emploi_deplacement_domicile_travail_ds_rp_navettes_princ_data
Inserting DataFrame emploi_deplacement_domicile_travail_ds_rp_navettes_princ_data
