<a href="https://colab.research.google.com/github/JOSELUISMILLA/NET8APIDEMO/blob/main/Temas_adicionales_Ingenier%C3%ADa_para_el_procesamiento_de_Datos.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
import sqlite3

# Función para hacer scraping de descripciones basadas en palabras clave, categoría y fecha
def get_search_descriptions(keyword, category, date, num_results=5):
    # Construir la URL de búsqueda utilizando el keyword, categoría y fecha
    search_query = f"{keyword} {category} {date.strftime('%B %Y')}"
    search_url = f"https://www.google.com/search?q={search_query}"
    headers = {'User-Agent': 'Mozilla/5.0'}
    response = requests.get(search_url, headers=headers)
    soup = BeautifulSoup(response.text, 'html.parser')

    # Extraer los resultados
    descriptions = []
    for result in soup.select('div.BNeawe.s3v9rd.AP7Wnd'):
        descriptions.append(result.text)
        if len(descriptions) >= num_results:
            break
    return descriptions if descriptions else ["No relevant results found"]

# Simulación de datos numéricos y categóricos
num_rows = 1000
np.random.seed(42)

# ID de transacción
trans_ids = range(1, num_rows+1)

# Categorías (simulando sectores de productos)
categories = np.random.choice([
    'Electronics', 'Clothing', 'Food', 'Books', 'Sports', 'Furniture',
    'Beauty & Personal Care', 'Health & Wellness', 'Automotive', 'Home Appliances',
    'Toys & Games', 'Jewelry', 'Footwear', 'Office Supplies', 'Pet Supplies',
    'Baby Products', 'Musical Instruments', 'Gardening', 'Tools & Hardware',
    'Kitchenware', 'Luggage & Travel Accessories', 'Stationery', 'Art Supplies',
    'Fitness Equipment', 'Outdoor Gear', 'Cleaning Supplies', 'Craft Supplies',
    'Party Supplies', 'Photography Equipment', 'Camping & Hiking', 'Home Decor',
    'Software', 'Mobile Accessories', 'Watches', 'Cosmetics', 'Personal Hygiene',
    'Safety Equipment', 'Pharmaceuticals', 'Medical Devices', 'Energy & Utilities',
    'Building Materials', 'Industrial Supplies', 'Agriculture', 'Financial Services',
    'Real Estate', 'Transportation', 'Education Materials', 'Travel & Tourism',
    'Insurance', 'Consulting Services', 'Entertainment Media'
], num_rows)

# Variables numéricas correlacionadas con ruido
base_var = np.random.uniform(50, 1000, num_rows)
var_num_1 = base_var + np.random.normal(0, 20, num_rows)
var_num_2 = 0.5 * base_var + np.random.normal(0, 30, num_rows)
var_num_3 = 2 * base_var + np.random.normal(0, 50, num_rows)
var_num_4 = base_var + np.random.uniform(-100, 100, num_rows)
var_num_5 = 1.5 * base_var + np.random.normal(0, 10, num_rows)
var_num_6 = 0.3 * base_var + np.random.normal(0, 5, num_rows)

# Monto de la transacción
amounts = np.random.uniform(10, 1000, num_rows)

# Fechas aleatorias en un rango de 2 años
start_date = datetime.now() - timedelta(days=730)
dates = [start_date + timedelta(days=random.randint(0, 730)) for _ in range(num_rows)]

# Variables categóricas adicionales
cat_1 = np.random.choice(['Low', 'Medium', 'High'], num_rows)
cat_2 = np.random.choice(['A', 'B', 'C', 'D'], num_rows)
cat_3 = np.random.choice(['North', 'South', 'East', 'West'], num_rows)
cat_4 = np.random.choice(['Urban', 'Rural'], num_rows)
cat_5 = np.random.choice(['Single', 'Married', 'Divorced', 'Widowed'], num_rows)
cat_6 = np.random.choice(['Red', 'Blue', 'Green', 'Yellow'], num_rows)

# Creación del DataFrame con columnas numéricas correlacionadas y categóricas
df = pd.DataFrame({
    'transaction_id': trans_ids,
    'category': categories,
    'amount': amounts,
    'date': dates,
    'var_num_1': var_num_1,
    'var_num_2': var_num_2,
    'var_num_3': var_num_3,
    'var_num_4': var_num_4,
    'var_num_5': var_num_5,
    'var_num_6': var_num_6,
    'cat_1': cat_1,
    'cat_2': cat_2,
    'cat_3': cat_3,
    'cat_4': cat_4,
    'cat_5': cat_5,
    'cat_6': cat_6
})

# Palabra clave para el scraping (puedes cambiarla según el caso)
keyword = 'trends'

# Recopilar descripciones con scraping, utilizando categorías y fechas
all_descriptions = []
for i in range(num_rows):
    category = df['category'][i]
    date = df['date'][i]
    descriptions = get_search_descriptions(keyword, category, date)
    all_descriptions.append(descriptions[0])  # Usar el primer resultado obtenido

# Añadir las descripciones al dataframe
df['description'] = all_descriptions

print("Datos generados con correlación, ruido y scraping utilizando categorías y fechas:")
print(df.head())

# Conectar a SQLite y usar credenciales
conn = sqlite3.connect('transactions_secure.db')
c = conn.cursor()

