# COLLECTE ET HARMONISATION DES DONNEES - REPUBLIQUE DU BENIN

## Objectifs
   - **Thématique**: Démographie - Economie - Santé - Education - Géographie
   - **Sources**: INStad - Portails Open Data - Worl Bank API - Web scraping
   - **Couverture**: Tous les départements du bénin
   - **Période**: 2020 - 2025 (Selon la disponibilité des données au niveau des sources)
### Python version: 3.13.7

### Configuration des imports

In [190]:
# =============================
# Imports
# =============================

# Standard library
import os
import re
import json
import time
import zipfile
import warnings
import logging
from io import BytesIO
from pathlib import Path
from datetime import datetime
from typing import Optional
from urllib.parse import urljoin, urlparse
from io import StringIO

# Third-party
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from bs4 import BeautifulSoup

# =============================
# Configurations globales
# =============================

# Warnings
warnings.filterwarnings("ignore", category=UserWarning, module="bs4")

# Logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s"
)

# Pandas
pd.set_option("display.max_rows", 100)  # nb max de lignes affichées
pd.set_option("display.max_columns", None)  # affiche toutes les colonnes
pd.set_option("display.float_format", "{:.2f}".format)  # formatage des floats
pd.set_option("display.expand_frame_repr", False)  # évite retour ligne inutile

# Matplotlib / Seaborn
plt.style.use("seaborn-v0_8-whitegrid")  # style moderne et lisible
plt.rcParams["figure.figsize"] = (10, 6)
plt.rcParams["axes.titlesize"] = 14
plt.rcParams["axes.labelsize"] = 12
sns.set_palette("Set2")

# =============================
# Check initialisation
# =============================

logging.info("Librairies et configurations chargées")
logging.info("Début de la collecte de données: %s", datetime.now().strftime("%d/%m/%Y %H:%M:%S"))


2025-09-20 21:10:24,276 | INFO | Librairies et configurations chargées
2025-09-20 21:10:24,278 | INFO | Début de la collecte de données: 20/09/2025 21:10:24


### Configuration des dossiers

#### Fonction de configurations de dossiers, réutilisable

In [191]:
def init_directory(base_dir: Optional[Path] = None) -> dict:
    """
    Initializes directory structure under the specified base directory.

    Creates a directory structure for storing data, including raw data,
    processed data, and final data. If directories already exist, their
    existence is logged; otherwise, they are created.

    :param base_dir: Optional base directory path. Defaults to the current
        working directory if not provided.
    :type base_dir: Optional[Path]
    :return: A dictionary with the keys 'data', 'raw_data', 'processed_data',
        and 'final_data', where each value is the Path object of the corresponding
        directory.
    :rtype: dict
    """

    if base_dir is None:
        base_dir = Path(".")

    data_dir = base_dir / "data"  # Répertoire ou se trouvera les données
    dirs = {
        "data": data_dir,
        "raw_data": data_dir / "raw_data",  # Répertoire des données brutes
        "processed_data": data_dir / "processed_data",  # Répertoire des données traitées
        "final_data": data_dir / "final_data",  # Répertoire des données finales
    }

    for name, path in dirs.items():
        try:
            if path.exists():
                logging.info(f"Dossier {name} existe déjà.")
            else:
                path.mkdir(parents=True, exist_ok=True)
                logging.info(f"Répertoire {name} prêt à {path}")
        except PermissionError as pe:
            logging.error(f"Erreur de permission pour {name}: {pe}")
        except OSError as oe:
            logging.error(f"Erreur système pour {name}: {oe}")
        except Exception as e:
            logging.error(f"Erreur inconnu pour {name}: {e}")

    return dirs

#### Initialisation des dossiers

In [192]:
DATA_DIR, RAW_DIR, PROCESSED_DIR, FINAL_DIR = init_directory().values()

logging.info("Dossiers chargés avec succès")

2025-09-20 21:10:24,300 | INFO | Dossier data existe déjà.
2025-09-20 21:10:24,301 | INFO | Dossier raw_data existe déjà.
2025-09-20 21:10:24,302 | INFO | Dossier processed_data existe déjà.
2025-09-20 21:10:24,303 | INFO | Dossier final_data existe déjà.
2025-09-20 21:10:24,304 | INFO | Dossiers chargés avec succès


### Traitement des sources

#### URL et sources API externes

In [193]:
# Base URL pour l'API de la Banque Mondiale
API_BASE_URL_WORLD_BANK = "https://api.worldbank.org/v2"

