In [1]:
from pathlib import Path
from typing import List, Optional
from sqlalchemy import create_engine
from psycopg2.extras import execute_values
from sqlalchemy_utils import database_exists, create_database
import logging
import pandas as pd
from sqlalchemy import text, Engine

CLEAN_DIR = Path("../data/cleaned")
RAW_DIR = Path("../data/raw")
CLEAN_DIR.mkdir(parents=True, exist_ok=True)
RAW_DIR.mkdir(parents=True, exist_ok=True)


logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

ADMIN_DB_URL = "postgresql+psycopg2://postgres:postgres2025%40@localhost:54785/postgres"
DB_NAME = "db_accident"
DB_URL = ADMIN_DB_URL.rsplit("/", 1)[0] + f"/{DB_NAME}" 



In [7]:
def get_engine(db_url: str) -> Engine:
    """
    Retourne un SQLAlchemy Engine pour l'URL 
    postgresql+psycopg2://user:password@host:port/dbname
    """
    return create_engine(db_url, client_encoding="utf8")


def show_open_connections(engine, db_name: str = None):
    """
    Affiche les connexions ouvertes à une base PostgreSQL.
    Si db_name est fourni, on filtre sur cette base.
    """
    query = """
    SELECT pid, usename, datname, state, query
    FROM pg_stat_activity
    WHERE datname = :db_name;
    """ if db_name else """
    SELECT pid, usename, datname, state, query
    FROM pg_stat_activity;
    """
    with engine.connect() as conn:
        result = conn.execute(text(query), {"db_name": db_name})
        rows = result.fetchall()
        for r in rows:
            print(r)


def close_all_connections(engine, db_name: str):
    """
    Ferme toutes les connexions PostgreSQL actives sur une base donnée (y compris celle en cours).
    """
    try:
        query = text("""
            SELECT pg_terminate_backend(pid)
            FROM pg_stat_activity
            WHERE datname = :db_name
            AND pid <> pg_backend_pid();
        """)
        with engine.begin() as conn:
            conn.execute(query, {"db_name": db_name})
        logger.info("Toutes les connexions à '%s' ont été fermées (sauf la session actuelle).", db_name)
    except Exception as e:
        logger.error(" Erreur lors de la fermeture des connexions : %s", e)
        raise
    finally:
        # Ferme la dernière connexion (celle utilisée par SQLAlchemy)
        engine.dispose()
        logger.info("Engine SQLAlchemy libéré — plus aucune connexion active.")


def create_database_if_not_exists(admin_db_url: str, db_name: str) -> None:
    """
    Crée une base PostgreSQL si elle n'existe pas.
    """
    db_url = admin_db_url.rsplit("/", 1)[0] + f"/{db_name}"  # Construire l'URL cible

    try:
        if not database_exists(db_url):
            create_database(db_url)  # Appel correct à sqlalchemy_utils
            logger.info("Base '%s' créée avec succès", db_name)
        else:
            logger.info("La base '%s' existe déjà", db_name)
    except Exception as e:
        logger.error("Erreur lors de la création de la base '%s' : %s", db_name, e)
        raise
    

def execute_script(engine: Engine, script_sql: str) -> None:
    """
    Exécute un script SQL donné sur la base de données.
    """
    try:
        logger.info("Exécution du script SQL...")
        with engine.begin() as conn:
            conn.execute(text(script_sql))
        logger.info("Script SQL exécuté avec succès")
    except Exception as e:
        logger.error("Erreur lors de l'exécution du script SQL : %s", e)
        raise
    finally:
        engine.dispose()
        logger.info("Connexion fermée et engine libéré.")