# Crear una tabla con credenciales de usuario
c.execute('''CREATE TABLE IF NOT EXISTS credentials (
                user TEXT NOT NULL,
                password TEXT NOT NULL
            )''')

# Insertar credenciales en la tabla
c.execute('''INSERT INTO credentials (user, password)
             VALUES ('useradm', '123password')''')

# Guardar los datos generados en la base de datos SQLite
df.to_sql('transactions', conn, if_exists='replace', index=False)

# Cerrar conexión
conn.commit()
conn.close()

print("Datos y credenciales guardados en la base de datos SQLite.")

Datos generados con correlación, ruido y scraping utilizando categorías y fechas:
   transaction_id               category      amount  \
0               1        Medical Devices  379.784468   
1               2  Photography Equipment  704.927728   
2               3           Pet Supplies  837.364381   
3               4            Agriculture  691.350945   
4               5      Health & Wellness  517.456801   

                        date   var_num_1   var_num_2    var_num_3   var_num_4  \
0 2024-03-27 23:37:11.383352  660.565577  314.692698  1390.118993  709.702958   
1 2023-06-26 23:37:11.383352  497.446535  241.516200  1009.457006  477.335269   
2 2023-06-10 23:37:11.383352  581.854517  261.192864  1292.889390  558.548005   
3 2022-12-20 23:37:11.383352  719.224405  361.064043  1539.277331  736.427564   
4 2024-08-09 23:37:11.383352  555.448805  297.543413  1227.117698  498.152656   

     var_num_5   var_num_6   cat_1 cat_2  cat_3  cat_4     cat_5   cat_6  \
0   999.377502  19

In [2]:
# Asignar nuevos nombres a las columnas
df.columns = [
    'ID de Transacción', 'Categoría de Producto', 'Monto de la Transacción', 'Fecha de Transacción',
    'Costo de Producción', 'Impuesto Aplicado', 'Precio de Lista', 'Descuento Aplicado',
    'Margen de Ganancia', 'Costo de Envío', 'Nivel de Calidad', 'Tamaño de la Empresa',
    'Región Geográfica', 'Ubicación', 'Estado Civil del Cliente', 'Color Preferido', 'Descripción de la Transacción'
]

# Mostrar las primeras filas del DataFrame con los nuevos nombres
print(df.head())

# Convertir el DataFrame a un archivo Excel
df.to_excel('transactions_data.xlsx', index=False)

print("DataFrame guardado como transactions_data.xlsx")

   ID de Transacción  Categoría de Producto  Monto de la Transacción  \
0                  1        Medical Devices               379.784468   
1                  2  Photography Equipment               704.927728   
2                  3           Pet Supplies               837.364381   
3                  4            Agriculture               691.350945   
4                  5      Health & Wellness               517.456801   

        Fecha de Transacción  Costo de Producción  Impuesto Aplicado  \
0 2024-03-27 23:37:11.383352           660.565577         314.692698   
1 2023-06-26 23:37:11.383352           497.446535         241.516200   
2 2023-06-10 23:37:11.383352           581.854517         261.192864   
3 2022-12-20 23:37:11.383352           719.224405         361.064043   
4 2024-08-09 23:37:11.383352           555.448805         297.543413   

   Precio de Lista  Descuento Aplicado  Margen de Ganancia  Costo de Envío  \
0      1390.118993          709.702958          999.3775

In [3]:
import sqlite3
import pandas as pd

# Cargar el archivo Excel en un DataFrame
df_loaded = pd.read_excel('transactions_data.xlsx')

# Conectar a SQLite y usar credenciales
conn = sqlite3.connect('transactions_secure.db')
c = conn.cursor()

# Crear una tabla con credenciales de usuario si no existe
c.execute('''CREATE TABLE IF NOT EXISTS credentials (
                user TEXT NOT NULL,
                password TEXT NOT NULL
            )''')

# Insertar credenciales (si aún no existen)
c.execute('''INSERT INTO credentials (user, password)
             VALUES ('useradm', '123password')''')

# Guardar el DataFrame cargado en la base de datos SQLite
df_loaded.to_sql('transactions', conn, if_exists='replace', index=False)

# Cerrar conexión
conn.commit()
conn.close()

print("DataFrame cargado de Excel y guardado en la base de datos SQLite con credenciales.")


DataFrame cargado de Excel y guardado en la base de datos SQLite con credenciales.


In [4]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=1fe70a85f71c64cf08345590d9b20b7271d35f708f2986eddbd3038d761ad9a4
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [5]:
!wget -O sqlite-jdbc.jar https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar

--2024-09-29 00:02:11--  https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7296329 (7.0M) [application/java-archive]
Saving to: ‘sqlite-jdbc.jar’


2024-09-29 00:02:12 (52.1 MB/s) - ‘sqlite-jdbc.jar’ saved [7296329/7296329]



In [6]:
pip install xlsxwriter

Collecting xlsxwriter
  Downloading XlsxWriter-3.2.0-py3-none-any.whl.metadata (2.6 kB)
