## chargement des librairies

In [1]:
import requests
import pandas as pd
import numpy as np
import time
import pyarrow as pa
import pyarrow.parquet as pq
from pyproj import Transformer
import os
from dotenv import load_dotenv

## Récupération des données brutes via l'API sirene

In [2]:
import os
from dotenv import load_dotenv

# Charger les variables d'environnement
load_dotenv()

# Récupérer la clé
API_KEY = os.getenv("SIRENE_API_KEY")

if not API_KEY:
    raise ValueError("❌ SIRENE_API_KEY non trouvée dans .env")

In [3]:
#Header avec token d'identification
headers = {
    "X-INSEE-Api-Key-Integration": API_KEY,
    "Accept": "application/json"
}

In [4]:
# Liste des codes BTP
codes_btp = [
    '43.99C', '43.99D', '41.20A', '41.20B', '42.11Z', '42.12Z', '42.13A', '42.13B',
    '42.21Z', '42.22Z', '42.91Z', '42.99Z', '43.11Z', '43.12A', '43.12B', '43.13Z',
    '43.21A', '43.21B', '43.22A', '43.22B', '43.29A', '43.29B', '43.31Z', '43.32A',
    '43.32B', '43.32C', '43.33Z', '43.34Z', '43.39Z', '43.91A', '43.91B', '43.99A',
    '43.99B', '41.10A', '41.10B', '41.10C', '41.10D', '81.30Z', '74.90A'
]
ape="43.22A"

In [5]:
def get_all_entreprises_btp(code_ape, headers):
    all_entreprises = []
    curseur = "*"
    nombre = 1000

    while True:
        url = f"https://api.insee.fr/api-sirene/3.11/siret?q=activitePrincipaleUniteLegale:{code_ape}&nombre={nombre}&curseur={curseur}"
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            data = response.json()
            entreprises_batch = data.get("etablissements", [])
            all_entreprises.extend(entreprises_batch)
            print(f"Récupéré {len(entreprises_batch)} entreprises. Total : {len(all_entreprises)}")

            header = data.get("header", {})
            curseur_suivant = header.get("curseurSuivant")

            # ✅ Condition d'arrêt correcte
            if curseur_suivant == header.get("curseur"):
                print("Fin des résultats : curseurSuivant == curseur.")
                break

            curseur = curseur_suivant
            time.sleep(1)

        else:
            print(f"Erreur {response.status_code} : {response.text}")
            if response.status_code == 429:
                print("Limite de requêtes atteinte. Attente de 5 secondes...")
                time.sleep(5)
            else:
                break

    return all_entreprises

In [None]:
entreprises = get_all_entreprises_btp(code_ape=ape, headers=headers)

Récupéré 1000 entreprises. Total : 1000
Récupéré 1000 entreprises. Total : 2000
Récupéré 1000 entreprises. Total : 3000
Récupéré 1000 entreprises. Total : 4000
Récupéré 1000 entreprises. Total : 5000
Récupéré 1000 entreprises. Total : 6000
Récupéré 1000 entreprises. Total : 7000
Récupéré 1000 entreprises. Total : 8000
Récupéré 1000 entreprises. Total : 9000
Récupéré 1000 entreprises. Total : 10000
Récupéré 1000 entreprises. Total : 11000


In [None]:
# Transformation des données json en format structuré
df = pd.json_normalize(entreprises)
df.head(5)

In [None]:
# enregistrement des données brutes au format parquet
df.to_parquet(f"raw_entreprises_{ape}.parquet", engine="pyarrow")

## Nettoyage des données

In [None]:
# Si besoin de charger le fichier de données brutes 
df = pd.read_parquet(f"raw_entreprises_{ape}.parquet")

In [None]:
#Suppression des colonnes vides
df2=df.dropna(axis=1, how='all').copy()
df2.info()

In [None]:
# ajout de la colonne de catégorie légal de niveau 2 en gardant les 2 premiers chiffres de la catégorie légale
df2.loc[:, "uniteLegale.categorieJuridiqueUniteLegaleNiv2"] = (
    df2["uniteLegale.categorieJuridiqueUniteLegale"].str[:2]
)
df2["uniteLegale.categorieJuridiqueUniteLegaleNiv2"].value_counts()