# Base URL pour l'API d'Instad (service local ou national)
API_BASE_URL_INSTAD = "https://instad.bj"

# Base URL pour l'API Overpass (OpenStreetMap) pour requêtes sur les cartes
API_BASE_URL_OVERPASS = "https://overpass-api.de/api/interpreter"

# Liste des URLs CSV provenant de sources externes potentielles
# Ici un exemple : données UN Data via l'API UNSD (2015-2024)
POTENTIAL_API_EXTERNAL_SOURCE_CSV_FILE = [
    "https://data.un.org/ws/rest/data/UNSD,DF_UNData_UNFCC,1.0/all/?startPeriod=2015&endPeriod=2024"
]

#### Configuration supplémentaire

In [194]:
COUNTRY_CODE = "BJ"

DEFAULT_INDICATOR_WORLD_BANK = [
    'SP.POP.TOTL',  # Population totale
    'NY.GDP.MKTP.CD',  # PIB
    'SE.PRM.NENR',  # Scolarisation primaire
    'SH.DYN.MORT',  # Taux de mortalité
    'AG.LND.TOTL.K2',  # Surface totale
    'NY.GDP.PCAP.CD',  # PIB par habitant
    'SL.TLF.TOTL.IN',  # Force de travail
    'SP.DYN.TFRT.IN'  # Taux de fertilité
]

OPEN_STREET_MAP_ADMIN_LEVEL_PAYS = "2"
OPEN_STREET_MAP_ADMIN_LEVEL_DEPARTEMENT = "4"
OPEN_STREET_MAP_ADMIN_LEVEL_COMMUNES = "6"

#### Collecte WORLD BANK API

##### WorldBankAPI

In [195]:
class WorldBankAPI:
    def __init__(self, url: str = API_BASE_URL_WORLD_BANK, country_code: str = COUNTRY_CODE, start_year: int = 2015,
                 end_year: int = 2024, default_per_page: int = 100):
        """
        Initializes the API request session with the given parameters for interacting
        with the World Bank API. Allows setting a specific country, time range, and API
        base URL. Configures default headers for the session to include a `User-Agent`
        string customized for educational research purposes.

        :param url: The base URL of the API to be used for requests.
        :type url: str
        :param country_code: The ISO 3166-1 alpha-3 country code for the target country.
        :type country_code: str
        :param start_year: The starting year for the data range.
        :type start_year: int
        :param end_year: The ending year for the data range.
        :type end_year: int
        """
        self.url = url
        self.country_code = country_code
        self.start_year = start_year
        self.end_year = end_year
        self.per_page = default_per_page

        self.session = requests.Session()

        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Educational Research Purpose)'
        })

    def get_indicators(self, indicators: list = DEFAULT_INDICATOR_WORLD_BANK) -> pd.DataFrame:
        """
        Récupère les données pour une liste d'indicateurs économiques ou financiers via l'API World Bank.
        Les données sont accumulées dans un pandas DataFrame.

        :param indicators: Liste des codes d'indicateurs à récupérer. Si non fournie, une liste par défaut est utilisée.
        :type indicators: list
        :return: DataFrame contenant les données récupérées, avec code et nom de l'indicateur, informations pays,
                 année, valeur, source et date de collecte.
        :rtype: pandas.DataFrame
        """
        donnees = []

        for indicator in indicators:
            logging.info(f"Récupération des données pour l'indicateur {indicator}")

            url = f"{self.url}/country/{self.country_code}/indicator/{indicator}"
            params = {
                'date': f'{self.start_year}:{self.end_year}',
                'format': 'json',
                'per_page': self.per_page
            }

            try:
                response = self.session.get(url, params=params, timeout=10)
                response.raise_for_status()
                data = response.json()

                # Vérifie si des données sont disponibles
                entries = data[1] if len(data) > 1 and data[1] else []
                logging.info(f"{len(entries)} enregistrements récupérés pour {indicator} \n")

                for entry in entries:
                    donnees.append({
                        'indicator_code': entry['indicator']['id'],
                        'indicator_name': entry['indicator']['value'],
                        'country_code': entry['country']['id'],
                        'country_name': entry['country']['value'],
                        'year': entry['date'],
                        'value': entry['value']
                    })

                # Pause pour éviter de saturer l'API
                time.sleep(0.5)

            except requests.exceptions.RequestException as e:
                logging.error(f"Erreur HTTP pour l'indicateur {indicator}: {e}")
            except ValueError as e:
                logging.error(f"Erreur JSON pour l'indicateur {indicator}: {e}")
            except Exception as e:
                logging.error(f"Erreur inattendue pour l'indicateur {indicator}: {e}")

        dataset = pd.DataFrame(donnees)

        if not dataset.empty:
            dataset['year'] = pd.to_numeric(dataset['year'], errors='coerce')
            dataset['source'] = "WORLD BANK API"
            dataset['collection_date'] = datetime.now().date()

        return dataset


