In [1]:
import os
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, Boolean, DateTime
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime

# Connexion à MariaDB via SQLAlchemy
def create_connection_sqlalchemy(host_name, user_name, user_password, db_name=None, port=3306):
    try:
        db_url = f"mysql+pymysql://{user_name}:{user_password}@{host_name}:{port}/{db_name}"
        engine = create_engine(db_url, pool_recycle=3600)
        connection = engine.connect()
        print(f"Connexion réussie à la base de données {db_name}" if db_name else "Connexion réussie au serveur MariaDB")
        return connection, engine
    except SQLAlchemyError as e:
        print(f"Erreur : '{e}'")
        return None, None

def creer_table_si_absente(connection, engine, nom_table, colonnes, dtype):
    """
    Crée une table dans MariaDB si elle n'existe pas déjà,
    en utilisant les colonnes extraites et leurs types.
    """
    metadata = MetaData(bind=engine)
    colonnes_avec_types = []

    for colonne in colonnes:
        type_colonne = dtype.get(colonne, 'VARCHAR(255)')

        # Ajustement des types pour SQLAlchemy
        if type_colonne == 'float':
            colonnes_avec_types.append(Column(colonne, Float))
        elif type_colonne == 'int':
            colonnes_avec_types.append(Column(colonne, Integer))
        elif type_colonne == 'bool':
            colonnes_avec_types.append(Column(colonne, Boolean))
        elif type_colonne == 'datetime':
            colonnes_avec_types.append(Column(colonne, DateTime))
        else:
            colonnes_avec_types.append(Column(colonne, String(255)))

    # Crée la table si elle n'existe pas
    table = Table(nom_table, metadata, *colonnes_avec_types, extend_existing=True)
    metadata.create_all(engine)

    print(f"Table '{nom_table}' créée ou déjà existante.")

def inserer_donnees(connection, nom_table, colonnes, donnees):
    """
    Insère les données dans la table MariaDB via SQLAlchemy.
    """
    placeholders = ', '.join(['%s' for _ in colonnes])
    colonnes_str = ', '.join(colonnes)
    requete_insertion = f"INSERT INTO {nom_table} ({colonnes_str}) VALUES ({placeholders})"
    
    try:
        connection.execute(requete_insertion, donnees)
        print(f"{len(donnees)} lignes insérées avec succès dans la table {nom_table}")
    except SQLAlchemyError as e:
        print(f"Erreur lors de l'insertion : '{e}'")

def traiter_fichier(connection, engine, chemin_fichier, fichier, liste_nom_table, dtype, index_dict, liste_col_a_supprimer):
    """
    Traite un fichier CSV en créant une table correspondante et en y insérant les données.
    """
    na_values = ['NA', 'N/A', '']
    df = pd.read_csv(chemin_fichier, dtype={col: typ for col, typ in dtype.items() if typ != 'datetime'}, na_values=na_values)

    # Supprimer les colonnes de la liste 'liste_col_a_supprimer'
    df = df.drop(columns=[col for col in liste_col_a_supprimer if col in df.columns])

    # Extraire et ajouter des informations au dataframe
    annee_mois, info_geo = extraire_info_du_nom_fichier(fichier, liste_nom_table)
    df['annee_mois'] = annee_mois
    df['info_geo'] = info_geo

    # Nettoyer les données
    df, duplicates = nettoyer_donnees(df)
    duplicates['source'] = fichier

    nom_table = nettoyer_nom_table(liste_nom_table, chemin_fichier) + "_temp"
    nom_table_duplicates = nom_table + "_duplicates"

    # Ajouter un index unique
    df, index_dict = rajouter_un_index_table(nom_table, index_dict, df)
    colonnes = nettoyer_noms_colonnes(list(df.columns))
    creer_table_si_absente(connection, engine, nom_table, colonnes, dtype)
    inserer_donnees(connection, nom_table, colonnes, df.values.tolist())

    # Traiter les doublons
    duplicates, index_dict = rajouter_un_index_table(nom_table_duplicates, index_dict, duplicates)
    colonnes_duplicates = nettoyer_noms_colonnes(list(duplicates.columns))
    creer_table_si_absente(connection, engine, nom_table_duplicates, colonnes_duplicates, dtype)
    inserer_donnees(connection, nom_table_duplicates, colonnes_duplicates, duplicates.values.tolist())

    print(f"Traitement du fichier : {chemin_fichier} terminé.")
    return df, index_dict

# Dictionnaire pour mémoriser l'index pour chaque table
index_dict = {}

# Connexion à la base de données via SQLAlchemy
connection, engine = create_connection_sqlalchemy("127.0.0.1", "root", "", "crime_test")

if connection:
    chemin_racine = "C:/Users/Admin.local/Documents/projetint/files_test"
    df, index_dict = parcourir_arborescence(connection, chemin_racine, "crime_test", liste_nom_table, dtype, index_dict, liste_col_a_supprimer)
    
    # Fermer la connexion proprement
    connection.close()
    print("Connexion fermée.")
  
print(index_dict)