In [None]:
#fonction permettant de récupérer la période la plus récente de la colonne periodesEtablissement
def extraire_derniere_periode(periodes):
    # 1) Normaliser le type : ndarray -> list
    if isinstance(periodes, np.ndarray):
        periodes = periodes.tolist()

    # 2) Gérer les autres cas foireux
    if periodes is None:
        return None

    # 3) On veut une liste de dicts
    if not isinstance(periodes, list) or len(periodes) == 0:
        return None

    # (optionnel mais utile) : filtrer ce qui n'est pas un dict
    periodes = [p for p in periodes if isinstance(p, dict)]
    if not periodes:
        return None

    # 4) Tri par dateDebut décroissant
    periodes_sorted = sorted(
        periodes,
        key=lambda x: x.get("dateDebut") or "0000-00-00",
        reverse=True
    )
    return periodes_sorted[0]

In [None]:
# Récupération dernière période
df2.loc[:, "periode_derniere"] = df2["periodesEtablissement"].apply(extraire_derniere_periode)
# Eclatement dernière période
df2 = df2.join(
    pd.json_normalize(df2["periode_derniere"]).add_prefix("periode.")
)
df2.info()

In [None]:
# Colonnes utiles à garder
cols_utiles = [
    #Etablissement
    "siret",
    "statutDiffusionEtablissement",
    "dateCreationEtablissement",
    "trancheEffectifsEtablissement",
    "anneeEffectifsEtablissement",
    "activitePrincipaleRegistreMetiersEtablissement",
    "periode.activitePrincipaleEtablissement",
    "etablissementSiege",
    "periode.etatAdministratifEtablissement",
    
    #Unité légale
    "siren",
    "uniteLegale.statutDiffusionUniteLegale",
    "uniteLegale.etatAdministratifUniteLegale",
    "uniteLegale.dateCreationUniteLegale",
    "uniteLegale.categorieJuridiqueUniteLegale",
    "uniteLegale.categorieJuridiqueUniteLegaleNiv2",
    "uniteLegale.denominationUniteLegale",
    "uniteLegale.sigleUniteLegale",
    "uniteLegale.denominationUsuelle1UniteLegale",
    "uniteLegale.activitePrincipaleUniteLegale",
    "uniteLegale.categorieEntreprise",
    "uniteLegale.trancheEffectifsUniteLegale",
    "uniteLegale.anneeEffectifsUniteLegale",

    #Localisation
    "adresseEtablissement.numeroVoieEtablissement",
    "adresseEtablissement.typeVoieEtablissement",
    "adresseEtablissement.libelleVoieEtablissement",
    "adresseEtablissement.codePostalEtablissement",
    "adresseEtablissement.libelleCommuneEtablissement",
    "adresseEtablissement.codeCommuneEtablissement",
    "adresseEtablissement.coordonneeLambertAbscisseEtablissement",
    "adresseEtablissement.coordonneeLambertOrdonneeEtablissement",
    "adresseEtablissement.libellePaysEtrangerEtablissement"
]

In [None]:
# Garder que les colonnes utiles
df3=df2[cols_utiles].copy()
df3.info()

In [None]:
#Garder que les établissements actifs
df4 = df3[df3["periode.etatAdministratifEtablissement"]=="A"].copy()
df4.info()

In [None]:
#Nettoyage dates
df4["dateCreationEtablissement"] = pd.to_datetime(df4["dateCreationEtablissement"], errors='coerce') 
df4["uniteLegale.dateCreationUniteLegale"] = pd.to_datetime(df4["uniteLegale.dateCreationUniteLegale"], errors='coerce')

df4.info()

In [None]:
# Nombre d'établissement actif / unité légale
# 1) Compter le nombre d'établissements par siren
df_count = df4.groupby("siren").size().reset_index(name="nb_etablissements")

# 2) Fusionner dans ton df4
df5 = df4.merge(df_count, on="siren", how="left")

df5

In [None]:
# enregistrement des données nettoyées au format parquet
df5.to_parquet(f"actives_entreprises_{ape}.parquet", engine="pyarrow")

## Transformation localisation

In [None]:
# Charger si besoin
df_utile = pd.read_parquet(f"actives_entreprises_{ape}.parquet")
df_utile.info()

