# Preparation des tables pour la base de donnees

Ce notebook prepare les donnees locales (CSV + JSON) afin de produire des tables nettoyees, pretes a etre chargees dans un entrepot de donnees ou une base relationnelle.


## Plan de preparation

1. Charger les fichiers bruts (CSV et JSON).
2. Harmoniser les colonnes et les types de donnees.
3. Enrichir avec des informations derivees (codes geo et codes postaux).
4. Consolider les jeux nettoyes et optionnellement les exporter en Parquet.


In [1]:
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Dict, List, Optional

import pandas as pd

try:
    from IPython.display import display
except ImportError:  # pragma: no cover
    display = print


In [2]:
PROJECT_ROOT = Path.cwd()
if not (PROJECT_ROOT / 'uploads').exists():
    PROJECT_ROOT = PROJECT_ROOT.parent
if not (PROJECT_ROOT / 'uploads').exists():
    PROJECT_ROOT = PROJECT_ROOT.parent

DATA_DIR = PROJECT_ROOT / 'uploads' / 'landing' / 'csv'
COMMUNES_PATH = PROJECT_ROOT / 'data' / 'communes.json'
OUTPUT_DIR = PROJECT_ROOT / 'data' / 'prepared' / 'silver'

assert DATA_DIR.exists(), f'Dossier CSV introuvable: {DATA_DIR}'
assert COMMUNES_PATH.exists(), f'Fichier JSON introuvable: {COMMUNES_PATH}'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print(f'Project root: {PROJECT_ROOT}')
print(f'CSV source dir: {DATA_DIR}')
print(f'Communes JSON: {COMMUNES_PATH}')


Project root: d:\data eng\Projet-Data-ENG
CSV source dir: d:\data eng\Projet-Data-ENG\uploads\landing\csv
Communes JSON: d:\data eng\Projet-Data-ENG\data\communes.json


In [3]:
@dataclass
class TableSpec:
    name: str
    source_path: Path
    rename: Dict[str, str]
    description: str
    dtype_overrides: Dict[str, str] = field(default_factory=dict)
    numeric_columns: List[str] = field(default_factory=list)
    extra_transform: Optional[Callable[[pd.DataFrame], pd.DataFrame]] = None

def normalize_name(name: str) -> str:
    clean = name.strip().lower()
    clean = clean.replace('%', 'pct')
    clean = re.sub(r'[\s/]+', '_', clean)
    clean = re.sub(r'[^0-9a-z_]+', '_', clean)
    clean = re.sub(r'_+', '_', clean)
    return clean.strip('_')

def enrich_geo_columns(df: pd.DataFrame) -> pd.DataFrame:
    if 'geo_id' not in df.columns:
        return df
    geo_parts = df['geo_id'].astype(str).str.extract(r'(?P<geo_reference_year>\d+)-(?P<geo_level_code>[A-Z]+)-(?P<geo_code>.+)')
    df = pd.concat([df, geo_parts], axis=1)
    if 'geo_reference_year' in df.columns:
        df['geo_reference_year'] = pd.to_numeric(df['geo_reference_year'], errors='coerce').astype('Int64')
    if 'geo_code' in df.columns:
        df['geo_code'] = df['geo_code'].str.zfill(2)
    return df

def load_table(spec: TableSpec) -> pd.DataFrame:
    df = pd.read_csv(spec.source_path)
    normalized = {col: normalize_name(col) for col in df.columns}
    df = df.rename(columns=normalized)
    df = df.rename(columns=spec.rename)
    df.columns = [normalize_name(col) for col in df.columns]
    df = enrich_geo_columns(df)
    if 'year' in df.columns:
        df['year'] = pd.to_numeric(df['year'], errors='coerce').astype('Int64')
    if 'departement_code' in df.columns:
        df['departement_code'] = df['departement_code'].astype(str).str.zfill(2)
    for col in spec.numeric_columns:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    for col, dtype in spec.dtype_overrides.items():
        if col in df.columns:
            df[col] = df[col].astype(dtype)
    if spec.extra_transform:
        df = spec.extra_transform(df)
    df = df.drop_duplicates().reset_index(drop=True)
    df['source_file'] = spec.source_path.name
    df['dataset'] = spec.name
    return df


