# KDD, Inteligencia de Negocios y Spark MLlib

## Configuración inicial

In [None]:
%pip install pyspark --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Crear una sesión Spark
spark = SparkSession.builder \
    .appName("MLlibExamples") \
    .getOrCreate()

## Ejemplos de código

### Ejemplo 1: Creación de un DataFrame

In [None]:
# Importar bibliotecas necesarias
from pyspark.sql.functions import rand, when, col, expr, lit
import random

# Crear una sesión Spark
spark = SparkSession.builder \
    .appName("ExtendedDatasetCreation") \
    .getOrCreate()

# Definir categorías y características
categories = ["A", "B", "C", "D", "E"]
features = ["f1", "f2", "f3", "f4"]

# Generar datos ampliados
extended_data = []
for i in range(1, 10001):  # Generamos 10,000 registros
    category = random.choice(categories)
    feature_values = [round(random.uniform(0, 1), 2) for _ in range(len(features))]
    label = random.choice([0, 1])  # Etiqueta binaria para clasificación
    extended_data.append((i, category) + tuple(feature_values) + (label,))

# Crear DataFrame
columns = ["id", "category"] + features + ["label"]
df = spark.createDataFrame(extended_data, columns)

# Añadir algunas características derivadas y valores nulos
df = df.withColumn("derived_feature",
                   when(col("f1") + col("f2") > 1, 1).otherwise(0))

df = df.withColumn("category_encoded",
                   when(col("category") == "A", 0)
                   .when(col("category") == "B", 1)
                   .when(col("category") == "C", 2)
                   .when(col("category") == "D", 3)
                   .when(col("category") == "E", 4))

# Introducir algunos valores nulos aleatoriamente
df = df.withColumn("f1", when(rand() > 0.95, None).otherwise(col("f1")))

# Añadir una columna numérica para regresión
df = df.withColumn("target_regression", expr("f1 * 2 + f2 * 3 + f3 * 1.5 + f4 * 0.5 + rand() * 0.1"))

# Mostrar el esquema y algunas filas de muestra
df.printSchema()
df.show(5)

# Guardar el DataFrame para su uso posterior si es necesario
# df.write.parquet("path/to/extended_dataset.parquet")

print(f"Número total de filas: {df.count()}")

root
 |-- id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- f1: double (nullable = true)
 |-- f2: double (nullable = true)
 |-- f3: double (nullable = true)
 |-- f4: double (nullable = true)
 |-- label: long (nullable = true)
 |-- derived_feature: integer (nullable = false)
 |-- category_encoded: integer (nullable = true)
 |-- target_regression: double (nullable = true)

+---+--------+----+----+----+----+-----+---------------+----------------+------------------+
| id|category|  f1|  f2|  f3|  f4|label|derived_feature|category_encoded| target_regression|
+---+--------+----+----+----+----+-----+---------------+----------------+------------------+
|  1|       A|0.95|0.51|0.43|0.02|    0|              1|               0| 4.169006099562851|
|  2|       B|0.29|0.12|0.18|0.81|    0|              0|               1|1.6186491801554297|
|  3|       C|0.91|0.14|0.19|0.57|    1|              1|               2|2.8703515223407723|
|  4|       D|0.23|0.43|0.51|0.57|    1|      

### Ejemplo 2: Preprocesamiento de datos

In [None]:
from pyspark.ml.feature import Imputer, StandardScaler

# Manejo de valores faltantes
imputer = Imputer(inputCols=["f1", "f2", "f3", "f4"], outputCols=["f1_imputed", "f2_imputed", "f3_imputed", "f4_imputed"])
imputer_model = imputer.fit(df)
df_imputed = imputer_model.transform(df)

# Escalado de características
assembler = VectorAssembler(inputCols=["f1_imputed", "f2_imputed", "f3_imputed", "f4_imputed"], outputCol="features")
df_assembled = assembler.transform(df_imputed)

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

print("Ejemplo 1: DataFrame con características imputadas y escaladas")
df_scaled.select("id", "category", "f1_imputed", "f2_imputed", "f3_imputed", "f4_imputed", "scaledFeatures", "label").show(5)

print("Columna scaledFeatures contiene:")
df_scaled.select("scaledFeatures").show(5, truncate=False)

