In [None]:
%pip install sqlalchemy
import pandas as pd
from sqlalchemy import create_engine, text
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import os
import time


# Define la configuración de la base de datos
database_config = {
    'server': r'DESKTOP-S9274BN\SQLEXPRESS',
    'database': 'nba_henry',
    # 'username': 'your_username',
    # 'password': 'your_password'
}

# Crear la cadena de conexión
connection_string = f"mssql+pyodbc://@{database_config['server']}/{database_config['database']}?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes"

# Crear el motor de conexión
engine = create_engine(connection_string)



In [None]:
class DataHandler(FileSystemEventHandler):
    def __init__(self, engine):
        self.engine = engine
        print("DataHandler initialized.")

    def on_created(self, event):
        print(f"Event detected: {event}")
        if event.is_directory:
            print("The created event is a directory. Ignoring.")
            return None
        elif event.src_path.endswith(".csv"):
            print(f"CSV file detected: {event.src_path}")
            self.process_new_file(event.src_path)

    def process_new_file(self, file_path):
        filename = os.path.basename(file_path)
        print(f"Processing file: {file_path}")
        # Read the new CSV file and drop the 'Unnamed: 0' column if it exists
        new_data = pd.read_csv(file_path)

        if 'Unnamed: 0' in new_data.columns:
            new_data = new_data.drop(columns=['Unnamed: 0'])
        # Convertimos las columnas que contengan la palabra date a formato date
        for column in new_data.columns:
            if 'date' in column.lower():
                new_data[column] = pd.to_datetime(new_data[column], errors='coerce')
            
        print(f"New data read from {file_path}:\n{new_data.head()}")
        print('new_data.dtypes:')
        print(new_data.dtypes)
        

        # Load existing data from the database
        existing_data = self.load_existing_data(filename)
        print(f"Existing data loaded from database:\n{existing_data.head()}")

        # Identify new or modified rows
        changes = self.get_changes(existing_data, new_data,filename)
        print(f"Changes identified:\n{changes.head()}")

        # Process and insert the changes into SQL Server
        self.insert_data_into_sql(changes,filename)
    
    def load_existing_data(self, filename):
        print(filename + " load existing data")
        
        filename_without_extension = filename.replace(".csv", "")
        
        query = f"SELECT * FROM {filename_without_extension}"
        print(query)
        
        with self.engine.connect() as connection:
            # Obtener la información de la estructura de la tabla
            table_info_query = f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{filename_without_extension}'"
            table_info = pd.read_sql(table_info_query, connection)
            
            # Crear un diccionario para mapear columnas a sus tipos de datos
            dtype_map = {}
            for index, row in table_info.iterrows():
                column_name = row['column_name']
                data_type = row['data_type']
                
                if data_type in ['date', 'timestamp']:
                    dtype_map[column_name] = 'datetime64[ns]'
                elif data_type in ['int', 'bigint', 'smallint']:
                    dtype_map[column_name] = 'int64'
                elif data_type in ['float', 'double precision']:
                    dtype_map[column_name] = 'float64'
                else:
                    dtype_map[column_name] = 'object'
            
            print("Tipo de datos esperado para cada columna:", dtype_map)
            
            # Cargar los datos de la tabla respetando los tipos de datos
            existing_data = pd.read_sql(query, connection, dtype=dtype_map)
            print('existing_data.dtypes:')
            print(existing_data.dtypes)
        return existing_data




    def get_changes(self, existing_data, new_data, filename):
        print(filename + " get changes")
        merged_data = new_data.merge(existing_data, on='id', how='left', suffixes=('', '_existing'))
        print('--------------------------------------------')
        print(merged_data)
    
        print('--------------------------------------------')

        # Función para verificar si una fila ha cambiado
        def row_changed(row):
            print('EVALUACION FILA FILA')
            print('----------------------------------------------')

            for col in new_data.columns:
                
                if col != 'id':
                    print(new_data.columns)
                    if row[col] != row.get(f"{col}_existing", None):
                        
                        #print(row[col])
                        print(row.get(f"{col}"))
                        print(row.get(f"{col}_existing"))

                        
                        print('true')
                        return True
            return False
        print('----------------------------------------------')

        # Identificar filas que son nuevas o modificadas
        changed_rows = merged_data.apply(row_changed, axis=1)
        changed_data = merged_data[changed_rows]
        

        # Eliminar las columnas '_existing'
        changed_data = changed_data[new_data.columns]
    
        print('get changes')
        print(changed_data)
        return changed_data                                      


    def insert_data_into_sql(self, data,filename):
        filename_without_extension = filename.replace(".csv", "")
        
        if data.empty:
            print("No new or modified rows to insert.")
            return
        
        with self.engine.connect() as connection:
            # Obtener los nombres de las columnas
            data.to_sql(filename_without_extension, connection, if_exists='append', index=False)

In [None]:
# Define la función principal para iniciar el observador
if __name__ == "__main__":
    path_to_watch = '/carpeta_watchdog'  # Cambia esto a tu directorio real
    print(f"Starting to watch directory: {path_to_watch}")
    event_handler = DataHandler(engine)
    observer = Observer()
    observer.schedule(event_handler, path=path_to_watch, recursive=False)
    observer.start()
    try:
        while True:
            time.sleep(1)  # Mantén el script en ejecución
    except KeyboardInterrupt:
        observer.stop()
    observer.join()
