**Objetivo del Proyecto**

Desarrollar un modelo de aprendizaje automático en PySpark para predecir el valor medio de viviendas en California, utilizando técnicas modernas de limpieza, transformación, escalamiento y regresión lineal. El proyecto simula un entorno de big data, lo que lo hace ideal para contextos empresariales de gran volumen.

Tecnologías Utilizadas

Lenguaje: Python

Framework: PySpark (Spark MLlib)

Entorno: Google Colab

Librerías clave: pyspark.ml, pandas, matplotlib

In [2]:
!pip install pyspark



In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, to_date, month, year, sum, avg, count
from pyspark.sql.window import Window
from pyspark.sql.functions import datediff, current_date

# Initialize Spark session
spark = SparkSession.builder.appName("HousePricePrediction").getOrCreate()

In [7]:
sales_df = spark.read.csv("/content/sample_data/housing.csv", header=True, inferSchema=True)

In [8]:
sales_df.createOrReplaceTempView("sales_data")
sales_df.printSchema()
sales_df.show(10)

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR B

Carga de Datos:

Se importó un archivo CSV con 20,640 registros de propiedades residenciales.

In [11]:
from pyspark.sql.functions import monotonically_increasing_id

sales_df = sales_df.withColumn("id", monotonically_increasing_id())
sales_df.show(3)


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity| id|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|  0|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|  1|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|  2|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+---+
only s

Se identificaron y eliminaron los registros con valores nulos.

Se añadió una columna id para seguimiento de filas.

In [12]:
from pyspark.sql.functions import col

# Contar valores nulos por columna
sales_df.select([col(c).isNull().cast("int").alias(c) for c in sales_df.columns]).summary("count").show()


+-------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----+
|summary|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|   id|
+-------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----+
|  count|    20640|   20640|             20640|      20640|         20640|     20640|     20640|        20640|             20640|          20640|20640|
+-------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-----+



In [13]:
sales_df = sales_df.dropna()

In [14]:
sales_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)
 |-- id: long (nullable = false)



In [15]:
sales_df.describe(["total_bedrooms", "total_rooms","population","households"]).show()


+-------+------------------+------------------+------------------+------------------+
|summary|    total_bedrooms|       total_rooms|        population|        households|
+-------+------------------+------------------+------------------+------------------+
|  count|             20433|             20433|             20433|             20433|
|   mean| 537.8705525375618|2636.5042333480155|1424.9469485635982|499.43346547251997|
| stddev|421.38507007403115| 2185.269566977601|1133.2084897449597| 382.2992258828481|
|    min|               1.0|               2.0|               3.0|               1.0|
|    max|            6445.0|           39320.0|           35682.0|            6082.0|
+-------+------------------+------------------+------------------+------------------+



In [17]:
train_df, test_df = sales_df.randomSplit([0.8, 0.2], seed=42)


División del Dataset:

Separación en conjuntos de entrenamiento (80%) y prueba (20%).

In [20]:
excluded_cols = ['median_house_value', 'id', 'ocean_proximity']
numerical_features = [col for col in sales_df.columns if sales_df.schema[col].dataType.simpleString() in ['double', 'int'] and col not in excluded_cols]


In [22]:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=numerical_features, outputCols=[f"{c}_imputed" for c in numerical_features])
imputer_model = imputer.fit(train_df)
train_df = imputer_model.transform(train_df)
test_df = imputer_model.transform(test_df)
imputed_features = [f"{c}_imputed" for c in numerical_features]


In [24]:
from pyspark.ml.feature import VectorAssembler # Import VectorAssembler

num_assembler = VectorAssembler(inputCols=imputed_features, outputCol="num_features_vec")
train_df = num_assembler.transform(train_df)
test_df = num_assembler.transform(test_df)


Imputación y Transformación de Variables:

Se imputaron valores faltantes usando Imputer.

