# Partie 3 : Chargement des Données (Version 2)

Ce notebook présente le processus de chargement des données transformées dans notre base de données MySQL pour notre projet ETL sur les pandémies.

## 1. Importation des bibliothèques nécessaires

In [None]:
import pandas as pd
import numpy as np
import os
import sqlalchemy
from sqlalchemy import create_engine, text
import pymysql
import warnings

# Ignorer les avertissements
warnings.filterwarnings('ignore')

# Configuration pour afficher plus de colonnes
pd.set_option('display.max_columns', None)

## 2. Chargement des données nettoyées

In [None]:
# Répertoire des données nettoyées
clean_data_dir = './donnees_nettoyees/'

# Fonction pour charger un fichier CSV avec gestion des erreurs
def load_csv_file(file_path, file_desc):
    try:
        df = pd.read_csv(file_path)
        print(f"Données {file_desc} chargées avec succès. Forme: {df.shape}")
        display(df.head())
        return df
    except Exception as e:
        print(f"Erreur lors du chargement des données {file_desc}: {e}")
        return None

# Chargement des fichiers CSV
confirmed_df = load_csv_file(os.path.join(clean_data_dir, 'cas_confirmes_clean.csv'), 'de cas confirmés')
deaths_df = load_csv_file(os.path.join(clean_data_dir, 'deces_clean.csv'), 'de décès')
recovered_df = load_csv_file(os.path.join(clean_data_dir, 'guerisons_clean.csv'), 'de guérisons')
locations_df = load_csv_file(os.path.join(clean_data_dir, 'localisations_clean.csv'), 'de localisations')
pandemics_df = load_csv_file(os.path.join(clean_data_dir, 'pandemies_clean.csv'), 'de pandémies')

## 3. Préparation des données pour le chargement

In [None]:
# Préparation de la table des dates (calendrier)
def prepare_calendar_table():
    # Collecte de toutes les dates uniques
    dates = set()
    
    if confirmed_df is not None and 'date' in confirmed_df.columns:
        dates.update(confirmed_df['date'].unique())
    
    if deaths_df is not None and 'date' in deaths_df.columns:
        dates.update(deaths_df['date'].unique())
    
    if recovered_df is not None and 'date' in recovered_df.columns:
        dates.update(recovered_df['date'].unique())
    
    # Création du DataFrame de calendrier
    if dates:
        dates = sorted(list(dates))
        calendar_df = pd.DataFrame({
            'date': dates,
        })
        
        # Conversion en datetime pour extraire les composants
        calendar_df['date'] = pd.to_datetime(calendar_df['date'])
        
        # Ajout de l'ID et des composants de date
        calendar_df['id_date'] = range(1, len(calendar_df) + 1)
        calendar_df['jour'] = calendar_df['date'].dt.day
        calendar_df['mois'] = calendar_df['date'].dt.month
        calendar_df['annee'] = calendar_df['date'].dt.year
        calendar_df['trimestre'] = calendar_df['date'].dt.quarter
        calendar_df['jour_semaine'] = calendar_df['date'].dt.dayofweek + 1  # 1 = Lundi, 7 = Dimanche
        
        # Conversion de la date en string pour le stockage SQL
        calendar_df['date'] = calendar_df['date'].dt.strftime('%Y-%m-%d')
        
        # Réorganisation des colonnes
        calendar_df = calendar_df[['id_date', 'date', 'jour', 'mois', 'annee', 'trimestre', 'jour_semaine']]
        
        return calendar_df
    else:
        print("Aucune date trouvée dans les données.")
        return None