Downloading XlsxWriter-3.2.0-py3-none-any.whl (159 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m159.9/159.9 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: xlsxwriter
Successfully installed xlsxwriter-3.2.0


In [7]:
pip install reportlab

Collecting reportlab
  Downloading reportlab-4.2.4-py3-none-any.whl.metadata (1.5 kB)
Downloading reportlab-4.2.4-py3-none-any.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m18.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: reportlab
Successfully installed reportlab-4.2.4


In [8]:
# Importar las bibliotecas necesarias
import time
import sqlite3
import pandas as pd
import os
import logging
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    StringIndexer,
    VectorAssembler,
    StandardScaler,
    Tokenizer,
    StopWordsRemover,
    HashingTF,
    IDF,
    Word2Vec,
    PCA
)
from pyspark.ml.regression import (
    LinearRegression,
    DecisionTreeRegressor,
    RandomForestRegressor,
    GBTRegressor
)
from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier,
    GBTClassifier
)
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import (
    RegressionEvaluator,
    BinaryClassificationEvaluator,
    ClusteringEvaluator
)
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, auc
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas

# Configuración de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Crear un directorio para guardar los modelos y resultados si no existe
model_save_path = "./models"
results_save_path = "./results"
plots_save_path = os.path.join(results_save_path, "plots")
os.makedirs(model_save_path, exist_ok=True)
os.makedirs(results_save_path, exist_ok=True)
os.makedirs(plots_save_path, exist_ok=True)

# Inicializar un diccionario para almacenar los tiempos de ejecución
times = {}

def iniciar_sesion_spark(jdbc_driver_path, app_name="PySpark SQLite Integration"):
    """Inicializa y retorna una sesión de Spark."""
    try:
        spark = SparkSession.builder \
            .appName(app_name) \
            .config("spark.jars", jdbc_driver_path) \
            .getOrCreate()
        logging.info("Sesión de Spark iniciada.")
        return spark
    except Exception as e:
        logging.error(f"Error al iniciar la sesión de Spark: {e}")
        raise

def cargar_datos_spark(spark, jdbc_url, dbtable, driver):
    """Carga datos desde SQLite a un DataFrame de Spark."""
    try:
        start_time = time.time()
        df = spark.read.format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", dbtable) \
            .option("driver", driver) \
            .load()
        end_time = time.time()
        duration = end_time - start_time
        logging.info(f"Datos cargados en PySpark en {duration:.2f} segundos.")
        return df, duration
    except Exception as e:
        logging.error(f"Error al cargar datos desde SQLite: {e}")
        raise

def exploracion_y_limpieza(df):
    """Realiza exploración y limpieza de los datos."""
    try:
        start_time = time.time()
        # Registrar el DataFrame como una vista temporal para consultas SQL
        df.createOrReplaceTempView("transactions")

        # Consulta SQL avanzada: filtrar transacciones con monto > 500
        filtered_df = df.sparkSession.sql("""
            SELECT
                `ID de Transacción`,
                `Categoría de Producto`,
                `Monto de la Transacción`,
                `Fecha de Transacción`,
                `Costo de Producción`,
                `Impuesto Aplicado`,
                `Precio de Lista`,
                `Descuento Aplicado`,
                `Margen de Ganancia`,
                `Costo de Envío`,
                `Nivel de Calidad`,
                `Tamaño de la Empresa`,
                `Región Geográfica`,
                `Ubicación`,
                `Estado Civil del Cliente`,
                `Color Preferido`,
                `Descripción de la Transacción`
            FROM transactions
            WHERE `Monto de la Transacción` > 500
        """)

        # Agrupación: promedio del monto de transacción por categoría de producto
        aggregated_df = filtered_df.groupBy("Categoría de Producto").agg(
            F.avg("Monto de la Transacción").alias("Avg_Monto_Trans")
        )

        end_time = time.time()
        duration = end_time - start_time
        logging.info(f"Exploración y limpieza completadas en {duration:.2f} segundos.")
        return filtered_df, aggregated_df, duration
    except Exception as e:
        logging.error(f"Error en exploración y limpieza de datos: {e}")
        raise

def preprocesamiento_datos(df, categorical_columns, numerical_columns):
    """Preprocesa los datos para el modelo."""
    try:
        start_time = time.time()

        # Convertir 'Fecha de Transacción' a tipo timestamp
        df = df.withColumn(
            "Fecha de Transacción",
            F.to_timestamp("Fecha de Transacción", "yyyy-MM-dd HH:mm:ss.SSSSSS")
        )

        # Indexar las columnas categóricas
        indexers = [
            StringIndexer(inputCol=col, outputCol=f"{col}_Index", handleInvalid='keep').fit(df)
            for col in categorical_columns
        ]

        # Ensamblar las características en un vector
        assembler = VectorAssembler(
            inputCols=[f"{col}_Index" for col in categorical_columns] + numerical_columns,
            outputCol="features_unscaled"
        )

        # Escalar las características
        scaler = StandardScaler(
            inputCol="features_unscaled",
            outputCol="features",
            withStd=True,
            withMean=False
        )

        # Crear un pipeline de preprocesamiento
        preprocessing_pipeline = Pipeline(stages=indexers + [assembler, scaler])

        # Ajustar y transformar los datos
        preprocessing_start_time = time.time()
        preprocessed_df = preprocessing_pipeline.fit(df).transform(df)
        preprocessing_end_time = time.time()
        duration = preprocessing_end_time - preprocessing_start_time
        logging.info(f"Preprocesamiento completado en {duration:.2f} segundos.")

        total_duration = preprocessing_end_time - start_time
        logging.info(f"Tiempo total de preprocesamiento: {total_duration:.2f} segundos.")

        return preprocessed_df, duration, total_duration
    except Exception as e:
        logging.error(f"Error en preprocesamiento de datos: {e}")
        raise