In [4]:
TABLE_SPECS = [
    TableSpec(
        name='stg_population',
        source_path=DATA_DIR / 'population_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'pcs': 'pcs_code',
            'sex': 'sex',
            'time_period': 'year',
            'rp_measure': 'rp_measure',
            'age': 'age_group',
            'obs_value': 'population_value',
            'departement': 'departement_code',
        },
        numeric_columns=['population_value'],
        description='Population par PCS, sexe et tranche d age.',
    ),
    TableSpec(
        name='stg_creation_entreprises',
        source_path=DATA_DIR / 'CREATION_ENT_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'freq': 'frequency',
            'side_measure': 'side_measure',
            'time_period': 'year',
            'activity': 'activity_code',
            'legal_form': 'legal_form',
            'obs_value': 'creation_count',
            'departement': 'departement_code',
        },
        numeric_columns=['creation_count'],
        description='Creations d entreprises par activite et forme juridique.',
    ),
    TableSpec(
        name='stg_creation_entrepreneurs_individuels',
        source_path=DATA_DIR / 'CREA_EI_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'sex': 'sex',
            'freq': 'frequency',
            'side_measure': 'side_measure',
            'time_period': 'year',
            'activity': 'activity_code',
            'legal_form': 'legal_form',
            'age': 'age_group',
            'obs_value': 'creation_count',
            'departement': 'departement_code',
        },
        numeric_columns=['creation_count'],
        description='Creations d entrepreneurs individuels selon le sexe, l age et l activite.',
    ),
    TableSpec(
        name='stg_deces',
        source_path=DATA_DIR / 'DECES_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'ec_measure': 'event_code',
            'freq': 'frequency',
            'time_period': 'year',
            'obs_value': 'death_count',
            'departement': 'departement_code',
        },
        numeric_columns=['death_count'],
        description='Nombre de deces annuels.',
    ),
    TableSpec(
        name='stg_ds_filosofi',
        source_path=DATA_DIR / 'DS_FILOSOFI_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'time_period': 'year',
            'unit_measure': 'unit_measure',
            'filosofi_measure': 'indicator_code',
            'obs_value': 'indicator_value',
            'departement': 'departement_code',
        },
        numeric_columns=['indicator_value'],
        description='Indicateurs DS FILOSOFI.',
    ),
    TableSpec(
        name='stg_emploi_chomage',
        source_path=DATA_DIR / 'EMPLOI_CHOMAGE_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'pcs': 'pcs_code',
            'freq': 'frequency',
            'empsta_enq': 'employment_status',
            'time_period': 'year',
            'rp_measure': 'rp_measure',
            'age': 'age_group',
            'obs_value': 'population_value',
            'departement': 'departement_code',
        },
        numeric_columns=['population_value'],
        description='Population employee/chomeuse selon PCS et tranche d age.',
    ),
    TableSpec(
        name='stg_fecondite',
        source_path=DATA_DIR / 'FECONDITE_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'nch': 'child_count_band',
            'time_period': 'year',
            'rp_measure': 'rp_measure',
            'tfn': 'fertility_indicator',
            'obs_value': 'measure_value',
            'departement': 'departement_code',
        },
        numeric_columns=['measure_value'],
        description='Mesures de fecondite des menages.',
    ),
    TableSpec(
        name='stg_filosofi_age_tp_nivvie',
        source_path=DATA_DIR / 'FILOSOFI_AGE_TP_NIVVIE_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'age_rf': 'age_group',
            'time_period': 'year',
            'unit_measure': 'unit_measure',
            'filosofi_measure': 'indicator_code',
            'obs_value': 'indicator_value',
            'departement': 'departement_code',
        },
        numeric_columns=['indicator_value'],
        description='Indicateurs FILOSOFI par tranche d age.',
    ),
    TableSpec(
        name='stg_logement',
        source_path=DATA_DIR / 'Logement_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'overocc': 'overocc_code',
            'freq': 'frequency',
            'time_period': 'year',
            'rp_measure': 'rp_measure',
            'ocs': 'occupancy_code',
            'obs_value': 'dwelling_value',
            'departement': 'departement_code',
        },
        numeric_columns=['dwelling_value'],
        description='Logement: occupation et parc residentiel.',
    ),
    TableSpec(
        name='stg_menage',
        source_path=DATA_DIR / 'Menage_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'pcs': 'pcs_code',
            'freq': 'frequency',
            'time_period': 'year',
            'rp_measure': 'rp_measure',
            'prefph': 'household_composition',
            'tph': 'household_type',
            'ocs': 'occupancy_code',
            'obs_value': 'measure_value',
            'departement': 'departement_code',
        },
        numeric_columns=['measure_value'],
        description='Structure des menages.',
    ),
    TableSpec(
        name='stg_naissances',
        source_path=DATA_DIR / 'naissances_hauts_de_france.csv',
        rename={
            'geo': 'geo_id',
            'ec_measure': 'event_code',
            'freq': 'frequency',
            'time_period': 'year',
            'obs_value': 'birth_count',
            'departement': 'departement_code',
        },
        numeric_columns=['birth_count'],
        description='Nombre de naissances annuelles.',
    ),
]
len(TABLE_SPECS)