# Préparation de la table des données principales
def prepare_main_data_table():
    # Préparation de la table de données principale
    data_records = []
    
    # Fonction pour ajouter des enregistrements à partir d'un DataFrame
    def add_records_from_df(df, value_column, calendar_df, locations_df):
        if df is None or calendar_df is None or locations_df is None:
            return
        
        # Création d'un dictionnaire de mapping pour les dates
        date_mapping = dict(zip(calendar_df['date'], calendar_df['id_date']))
        
        # Création d'un dictionnaire de mapping pour les localisations
        location_mapping = dict(zip(locations_df['pays'], locations_df['id_localisation']))
        
        # Parcours des lignes du DataFrame
        for _, row in df.iterrows():
            date_str = pd.to_datetime(row['date']).strftime('%Y-%m-%d') if pd.notna(row['date']) else None
            pays = row['pays'] if pd.notna(row['pays']) else None
            id_pandemie = row['id_pandemie'] if 'id_pandemie' in row and pd.notna(row['id_pandemie']) else None
            value = row[value_column] if pd.notna(row[value_column]) else 0
            
            if date_str in date_mapping and pays in location_mapping and id_pandemie is not None:
                data_records.append({
                    'id_date': date_mapping[date_str],
                    'id_localisation': location_mapping[pays],
                    'id_pandemie': id_pandemie,
                    value_column: value
                })
    
    # Préparation du calendrier
    calendar_df = prepare_calendar_table()
    
    # Ajout des enregistrements à partir des différents DataFrames
    if confirmed_df is not None and 'cas_confirmes' in confirmed_df.columns:
        add_records_from_df(confirmed_df, 'cas_confirmes', calendar_df, locations_df)
    
    if deaths_df is not None and 'deces' in deaths_df.columns:
        add_records_from_df(deaths_df, 'deces', calendar_df, locations_df)
    
    if recovered_df is not None and 'guerisons' in recovered_df.columns:
        add_records_from_df(recovered_df, 'guerisons', calendar_df, locations_df)
    
    # Création du DataFrame de données
    if data_records:
        data_df = pd.DataFrame(data_records)
        
        # Agrégation des données pour éviter les doublons
        data_df = data_df.groupby(['id_date', 'id_localisation', 'id_pandemie']).sum().reset_index()
        
        # Ajout d'un ID unique
        data_df['id_data'] = range(1, len(data_df) + 1)
        
        # Réorganisation des colonnes
        columns = ['id_data', 'id_date', 'id_localisation', 'id_pandemie']
        value_columns = [col for col in data_df.columns if col not in columns]
        data_df = data_df[columns + value_columns]
        
        return data_df, calendar_df
    else:
        print("Aucune donnée trouvée pour la table principale.")
        return None, calendar_df

# Préparation des tables
data_df, calendar_df = prepare_main_data_table()

# Affichage des tables préparées
if calendar_df is not None:
    print(f"Table de calendrier préparée avec {len(calendar_df)} entrées.")
    display(calendar_df.head())

if data_df is not None:
    print(f"Table de données principale préparée avec {len(data_df)} entrées.")
    display(data_df.head())

## 4. Configuration de la connexion à la base de données

In [None]:
# Paramètres de connexion à la base de données
db_user = 'root'  # Modifiez selon votre configuration
db_password = 'votre_mot_de_passe'  # Modifiez selon votre configuration
db_host = 'localhost'
db_port = '3306'
db_name = 'pandemies_db'  # Assurez-vous que cette base de données existe

# Chaîne de connexion
connection_string = f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"

# Création du moteur SQLAlchemy
try:
    engine = create_engine(connection_string)
    print("Connexion à la base de données établie avec succès.")
    
    # Test de connexion
    with engine.connect() as conn:
        result = conn.execute(text("SELECT 1"))
        print("Test de connexion réussi.")
        
        # Affichage des tables existantes
        tables = engine.table_names()
        print(f"Tables existantes dans la base de données: {tables}")
except Exception as e:
    print(f"Erreur lors de la connexion à la base de données: {e}")
    engine = None

## 5. Création du schéma de la base de données

In [None]:
# Création du schéma de la base de données
def create_database_schema(engine):
    if engine is None:
        print("Impossible de créer le schéma de la base de données sans connexion.")
        return False
    
    try:
        # Création des tables
        with engine.connect() as conn:
            # Table calendrier
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS calendrier (
                id_date INT PRIMARY KEY,
                date DATE NOT NULL,
                jour INT NOT NULL,
                mois INT NOT NULL,
                annee INT NOT NULL,
                trimestre INT NOT NULL,
                jour_semaine INT NOT NULL
            )
            """))
            
            # Table localisation
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS localisation (
                id_localisation INT PRIMARY KEY,
                pays VARCHAR(100) NOT NULL,
                code_pays VARCHAR(3),
                region VARCHAR(100),
                continent VARCHAR(50),
                latitude FLOAT,
                longitude FLOAT,
                population BIGINT
            )
            """))
            
            # Table pandemie
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS pandemie (
                id_pandemie INT PRIMARY KEY,
                nom VARCHAR(100) NOT NULL,
                agent_pathogene VARCHAR(100),
                description TEXT,
                date_debut DATE,
                date_fin DATE
            )
            """))
            
            # Table data
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS data (
                id_data INT PRIMARY KEY,
                id_date INT NOT NULL,
                id_localisation INT NOT NULL,
                id_pandemie INT NOT NULL,
                cas_confirmes INT DEFAULT 0,
                deces INT DEFAULT 0,
                guerisons INT DEFAULT 0,
                FOREIGN KEY (id_date) REFERENCES calendrier(id_date),
                FOREIGN KEY (id_localisation) REFERENCES localisation(id_localisation),
                FOREIGN KEY (id_pandemie) REFERENCES pandemie(id_pandemie)
            )
            """))
            
            conn.commit()
        
        print("Schéma de la base de données créé avec succès.")
        return True
    except Exception as e:
        print(f"Erreur lors de la création du schéma de la base de données: {e}")
        return False

