SECOND ETL

In [None]:
!pip install pandas pymongo python-dateutil pyarrow pyspark



In [None]:
import pandas as pd 
import sqlite3
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
import re
from dateutil.parser import parse
from datetime import datetime
import pyarrow.parquet as pq
from pyspark.sql import SparkSession

# Función para cargar y preprocesar datos desde CSV
def cargar_y_preprocesar_datos(ruta_archivo):
    print("Cargando y preprocesando datos desde CSV...")
    try:
        df = pd.read_csv(ruta_archivo, encoding='utf-8', dtype=str)
    except Exception as e:
        print(f"Error al leer el CSV: {e}")
        return None

    # Manejar valores faltantes
    def validar_correo(correo):
        if pd.isna(correo) or not re.match(r'^[\w\.-]+@[\w\.-]+\.\w+$', correo):
            return 'sin_correo@desconocido.com'
        return correo.strip()

    df['correo_electronico'] = df['correo_electronico'].apply(validar_correo)

    def estandarizar_telefono(telefono):
        if pd.isna(telefono) or telefono.strip() == '':
            return 'Desconocido'
        telefono = re.sub(r'[^\d+-]', '', telefono)
        digitos = re.findall(r'\d', telefono)
        if len(digitos) >= 10:
            return f"+1-{''.join(digitos[-10:-7])}-{''.join(digitos[-7:-4])}-{''.join(digitos[-4:])}"
        return 'Desconocido'

    df['telefono'] = df['telefono'].apply(estandarizar_telefono)

    df = df.fillna({
        'nombre': 'Desconocido',
        'direccion': 'Desconocida',
        'nombre_de_la_pelicula/video': 'Desconocida',
        'formato': 'Desconocido'
    })

    # Normalizar direcciones
    def normalizar_direccion(direccion):
        if pd.isna(direccion) or direccion.strip() == '' or direccion == 'Desconocida':
            return 'Desconocida'
        direccion = ' '.join(direccion.lower().split())
        reemplazos = {
            r'\bcalle\b': 'C.',
            r'\bavenida\b': 'Av.',
            r'\bnumero\b': 'No.',
            r'\bcolonia\b': 'Col.',
            r'\bcarretera\b': 'Carr.',
            r'\bboulevard\b': 'Blvd.'
        }
        for patron, reemplazo in reemplazos.items():
            direccion = re.sub(patron, reemplazo, direccion)
        direccion = ' '.join(word.capitalize() for word in direccion.split())
        direccion = re.sub(r'[^\w\s\.\,]', '', direccion)
        return direccion

    df['direccion'] = df['direccion'].apply(normalizar_direccion)

    # Corregir fechas inconsistentes
    def parsear_fecha(fecha_str):
        if pd.isna(fecha_str) or fecha_str.strip() == '':
            return None
        try:
            fecha = parse(fecha_str, fuzzy=True, dayfirst=True)
            if 2000 <= fecha.year <= 2025:
                return fecha.strftime('%Y-%m-%d')
            return None
        except:
            return None

    df['fecha_de_alquiler'] = df['fecha_de_alquiler'].apply(parsear_fecha)
    df['fecha_de_entrega'] = df['fecha_de_entrega'].apply(parsear_fecha)
    df = df.dropna(subset=['fecha_de_alquiler', 'fecha_de_entrega'])

    # Calcular duración del alquiler
    df['duracion_alquiler_dias'] = (
        pd.to_datetime(df['fecha_de_entrega']) - pd.to_datetime(df['fecha_de_alquiler'])
    ).dt.days
    df = df[df['duracion_alquiler_dias'] >= 0]

    print(f"Datos preprocesados: {len(df)} filas")
    return df

# Carga en MongoDB
def cargar_a_mongodb(df, db_name='alquileres', collection_name='clientes'):
    print("Cargando datos a MongoDB...")
    try:
        client = MongoClient('mongodb://localhost:27017/')
        db = client[db_name]
        collection = db[collection_name]
        collection.drop()  # Limpiar colección existente
        records = df.to_dict('records')
        collection.insert_many(records)
        print(f"Datos cargados en MongoDB: {db_name}.{collection_name} ({len(records)} documentos)")
        client.close()
    except ConnectionFailure as e:
        print(f"Error de conexión a MongoDB: {e}")
    except Exception as e:
        print(f"Error al cargar en MongoDB: {e}")