def entrenar_modelos_regresion(preprocessed_df, features_col, label_col):
    """Entrena múltiples modelos de regresión y retorna los modelos y sus tiempos."""
    modelos = {
        "Regresión Lineal": LinearRegression(featuresCol=features_col, labelCol=label_col),
        "Árbol de Decisión": DecisionTreeRegressor(featuresCol=features_col, labelCol=label_col),
        "Random Forest": RandomForestRegressor(featuresCol=features_col, labelCol=label_col, numTrees=100),
        "Gradient-Boosted Trees": GBTRegressor(featuresCol=features_col, labelCol=label_col, maxIter=100)
    }

    modelos_entrenados = {}
    tiempos_entrenamiento = {}

    # Dividir los datos en entrenamiento y prueba
    train, test = preprocessed_df.randomSplit([0.8, 0.2], seed=42)

    for nombre, modelo in modelos.items():
        start_time = time.time()
        modelo_entrenado = modelo.fit(train)
        end_time = time.time()
        duration = end_time - start_time
        modelos_entrenados[nombre] = modelo_entrenado
        tiempos_entrenamiento[nombre] = duration
        logging.info(f"Modelo de {nombre} entrenado en {duration:.2f} segundos.")

    return modelos_entrenados, tiempos_entrenamiento, train, test

def entrenar_modelos_clasificacion(preprocessed_df, features_col, label_col):
    """Entrena múltiples modelos de clasificación y retorna los modelos y sus tiempos."""
    modelos = {
        "Regresión Logística": LogisticRegression(featuresCol=features_col, labelCol=label_col),
        "Árbol de Decisión": DecisionTreeClassifier(featuresCol=features_col, labelCol=label_col),
        "Random Forest": RandomForestClassifier(featuresCol=features_col, labelCol=label_col, numTrees=100),
        "Gradient-Boosted Trees": GBTClassifier(featuresCol=features_col, labelCol=label_col, maxIter=100)
    }

    modelos_entrenados = {}
    tiempos_entrenamiento = {}

    # Dividir los datos en entrenamiento y prueba
    train, test = preprocessed_df.randomSplit([0.8, 0.2], seed=42)

    for nombre, modelo in modelos.items():
        start_time = time.time()
        modelo_entrenado = modelo.fit(train)
        end_time = time.time()
        duration = end_time - start_time
        modelos_entrenados[nombre] = modelo_entrenado
        tiempos_entrenamiento[nombre] = duration
        logging.info(f"Modelo de {nombre} entrenado en {duration:.2f} segundos.")

    return modelos_entrenados, tiempos_entrenamiento, train, test

def entrenar_modelo_clustering(preprocessed_df, features_col, k=3):
    """Entrena un modelo de K-Means y retorna el modelo y su tiempo."""
    try:
        start_time = time.time()
        kmeans = KMeans(featuresCol=features_col, k=k, seed=42)
        kmeans_model = kmeans.fit(preprocessed_df)
        end_time = time.time()
        duration = end_time - start_time
        logging.info(f"Modelo de K-Means entrenado en {duration:.2f} segundos.")
        return kmeans_model, duration
    except Exception as e:
        logging.error(f"Error al entrenar el modelo de K-Means: {e}")
        raise

def análisis_texto(preprocessed_df):
    """Realiza análisis de texto utilizando TF-IDF y Word2Vec."""
    try:
        start_time = time.time()

        # Tokenizar la descripción de la transacción
        tokenizer = Tokenizer(inputCol="Descripción de la Transacción", outputCol="words_token")

        # Remover stopwords
        remover = StopWordsRemover(inputCol="words_token", outputCol="words_clean")

        # Aplicar HashingTF para TF-IDF
        hashing_tf = HashingTF(inputCol="words_clean", outputCol="rawFeatures", numFeatures=1000)
        idf = IDF(inputCol="rawFeatures", outputCol="tfidf_features")

        # Aplicar Word2Vec
        word2vec = Word2Vec(inputCol="words_clean", outputCol="word2vec_features", vectorSize=100, minCount=1)

        # Crear un pipeline de análisis de texto
        text_pipeline = Pipeline(stages=[tokenizer, remover, hashing_tf, idf, word2vec])

        # Ajustar y transformar los datos
        text_start_time = time.time()
        text_model = text_pipeline.fit(preprocessed_df)
        text_df = text_model.transform(preprocessed_df)
        text_end_time = time.time()
        duration = text_end_time - text_start_time
        logging.info(f"Análisis de texto completado en {duration:.2f} segundos.")

        total_duration = text_end_time - start_time
        logging.info(f"Tiempo total de análisis de texto: {total_duration:.2f} segundos.")

        return text_df, duration, total_duration
    except Exception as e:
        logging.error(f"Error en análisis de texto: {e}")
        raise