Se ensamblaron y estandarizaron variables numéricas con VectorAssembler y StandardScaler.

In [26]:
from pyspark.ml.feature import StandardScaler # Import StandardScaler

scaler = StandardScaler(inputCol="num_features_vec", outputCol="scaled_num_features", withMean=True, withStd=True)
scaler_model = scaler.fit(train_df)
train_df = scaler_model.transform(train_df)
test_df = scaler_model.transform(test_df)


In [28]:
from pyspark.ml.feature import StringIndexer # Import the StringIndexer class

indexer = StringIndexer(inputCol="ocean_proximity", outputCol="ocean_category_index")
indexer_model = indexer.fit(train_df)
train_df = indexer_model.transform(train_df)
test_df = indexer_model.transform(test_df)


Codificación de Variables Categóricas:

Se indexó la columna ocean_proximity con StringIndexer.

Se codificó con OneHotEncoder.

In [30]:
from pyspark.ml.feature import OneHotEncoder # Import OneHotEncoder

encoder = OneHotEncoder(inputCols=["ocean_category_index"], outputCols=["ocean_category_ohe"])
encoder_model = encoder.fit(train_df)
train_df = encoder_model.transform(train_df)
test_df = encoder_model.transform(test_df)


In [31]:
final_assembler = VectorAssembler(
    inputCols=["scaled_num_features", "ocean_category_ohe"],
    outputCol="final_feature_vector"
)
train_df = final_assembler.transform(train_df)
test_df = final_assembler.transform(test_df)


Ensamblaje Final: Combinación de todas las variables preprocesadas en un solo vector de entrada: final_feature_vector.

In [33]:
from pyspark.ml.regression import LinearRegression # Import LinearRegression

lr = LinearRegression(featuresCol="final_feature_vector", labelCol="median_house_value")


Entrenamiento del Modelo: Se utilizó regresión lineal para predecir el valor medio de las casas (median_house_value).

In [34]:
lr_model = lr.fit(train_df)


In [35]:
pred_train_df = lr_model.transform(train_df)
pred_test_df = lr_model.transform(test_df)


In [36]:
pred_test_pd_df = pred_test_df.select("prediction", "median_house_value").toPandas()
pred_test_pd_df.head()


Unnamed: 0,prediction,median_house_value
0,151275.833403,103600.0
1,216541.197779,106700.0
2,126836.712672,73200.0
3,197962.966924,90100.0
4,153295.952455,67000.0


Evaluación del Modelo:

Predicción en datos de entrenamiento y prueba.

Conversión a Pandas para visualización.

In [37]:
predictions_and_actuals_rdd = pred_test_df.select("prediction", "median_house_value").rdd.map(lambda row: (row[0], row[1]))


In [39]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction")

rmse = evaluator.evaluate(pred_test_df, {evaluator.metricName: "rmse"})
mae = evaluator.evaluate(pred_test_df, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(pred_test_df, {evaluator.metricName: "r2"})
mse = evaluator.evaluate(pred_test_df, {evaluator.metricName: "mse"})

print(f"RMSE: {rmse}")
print(f"MAE: {mae}")
print(f"MSE: {mse}")
print(f"R2: {r2}")


RMSE: 68641.31886229731
MAE: 50004.934441645775
MSE: 4711630655.155573
R2: 0.6432667203304552


Resultados Clave:

El modelo logró explicar aproximadamente el 64% de la varianza en el valor de las viviendas.

El error medio absoluto (MAE) es de 50,000, aceptable para datos inmobiliarios reales.

El uso de PySpark permitió escalar fácilmente el pipeline completo para grandes volúmenes de datos.

Lo que demuestra este proyecto

Capacidad para construir pipelines de ciencia de datos en PySpark.

Experiencia práctica en ingeniería de características, escalamiento y codificación de datos.

Sólido conocimiento de evaluación de modelos de regresión.

Preparación para roles de Data Analyst / Data Scientist en entornos big data.