def truncate_table_if_exists(engine, table_name, schema=None):
    full_table_name = f"{schema}.{table_name}" if schema else table_name
    
    query = text(f"""
    DO $$
    BEGIN
        IF EXISTS (SELECT 1 FROM information_schema.tables 
                   WHERE table_schema = '{schema or 'public'}' 
                   AND table_name = '{table_name}') THEN
            EXECUTE 'TRUNCATE TABLE {full_table_name} RESTART IDENTITY CASCADE';
        END IF;
    END $$;
    """)
    
    with engine.begin() as conn:
        conn.execute(query)
        
    print(f"Table {full_table_name} vidée si elle existait.")




def insert_df_to_table(
    engine: Engine,
    df: pd.DataFrame,
    schema: str,
    table: str,
    table_columns: Optional[List[str]] = None,
    batch_size: int = 1000,
) -> int:
    """
    Insère un DataFrame dans une table Postgres via psycopg2.execute_values (performant pour gros volumes).

    Comportement :
      - Si table_columns est fourni : on utilise cet ordre de colonnes pour l'insertion.
        * Les colonnes absentes dans df sont créées et remplies par NULL.
        * Les colonnes supplémentaires dans df sont ignorées.
      - Si table_columns est None : on utilise l'ordre des colonnes présentes dans df.
      - Retourne le nombre de lignes insérées.
      - Gère commit/rollback automatiquement.

    Arguments :
      engine        : SQLAlchemy Engine (obtenu via get_engine)
      df            : pandas.DataFrame (les colonnes doivent déjà être normalisées)
      schema        : schéma SQL (ex. "bronze")
      table         : nom de la table (ex. "caracteristiques_raw")
      table_columns : liste ordonnée des colonnes à insérer (optionnel)
      batch_size    : taille de page pour execute_values (par défaut 1000)

    Remarques :
      - Nécessite psycopg2 (sqlalchemy devra utiliser l'adaptateur psycopg2).
      - Tous types Python usuels sont supportés (str, int, float, None, bool, datetime, ...).
    """
    if df is None or len(df) == 0:
        return 0

    # Déterminer colonnes cibles et préparer df_subset avec la bonne colonne ordre
    if table_columns is None:
        cols = list(df.columns)
    else:
        cols = list(table_columns)
        # ajouter colonnes manquantes dans df en les remplissant avec None
        for c in cols:
            if c not in df.columns:
                df[c] = None

    # Conserver uniquement les colonnes cibles
    df_subset = df[cols].copy()

    # Convertir les NaN pandas en None pour psycopg2
    df_subset = df_subset.where(pd.notnull(df_subset), None)

    # Préparer les tuples de valeurs
    records = [tuple(x) for x in df_subset.itertuples(index=False, name=None)]
    if not records:
        return 0

    # Construire la clause des colonnes (avec guillemets pour noms contenant underscore ou majuscules)
    cols_sql = ", ".join([f'"{c}"' for c in cols])
    insert_sql = f'INSERT INTO "{schema}"."{table}" ({cols_sql}) VALUES %s'

    # Obtenir une connexion psycopg2 à partir de l'engine SQLAlchemy
    conn = engine.raw_connection()
    cur = conn.cursor()
    try:
        # execute_values fait des inserts par batch très efficaces
        execute_values(cur, insert_sql, records, page_size=batch_size)
        conn.commit()
        return len(records)
    except Exception:
        conn.rollback()
        raise
    finally:
        cur.close()
        conn.close()
        
def close_engine(engine: Engine) -> None:
    """
    Ferme proprement toutes les connexions actives et libère les ressources associées à l'Engine SQLAlchemy.
    """
    try:
        # Fermer les connexions éventuelles encore ouvertes
        if hasattr(engine, "dispose"):
            engine.dispose()
            logger.info("Connexion et engine SQLAlchemy correctement fermés.")
        else:
            logger.warning("L'objet fourni ne semble pas être un Engine SQLAlchemy valide.")
    except Exception as e:
        logger.error("Erreur lors de la fermeture de l'engine : %s", e)


In [3]:

