In [19]:
import pandas as pd
from sqlalchemy import text, MetaData, create_engine
from functions import *
from sqlalchemy.exc import ProgrammingError

In [20]:
%load_ext autoreload
%autoreload 1

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [21]:
mysql_user = "flosrv"
password = "Nesrine123"
host = "localhost"
port = 3306
database = "Darkstar_Games"
Directors_table = 'Darkstar_Directors'
metadata = MetaData()
# Connect to the database
engine_darkstar = create_engine(f"mysql+mysqlconnector://{mysql_user}:{password}@{host}/{database}", isolation_level ='AUTOCOMMIT')

In [22]:
def create_table_in_mysql(df: pd.DataFrame, table_name: str, engine):
    """
    Crée une table MySQL avec les colonnes du DataFrame, en supprimant les espaces en trop des noms de colonnes.
    """
    check_table_sql = f"SHOW TABLES LIKE '{table_name}';"

    try:
        with engine.connect() as connection:
            result = connection.execute(text(check_table_sql))
            if result.fetchone():
                print(f"⚠️ La table '{table_name}' existe déjà dans MySQL. Rien n'a été fait.")
                return  # Ne pas recréer la table si elle existe déjà

            # Définition des colonnes avec strip des noms de colonnes
            column_definitions = []
            for column_name, dtype in df.dtypes.items():
                # Appliquer un strip sur les noms de colonnes pour éviter les espaces superflus
                clean_column_name = column_name.strip()
                
                if dtype == 'object': 
                    column_type = "VARCHAR(255)"
                elif dtype == 'int64':  
                    column_type = "INT"
                elif dtype == 'float64': 
                    column_type = "FLOAT"
                elif dtype == 'datetime64[ns]': 
                    column_type = "DATETIME"
                else:
                    column_type = "VARCHAR(255)"

                column_definitions.append(f"`{clean_column_name}` {column_type}")

            create_table_sql = f"CREATE TABLE `{table_name}` ({', '.join(column_definitions)});"
            connection.execute(text(create_table_sql))
            print(f"✅ Table '{table_name}' créée avec succès dans MySQL.")

    except Exception as e:
        print(f"❌ Erreur lors de la création de la table : {e}")

def insert_new_rows(engine, df: pd.DataFrame, table_name: str, key_column: str = None):
    """
    Insère uniquement les nouvelles lignes dans MySQL en comparant la colonne spécifiée.
    """
    try:
        # Vérifier si la table existe
        check_table_sql = f"SHOW TABLES LIKE '{table_name}';"
        with engine.connect() as connection:
            table_exists = connection.execute(text(check_table_sql)).fetchone()

        if not table_exists:
            print(f"⚠️ La table '{table_name}' n'existe pas. Création en cours...")
            create_table_in_mysql(df, table_name, engine)  # Créer la table si elle n'existe pas

        # Vérifier les colonnes existantes dans la table
        query_columns = f"SHOW COLUMNS FROM `{table_name}`;"
        with engine.connect() as connection:
            existing_columns = [row[0] for row in connection.execute(text(query_columns)).fetchall()]

        # Appliquer un strip aux noms de colonnes dans le DataFrame avant insertion
        df.columns = df.columns.str.strip()

        # Aligner les colonnes du DataFrame avec celles de la table MySQL
        df = df[[col for col in df.columns if col in existing_columns]]

        if key_column and key_column in existing_columns:
            # Récupérer la valeur max de key_column dans la table
            query = f"SELECT MAX(`{key_column}`) FROM `{table_name}`;"
            with engine.connect() as connection:
                result = connection.execute(text(query)).scalar()

            if result is None:
                print(f"💾 Aucune donnée dans '{table_name}', insertion de toutes les lignes.")
                df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
                print(f"✅ {len(df)} nouvelles lignes insérées.")
                return

            df[key_column] = pd.to_datetime(df[key_column])
            new_rows = df[df[key_column] > result]

            if new_rows.empty:
                print(f"✅ Aucune nouvelle ligne à insérer.")
                return

            new_rows.to_sql(name=table_name, con=engine, if_exists='append', index=False)
            print(f"✅ {len(new_rows)} nouvelles lignes insérées.")
        else:
            df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
            print(f"✅ {len(df)} lignes insérées.")

    except ProgrammingError as e:
        print(f"❌ Erreur SQL détectée : {e}")
    except Exception as e:
        print(f"❌ Erreur lors de l'insertion : {e}")

In [23]:
directors_path = r"C:\Users\f.gionnane\Documents\Darkstar Projects\Company Roster 2024 11 01 - Directors.csv"

In [24]:
df_directors = pd.read_csv(directors_path)

In [25]:
df = convert_df_columns(df_directors)

In [26]:
create_table_in_mysql(df=df_directors, table_name=Directors_table, engine=engine_darkstar)

⚠️ La table 'Darkstar_Directors' existe déjà dans MySQL. Rien n'a été fait.


In [27]:
insert_new_rows(engine =engine_darkstar, df=df_directors, table_name=Directors_table)

✅ 7 lignes insérées.


  df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