def evaluar_modelos_regresion(modelos, test_df, label_col="Precio de Lista"):
    """Evalúa los modelos de regresión y retorna los resultados."""
    evaluador = RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse")
    resultados = {}

    for nombre, modelo in modelos.items():
        predicciones = modelo.transform(test_df)
        rmse = evaluador.evaluate(predicciones)
        resultados[nombre] = rmse
        logging.info(f"{nombre} RMSE: {rmse:.2f}")

    return resultados

def evaluar_modelos_clasificacion(modelos, test_df, label_col="label"):
    """Evalúa los modelos de clasificación y retorna los resultados."""
    evaluador = BinaryClassificationEvaluator(labelCol=label_col, rawPredictionCol="rawPrediction", metricName="areaUnderROC")
    resultados = {}

    for nombre, modelo in modelos.items():
        predicciones = modelo.transform(test_df)
        auc_score = evaluador.evaluate(predicciones)
        resultados[nombre] = auc_score
        logging.info(f"{nombre} AUC: {auc_score:.2f}")

    return resultados

def evaluar_modelo_clustering(modelo, df):
    """Evalúa el modelo de clustering y retorna el Silhouette Score."""
    evaluator = ClusteringEvaluator()
    predicciones = modelo.transform(df)
    silhouette = evaluator.evaluate(predicciones)
    logging.info(f"K-Means Silhouette Score: {silhouette:.2f}")
    return silhouette

def ajuste_hiperparametros_clasificacion(modelo, train_df, evaluador, param_grid, num_folds=3):
    """Realiza el ajuste de hiperparámetros usando CrossValidator."""
    try:
        start_time = time.time()
        crossval = CrossValidator(
            estimator=modelo,
            estimatorParamMaps=param_grid,
            evaluator=evaluador,
            numFolds=num_folds,
            parallelism=2  # Ajustar según los recursos disponibles
        )
        cv_model = crossval.fit(train_df)
        end_time = time.time()
        duration = end_time - start_time
        logging.info(f"CrossValidator completado en {duration:.2f} segundos.")
        return cv_model, duration
    except Exception as e:
        logging.error(f"Error en ajuste de hiperparámetros: {e}")
        raise

def guardar_modelos(modelos_dict, ruta_base):
    """Guarda los modelos entrenados en el sistema de archivos, sobrescribiendo si existen."""
    try:
        for nombre, modelo in modelos_dict.items():
            ruta = os.path.join(ruta_base, f"{nombre.replace(' ', '_').lower()}_model")
            # Utilizar write().overwrite().save(path) para sobrescribir si existe
            modelo.write().overwrite().save(ruta)
            logging.info(f"Modelo de {nombre} guardado en {ruta}")
    except Exception as e:
        logging.error(f"Error al guardar modelos: {e}")
        raise

def exportar_resultados_excel(tiempos, regresion_resultados, clasificacion_resultados, clustering_resultado, ruta):
    """Exporta los resultados y tiempos a un archivo Excel con múltiples hojas."""
    try:
        with pd.ExcelWriter(ruta, engine='xlsxwriter') as writer:
            # Hoja de tiempos
            tiempos_df = pd.DataFrame(list(tiempos.items()), columns=['Paso', 'Duración (s)'])
            tiempos_df.to_excel(writer, sheet_name='Tiempos de Ejecución', index=False)

            # Hoja de regresión
            regresion_df = pd.DataFrame(list(regresion_resultados.items()), columns=['Modelo', 'RMSE'])
            regresion_df.to_excel(writer, sheet_name='Regresión', index=False)

            # Hoja de clasificación
            clasificacion_df = pd.DataFrame(list(clasificacion_resultados.items()), columns=['Modelo', 'AUC'])
            clasificacion_df.to_excel(writer, sheet_name='Clasificación', index=False)

            # Hoja de clustering
            clustering_df = pd.DataFrame([{'Modelo': 'K-Means', 'Silhouette Score': clustering_resultado}])
            clustering_df.to_excel(writer, sheet_name='Clustering', index=False)

            # Información adicional o gráficos pueden ser añadidos aquí

        logging.info(f"Resultados exportados a Excel en {ruta}")
    except Exception as e:
        logging.error(f"Error al exportar resultados a Excel: {e}")
        raise

def generar_informe_pdf(ruta_pdf, plots_paths, excel_path, summary_text=""):
    """Genera un informe PDF incluyendo gráficos y resúmenes."""
    try:
        c = canvas.Canvas(ruta_pdf, pagesize=letter)
        width, height = letter
        c.setFont("Helvetica", 12)

        # Título
        c.drawString(50, height - 50, "Informe del Pipeline de Machine Learning")

        y_position = height - 80

        # Resumen
        c.drawString(50, y_position, "Resumen:")
        y_position -= 20
        text_object = c.beginText(50, y_position)
        for line in summary_text.split('\n'):
            text_object.textLine(line)
            y_position -= 15
        c.drawText(text_object)

        # Agregar gráficos
        for plot in plots_paths:
            if y_position < 200:
                c.showPage()
                y_position = height - 50
            # Ajustar la posición y el tamaño de la imagen según sea necesario
            c.drawImage(plot, 50, y_position - 300, width=500, height=300)
            y_position -= 320

        # Finalizar el PDF
        c.save()
        logging.info(f"Informe PDF generado en {ruta_pdf}")
    except Exception as e:
        logging.error(f"Error al generar el informe PDF: {e}")
        raise