11

In [6]:
tables: Dict[str, pd.DataFrame] = {}
summary_rows = []
for spec in TABLE_SPECS:
    df = load_table(spec)
    tables[spec.name] = df
    summary_rows.append({
        'table': spec.name,
        'rows': len(df),
        'columns': len(df.columns),
        'source': spec.source_path.name,
        'description': spec.description,
    })
    print(f"{spec.name}: {df.shape[0]} lignes, {df.shape[1]} colonnes")
summary_df = pd.DataFrame(summary_rows).sort_values('table').reset_index(drop=True)
display(summary_df)


stg_population: 1578 lignes, 13 colonnes
stg_creation_entreprises: 10756 lignes, 13 colonnes
stg_creation_entrepreneurs_individuels: 1560 lignes, 15 colonnes
stg_deces: 50 lignes, 11 colonnes
stg_ds_filosofi: 100 lignes, 11 colonnes
stg_emploi_chomage: 345 lignes, 14 colonnes
stg_fecondite: 320 lignes, 12 colonnes
stg_filosofi_age_tp_nivvie: 70 lignes, 12 colonnes
stg_logement: 45 lignes, 13 colonnes
stg_menage: 510 lignes, 15 colonnes
stg_naissances: 50 lignes, 11 colonnes


Unnamed: 0,table,rows,columns,source,description
0,stg_creation_entrepreneurs_individuels,1560,15,CREA_EI_hauts_de_france.csv,Creations d entrepreneurs individuels selon le...
1,stg_creation_entreprises,10756,13,CREATION_ENT_hauts_de_france.csv,Creations d entreprises par activite et forme ...
2,stg_deces,50,11,DECES_hauts_de_france.csv,Nombre de deces annuels.
3,stg_ds_filosofi,100,11,DS_FILOSOFI_hauts_de_france.csv,Indicateurs DS FILOSOFI.
4,stg_emploi_chomage,345,14,EMPLOI_CHOMAGE_hauts_de_france.csv,Population employee/chomeuse selon PCS et tran...
5,stg_fecondite,320,12,FECONDITE_hauts_de_france.csv,Mesures de fecondite des menages.
6,stg_filosofi_age_tp_nivvie,70,12,FILOSOFI_AGE_TP_NIVVIE_hauts_de_france.csv,Indicateurs FILOSOFI par tranche d age.
7,stg_logement,45,13,Logement_hauts_de_france.csv,Logement: occupation et parc residentiel.
8,stg_menage,510,15,Menage_hauts_de_france.csv,Structure des menages.
9,stg_naissances,50,11,naissances_hauts_de_france.csv,Nombre de naissances annuelles.


In [7]:
tables['stg_population'].head()


