In [None]:
from src.data_loader import fetch_world_bank_data
from src.processor import clean_climate_data
from config import logger

def main():
    # Indicateurs à récupérer
    indicators = {
        "co2_per_capita": "EN.ATM.CO2E.PC",
        "total_ghg": "EN.ATM.GHGT.KT.CE"
    }

    for name, code in indicators.items():
        logger.info(f"Traitement de l'indicateur : {name}")
        
        # Le loader gère maintenant le cache tout seul
        raw_df = fetch_world_bank_data(code)

        if raw_df is not None:
            cleaned_df = clean_climate_data(raw_df)
            
            # Sauvegarde du fichier processé
            output_path = f"data/processed/{name}_cleaned.csv"
            cleaned_df.to_csv(output_path, index=False)
            logger.info(f"Fichier final prêt : {output_path}")

if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
import os
from typing import Optional
from co2_extractor.web_api import CO2Extractor  # Import de VOTRE librairie
from config import logger

def fetch_world_bank_data(indicator: str, country: str = "all") -> Optional[pd.DataFrame]:
    """
    Utilise la librairie co2-extractor-wb pour récupérer les données.
    """
    logger.info(f"Appel de la librairie externe pour l'indicateur {indicator}")
    
    # Appel de la classe de votre librairie
    df = CO2Extractor.get_co2_data(country=country, indicator=indicator)
    
    if df is not None:
        # Cache local (optionnel)
        os.makedirs("data/raw", exist_ok=True)
        df.to_csv(f"data/raw/{indicator}.csv", index=False)
        return df
    
    logger.error("La librairie n'a renvoyé aucune donnée.")
    return None

In [None]:
import requests
import pandas as pd
import os
import time
from typing import Optional
from config import logger, WB_API_URL

def is_cache_valid(filepath: str, max_days: int) -> bool:
    """Vérifie si le fichier de cache est encore valide."""
    if not os.path.exists(filepath):
        return False
    
    # Calcul de l'âge du fichier
    file_age_seconds = time.time() - os.path.getmtime(filepath)
    return file_age_seconds < (max_days * 86400)

def fetch_world_bank_data(indicator: str, country: str = "all", force_refresh: bool = False) -> Optional[pd.DataFrame]:
    """
    Récupère les données avec gestion du cache local.
    """
    raw_path = f"data/raw/{indicator}.csv"
    os.makedirs("data/raw", exist_ok=True)

    # 1. Vérification du cache
    if not force_refresh and is_cache_valid(raw_path, max_days=7):
        logger.info(f"Chargement de {indicator} depuis le cache local.")
        return pd.read_csv(raw_path)

    # 2. Appel API si pas de cache ou refresh forcé
    url = f"{WB_API_URL}/country/{country}/indicator/{indicator}?format=json&per_page=5000&source=2"
    
    try:
        logger.info(f"Appel réseau vers l'API pour l'indicateur: {indicator}")
        response = requests.get(url, timeout=15)
        response.raise_for_status()
        data = response.json()

        if isinstance(data, list) and len(data) > 1:
            df = pd.DataFrame(data[1])
            # Sauvegarde immédiate pour le cache
            df.to_csv(raw_path, index=False)
            logger.info(f"Données récupérées et mises en cache : {raw_path}")
            return df
        else:
            logger.error(f"Format de réponse invalide pour {indicator}")
            return None

    except requests.exceptions.RequestException as e:
        logger.error(f"Erreur lors de la requête API : {e}")
        # En cas d'erreur réseau, on essaie quand même de charger le cache même s'il est vieux
        if os.path.exists(raw_path):
            logger.warning("Erreur réseau. Utilisation du cache expiré par précaution.")
            return pd.read_csv(raw_path)
        return None

In [None]:
import requests
import pandas as pd
from typing import Optional