def metodo_del_codo(preprocessed_df, features_col, max_k=10):
    """Determina el número óptimo de clusters usando el método del codo."""
    sse = []
    for k in range(2, max_k+1):
        kmeans = KMeans(featuresCol=features_col, k=k, seed=42)
        model = kmeans.fit(preprocessed_df)
        sse.append(model.summary.trainingCost)
        logging.info(f"K={k}, SSE={model.summary.trainingCost}")

    # Graficar el método del codo
    plt.figure(figsize=(10,6))
    sns.lineplot(x=range(2, max_k+1), y=sse, marker='o')
    plt.title('Método del Codo para Determinar k Óptimo')
    plt.xlabel('Número de Clusters (k)')
    plt.ylabel('Suma de Errores al Cuadrado (SSE)')
    plot_path = os.path.join(plots_save_path, "metodo_del_codo.png")
    plt.savefig(plot_path)
    plt.close()
    logging.info("Gráfico del método del codo guardado.")

    # Determinar el k donde la disminución de SSE se ralentiza
    # Aquí podrías implementar lógica para elegir el k óptimo automáticamente
    # Por simplicidad, retornamos el k con la mayor diferencia en la pendiente
    diffs = [sse[i] - sse[i+1] for i in range(len(sse)-1)]
    if diffs:
        k_optimo = diffs.index(max(diffs)) + 3  # +3 porque range starts at 2 and diffs index shifted by 1
        logging.info(f"El número óptimo de clusters según el método del codo es: k={k_optimo}")
    else:
        k_optimo = 3  # Valor por defecto
        logging.info(f"No se pudo determinar k óptimo. Usando k={k_optimo} por defecto.")
    return k_optimo