Unnamed: 0,geo_id,pcs_code,sex,year,rp_measure,age_group,population_value,departement_code,geo_reference_year,geo_level_code,geo_code,source_file,dataset
0,2024-DEP-02,5,_T,2021,POP,Y_GE15,69912.03883,2,2024,DEP,2,population_hauts_de_france.csv,stg_population
1,2024-DEP-02,4,M,2010,POP,Y25T54,19828.358263,2,2024,DEP,2,population_hauts_de_france.csv,stg_population
2,2024-DEP-02,7,M,2015,POP,Y_GE15,56012.778979,2,2024,DEP,2,population_hauts_de_france.csv,stg_population
3,2024-DEP-02,6,F,2010,POP,Y_GE55,1830.64162,2,2024,DEP,2,population_hauts_de_france.csv,stg_population
4,2024-DEP-02,7,M,2010,POP,Y_GE55,52982.579024,2,2024,DEP,2,population_hauts_de_france.csv,stg_population


In [9]:
with COMMUNES_PATH.open(encoding='utf-8') as f:
    communes_payload = json.load(f)
communes_full = pd.json_normalize(communes_payload.get('communes', []))
communes_full = communes_full.rename(columns={
    'nom': 'commune_nom',
    'code': 'commune_code',
    'codesPostaux': 'codes_postaux',
    'codeDepartement': 'departement_code',
    'departement_nom': 'departement_nom',
    'codeRegion': 'region_code',
    'region_nom': 'region_nom',
    'population': 'population',
    'surface': 'surface_km2',
    'longitude': 'longitude',
    'latitude': 'latitude',
    'contour_geojson': 'contour_geojson',
})
communes_full.columns = [normalize_name(col) for col in communes_full.columns]
communes_full['departement_code'] = communes_full['departement_code'].astype(str).str.zfill(2)
communes_full['region_code'] = communes_full['region_code'].astype(str).str.zfill(2)
communes_full['population'] = pd.to_numeric(communes_full['population'], errors='coerce')
communes_full['surface_km2'] = pd.to_numeric(communes_full['surface_km2'], errors='coerce')
communes_full['longitude'] = pd.to_numeric(communes_full['longitude'], errors='coerce')
communes_full['latitude'] = pd.to_numeric(communes_full['latitude'], errors='coerce')
communes_full = communes_full.drop_duplicates(subset=['commune_code']).reset_index(drop=True)
communes_full['codes_postaux'] = communes_full['codes_postaux'].apply(lambda values: ','.join(values) if isinstance(values, list) else values)
dim_commune_geojson = communes_full[['commune_code', 'contour_geojson']].dropna().reset_index(drop=True)
dim_commune_geojson['contour_geojson'] = dim_commune_geojson['contour_geojson'].apply(lambda x: json.dumps(x) if isinstance(x, (dict, list)) else x)
communes_df = communes_full.drop(columns=['contour_geojson'])
tables['dim_commune'] = communes_df
tables['dim_commune_geojson'] = dim_commune_geojson
print(f"dim_commune: {communes_df.shape[0]} lignes, {communes_df.shape[1]} colonnes")
display(communes_df.head())


dim_commune: 3782 lignes, 13 colonnes


Unnamed: 0,commune_nom,commune_code,codes_postaux,departement_code,departement_nom,region_code,region_nom,population,surface_km2,longitude,latitude,contour_geojson_type,contour_geojson_coordinates
0,Abbécourt,2001,[02300],2,Aisne,32,Hauts-de-France,513,598.24,3.1824,49.5997,Polygon,"[[[3.191819, 49.590074], [3.190745, 49.589709]..."
1,Achery,2002,[02800],2,Aisne,32,Hauts-de-France,586,694.83,3.3974,49.6935,Polygon,"[[[3.421473, 49.705236], [3.421599, 49.704729]..."
2,Acy,2003,[02200],2,Aisne,32,Hauts-de-France,1013,1154.77,3.4188,49.3476,Polygon,"[[[3.429151, 49.321089], [3.427326, 49.321839]..."
3,Agnicourt-et-Séchelles,2004,[02340],2,Aisne,32,Hauts-de-France,188,1071.98,3.9637,49.7172,Polygon,"[[[3.942471, 49.692239], [3.937267, 49.694734]..."
4,Aguilcourt,2005,[02190],2,Aisne,32,Hauts-de-France,405,1061.11,3.9663,49.3953,Polygon,"[[[3.962728, 49.413212], [3.962942, 49.412964]..."