In [None]:
#Transformation des ND en NaN et le reste en float
for col in [
    "adresseEtablissement.coordonneeLambertAbscisseEtablissement",
    "adresseEtablissement.coordonneeLambertOrdonneeEtablissement",
]:
    df_utile[col] = pd.to_numeric(df_utile[col], errors="coerce")

In [None]:
# Transformation des coordonnées Lambert en WGS84 accepté par Power BI, BigQuery
transformer = Transformer.from_crs(2154, 4326, always_xy=True)

mask = df_utile["adresseEtablissement.coordonneeLambertAbscisseEtablissement"].notna() & \
       df_utile["adresseEtablissement.coordonneeLambertOrdonneeEtablissement"].notna()

x = df_utile.loc[mask, "adresseEtablissement.coordonneeLambertAbscisseEtablissement"].astype(float)
y = df_utile.loc[mask, "adresseEtablissement.coordonneeLambertOrdonneeEtablissement"].astype(float)

df_utile.loc[mask, "longitude"], df_utile.loc[mask, "latitude"] = transformer.transform(x.values, y.values)
df_utile.info()

In [None]:
#Vérification
df_utile[["longitude", "latitude"]].notna().mean()

In [None]:
# Vérifier le nombre de coordonnées converties
print(f"Nombre de coordonnées converties : {mask.sum()} / {len(df_utile)}")

# Afficher un échantillon des résultats
print(df_utile[['latitude', 'longitude']].sample(5))

# Vérifier les valeurs aberrantes (ex : latitude > 90 ou < -90)
print("Valeurs aberrantes :")
print(df_utile[(df_utile['latitude'].abs() > 90) | (df_utile['longitude'].abs() > 180)])

In [None]:
#Fonction pour extraire le département du code commune INSEE
def extract_dept_from_code_commune(code_raw):
    if code_raw is None:
        return None
    code = str(code_raw).strip()
    if code == "" or code == "[ND]":
        return None
    
    # Corse : commence par 2A ou 2B
    if code.upper().startswith("2A"):
        return "2A"
    if code.upper().startswith("2B"):
        return "2B"
    
    # DROM-COM : commence par 97 ou 98 → 3 premiers caractères
    if code.startswith("97") or code.startswith("98"):
        return code[:3]
    
    # Métropole standard : 2 premiers caractères
    return code[:2]

In [None]:
df_utile['dep'] = df_utile['adresseEtablissement.codeCommuneEtablissement'].apply(extract_dept_from_code_commune)
df_utile['dept_anomaly'] = df_utile['dep'].isna()
df_utile['dep'].value_counts(dropna=False)

## Enrichissement des données

In [None]:
#Ajout de l'ancienneté
date_ref = pd.Timestamp.today().normalize()

df_utile["anciennete_ul_annees"] = (
    (date_ref - df_utile["uniteLegale.dateCreationUniteLegale"]).dt.days // 365
)

df_utile["anciennete_etab_annees"] = (
    (date_ref - df_utile["dateCreationEtablissement"]).dt.days // 365
)
df_utile[["siret","siren","anciennete_ul_annees","anciennete_etab_annees"]].head()

In [None]:
#ajout de la catégorie anncienneté
def categorize_anciennete(annees):
    if annees is None or pd.isna(annees):
        return None
    elif annees <= 5:
        return '0-5 ans'
    elif annees <= 10:
        return '6-10 ans'
    elif annees <= 20:
        return '11-20 ans'
    else:
        return '20+ ans'

df_utile['tranche_anciennete'] = df_utile['anciennete_etab_annees'].apply(categorize_anciennete)

In [None]:
#Ajout de la colonne catégorie établissement
def categorize_etab(tranche):
    if tranche in ['NN', '00']:
        return 'Non employeur'
    elif tranche in ['01', '02', '03']:
        return 'TPE (1-9)'
    elif tranche in ['11', '12', '21', '22', '31']:
        return 'PME (10-249)'
    elif tranche in ['32', '41', '42', '51', '52', '53']:
        return 'Grande structure (250+)'
    else:
        return None

df_utile['categorie_entreprise_etab'] = df_utile['trancheEffectifsEtablissement'].apply(categorize_etab)

