Proceso de automatización para Amazon Reviews


In [18]:
from sqlalchemy import create_engine, text
import pandas as pd
import os
from kaggle.api.kaggle_api_extended import KaggleApi

# Configuración de conexión SQL Server
SQL_SERVER = 'FRANCOCONSORTE\\SQLEXPRESS'
DATABASE = 'amazon'
TABLE = 'dbo.amazon_reviews'
USER = 'FRANCOCONSORTE\\franc'

# Crear conexión con SQLAlchemy usando autenticación de Windows
engine = create_engine(f"mssql+pyodbc://{USER}@{SQL_SERVER}/{DATABASE}?driver=ODBC+Driver+17+for+SQL+Server&Trusted_Connection=yes")

def download_from_kaggle():
    """ Descarga el dataset de Kaggle y lo descomprime """
    os.environ['KAGGLE_CONFIG_DIR'] = r"C:\Users\franc\OneDrive\Documentos\Data Analyst\Proyectos\Amazon.kaggle"  # Configurar ruta de credenciales

    api = KaggleApi()
    api.authenticate()
    
    dataset_path = "ashishkumarak/amazon-shopping-reviews-daily-updated"
    api.dataset_download_files(dataset_path, path='./', unzip=True)
    return "./amazon_reviews.csv"

def transform_data(file_path):
    """ Transforma los datos eliminando columnas y manejando fechas """
    df = pd.read_csv(file_path)
    
    # Eliminar la columna 'reviewCreatedVersion' si existe
    df.drop(columns=['reviewCreatedVersion'], errors='ignore', inplace=True)

    # Manejo de la columna 'at' (fecha y hora)
    if 'at' in df.columns:
        df['fecha'] = pd.to_datetime(df['at']).dt.date
        df['hora'] = pd.to_datetime(df['at']).dt.time
        df.drop(columns=['at'], inplace=True)

    # Llenar valores nulos en 'appVersion'
    df['appVersion'] = df['appVersion'].ffill()

    return df

def load_data_to_sql(df):
    """ Carga los datos transformados en SQL Server evitando duplicados y corrigiendo valores nulos """
    with engine.connect() as conn:
        existing_ids = pd.read_sql(f"SELECT reviewId FROM {TABLE}", conn)
        existing_ids_set = set(existing_ids['reviewId'])

        new_rows = df[~df['reviewId'].isin(existing_ids_set)]
        if new_rows.empty:
            print("No hay registros nuevos para insertar.")
        else:
            insert_query = text(f"""
                INSERT INTO {TABLE} (reviewId, userName, content, score, thumbsUpCount, appVersion, fecha, hora)
                VALUES (:reviewId, :userName, :content, :score, :thumbsUpCount, :appVersion, :fecha, :hora)
            """)

            insert_data = new_rows.to_dict(orient='records')  # Convierte DataFrame a lista de diccionarios

            try:
                conn.execute(insert_query, insert_data)  # Ejecutar inserción
                print(f"Carga completada: {len(insert_data)} registros insertados.")
            except Exception as e:
                print(f"Error al insertar datos: {e}")

        # Corregir valores nulos en la base de datos para datos antiguos y nuevos
        cleanup_query = text(f"""
            UPDATE {TABLE}
            SET appVersion = COALESCE(appVersion, 'Desconocida'),
                fecha = COALESCE(fecha, CAST(GETDATE() AS DATE)),
                hora = COALESCE(hora, CAST(GETDATE() AS TIME))
            WHERE appVersion IS NULL OR fecha IS NULL OR hora IS NULL;
        """)
        conn.execute(cleanup_query)
        print("Corrección de valores nulos aplicada en toda la base de datos.")

if __name__ == "__main__":
    file_path = download_from_kaggle()
    transformed_data = transform_data(file_path)
    load_data_to_sql(transformed_data)


Dataset URL: https://www.kaggle.com/datasets/ashishkumarak/amazon-shopping-reviews-daily-updated
No hay registros nuevos para insertar.
Corrección de valores nulos aplicada en toda la base de datos.