Ejemplo 1: DataFrame con características imputadas y escaladas
+---+--------+----------+----------+----------+----------+--------------------+-----+
| id|category|f1_imputed|f2_imputed|f3_imputed|f4_imputed|      scaledFeatures|label|
+---+--------+----------+----------+----------+----------+--------------------+-----+
|  1|       A|      0.95|      0.51|      0.43|      0.02|[3.38661067681834...|    0|
|  2|       B|      0.29|      0.12|      0.18|      0.81|[1.03380746976559...|    0|
|  3|       C|      0.91|      0.14|      0.19|      0.57|[3.24401654305757...|    1|
|  4|       D|      0.23|      0.43|      0.51|      0.57|[0.81991626912444...|    1|
|  5|       B|       0.8|      0.45|      0.73|      0.97|[2.85188267521544...|    0|
+---+--------+----------+----------+----------+----------+--------------------+-----+
only showing top 5 rows

Columna scaledFeatures contiene:
+-----------------------------------------------------------------------------+
|scaledFeatures          

### Ejemplo 3: Entrenamiento de un modelo de clasificación

In [None]:
from pyspark.ml import Pipeline,
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import Imputer, StandardScaler, VectorAssembler, StringIndexer, OneHotEncoder

# Preprocesamiento
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
encoder = OneHotEncoder(inputCols=["categoryIndex"], outputCols=["categoryVec"])
assembler = VectorAssembler(inputCols=["categoryVec", "f1", "f2", "f3", "f4"], outputCol="features", handleInvalid="skip")

# Modelo
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Pipeline
pipeline = Pipeline(stages=[indexer, encoder, assembler, lr])

# Dividir los datos
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Entrenamiento
model = pipeline.fit(train_data)

# Predicciones
predictions = model.transform(test_data)

print("\nEjemplo 2: Predicciones de Regresión Logística")
predictions.select("id", "label", "prediction", "probability").show(5)

# Evaluación
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")


Ejemplo 2: Predicciones de Regresión Logística
+---+-----+----------+--------------------+
| id|label|prediction|         probability|
+---+-----+----------+--------------------+
|  3|    1|       0.0|[0.52114115040225...|
|  7|    0|       0.0|[0.52102406546541...|
|  9|    1|       1.0|[0.47788162697766...|
| 14|    0|       0.0|[0.50226771693924...|
| 20|    0|       1.0|[0.47764981063960...|
+---+-----+----------+--------------------+
only showing top 5 rows

AUC: 0.4879246798574064


# Ejemplo 3: Clasificación multiclase (usando 'category' como etiqueta)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, when
import random

# Crear una sesión Spark
spark = SparkSession.builder \
    .appName("NewMulticlassDataset") \
    .getOrCreate()

# Definir categorías
categories = ["A", "B", "C", "D", "E"]

# Función para generar datos de ejemplo
def generate_example(id):
    category = random.choice(categories)
    f1 = round(random.uniform(0, 10), 2)
    f2 = round(random.uniform(0, 10), 2)
    f3 = round(random.uniform(0, 10), 2)
    f4 = round(random.uniform(0, 10), 2)
    return (id, category, f1, f2, f3, f4)

# Generar datos
data = [generate_example(i) for i in range(1000)]  # 1000 ejemplos

# Crear DataFrame
columns = ["id", "category", "f1", "f2", "f3", "f4"]
df = spark.createDataFrame(data, columns)

# Mostrar algunas estadísticas del DataFrame
print("Esquema del DataFrame:")
df.printSchema()

print("\nDistribución de categorías:")
df.groupBy("category").count().show()

print("\nResumen estadístico de las características numéricas:")
df.describe().show()

print("\nMuestra de los datos:")
df.show(5)

print(f"\nNúmero total de filas: {df.count()}")

Esquema del DataFrame:
root
 |-- id: long (nullable = true)
 |-- category: string (nullable = true)
 |-- f1: double (nullable = true)
 |-- f2: double (nullable = true)
 |-- f3: double (nullable = true)
 |-- f4: double (nullable = true)


Distribución de categorías:
+--------+-----+
|category|count|
+--------+-----+
|       E|  208|
|       B|  204|
|       D|  203|
|       C|  183|
|       A|  202|
+--------+-----+


Resumen estadístico de las características numéricas:
+-------+-----------------+--------+------------------+------------------+------------------+-----------------+
|summary|               id|category|                f1|                f2|                f3|               f4|
+-------+-----------------+--------+------------------+------------------+------------------+-----------------+
|  count|             1000|    1000|              1000|              1000|              1000|             1000|
|   mean|            499.5|    NULL| 5.231319999999999|           5.05741| 4.

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Preprocesamiento
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
assembler = VectorAssembler(inputCols=["f1", "f2", "f3", "f4"], outputCol="features")

# Dividir los datos
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Crear y entrenar el modelo
lr_multi = LogisticRegression(featuresCol="features", labelCol="categoryIndex", maxIter=10, regParam=0.3)
pipeline_multi = Pipeline(stages=[indexer, assembler, lr_multi])
model_multi = pipeline_multi.fit(train_data)

# Evaluar el modelo
predictions_multi = model_multi.transform(test_data)
evaluator_multi = MulticlassClassificationEvaluator(labelCol="categoryIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_multi.evaluate(predictions_multi)
print(f"\nEjemplo 3: Clasificación multiclase")
print(f"Accuracy: {accuracy}")

# Matriz de confusión
print("Matriz de confusión:")
predictions_multi.groupBy("categoryIndex", "prediction").count().show()

# Mostrar algunas predicciones
print("\nAlgunas predicciones:")
predictions_multi.select("category", "categoryIndex", "prediction", "probability").show(5)


Ejemplo 3: Clasificación multiclase
Accuracy: 0.1813186813186813
Matriz de confusión:
+-------------+----------+-----+
|categoryIndex|prediction|count|
+-------------+----------+-----+
|          2.0|       0.0|   23|
|          1.0|       1.0|   15|
|          0.0|       1.0|   10|
|          1.0|       0.0|   21|
|          3.0|       1.0|   17|
|          2.0|       1.0|   18|
|          0.0|       0.0|   17|
|          4.0|       3.0|    3|
|          4.0|       0.0|   16|
|          3.0|       3.0|    1|
|          0.0|       3.0|    2|
|          3.0|       0.0|   23|
|          4.0|       1.0|   14|
|          2.0|       3.0|    1|
|          1.0|       3.0|    1|
+-------------+----------+-----+


Algunas predicciones:
+--------+-------------+----------+--------------------+
|category|categoryIndex|prediction|         probability|
+--------+-------------+----------+--------------------+
|       B|          2.0|       1.0|[0.20653306285646...|
|       C|          4.0|       0.0

## Ejercicios resueltos

### Ejercicio 1: Clasificación de Iris

In [None]:
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Cargar el dataset
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class']
iris_pd = pd.read_csv(url, names=names)
iris_df = spark.createDataFrame(iris_pd)

# Preprocesamiento
indexer = StringIndexer(inputCol="class", outputCol="label")
assembler = VectorAssembler(inputCols=["sepal-length", "sepal-width", "petal-length", "petal-width"], outputCol="features")

# Dividir los datos
train_data, test_data = iris_df.randomSplit([0.8, 0.2], seed=42)

# Crear y entrenar el modelo
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[indexer, assembler, lr])
model = pipeline.fit(train_data)

# Evaluar el modelo
predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Matriz de confusión
predictions.groupBy("label", "prediction").count().show()

Accuracy: 1.0
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0|   13|
|  0.0|       0.0|    6|
|  2.0|       2.0|   13|
+-----+----------+-----+



### Ejercicio 2: Predicción de precios de viviendas

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler, Imputer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml import Pipeline

spark = SparkSession.builder.appName("HousingPrediction").getOrCreate()

# Cargar el dataset
url = "https://raw.githubusercontent.com/ageron/handson-ml/master/datasets/housing/housing.csv"
housing_pd = pd.read_csv(url)
housing_df = spark.createDataFrame(housing_pd)

# Preprocesamiento
numeric_cols = ["longitude", "latitude", "housing_median_age", "total_rooms", "total_bedrooms", "population", "households", "median_income"]
categorical_cols = ["ocean_proximity"]

# Manejar valores faltantes
imputer = Imputer(inputCols=["total_bedrooms"], outputCols=["total_bedrooms_imputed"])

# Codificar variables categóricas
indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_proximity_index")
encoder = OneHotEncoder(inputCols=["ocean_proximity_index"], outputCols=["ocean_proximity_vec"])

# Combinar características numéricas y categóricas
assembler_inputs = [col + "_imputed" if col == "total_bedrooms" else col for col in numeric_cols] + ["ocean_proximity_vec"]
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features_unscaled")

# Escalar características
scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=False)

# Modelo
rf = RandomForestRegressor(featuresCol="features", labelCol="median_house_value")

# Pipeline
pipeline = Pipeline(stages=[imputer, indexer, encoder, assembler, scaler, rf])

# Dividir los datos
train_data, test_data = housing_df.randomSplit([0.8, 0.2], seed=42)

# Configurar CrossValidator
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3)

# Entrenar el modelo
cv_model = cv.fit(train_data)

# Hacer predicciones
predictions = cv_model.transform(test_data)

# Evaluar el modelo
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

r2_evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="r2")
r2 = r2_evaluator.evaluate(predictions)
print(f"R2: {r2}")

RMSE: 56319.05910152569
R2: 0.7657823584175962