##### Implémentation

In [196]:
world_bank = WorldBankAPI()
path_to_save = RAW_DIR / 'world_bank_data.csv'

has_data = False
world_bank_data = None

if not Path(path_to_save).exists():
    world_bank_data = world_bank.get_indicators()
    has_data = len(world_bank_data) > 0
    if has_data:
        logging.info(f"World Bank: {len(world_bank_data)} enrégistrement collectés")
    else:
        logging.info(f"World Bank: Aucun enrégistrement collectés")
else:
    logging.warning("Déjà éffectué")



##### Sauvegarde des données

In [197]:
if has_data and not world_bank_data.empty:
    world_bank_data.to_csv(path_to_save, index=False)

    # Taille du fichier en octets
    size_bytes = path_to_save.stat().st_size

    # Optionnel : convertir en Ko/Mo pour lecture facile
    size_kb = size_bytes / 1024
    size_mb = size_kb / 1024

    logging.info(f"Sauvegarde des données à {path_to_save} ({size_bytes} bytes / {size_kb:.2f} KB / {size_mb:.2f} MB)")
else:
    logging.warning("Déjà éffectué")



#### Collecte INSTAD API - WEB SCRAPING

###### INStadScraper

In [198]:
class INStadScraper:
    def __init__(self, base_url: str = API_BASE_URL_INSTAD):
        self.url = base_url

        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
        })

    def scrape_html_tables(self, urls: list, max_tables: int = 5):
        html_data_tables = []

        if urls:
            for url in urls:
                try:
                    response = self.session.get(url, timeout=10)
                    response.raise_for_status()
                    logging.info(f"Scraping html de l'url {response.url}")

                    soup = BeautifulSoup(response.content, 'html.parser')

                    tables = soup.find_all('table')
                    logging.info(f"{len(tables)} tableaux trouvées sur {url}")

                    for i, table in enumerate(tables[:max_tables]):
                        try:
                            dataset = pd.read_html(StringIO(str(table)))[0]
                            dataset['source_url'] = url
                            dataset['table_index'] = i
                            dataset['collection_date'] = datetime.now().date()
                            html_data_tables.append(dataset)
                            logging.info(f"Tableau {i + 1} récupéré: {dataset.shape} \n")
                        except Exception as e:
                            logging.error(f"Impossible de récupérer le table à l'index {i} sur {url}: {e}")

                    time.sleep(2)
                except requests.exceptions.RequestException as e:
                    logging.error(f"Erreur requête pour {url}: {e}")
                except Exception as e:
                    logging.error(f"Erreur inattendue pour l'url {url}: {e}")

        return html_data_tables

    def scrape_json_data(self, json_urls: list):
        json_data = []

        if json_urls:
            for url in json_urls:
                try:
                    response = self.session.get(url, timeout=10)
                    response.raise_for_status()
                    logging.info(f"Scraping json de l'url {response.url}")

                    data = response.json()

                    dataset = pd.json_normalize(data)
                    dataset['source_url'] = url
                    dataset['collection_date'] = datetime.now().date()
                    json_data.append(dataset)
                    logging.info(f"Json récupérer: {dataset.shape} \n")

                    time.sleep(1)
                except requests.exceptions.RequestException as e:
                    logging.error(f"Erreur HTTP pour {url}: {e}")
                except ValueError as e:
                    logging.error(f"JSON invalide pour {url}: {e}")
                except Exception as e:
                    logging.error(f"Erreur inattendue pour {url}: {e}")

        return json_data

    def scrape_demographic_data(self):

        urls = [
            f"{self.url}/statistiques/indicateurs-recents/43-population",  # indicateurs récents
            f"{self.url}/publications/publications-annuelles"  # publications annuelles
        ]

        return self.scrape_html_tables(urls)

    def scrape_economic_data(self):
        html_urls = [
            f"{self.url}/publications/publications-trimestrielles",  # publications trimestrielles
            f"{self.url}/publications/publications-mensuelles"  # publications mensuelles
        ]

        # A renseigner une url json a scraper
        json_urls = []

        html_data = self.scrape_html_tables(html_urls)
        json_data = self.scrape_json_data(json_urls)

        donnees = html_data + json_data
        if donnees:
            return pd.concat(donnees, ignore_index=True)
        else:
            return pd.DataFrame()