class CO2Extractor:
    BASE_URL = "https://api.worldbank.org/v2"

    @staticmethod
    def get_co2_data(indicator: str, country: str = "all") -> Optional[pd.DataFrame]:
        # On essaie deux variantes d'URL : avec source=2 (nécessaire pour le CO2) 
        # et sans (pour les autres indicateurs plus récents)
        urls = [
            f"{CO2Extractor.BASE_URL}/country/{country}/indicator/{indicator}?format=json&per_page=1000&source=2",
            f"{CO2Extractor.BASE_URL}/country/{country}/indicator/{indicator}?format=json&per_page=1000",
             f"{CO2Extractor.BASE_URL}/country/{country}/indicator/{indicator}?format=json&per_page=5000&source=2"
        ]
        
        for url in urls:
            try:
                print(f"[LIB] Tentative avec : {url}")
                response = requests.get(url, timeout=15)
                response.raise_for_status()
                data = response.json()

                if isinstance(data, list) and len(data) > 1:
                    print(f"[LIB] Succès avec l'indicateur {indicator}")
                    return pd.DataFrame(data[1])
                
                # Si l'API renvoie une erreur "Invalid value", on passe à l'URL suivante
                if isinstance(data, list) and len(data) == 1 and 'message' in data[0]:
                    print(f"[LIB] L'URL a échoué (Code {data[0]['message'][0].get('id')}), essai suivant...")
                    continue

            except Exception as e:
                print(f"[LIB] Erreur de connexion : {e}")
                continue
        
        return None

In [None]:
from src.data_loader import fetch_world_bank_data
from src.processor import clean_climate_data
from config import logger

def main():
    # Liste d'indicateurs stables
    indicators = {
        "co2_per_capita": "EN.ATM.CO2E.PC",   # CO2 par habitant
        "forest_area": "AG.LND.FRST.ZS",      # Surface forestière (%)
        "gdp_per_capita": "NY.GDP.PCAP.CD", # PIB par habitant (pour corréler avec le climat)
        "total_ghg": "EN.ATM.GHGT.KT.CE"   # Emissions totales de GES (kt de CO2 équivalent)
    }


    for name, code in indicators.items():
        logger.info(f"Traitement de l'indicateur : {name}")
        
        # Le loader gère maintenant le cache tout seul
        raw_df = fetch_world_bank_data(code)

        if raw_df is not None:
            cleaned_df = clean_climate_data(raw_df)
            
            # Sauvegarde du fichier processé
            output_path = f"data/processed/{name}_cleaned.csv"
            cleaned_df.to_csv(output_path, index=False)
            logger.info(f"Fichier final prêt : {output_path}")

if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
from config import logger