# Création du schéma
schema_created = create_database_schema(engine)

## 6. Chargement des données dans la base de données

In [None]:
# Fonction pour charger un DataFrame dans une table
def load_table(df, table_name, engine, if_exists='replace'):
    if df is None or engine is None:
        print(f"Impossible de charger les données dans la table {table_name}.")
        return False
    
    try:
        # Chargement des données
        df.to_sql(table_name, engine, if_exists=if_exists, index=False)
        
        # Vérification du nombre de lignes chargées
        with engine.connect() as conn:
            result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
            count = result.fetchone()[0]
            print(f"Données chargées avec succès dans la table {table_name}. Nombre de lignes: {count}")
        
        return True
    except Exception as e:
        print(f"Erreur lors du chargement des données dans la table {table_name}: {e}")
        return False

# Chargement des tables
if schema_created:
    # Chargement des tables de dimension
    load_table(calendar_df, 'calendrier', engine, 'replace')
    load_table(locations_df, 'localisation', engine, 'replace')
    load_table(pandemics_df, 'pandemie', engine, 'replace')
    
    # Chargement de la table de faits
    load_table(data_df, 'data', engine, 'replace')
else:
    print("Le chargement des données a été annulé car le schéma n'a pas pu être créé.")

## 7. Vérification des données chargées

In [None]:
# Fonction pour exécuter une requête SQL et afficher les résultats
def execute_query(query, description, engine):
    if engine is None:
        print(f"Impossible d'exécuter la requête: {description}")
        return None
    
    try:
        with engine.connect() as conn:
            result = conn.execute(text(query))
            df = pd.DataFrame(result.fetchall())
            if not df.empty:
                df.columns = result.keys()
            print(f"Résultats de la requête: {description}")
            display(df)
            return df
    except Exception as e:
        print(f"Erreur lors de l'exécution de la requête {description}: {e}")
        return None

# Vérification des données chargées
if engine is not None:
    # Nombre de lignes par table
    execute_query("SELECT 'calendrier' AS table_name, COUNT(*) AS row_count FROM calendrier UNION ALL \
                  SELECT 'localisation', COUNT(*) FROM localisation UNION ALL \
                  SELECT 'pandemie', COUNT(*) FROM pandemie UNION ALL \
                  SELECT 'data', COUNT(*) FROM data", 
                  "Nombre de lignes par table", engine)
    
    # Exemple de requête sur les données
    execute_query("""SELECT p.nom AS pandemie, l.pays, c.date, d.cas_confirmes, d.deces, d.guerisons
                    FROM data d
                    JOIN calendrier c ON d.id_date = c.id_date
                    JOIN localisation l ON d.id_localisation = l.id_localisation
                    JOIN pandemie p ON d.id_pandemie = p.id_pandemie
                    ORDER BY c.date DESC, l.pays
                    LIMIT 10""", 
                  "Exemple de données chargées", engine)
    
    # Statistiques par pandémie
    execute_query("""SELECT p.nom AS pandemie, 
                        SUM(d.cas_confirmes) AS total_cas, 
                        SUM(d.deces) AS total_deces, 
                        SUM(d.guerisons) AS total_guerisons
                    FROM data d
                    JOIN pandemie p ON d.id_pandemie = p.id_pandemie
                    GROUP BY p.nom""", 
                  "Statistiques par pandémie", engine)

## 8. Conclusion

Ce notebook a permis de charger les données transformées dans la base de données MySQL. Les étapes suivantes ont été réalisées :

1. Chargement des données nettoyées depuis les fichiers CSV
2. Préparation des tables pour le chargement
3. Configuration de la connexion à la base de données
4. Création du schéma de la base de données
5. Chargement des données dans les tables
6. Vérification des données chargées

Le processus ETL est maintenant complet :
- **Extraction** : Les données ont été extraites des fichiers CSV et JSON sources
- **Transformation** : Les données ont été nettoyées, agrégées, normalisées et les doublons ont été supprimés
- **Chargement** : Les données transformées ont été chargées dans la base de données MySQL

Les données sont maintenant prêtes pour l'analyse et la visualisation.