##### Implémentations

In [199]:
scraper = INStadScraper()

path_instad_to_save = RAW_DIR / 'instad_scraping.csv'

has_data = False
instad_data = None

if not Path(path_instad_to_save).exists():
    demographic_tables = scraper.scrape_demographic_data()

    if demographic_tables:
        demographic_df = pd.concat(demographic_tables, ignore_index=True)
    else:
        demographic_df = pd.DataFrame()

    logging.info(f"Nombre de lignes démographiques : {len(demographic_df)}")

    economic_df = scraper.scrape_economic_data()
    logging.info(f"Nombre de lignes économiques : {len(economic_df)}")

    instad_data = pd.concat([demographic_df, economic_df], ignore_index=True)
    has_data = len(instad_data) > 0
    logging.info(f"Total lignes récupérées : {len(instad_data)}")
else:
    logging.warning("Déjà éffectué")




##### Sauvegarde des données

In [200]:
if has_data and not instad_data.empty:
    instad_data.to_csv(path_instad_to_save, index=False)

    # Taille du fichier en octets
    size_instad_bytes = path_instad_to_save.stat().st_size

    # Optionnel : convertir en Ko/Mo pour lecture facile
    size_instad_kb = size_instad_bytes / 1024
    size_instad_mb = size_instad_kb / 1024

    logging.info(
        f"Sauvegarde des données à {path_instad_to_save} ({size_instad_bytes} bytes / {size_instad_kb:.2f} KB / {size_instad_mb:.2f} MB)")
else:
    logging.warning("Déjà éffectué")



#### Collecte Données Géographique

##### GeographicDataCollector

In [201]:
class GeographicDataCollector:
    def __init__(self, base_url: str = API_BASE_URL_OVERPASS, country_code: str = COUNTRY_CODE):
        self.url = base_url
        self.country_code = country_code

        self.session = requests.Session()

    def get_openstreetmap_cities(self, admin_level: str = OPEN_STREET_MAP_ADMIN_LEVEL_PAYS):
        query = f"""
        [out:json][timeout:60];
        area["ISO3166-1"={self.country_code}][admin_level={admin_level}];
        (
          node(area)["place"~"city|town"];
          way(area)["place"~"city|town"];
          relation(area)["place"~"city|town"];
        );
        out center tags;
        """

        try:
            response = self.session.post(self.url, data={'data': query}, timeout=60)
            response.raise_for_status()
            data = response.json()

            logging.info(f"Données villes récupérées à {response.url}")

            cities = []

            for element in data.get('elements', []):
                if 'tags' in element:
                    cities.append({
                        'name': element.get('tags').get('name'),
                        'place_type': element.get('tags').get('place'),
                        'population': element.get('tags').get('population'),
                        'latitude': element.get('lat') or (element.get('center', {}) or {}).get('lat'),
                        'longitude': element.get('lon') or (element.get('center', {}) or {}).get('lon'),
                        'source': "OpenStreetMap",
                        'collection_date': datetime.now().date(),
                    })

            logging.info(f"{len(cities)} enregistrements récupérés \n")

            return pd.DataFrame(cities)
        except requests.exceptions.RequestException as e:
            logging.error(f"Erreur HTTP pour OpenStreetMap: {e}")
        except ValueError as e:
            logging.error(f"JSON invalide pour OpenStreetMap: {e}")
        except Exception as e:
            logging.error(f"Erreur inattendue pour OpenStreetMap: {e}")

    def get_administrative_boundaries(self, admin_level: str = OPEN_STREET_MAP_ADMIN_LEVEL_PAYS):
        query = f"""
            [out:json][timeout:60];
            relation["boundary"="administrative"]["admin_level"={admin_level}]["name"="Bénin"];
            out center tags;
            """

        try:
            response = self.session.post(self.url, data={'data': query}, timeout=60)
            response.raise_for_status()
            data = response.json()

            logging.info(f"Données administrative récupérées à {response.url}\n{query}\n{data}")

            boundaries = []

            for element in data.get('elements', []):
                if 'tags' in element:
                    boundaries.append({
                        'name': element.get('tags').get('name'),
                        'admin_level': admin_level,
                        'wikidata': element.get('tags').get('wikidata'),
                        'latitude': element.get('lat') or (element.get('center', {}) or {}).get('lat'),
                        'longitude': element.get('lon') or (element.get('center', {}) or {}).get('lon'),
                        'osm_id': element.get('id'),
                        'source': 'OpenStreetMap',
                        'collection_date': datetime.now().date()
                    })

            logging.info(f"{len(boundaries)} enregistrements récupérés \n")

            return pd.DataFrame(boundaries)
        except requests.exceptions.RequestException as e:
            logging.error(f"Erreur HTTP pour OpenStreetMap: {e}")
        except ValueError as e:
            logging.error(f"JSON invalide pour OpenStreetMap: {e}")
        except Exception as e:
            logging.error(f"Erreur inattendue pour OpenStreetMap: {e}")