def main():
    """Función principal para ejecutar todo el pipeline."""
    try:
        # Iniciar el temporizador total
        total_start_time = time.time()

        # -----------------------------
        # Paso C: Conexión de PySpark con SQLite
        # -----------------------------
        sqlite_db_path = "transactions_secure.db"  # Ruta al archivo de la base de datos SQLite
        sqlite_jdbc_driver = "sqlite-jdbc.jar"  # Ruta al driver JDBC de SQLite

        # Verificar que el driver JDBC exista
        if not os.path.exists(sqlite_jdbc_driver):
            raise FileNotFoundError(
                f"El driver JDBC de SQLite no se encontró en {sqlite_jdbc_driver}. "
                "Por favor, descarga y coloca el archivo JAR correspondiente."
            )

        # URL JDBC para SQLite
        jdbc_url = f"jdbc:sqlite:{sqlite_db_path}"

        # Inicializar la sesión de Spark con el driver JDBC
        spark = iniciar_sesion_spark(sqlite_jdbc_driver)

        # -----------------------------
        # Paso D: Cargar datos a PySpark
        # -----------------------------
        transactions_df, duration = cargar_datos_spark(spark, jdbc_url, "transactions", "org.sqlite.JDBC")
        times['Cargar datos a PySpark'] = duration

        # -----------------------------
        # Paso E: Exploración y limpieza de los datos con SQL en PySpark
        # -----------------------------
        filtered_df, aggregated_df, duration = exploracion_y_limpieza(transactions_df)
        times['Exploración y limpieza de los datos'] = duration

        # Guardar resultados de agregación
        aggregated_pd = aggregated_df.toPandas()
        aggregated_pd.to_excel(os.path.join(results_save_path, "agregacion_categoria_producto.xlsx"), index=False)
        logging.info("Resultados de agregación guardados en Excel.")

        # -----------------------------
        # Paso F: Preprocesamiento de los datos
        # -----------------------------
        categorical_columns = [
            "Categoría de Producto", "Nivel de Calidad", "Tamaño de la Empresa",
            "Región Geográfica", "Ubicación", "Estado Civil del Cliente", "Color Preferido"
        ]

        numerical_columns = [
            "Monto de la Transacción", "Costo de Producción", "Impuesto Aplicado",
            "Precio de Lista", "Descuento Aplicado", "Margen de Ganancia", "Costo de Envío"
        ]

        preprocessed_df, duration, total_duration = preprocesamiento_datos(
            filtered_df,
            categorical_columns,
            numerical_columns
        )
        times['Preprocesamiento de los datos'] = duration
        times['Preprocesamiento total'] = total_duration

        # -----------------------------
        # Paso G: Aplicación de algoritmos de Machine Learning con PySpark ML
        # -----------------------------

        # F1: Entrenamiento de modelos de Regresión
        modelos_reg, tiempos_reg, train_reg, test_reg = entrenar_modelos_regresion(
            preprocessed_df,
            "features",
            "Precio de Lista"
        )
        times.update(tiempos_reg)

        # F2: Entrenamiento de modelos de Clasificación
        # Crear etiqueta binaria
        classification_df = preprocessed_df.withColumn(
            "label",
            F.when(F.col("Monto de la Transacción") > 700, 1).otherwise(0)
        ).select("features", "label")

        modelos_cls, tiempos_cls, train_cls, test_cls = entrenar_modelos_clasificacion(
            classification_df,
            "features",
            "label"
        )
        times.update(tiempos_cls)

        # F3: Determinar el número óptimo de clusters usando el método del codo
        k_optimo = metodo_del_codo(preprocessed_df, "features", max_k=10)
        # No se midió el tiempo en 'metodo_del_codo', pero puedes hacerlo si lo deseas

        # F4: Entrenamiento de modelo de Clustering K-Means con k_optimo
        kmeans_model, duration_kmeans = entrenar_modelo_clustering(preprocessed_df, "features", k=k_optimo)
        times[f'Clustering K-Means k={k_optimo}'] = duration_kmeans

        # F5: Análisis de texto
        text_df, duration_text, total_duration_text = análisis_texto(preprocessed_df)
        times['Análisis de texto'] = duration_text
        times['Análisis de texto total'] = total_duration_text

        # -----------------------------
        # Paso H: Evaluación y ajuste de modelos
        # -----------------------------
        # H1: Evaluar modelos de Regresión
        regresion_resultados = evaluar_modelos_regresion(modelos_reg, test_reg)

        # H2: Evaluar modelos de Clasificación
        clasificacion_resultados = evaluar_modelos_clasificacion(modelos_cls, test_cls)

        # H3: Evaluar modelo de Clustering
        clustering_resultado = evaluar_modelo_clustering(kmeans_model, preprocessed_df)

        # H4: Ajuste de hiperparámetros para Clasificación Logística
        log_reg = LogisticRegression(featuresCol="features", labelCol="label")
        paramGrid = ParamGridBuilder() \
            .addGrid(log_reg.regParam, [0.01, 0.1, 1.0]) \
            .addGrid(log_reg.elasticNetParam, [0.0, 0.5, 1.0]) \
            .build()
        cv_model, duration_cv = ajuste_hiperparametros_clasificacion(
            log_reg,
            train_cls,
            BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC"),
            paramGrid
        )
        times['CrossValidator clasificación logística'] = duration_cv

        # Obtener el mejor modelo de Clasificación Logística
        best_log_reg_model = cv_model.bestModel
        logging.info(f"Mejores parámetros del modelo de Clasificación Logística: regParam={best_log_reg_model._java_obj.getRegParam()}, elasticNetParam={best_log_reg_model._java_obj.getElasticNetParam()}")

        # Actualizar resultados de clasificación con el mejor modelo
        predicciones_best = best_log_reg_model.transform(test_cls)
        evaluador_cls = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
        auc_best = evaluador_cls.evaluate(predicciones_best)
        clasificacion_resultados['Clasificación Logística Optimizada'] = auc_best
        logging.info(f"Clasificación Logística Optimizada AUC: {auc_best:.2f}")

        # Agregar el tiempo total de evaluación y ajuste
        duration_evaluacion = time.time() - total_start_time
        times['Evaluación y ajuste de modelos'] = duration_evaluacion

        # -----------------------------
        # Paso I: Visualización de resultados
        # -----------------------------
        start_time_visual = time.time()
        plots_paths = []

        # H1: Visualizar predicciones de Regresión Lineal
        lr_model = modelos_reg.get("Regresión Lineal")
        if lr_model:
            predictions_reg = lr_model.transform(test_reg)
            regression_results = predictions_reg.select("Precio de Lista", "prediction").toPandas()
            plt.figure(figsize=(10,6))
            sns.scatterplot(x='Precio de Lista', y='prediction', data=regression_results, alpha=0.5)
            plt.title('Regresión Lineal: Actual vs Predicción')
            plt.xlabel('Precio de Lista Real')
            plt.ylabel('Precio de Lista Predicho')
            plot_path = os.path.join(plots_save_path, "regresion_lineal_actual_vs_prediccion.png")
            plt.savefig(plot_path)
            plt.close()
            logging.info("Visualización de Regresión Lineal guardada.")
            plots_paths.append(plot_path)

        # H2: Visualizar curva ROC para Clasificación Logística
        if best_log_reg_model:
            cls_results = predicciones_best.select("label", "probability").toPandas()
            cls_results['prob'] = cls_results['probability'].apply(lambda x: x[1])

            fpr, tpr, thresholds = roc_curve(cls_results['label'], cls_results['prob'])
            roc_auc = auc(fpr, tpr)

            plt.figure(figsize=(10,6))
            plt.plot(fpr, tpr, label=f'Curva ROC (AUC = {roc_auc:.2f})')
            plt.plot([0,1], [0,1], 'k--')
            plt.title('Curva ROC - Clasificación Logística Optimizada')
            plt.xlabel('Tasa de Falsos Positivos')
            plt.ylabel('Tasa de Verdaderos Positivos')
            plt.legend(loc='lower right')
            plot_path = os.path.join(plots_save_path, "clasificacion_logistica_roc_curve.png")
            plt.savefig(plot_path)
            plt.close()
            logging.info("Curva ROC de Clasificación Logística guardada.")
            plots_paths.append(plot_path)

        # H3: Visualizar resultados de K-Means Clustering
        clustering_results = kmeans_model.transform(preprocessed_df).select("prediction").toPandas()
        plt.figure(figsize=(10,6))
        sns.countplot(x='prediction', data=clustering_results)
        plt.title(f'Resultados de K-Means Clustering (k={k_optimo})')
        plt.xlabel('Cluster')
        plt.ylabel('Número de Transacciones')
        plot_path = os.path.join(plots_save_path, "kmeans_clustering_resultados.png")
        plt.savefig(plot_path)
        plt.close()
        logging.info("Visualización de K-Means Clustering guardada.")
        plots_paths.append(plot_path)

        # H4: Visualizar distribución de características TF-IDF
        tfidf_sample = text_df.select("tfidf_features").limit(1000).toPandas()
        tfidf_sample['tfidf_norm'] = tfidf_sample['tfidf_features'].apply(lambda x: float(x.norm(2)))

        plt.figure(figsize=(10,6))
        sns.histplot(tfidf_sample['tfidf_norm'], bins=50, kde=True)
        plt.title('Distribución de la Norma de Características TF-IDF')
        plt.xlabel('Norma TF-IDF')
        plt.ylabel('Frecuencia')
        plot_path = os.path.join(plots_save_path, "tfidf_norm_distribution.png")
        plt.savefig(plot_path)
        plt.close()
        logging.info("Distribución de Norma TF-IDF guardada.")
        plots_paths.append(plot_path)

        # H5: Visualizar características de Word2Vec usando PCA
        pca = PCA(k=2, inputCol="word2vec_features", outputCol="pca_features")
        pca_model = pca.fit(text_df)
        pca_df = pca_model.transform(text_df).select("pca_features").toPandas()

        plt.figure(figsize=(10,6))
        sns.scatterplot(
            x=pca_df['pca_features'].apply(lambda x: x[0]),
            y=pca_df['pca_features'].apply(lambda x: x[1]),
            alpha=0.5
        )
        plt.title('Proyección PCA de Características Word2Vec')
        plt.xlabel('PCA 1')
        plt.ylabel('PCA 2')
        plot_path = os.path.join(plots_save_path, "word2vec_pca_projection.png")
        plt.savefig(plot_path)
        plt.close()
        logging.info("Proyección PCA de Word2Vec guardada.")
        plots_paths.append(plot_path)

        # H6: Visualizar el método del codo
        plot_codo = os.path.join(plots_save_path, "metodo_del_codo.png")
        if os.path.exists(plot_codo):
            plots_paths.append(plot_codo)

        duration_visual = time.time() - start_time_visual
        times['Visualización de resultados'] = duration_visual
        logging.info(f"Visualización completada en {duration_visual:.2f} segundos.")

        # -----------------------------
        # Paso J: Cálculo de tiempos y Exportación de Resultados
        # -----------------------------
        total_end_time = time.time()
        tiempos_totales = total_end_time - total_start_time
        times['Tiempo total'] = tiempos_totales
        logging.info(f"Tiempo total de ejecución: {tiempos_totales:.2f} segundos.")

        # Exportar resultados a Excel
        ruta_excel = os.path.join(results_save_path, "resultados_pipeline.xlsx")
        exportar_resultados_excel(
            times,
            regresion_resultados,
            clasificacion_resultados,
            clustering_resultado,
            ruta_excel
        )

        # -----------------------------
        # Paso K: Generar Informe PDF
        # -----------------------------
        ruta_pdf = os.path.join(results_save_path, "informe_pipeline.pdf")
        summary_text = f"""
Informe del Pipeline de Machine Learning

Tiempo Total de Ejecución: {tiempos_totales:.2f} segundos

Modelos de Regresión:
"""
        for modelo, rmse in regresion_resultados.items():
            summary_text += f"{modelo}: RMSE = {rmse:.2f}\n"

        summary_text += "\nModelos de Clasificación:\n"
        for modelo, auc_score in clasificacion_resultados.items():
            summary_text += f"{modelo}: AUC = {auc_score:.2f}\n"

        summary_text += f"\nModelo de Clustering K-Means (k={k_optimo}): Silhouette Score = {clustering_resultado:.2f}\n"

        generar_informe_pdf(ruta_pdf, plots_paths, ruta_excel, summary_text)

        # -----------------------------
        # Paso L: Guardar los modelos entrenados
        # -----------------------------
        modelos_a_guardar = {
            "Regresión Lineal": modelos_reg.get("Regresión Lineal"),
            "Árbol de Decisión Regresión": modelos_reg.get("Árbol de Decisión"),
            "Random Forest Regresión": modelos_reg.get("Random Forest"),
            "Gradient-Boosted Trees Regresión": modelos_reg.get("Gradient-Boosted Trees"),
            "Regresión Logística": modelos_cls.get("Regresión Logística"),
            "Árbol de Decisión Clasificación": modelos_cls.get("Árbol de Decisión"),
            "Random Forest Clasificación": modelos_cls.get("Random Forest"),
            "Gradient-Boosted Trees Clasificación": modelos_cls.get("Gradient-Boosted Trees"),
            f"K-Means Clustering k={k_optimo}": kmeans_model,
            "Clasificación Logística Optimizada": best_log_reg_model
        }
        guardar_modelos(modelos_a_guardar, model_save_path)

        # -----------------------------
        # Paso M: Desconexión y limpieza final
        # -----------------------------
        spark.stop()
        logging.info("Sesión de Spark cerrada.")

        # Finalizar el reporte
        logging.info("Proceso completado exitosamente.")

    except Exception as e:
        logging.error(f"Error en el pipeline principal: {e}")
        try:
            spark.stop()
            logging.info("Sesión de Spark cerrada debido a un error.")
        except:
            pass
        raise

if __name__ == "__main__":
    main()