In [None]:
# ajout de la division à partir du code APE

# Division APE (2 premiers caractères)
df_utile['division_ape_etab'] = df_utile['periode.activitePrincipaleEtablissement'].str[:2]

# Type activité (BTP vs Autres)
df_utile['type_activite_etab'] = df_utile['periode.activitePrincipaleEtablissement'].apply(
    lambda x: 'BTP' if x in codes_btp else 'Autres'
)

## Finalisation dataset au format parquet

In [None]:
# Suppression des colonnes coordonnées lambert
dataset_for_export = df_utile.drop(columns=["adresseEtablissement.coordonneeLambertAbscisseEtablissement","adresseEtablissement.coordonneeLambertOrdonneeEtablissement"])

In [None]:
dataset_for_export.info()

In [None]:
#Renommer les colonnnes
rename_cols = {
    # Identifiants
    "siret": "siret",
    "siren": "siren",

    # Établissement
    "statutDiffusionEtablissement": "statut_diffusion_etab",
    "dateCreationEtablissement": "date_creation_etab",
    "trancheEffectifsEtablissement": "tranche_effectifs_etab",
    "anneeEffectifsEtablissement": "annee_effectifs_etab",
    "activitePrincipaleRegistreMetiersEtablissement": "aprm_etab",
    "periode.activitePrincipaleEtablissement": "ape_etab_dern_periode",
    "etablissementSiege": "est_siege",
    "periode.etatAdministratifEtablissement": "etat_admin_etab_dern_periode",

    # Unité légale
    "uniteLegale.statutDiffusionUniteLegale": "statut_diffusion_ul",
    "uniteLegale.etatAdministratifUniteLegale": "etat_admin_ul",
    "uniteLegale.dateCreationUniteLegale": "date_creation_ul",
    "uniteLegale.categorieJuridiqueUniteLegale": "categorie_juridique_ul",
    "uniteLegale.denominationUniteLegale": "denomination_ul",
    "uniteLegale.sigleUniteLegale": "sigle_ul",
    "uniteLegale.denominationUsuelle1UniteLegale": "denomination_usuelle_ul",
    "uniteLegale.activitePrincipaleUniteLegale": "ape_ul",
    "uniteLegale.categorieEntreprise": "categorie_entreprise_ul",
    "uniteLegale.trancheEffectifsUniteLegale": "tranche_effectifs_ul",
    "uniteLegale.anneeEffectifsUniteLegale": "annee_effectifs_ul",

    # Adresse / localisation
    "adresseEtablissement.numeroVoieEtablissement": "num_voie",
    "adresseEtablissement.typeVoieEtablissement": "type_voie",
    "adresseEtablissement.libelleVoieEtablissement": "libelle_voie",
    "adresseEtablissement.codePostalEtablissement": "code_postal",
    "adresseEtablissement.libelleCommuneEtablissement": "commune",
    "adresseEtablissement.codeCommuneEtablissement": "code_commune",
    "adresseEtablissement.coordonneeLambertAbscisseEtablissement": "lambert_x",
    "adresseEtablissement.coordonneeLambertOrdonneeEtablissement": "lambert_y",
    "adresseEtablissement.libellePaysEtrangerEtablissement": "pays_etranger",

    # Enrichissements
    "nb_etablissements": "nb_etablissements_siren",
    "uniteLegale.categorieJuridiqueUniteLegaleNiv2": "categorie_juridique_ul_niv2",
    "anciennete_etab_annees" : "anciennete_etab_annees",
    "anciennete_ul_annees": "anciennete_ul_annees",
    "longitude": "longitude",
    "latitude": "latitude",
    "dep": "dep",
    "dept_anomaly": "dept_anomaly",
    "categorie_entreprise_etab": "categorie_entreprise_etab",
    "division_ape_etab": "division_ape_etab",
    "type_activite_etab": "type_activite_etab",
    "tranche_anciennete" : "tranche_anciennete"
}

In [None]:
dataset_for_export= dataset_for_export.rename(columns=rename_cols)
dataset_for_export.info()

In [None]:
dataset_for_export.to_parquet(
    f"etablissements_{ape}_clean.parquet",
    engine="pyarrow",
    index=False
)