# Carga en SQLite (Data Warehouse ligero)
def cargar_a_sqlite(df, db_name='alquileres.db', table_name='clientes_procesados'):
    print("Cargando datos a SQLite...")
    try:
        conn = sqlite3.connect(db_name)
        # Crear tabla con índices
        create_table_query = """
        CREATE TABLE IF NOT EXISTS clientes_procesados (
            codigo_usuario TEXT,
            nombre TEXT,
            direccion TEXT,
            telefono TEXT,
            correo_electronico TEXT,
            fecha_de_alquiler TEXT,
            fecha_de_entrega TEXT,
            nombre_de_la_pelicula_video TEXT,
            formato TEXT,
            duracion_alquiler_dias INTEGER,
            PRIMARY KEY (codigo_usuario, fecha_de_alquiler)
        );
        CREATE INDEX IF NOT EXISTS idx_formato ON clientes_procesados (formato);
        CREATE INDEX IF NOT EXISTS idx_fecha_alquiler ON clientes_procesados (fecha_de_alquiler);
        """
        conn.executescript(create_table_query)
        df.to_sql(table_name, conn, if_exists='replace', index=False)
        conn.commit()
        print(f"Datos cargados en SQLite: {db_name}.{table_name} ({len(df)} filas)")
        conn.close()
    except Exception as e:
        print(f"Error al cargar en SQLite: {e}")

# Carga en archivo Parquet
def cargar_a_parquet(df, archivo_parquet='clientes_procesados.parquet'):
    print("Cargando datos a archivo Parquet...")
    try:
        df.to_parquet(archivo_parquet, engine='pyarrow', compression='snappy', index=False)
        print(f"Datos guardados en archivo Parquet: {archivo_parquet} ({len(df)} filas)")
    except Exception as e:
        print(f"Error al guardar en Parquet: {e}")

# Ejemplo de lectura con Spark
def leer_parquet_con_spark(archivo_parquet='clientes_procesados.parquet'):
    print("Leyendo archivo Parquet con Spark...")
    try:
        spark = SparkSession.builder.appName("LecturaParquet").getOrCreate()
        df_spark = spark.read.parquet(archivo_parquet)
        print("Primeras 5 filas del archivo Parquet:")
        df_spark.show(5)
        spark.stop()
    except Exception as e:
        print(f"Error al leer con Spark: {e}")

# Función principal
def cargar_datos_multiples(ruta_archivo):
    df = cargar_y_preprocesar_datos(ruta_archivo)
    if df is None:
        print("Proceso abortado debido a fallo en la carga.")
        return

    # Cargar a cada sistema
    cargar_a_mongodb(df)
    cargar_a_sqlite(df)
    cargar_a_parquet(df)
    leer_parquet_con_spark()  # Demostración de lectura con Spark

if __name__ == "__main__":
    ruta_archivo = "Clientes.csv"
    cargar_datos_multiples(ruta_archivo)

Cargando y preprocesando datos desde CSV...
Datos preprocesados: 24413 filas
Cargando datos a MongoDB...
Error de conexión a MongoDB: localhost:27017: [Errno 111] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms), Timeout: 30s, Topology Description: <TopologyDescription id: 68197b4b8cae37b3e73a9705, topology_type: Unknown, servers: [<ServerDescription ('localhost', 27017) server_type: Unknown, rtt: None, error=AutoReconnect('localhost:27017: [Errno 111] Connection refused (configured timeouts: socketTimeoutMS: 20000.0ms, connectTimeoutMS: 20000.0ms)')>]>
Cargando datos a SQLite...
Datos cargados en SQLite: alquileres.db.clientes_procesados (24413 filas)
Cargando datos a archivo Parquet...
Datos guardados en archivo Parquet: clientes_procesados.parquet (24413 filas)
Leyendo archivo Parquet con Spark...
Primeras 5 filas del archivo Parquet:
+--------------+-----------------+--------------------+---------------+--------------------+---------