##### Implémentation

In [202]:
geo_collector = GeographicDataCollector()

path_osm_ville_to_save = RAW_DIR / "osm_ville.csv"
path_osm_administrative_boundaries_to_save = RAW_DIR / "osm_administrative_boundaries.csv"

has_data_osm_ville = False
osm_ville_data = pd.DataFrame()

has_data_osm_administrative_boundaries = False
osm_administrative_boundaries_data = pd.DataFrame()

if not Path(path_osm_ville_to_save).exists():
    osm_ville_data = geo_collector.get_openstreetmap_cities()

    has_data_osm_ville = len(osm_ville_data) > 0
    logging.info(f"OpenStreetMap: {len(osm_ville_data)} villes récupérées")
else:
    logging.warning("Déjà éffectué")

if not Path(path_osm_administrative_boundaries_to_save).exists():
    osm_administrative_boundaries_data = geo_collector.get_administrative_boundaries()

    has_data_osm_administrative_boundaries = len(osm_administrative_boundaries_data) > 0
    logging.info(f"OpenStreetMap: {len(osm_administrative_boundaries_data)} données administrative récupérées")
else:
    logging.warning("Déjà éffectué")

2025-09-20 21:10:27,195 | INFO | Données administrative récupérées à https://overpass-api.de/api/interpreter

            [out:json][timeout:60];
            relation["boundary"="administrative"]["admin_level"=2]["name"="Bénin"];
            out center tags;
            
{'version': 0.6, 'generator': 'Overpass API 0.7.62.8 e802775f', 'osm3s': {'timestamp_osm_base': '2025-09-20T20:08:57Z', 'copyright': 'The data included in this document is from www.openstreetmap.org. The data is made available under ODbL.'}, 'elements': []}
2025-09-20 21:10:27,198 | INFO | 0 enregistrements récupérés 

2025-09-20 21:10:27,200 | INFO | OpenStreetMap: 0 données administrative récupérées


##### Sauvegarde des données

In [203]:
if has_data_osm_ville and not osm_ville_data.empty:
    osm_ville_data.to_csv(path_osm_ville_to_save, index=False)

    # Taille du fichier en octets
    size_osm_ville_bytes = path_osm_ville_to_save.stat().st_size

    # Optionnel : convertir en Ko/Mo pour lecture facile
    size_osm_ville_kb = size_osm_ville_bytes / 1024
    size_osm_ville_mb = size_osm_ville_kb / 1024

    logging.info(
        f"Sauvegarde des données à {path_osm_ville_to_save} ({size_osm_ville_bytes} bytes / {size_osm_ville_kb:.2f} KB / {size_osm_ville_mb:.2f} MB)")
else:
    logging.warning("Déjà éffectué")

if has_data_osm_administrative_boundaries and not osm_administrative_boundaries_data.empty:
    osm_administrative_boundaries_data.to_csv(path_osm_administrative_boundaries_to_save, index=False)

    # Taille du fichier en octets
    size_osm_administrative_bytes = path_osm_administrative_boundaries_to_save.stat().st_size

    # Optionnel : convertir en Ko/Mo pour lecture facile
    size_osm_administrative_kb = size_osm_administrative_bytes / 1024
    size_osm_administrative_mb = size_osm_administrative_kb / 1024

    logging.info(
        f"Sauvegarde des données à {path_osm_administrative_boundaries_to_save} ({size_osm_administrative_bytes} bytes / {size_osm_administrative_kb:.2f} KB / {size_osm_administrative_mb:.2f} MB)")
else:
    logging.warning("Déjà éffectué")



### Nettoyage, Harmonisation, Fusion, Contrôle qualités des données