In [10]:
postal_df = (
    communes_df[['commune_code', 'codes_postaux']]
    .explode('codes_postaux')
    .dropna()
    .rename(columns={'codes_postaux': 'code_postal'})
    .drop_duplicates()
    .reset_index(drop=True)
)
postal_df['code_postal'] = postal_df['code_postal'].astype(str)
tables['bridge_commune_code_postal'] = postal_df
print(f"bridge_commune_code_postal: {postal_df.shape[0]} lignes")
display(postal_df.head())


bridge_commune_code_postal: 3801 lignes


Unnamed: 0,commune_code,code_postal
0,2001,2300
1,2002,2800
2,2003,2200
3,2004,2340
4,2005,2190


In [11]:
final_summary = []
for name, df in tables.items():
    final_summary.append({'table': name, 'rows': len(df), 'columns': len(df.columns)})
final_summary_df = pd.DataFrame(final_summary).sort_values('table').reset_index(drop=True)
display(final_summary_df)


AttributeError: 'Series' object has no attribute 'columns'

In [None]:
SAVE_TO_PARQUET = False  # Ajuster a True pour ecrire les fichiers prepares
if SAVE_TO_PARQUET:
    for name, df in tables.items():
        output_path = OUTPUT_DIR / f'{name}.parquet'
        df.to_parquet(output_path, index=False)
        print(f'Ecriture: {output_path}')
else:
    print('Export Parquet desactive. Fixer SAVE_TO_PARQUET = True pour ecrire les tables.')


## Chargement vers Azure SQL Database

Configure les variables d'environnement `AZURE_SQL_SERVER`, `AZURE_SQL_DATABASE`, `AZURE_SQL_USERNAME`, `AZURE_SQL_PASSWORD` (et optionnellement `AZURE_SQL_SCHEMA`, `AZURE_SQL_DRIVER`, `AZURE_SQL_PORT`) pour activer l'export. Les tables sont ecrasees (`if_exists="replace"`).


In [None]:
import os
import sqlalchemy as sa

SQL_CONFIG = {
    'server': os.getenv('AZURE_SQL_SERVER'),
    'database': os.getenv('AZURE_SQL_DATABASE', 'projet_data_eng'),
    'username': os.getenv('AZURE_SQL_USERNAME'),
    'password': os.getenv('AZURE_SQL_PASSWORD'),
    'schema': os.getenv('AZURE_SQL_SCHEMA', 'dbo'),
    'driver': os.getenv('AZURE_SQL_DRIVER', 'ODBC Driver 18 for SQL Server'),
    'port': os.getenv('AZURE_SQL_PORT', '1433'),
}
missing = [key for key in ('server', 'username', 'password') if not SQL_CONFIG[key]]
if missing:
    raise RuntimeError(f"Variables manquantes pour la connexion SQL: {missing}")

driver_token = SQL_CONFIG['driver'].replace(' ', '+')
connection_uri = (
    f"mssql+pyodbc://{SQL_CONFIG['username']}:{SQL_CONFIG['password']}"
    f"@{SQL_CONFIG['server']}:{SQL_CONFIG['port']}/{SQL_CONFIG['database']}?driver={driver_token}"
)
engine = sa.create_engine(connection_uri, fast_executemany=True)
print(f"Connexion initialisee vers {SQL_CONFIG['server']} (base {SQL_CONFIG['database']}, schema {SQL_CONFIG['schema']})")


In [None]:
tables_to_export = {name: df for name, df in tables.items() if not df.empty}
if not tables_to_export:
    raise ValueError('Aucune table disponible pour export SQL.')

for table_name, df in tables_to_export.items():
    df.to_sql(
        name=table_name,
        con=engine,
        schema=SQL_CONFIG['schema'],
        if_exists='replace',
        index=False,
        method='multi',
    )
    print(f"Table {table_name} chargee ({len(df)} lignes)")

engine.dispose()
print('Export SQL termine.')