def get_single_value(engine: Engine, sql: str, params: dict = None):
    """
    Exécute une requête SQL qui retourne une seule valeur (ex: COUNT, SUM).

    Arguments :
        engine : SQLAlchemy Engine
        sql    : requête SQL (ex: "SELECT COUNT(*) FROM bronze.caracteristiques_raw")
        params : dictionnaire de paramètres pour la requête (optionnel)
        
        Exemple d'utilisation :
        sql = "SELECT COUNT(*) FROM bronze.caracteristiques_raw WHERE annee = :annee"
        nb_2023 = get_single_value(engine, sql, params={"annee": "2023"})
        print(nb_2023)

    Retour :
        La valeur retournée par la requête (int, float, str, ...)
    """
    with engine.connect() as conn:
        result = conn.execute(text(sql), params or {})
        value = result.scalar()  # récupère la première colonne de la première ligne
    return value


In [4]:
import pandas as pd
from sqlalchemy import text, Engine
from typing import List, Optional

def get_df_from_table(
    engine: Engine,
    schema: str,
    table: str,
    columns: Optional[List[str]] = None,
    filters: Optional[str] = None
) -> pd.DataFrame:
    """
    Récupère un DataFrame depuis une table d'un schéma SQL donné.

    Arguments :
        engine  : SQLAlchemy Engine connecté à la base
        schema  : nom du schéma SQL (ex: 'bronze', 'silver', 'gold')
        table   : nom de la table
        columns : liste de colonnes à récupérer (optionnel, défaut = toutes)
        filters : chaîne SQL pour filtrer les lignes (optionnel, ex: "annee='2023'")

    Retour :
        df : pandas DataFrame avec les données sélectionnées
    """
    # Construire la clause SELECT
    cols_sql = "*"
    if columns:
        cols_sql = ", ".join([f'"{c}"' for c in columns])

    # Construire la requête SQL avec le schéma
    query = f'SELECT {cols_sql} FROM "{schema}"."{table}"'
    if filters:
        query += f" WHERE {filters}"

    # Exécuter et retourner le DataFrame
    with engine.connect() as conn:
        df = pd.read_sql(text(query), conn)
    
    return df

In [5]:
def change_column_type(
    engine: Engine,
    schema: str,
    table: str,
    column: str,
    new_type: str,
    using_expr: str = None
) -> None:
    """
    Change le type d'une colonne dans PostgreSQL.

    Arguments :
        engine     : SQLAlchemy Engine connecté à la base
        schema     : nom du schéma SQL (ex: 'bronze')
        table      : nom de la table
        column     : nom de la colonne à modifier
        new_type   : nouveau type SQL (ex: 'INTEGER', 'DATE', 'TEXT', 'TIMESTAMP')
        using_expr : expression SQL pour convertir les valeurs existantes (optionnel)
                     Si None, utilise "column::new_type"
    """
    if using_expr is None:
        using_expr = f"{column}::{new_type}"

    query = f'ALTER TABLE "{schema}"."{table}" ALTER COLUMN "{column}" TYPE {new_type} USING {using_expr};'

    try:
        with engine.begin() as conn:
            conn.execute(text(query))
        logger.info(f"Colonne '{column}' de la table '{schema}.{table}' convertie en {new_type}.")
    except Exception as e:
        logger.error(f"Erreur lors du changement de type de la colonne '{column}': {e}")
        raise


In [6]:

def drop_columns(engine, schema: str, table: str, columns: list):
    """
    Supprime des colonnes d'une table PostgreSQL.

    :param engine: SQLAlchemy engine
    :param schema: Nom du schéma (ex: "silver")
    :param table: Nom de la table (ex: "caracteristiques_clean")
    :param columns: Liste des colonnes à supprimer
    """
    if not columns:
        print("Aucune colonne à supprimer.")
        return
    
    with engine.begin() as conn:  # begin() gère la transaction
        for col in columns:
            stmt = f'ALTER TABLE {schema}.{table} DROP COLUMN IF EXISTS {col} CASCADE;'
            conn.execute(text(stmt))
            print(f"Colonne supprimée : {col}")