def clean_climate_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Nettoie les données brutes de la Banque Mondiale.
    
    Args:
        df (pd.DataFrame): DataFrame brut issu de l'API.
        
    Returns:
        pd.DataFrame: DataFrame nettoyé avec colonnes Country, Year, Value.
    """
    if df is None or df.empty:
        return pd.DataFrame()

    logger.info("Début du nettoyage des données...")
    
    # Extraire les valeurs des dictionnaires (country et indicator)
    df['country_name'] = df['country'].apply(lambda x: x['value'])
    df['country_code'] = df['country'].apply(lambda x: x['id'])
    
    # Sélectionner et renommer les colonnes utiles
    df_cleaned = df[['country_name', 'country_code', 'date', 'value']].copy()
    df_cleaned.columns = ['country', 'code', 'year', 'co2_value']
    
    # Convertir les types
    df_cleaned['year'] = pd.to_numeric(df_cleaned['year'])
    df_cleaned['co2_value'] = pd.to_numeric(df_cleaned['co2_value'])
    
    # Supprimer les lignes sans valeur
    df_cleaned = df_cleaned.dropna(subset=['co2_value'])
    
    # Trier par pays et année
    df_cleaned = df_cleaned.sort_values(['country', 'year'])
    
    logger.info(f"Nettoyage terminé. Lignes restantes : {len(df_cleaned)}")
    return df_cleaned

In [None]:
import requests
import pandas as pd
import logging
from typing import Optional

# Configuration d'un logger local à la librairie
logger = logging.getLogger(__name__)

class CO2Extractor:
    """Librairie pour extraire les données climatiques de la Banque Mondiale."""
    
    BASE_URL = "https://api.worldbank.org/v2"

    @staticmethod
    def get_co2_data(indicator: str, country: str = "all") -> Optional[pd.DataFrame]:
        """
        Récupère les données d'un indicateur via l'API Banque Mondiale en testant plusieurs endpoints.
        
        Args:
            indicator (str): Le code de l'indicateur (ex: 'EN.ATM.CO2E.PC').
            country (str): Le code du pays (par défaut 'all').
            
        Returns:
            Optional[pd.DataFrame]: DataFrame avec les données ou None si échec.
        """
        urls = [
            f"{CO2Extractor.BASE_URL}/country/{country}/indicator/{indicator}?format=json&per_page=1000&source=2",
            f"{CO2Extractor.BASE_URL}/country/{country}/indicator/{indicator}?format=json&per_page=1000"
        ]
        
        for url in urls:
            try:
                # On remplace print par logger.info (meilleure pratique)
                logger.info(f"Tentative de récupération via : {url}")
                response = requests.get(url, timeout=15)
                response.raise_for_status()
                data = response.json()

                # Vérification de la structure de réponse [metadata, data]
                if isinstance(data, list) and len(data) > 1:
                    logger.info(f"Succès avec l'indicateur {indicator}")
                    return pd.DataFrame(data[1])
                
                # Gestion des messages d'erreur renvoyés dans le JSON
                if isinstance(data, list) and len(data) == 1 and 'message' in data[0]:
                    msg = data[0]['message'][0].get('value', 'Erreur inconnue')
                    logger.warning(f"L'URL a échoué (Message API : {msg}), essai suivant...")
                    continue

            except requests.exceptions.RequestException as e:
                logger.error(f"Erreur de connexion pour {url} : {e}")
                continue
        
        logger.error(f"Tous les essais ont échoué pour l'indicateur {indicator}")
        return None

In [None]:
import os  
from src.data_loader import fetch_world_bank_data
from src.processor import clean_climate_data
from config import logger

def main():
    # 1. Création des dossiers de données s'ils n'existent pas
    os.makedirs("data/raw", exist_ok=True)
    os.makedirs("data/processed", exist_ok=True)

    # 2. Liste de vos 4 indicateurs
    indicators = {
        "co2_per_capita": "EN.GHG.CO2.PC.CE.AR5",   # CO2 par habitant
        "forest_area": "AG.LND.FRST.ZS",            # Surface forestière (%)
        "gdp_per_capita": "NY.GDP.PCAP.CD",         # PIB par habitant
        "total_ghg": "EN.GHG.ALL.MT.CE.AR5"            # Emissions totales de GES 'unité ici est en Mt (Mégatonnes)
    }

    logger.info("DÉMARRAGE DU PIPELINE D'ACQUISITION DES DONNÉES")

    for name, code in indicators.items():
        logger.info(f"--- Analyse de l'indicateur : {name} ({code}) ---")
        
        # 3. Acquisition via la librairie (avec gestion du cache)
        raw_df = fetch_world_bank_data(code)

        if raw_df is not None:
            # 4. Nettoyage générique
            cleaned_df = clean_climate_data(raw_df)
            
            if not cleaned_df.empty:
                # 5. Sauvegarde du fichier final
                output_path = f"data/processed/{name}_cleaned.csv"
                cleaned_df.to_csv(output_path, index=False)
                logger.info(f"SUCCÈS : Fichier {output_path} créé.")
            else:
                logger.warning(f"Le nettoyage a renvoyé un résultat vide pour {name}.")
        else:
            logger.error(f"ÉCHEC : Impossible de récupérer l'indicateur {name}.")

    logger.info("PIPELINE TERMINÉ AVEC SUCCÈS.")

if __name__ == "__main__":
    main()