In [2]:
import os, findspark
os.environ["JAVA_HOME"] = r"C:\java"
os.environ["HADOOP_HOME"] = r"C:\hadoop"   # opcional, por winutils
findspark.init(r"C:\spark")

## 1. Incializando Spark

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PipelineLimpiezaAirbnb") \
    .getOrCreate()

print("SparkSession iniciada:", spark.version)

SparkSession iniciada: 3.5.7


### 1.1. Hacer una lectura de la ruta de los datos que usaremos

In [4]:
# Ruta de los datos
listings_path = "listings.csv"
neighb_path = "neighbourhoods.csv"
reviews_path = "reviews.csv"

In [5]:
# Lectura de archivos
listings_df = spark.read.csv(listings_path, header=True, inferSchema=True)
neighb_df = spark.read.csv(neighb_path, header=True, inferSchema=True)
reviews_df = spark.read.csv(reviews_path, header=True, inferSchema=True)

## 2. Exploracion de los datos

Se realiza una exploracion de los datos para saber con que tipo de datos estamos trabajando

In [5]:
# Exploracion de los datos de listings
listings_df.printSchema()
listings_df.show()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: double (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)

+------+--------------------+-------+---------------+-------------------+-------------+---------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+-------

In [6]:
# Exploracion de los datos de neighbourhoods
neighb_df.printSchema()
neighb_df.show()

root
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)

+-------------------+-------------------+
|neighbourhood_group|      neighbourhood|
+-------------------+-------------------+
|               NULL|          Cerrillos|
|               NULL|        Cerro Navia|
|               NULL|           Conchalí|
|               NULL|          El Bosque|
|               NULL|   Estación Central|
|               NULL|         Huechuraba|
|               NULL|      Independencia|
|               NULL|        La Cisterna|
|               NULL|         La Florida|
|               NULL|          La Granja|
|               NULL|         La Pintana|
|               NULL|           La Reina|
|               NULL|         Las Condes|
|               NULL|       Lo Barnechea|
|               NULL|          Lo Espejo|
|               NULL|           Lo Prado|
|               NULL|              Macul|
|               NULL|              Maipú|
|               

In [7]:
# Exploracion de los datos de reviews
reviews_df.printSchema()
reviews_df.show()

root
 |-- listing_id: long (nullable = true)
 |-- date: date (nullable = true)

+----------+----------+
|listing_id|      date|
+----------+----------+
|     88944|2011-10-21|
|     88944|2012-01-29|
|     88944|2012-04-03|
|     88944|2012-06-03|
|     88944|2012-07-06|
|     88944|2012-07-25|
|     88944|2012-08-02|
|     88944|2012-12-02|
|     88944|2013-10-06|
|     88944|2013-11-12|
|     88944|2014-01-31|
|     88944|2014-04-01|
|     88944|2014-05-17|
|     88944|2014-06-25|
|     88944|2014-09-01|
|     88944|2014-09-20|
|     88944|2014-10-12|
|     88944|2014-11-16|
|     88944|2014-12-03|
|     88944|2014-12-23|
+----------+----------+
only showing top 20 rows



In [8]:
# Conteo de registros en cada DataFrame
print("Total de registros para listings:\n", listings_df.count())
print("Total de registros para neighbourhoods:\n", neighb_df.count())
print("Total de registros para reviews:\n", reviews_df.count())

Total de registros para listings:
 15143
Total de registros para neighbourhoods:
 32
Total de registros para reviews:
 454372


In [9]:
# Importar funciones necesarias
from pyspark.sql.functions import col, sum as spark_sum

# Conteo de valores nulos por columna en listings
null_counts_listings = listings_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in listings_df.columns])
print("Conteo de valores nulos por columna en listings:")
null_counts_listings.show()

# Conteo de valores nulos por columna en neighbourhoods
null_counts_neighb = neighb_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in neighb_df.columns])
print("Conteo de valores nulos por columna en neighbourhoods:")
null_counts_neighb.show()

# Conteo de valores nulos por columna en reviews
null_counts_reviews = reviews_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in reviews_df.columns])
print("Conteo de valores nulos por columna en reviews:")
null_counts_reviews.show()

Conteo de valores nulos por columna en listings:
+---+----+-------+---------+-------------------+-------------+--------+---------+---------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+
| id|name|host_id|host_name|neighbourhood_group|neighbourhood|latitude|longitude|room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|license|
+---+----+-------+---------+-------------------+-------------+--------+---------+---------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+
|  0|   9|     92|      172|              15052|          101|      94|       92|      101| 2069|            95|              105|       3335|             3323|                            93|              92|                  172|  14

In [10]:
# Conteo de duplicados en listings
duplicates_listings = listings_df.count() - listings_df.dropDuplicates().count()
print(f"Número de registros duplicados en listings: {duplicates_listings}")

# Conteo de duplicados en neighbourhoods
duplicates_neighb = neighb_df.count() - neighb_df.dropDuplicates().count()
print(f"Número de registros duplicados en neighbourhoods: {duplicates_neighb}")

# Conteo de duplicados en reviews
duplicates_reviews = reviews_df.count() - reviews_df.dropDuplicates().count()
print(f"Número de registros duplicados en reviews: {duplicates_reviews}")

Número de registros duplicados en listings: 0
Número de registros duplicados en neighbourhoods: 0
Número de registros duplicados en reviews: 1763


In [11]:
# Conteo de registros
print("Total registros listings:", listings_df.count())

print("Total registros neighbourhoods:", neighb_df.count())

print("Total registros reviews:", reviews_df.count())

Total registros listings: 15143
Total registros neighbourhoods: 32
Total registros reviews: 454372


## 3. Limpieza de Datos

### Paso 1: Corrección de tipos de datos

In [6]:
from pyspark.sql.functions import col, when, regexp_replace, trim
from pyspark.sql.types import IntegerType, FloatType, DoubleType

# Función para convertir columnas numéricas
def convert_to_numeric(df, column_name, data_type):
    """Convierte una columna a tipo numérico manejando valores nulos y errores"""
    return df.withColumn(column_name, 
                        when(col(column_name).isNull() | 
                             (col(column_name) == "") |
                             (col(column_name) == "NULL"), None)
                        .otherwise(col(column_name).cast(data_type)))

print("CORRECCIÓN DE TIPOS DE DATOS")
print("Convirtiendo columnas de listings a tipos apropiados:")

# Limpiar y convertir listings_df
listings_clean = listings_df

# Convertir columnas numéricas enteras
integer_columns = ['id', 'host_id', 'minimum_nights', 'number_of_reviews', 
                   'calculated_host_listings_count', 'number_of_reviews_ltm']

for col_name in integer_columns:
    if col_name in listings_clean.columns:
        listings_clean = convert_to_numeric(listings_clean, col_name, IntegerType())
        print(f" - Convertido {col_name} a IntegerType")

# Convertir columnas numéricas decimales
float_columns = ['latitude', 'longitude', 'reviews_per_month']

for col_name in float_columns:
    if col_name in listings_clean.columns:
        listings_clean = convert_to_numeric(listings_clean, col_name, DoubleType())
        print(f" - Convertido {col_name} a DoubleType")

# La columna price requiere limpieza especial (se hará en paso posterior)
print("Tipos de datos corregidos para listings")

CORRECCIÓN DE TIPOS DE DATOS
Convirtiendo columnas de listings a tipos apropiados:
 - Convertido id a IntegerType
 - Convertido host_id a IntegerType
 - Convertido minimum_nights a IntegerType
 - Convertido number_of_reviews a IntegerType
 - Convertido calculated_host_listings_count a IntegerType
 - Convertido number_of_reviews_ltm a IntegerType
 - Convertido latitude a DoubleType
 - Convertido longitude a DoubleType
 - Convertido number_of_reviews a IntegerType
 - Convertido calculated_host_listings_count a IntegerType
 - Convertido number_of_reviews_ltm a IntegerType
 - Convertido latitude a DoubleType
 - Convertido longitude a DoubleType
 - Convertido reviews_per_month a DoubleType
Tipos de datos corregidos para listings
 - Convertido reviews_per_month a DoubleType
Tipos de datos corregidos para listings


In [13]:
# Verificar el schema actualizado
print("\nSCHEMA ACTUALIZADO DE LISTINGS")
listings_clean.printSchema()


SCHEMA ACTUALIZADO DE LISTINGS
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: double (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)



### Paso 2: Manejo de valores nulos en listings

In [7]:
from pyspark.sql.functions import coalesce, lit, isnan, when, count, sum as spark_sum

print("MANEJO DE VALORES NULOS - LISTINGS")

# Verificar conteo actual de nulos
print("Conteo de valores nulos antes de la limpieza:")
null_counts_before = listings_clean.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in listings_clean.columns
])
null_counts_before.show()

# Estrategias para manejar valores nulos:

# 1. neighbourhood_group: Como hay muchos nulos (15052), vamos a investigar
print("\nAnalizando neighbourhood_group nulos...")
total_listings = listings_clean.count()
null_neighbourhood_group = listings_clean.filter(col("neighbourhood_group").isNull()).count()
print(f"Total de registros: {total_listings}")
print(f"Registros con neighbourhood_group nulo: {null_neighbourhood_group}")

# Asignar "Sin_Grupo" a neighbourhood_group nulos
listings_clean = listings_clean.withColumn(
    "neighbourhood_group",
    when(col("neighbourhood_group").isNull(), "Sin_Grupo")
    .otherwise(col("neighbourhood_group"))
)

# 2. host_name: Rellenar con "Host_Desconocido"
listings_clean = listings_clean.withColumn(
    "host_name",
    when(col("host_name").isNull(), "Host_Desconocido")
    .otherwise(col("host_name"))
)

# 3. name: Rellenar con "Propiedad_Sin_Nombre"
listings_clean = listings_clean.withColumn(
    "name",
    when(col("name").isNull(), "Propiedad_Sin_Nombre")
    .otherwise(col("name"))
)

# 4. Para columnas numéricas con pocos nulos, calcular promedio/mediana
# reviews_per_month: usar 0 para propiedades sin reseñas
listings_clean = listings_clean.withColumn(
    "reviews_per_month",
    when(col("reviews_per_month").isNull(), 0.0)
    .otherwise(col("reviews_per_month"))
)

# 5. minimum_nights: usar 1 como valor por defecto
listings_clean = listings_clean.withColumn(
    "minimum_nights",
    when(col("minimum_nights").isNull(), 1)
    .otherwise(col("minimum_nights"))
)

# 6. Para latitude/longitude nulos, eliminaremos esos registros (son críticos para análisis geográfico)
print(f"\nEliminando registros con coordenadas faltantes...")
before_geo_filter = listings_clean.count()
listings_clean = listings_clean.filter(
    col("latitude").isNotNull() & col("longitude").isNotNull()
)
after_geo_filter = listings_clean.count()
print(f"Registros eliminados por coordenadas faltantes: {before_geo_filter - after_geo_filter}")

print("\nConteo de valores nulos después de la limpieza:")
null_counts_after = listings_clean.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in listings_clean.columns
])
null_counts_after.show()

print(f"Registros finales en listings: {listings_clean.count()}")

MANEJO DE VALORES NULOS - LISTINGS
Conteo de valores nulos antes de la limpieza:
+-----+----+-------+---------+-------------------+-------------+--------+---------+---------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+
|   id|name|host_id|host_name|neighbourhood_group|neighbourhood|latitude|longitude|room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|license|
+-----+----+-------+---------+-------------------+-------------+--------+---------+---------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+
|10085|   9|    183|      172|              15052|          101|     103|      174|      101| 2069|            97|              172|       3335|             3331|                            95|   

### Paso 3: Manejo de valores nulos en neighbourhoods

In [8]:
print("MANEJO DE VALORES NULOS - NEIGHBOURHOODS")

# Verificar conteo actual de nulos en neighbourhoods
print("Conteo de valores nulos en neighbourhoods:")
null_counts_neighb = neighb_df.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in neighb_df.columns
])
null_counts_neighb.show()

# Mostrar registros con neighbourhood_group nulo
print("Registros con neighbourhood_group nulo:")
neighb_df.filter(col("neighbourhood_group").isNull()).show()

# Limpiar neighbourhoods: asignar "Grupo_No_Especificado" a neighbourhood_group nulos
neighb_clean = neighb_df.withColumn(
    "neighbourhood_group",
    when(col("neighbourhood_group").isNull(), "Grupo_No_Especificado")
    .otherwise(col("neighbourhood_group"))
)

print("Conteo de valores nulos después de la limpieza:")
null_counts_neighb_after = neighb_clean.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in neighb_clean.columns
])
null_counts_neighb_after.show()

print(f"Registros en neighbourhoods: {neighb_clean.count()}")

MANEJO DE VALORES NULOS - NEIGHBOURHOODS
Conteo de valores nulos en neighbourhoods:
+-------------------+-------------+
|neighbourhood_group|neighbourhood|
+-------------------+-------------+
|                 32|            0|
+-------------------+-------------+

Registros con neighbourhood_group nulo:
+-------------------+-------------------+
|neighbourhood_group|      neighbourhood|
+-------------------+-------------------+
|               NULL|          Cerrillos|
|               NULL|        Cerro Navia|
|               NULL|           Conchalí|
|               NULL|          El Bosque|
|               NULL|   Estación Central|
|               NULL|         Huechuraba|
|               NULL|      Independencia|
|               NULL|        La Cisterna|
|               NULL|         La Florida|
|               NULL|          La Granja|
|               NULL|         La Pintana|
|               NULL|           La Reina|
|               NULL|         Las Condes|
|               NULL|  

### Paso 4: Eliminación de duplicados en reviews

In [9]:
print("ELIMINACIÓN DE DUPLICADOS - REVIEWS")

# Verificar conteo antes de eliminar duplicados
total_reviews = reviews_df.count()
print(f"Total de registros en reviews antes: {total_reviews}")

# Contar duplicados
duplicates_count = total_reviews - reviews_df.dropDuplicates().count()
print(f"Número de registros duplicados encontrados: {duplicates_count}")

# Eliminar duplicados
reviews_clean = reviews_df.dropDuplicates()
total_after_cleanup = reviews_clean.count()

print(f"\nTotal de registros en reviews después: {total_after_cleanup}")
print(f"Registros eliminados: {total_reviews - total_after_cleanup}")

# Verificar que no hay duplicados
final_duplicates = total_after_cleanup - reviews_clean.dropDuplicates().count()
print(f"\nDuplicados restantes: {final_duplicates}")

ELIMINACIÓN DE DUPLICADOS - REVIEWS
Total de registros en reviews antes: 454372
Número de registros duplicados encontrados: 1763
Número de registros duplicados encontrados: 1763

Total de registros en reviews después: 452609
Registros eliminados: 1763

Total de registros en reviews después: 452609
Registros eliminados: 1763

Duplicados restantes: 0

Duplicados restantes: 0


### Paso 5: Validación de datos geográficos

In [17]:
# Rangos aproximados para Santiago, Chile
# Latitud: entre -33.7 y -33.1 (Santiago está alrededor de -33.4)
# Longitud: entre -70.9 y -70.3 (Santiago está alrededor de -70.6)

MIN_LAT, MAX_LAT = -33.8, -33.0
MIN_LON, MAX_LON = -70.9, -70.2

print("VALIDACIÓN DE DATOS GEOGRÁFICOS")
print(f"Rangos válidos para Santiago:")
print(f"Latitud: {MIN_LAT} a {MAX_LAT}")
print(f"Longitud: {MIN_LON} a {MAX_LON}")

VALIDACIÓN DE DATOS GEOGRÁFICOS
Rangos válidos para Santiago:
Latitud: -33.8 a -33.0
Longitud: -70.9 a -70.2


In [18]:
# Analizar rangos actuales de coordenadas
print("\nANÁLISIS DE RANGOS ACTUALES")
coords_stats = listings_clean.select(
    col("latitude").alias("lat"),
    col("longitude").alias("lon")
).describe()
coords_stats.show()

# Contar registros fuera de rango
out_of_range_count = listings_clean.filter(
    (col("latitude") < MIN_LAT) | (col("latitude") > MAX_LAT) |
    (col("longitude") < MIN_LON) | (col("longitude") > MAX_LON)
).count()

total_count = listings_clean.count()
print(f"\nRegistros fuera de rango geográfico de Santiago: {out_of_range_count}")
print(f"Total de registros: {total_count}")
print(f"Porcentaje fuera de rango: {(out_of_range_count/total_count)*100:.2f}%")


ANÁLISIS DE RANGOS ACTUALES
+-------+-------------------+------------------+
|summary|                lat|               lon|
+-------+-------------------+------------------+
|  count|              14960|             14960|
|   mean| -33.43383032269537|-70.60988689806014|
| stddev|0.03273250917799909|0.0776001536803381|
|    min|        -33.5950351|         -70.86822|
|    max|          -33.24383|         -70.22031|
+-------+-------------------+------------------+


Registros fuera de rango geográfico de Santiago: 0
Total de registros: 14960
Porcentaje fuera de rango: 0.00%


In [19]:
# Rangos aproximados para Santiago, Chile
# Latitud: entre -33.7 y -33.1 (Santiago está alrededor de -33.4)
# Longitud: entre -70.9 y -70.3 (Santiago está alrededor de -70.6)

MIN_LAT, MAX_LAT = -33.8, -33.0
MIN_LON, MAX_LON = -70.9, -70.2

print("VALIDACIÓN DE DATOS GEOGRÁFICOS")
print(f"Rangos válidos para Santiago:")
print(f"Latitud: {MIN_LAT} a {MAX_LAT}")
print(f"Longitud: {MIN_LON} a {MAX_LON}")

# Analizar rangos actuales de coordenadas
print("\nANÁLISIS DE RANGOS ACTUALES")
coords_stats = listings_clean.select(
    col("latitude").alias("lat"),
    col("longitude").alias("lon")
).describe()
coords_stats.show()

# Contar registros fuera de rango
out_of_range_count = listings_clean.filter(
    (col("latitude") < MIN_LAT) | (col("latitude") > MAX_LAT) |
    (col("longitude") < MIN_LON) | (col("longitude") > MAX_LON)
).count()

total_count = listings_clean.count()
print(f"\nRegistros fuera de rango geográfico de Santiago: {out_of_range_count}")
print(f"Total de registros: {total_count}")
print(f"Porcentaje fuera de rango: {(out_of_range_count/total_count)*100:.2f}%")

# Mostrar algunos registros fuera de rango
if out_of_range_count > 0:
    print("\nEjemplos de registros fuera de rango:")
    listings_clean.filter(
        (col("latitude") < MIN_LAT) | (col("latitude") > MAX_LAT) |
        (col("longitude") < MIN_LON) | (col("longitude") > MAX_LON)
    ).select("id", "name", "neighbourhood", "latitude", "longitude").show(10)

# Filtrar solo coordenadas válidas para Santiago
listings_clean = listings_clean.filter(
    (col("latitude") >= MIN_LAT) & (col("latitude") <= MAX_LAT) &
    (col("longitude") >= MIN_LON) & (col("longitude") <= MAX_LON)
)

filtered_count = listings_clean.count()
removed_count = total_count - filtered_count

print(f"\nDespués del filtrado geográfico:")
print(f"Registros mantenidos: {filtered_count}")
print(f"Registros eliminados: {removed_count}")

print("Datos geográficos validados")

VALIDACIÓN DE DATOS GEOGRÁFICOS
Rangos válidos para Santiago:
Latitud: -33.8 a -33.0
Longitud: -70.9 a -70.2

ANÁLISIS DE RANGOS ACTUALES
+-------+-------------------+------------------+
|summary|                lat|               lon|
+-------+-------------------+------------------+
|  count|              14960|             14960|
|   mean| -33.43383032269537|-70.60988689806014|
| stddev|0.03273250917799909|0.0776001536803381|
|    min|        -33.5950351|         -70.86822|
|    max|          -33.24383|         -70.22031|
+-------+-------------------+------------------+


Registros fuera de rango geográfico de Santiago: 0
Total de registros: 14960
Porcentaje fuera de rango: 0.00%

Después del filtrado geográfico:
Registros mantenidos: 14960
Registros eliminados: 0
Datos geográficos validados


### Paso 6: Normalización de precios

In [10]:
print("NORMALIZACIÓN DE PRECIOS")

# Analizar la columna price actual
print("Análisis de la columna price:")
print("Ejemplos de valores en price:")
listings_clean.select("price").distinct().show(20)

# Contar valores nulos y no nulos
total_listings = listings_clean.count()
null_prices = listings_clean.filter(col("price").isNull()).count()
valid_prices = total_listings - null_prices

print(f"\nTotal de registros: {total_listings}")
print(f"Precios nulos: {null_prices}")
print(f"Precios con valor: {valid_prices}")

# Limpiar y convertir precios
print("\nLimpiando y convirtiendo precios...")

# Remover caracteres no numéricos y convertir a número
from pyspark.sql.functions import regexp_replace, trim

listings_clean = listings_clean.withColumn(
    "price_clean",
    when(col("price").isNull() | (col("price") == "") | (col("price") == "NULL"), None)
    .otherwise(
        regexp_replace(
            regexp_replace(col("price"), "[^0-9.]", ""), # Remover todo excepto números y puntos
            "^\\.", "0."  # Convertir .xx a 0.xx
        ).cast(DoubleType())
    )
)

# Analizar la distribución de precios limpios
print("\nEstadísticas de precios después de la limpieza:")
price_stats = listings_clean.select("price_clean").describe()
price_stats.show()

# Identificar precios extremos (outliers)
# Calcular percentiles
percentiles = listings_clean.select("price_clean").filter(col("price_clean").isNotNull()).approxQuantile("price_clean", [0.01, 0.05, 0.95, 0.99], 0.01)
p1, p5, p95, p99 = percentiles

print(f"Percentiles de precios:")
print(f"1%: ${p1:.2f}")
print(f"5%: ${p5:.2f}")
print(f"95%: ${p95:.2f}")
print(f"99%: ${p99:.2f}")

# Filtrar precios extremos (mantener entre percentil 1% y 99%)
before_filter = listings_clean.count()
listings_clean = listings_clean.filter(
    col("price_clean").isNull() | 
    ((col("price_clean") >= p1) & (col("price_clean") <= p99))
)
after_filter = listings_clean.count()

print(f"\nRegistros filtrados por precios extremos: {before_filter - after_filter}")

# Rellenar precios nulos con la mediana
median_price = listings_clean.filter(col("price_clean").isNotNull()).approxQuantile("price_clean", [0.5], 0.01)[0]
print(f"Mediana de precios: ${median_price:.2f}")

listings_clean = listings_clean.withColumn(
    "price_final",
    when(col("price_clean").isNull(), median_price)
    .otherwise(col("price_clean"))
)

# Eliminar columnas temporales
listings_clean = listings_clean.drop("price", "price_clean").withColumnRenamed("price_final", "price")

print("Precios normalizados y limpios")

NORMALIZACIÓN DE PRECIOS
Análisis de la columna price:
Ejemplos de valores en price:
+------+
| price|
+------+
| 27990|
| 17714|
| 31713|
|316499|
|191714|
| 56527|
|110071|
| 44423|
| 57051|
| 31118|
| 28426|
| 69182|
| 33174|
| 37311|
| 81824|
| 25969|
| 41157|
|  7762|
| 48590|
| 38900|
+------+
only showing top 20 rows

+------+
| price|
+------+
| 27990|
| 17714|
| 31713|
|316499|
|191714|
| 56527|
|110071|
| 44423|
| 57051|
| 31118|
| 28426|
| 69182|
| 33174|
| 37311|
| 81824|
| 25969|
| 41157|
|  7762|
| 48590|
| 38900|
+------+
only showing top 20 rows


Total de registros: 14960
Precios nulos: 1977
Precios con valor: 12983

Limpiando y convirtiendo precios...

Estadísticas de precios después de la limpieza:

Total de registros: 14960
Precios nulos: 1977
Precios con valor: 12983

Limpiando y convirtiendo precios...

Estadísticas de precios después de la limpieza:
+-------+-----------------+
|summary|      price_clean|
+-------+-----------------+
|  count|            12983|
|  

### Paso 7: Validación final y estadísticas

In [21]:
from pyspark.sql.functions import min as spark_min, max as spark_max, avg as spark_avg, count as spark_count

print("VALIDACIÓN FINAL Y ESTADÍSTICAS")
print("RESUMEN DE LA LIMPIEZA DE DATOS:")

# Estadísticas finales de cada dataset
print(f"DATASETS LIMPIOS:")
print(f" - listings_clean: {listings_clean.count():,} registros")
print(f" - neighb_clean: {neighb_clean.count():,} registros")
print(f" - reviews_clean: {reviews_clean.count():,} registros")

# Verificar valores nulos finales en listings
print(f"\nVERIFICACIÓN DE VALORES NULOS - LISTINGS:")
final_nulls = listings_clean.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in listings_clean.columns
])
final_nulls.show()

# Verificar valores nulos finales en neighbourhoods
print(f"VERIFICACIÓN DE VALORES NULOS - NEIGHBOURHOODS:")
final_nulls_neighb = neighb_clean.select([
    spark_sum(col(c).isNull().cast("int")).alias(c) for c in neighb_clean.columns
])
final_nulls_neighb.show()

# Mostrar schema final de listings
print(f"SCHEMA FINAL DE LISTINGS:")
listings_clean.printSchema()

# Estadísticas descriptivas finales
print(f"ESTADÍSTICAS DESCRIPTIVAS FINALES:")
listings_clean.select("price", "latitude", "longitude", "minimum_nights", 
                     "number_of_reviews", "reviews_per_month").describe().show()

# Distribución por tipo de habitación
print(f"DISTRIBUCIÓN POR TIPO DE HABITACIÓN:")
listings_clean.groupBy("room_type").count().orderBy("count", ascending=False).show()

# Top 10 barrios con más propiedades
print(f"TOP 10 BARRIOS CON MÁS PROPIEDADES:")
listings_clean.groupBy("neighbourhood").count().orderBy("count", ascending=False).show(10)

# Rango de precios por tipo de habitación
print(f"ESTADÍSTICAS DE PRECIOS POR TIPO DE HABITACIÓN:")
price_by_type = listings_clean.groupBy("room_type").agg(
    spark_min("price").alias("precio_min"),
    spark_max("price").alias("precio_max"),
    spark_avg("price").alias("precio_promedio"),
    spark_count("price").alias("cantidad")
).orderBy("precio_promedio", ascending=False)
price_by_type.show()

VALIDACIÓN FINAL Y ESTADÍSTICAS
RESUMEN DE LA LIMPIEZA DE DATOS:
DATASETS LIMPIOS:
 - listings_clean: 14,960 registros
 - neighb_clean: 32 registros
 - reviews_clean: 452,609 registros

VERIFICACIÓN DE VALORES NULOS - LISTINGS:
+----+----+-------+---------+-------------------+-------------+--------+---------+---------+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+-----+
|  id|name|host_id|host_name|neighbourhood_group|neighbourhood|latitude|longitude|room_type|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|license|price|
+----+----+-------+---------+-------------------+-------------+--------+---------+---------+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+-----+
|9924|   0|      0|        0|                  0|   

## 4. Transformación Final a Formato Parquet

Una vez completada la limpieza de datos, procedemos a guardar los datasets procesados en formato Parquet para optimizar el almacenamiento y futuras consultas.

In [22]:
print("TRANSFORMACIÓN FINAL A FORMATO PARQUET")

# Crear directorios para almacenar los archivos Parquet
import os
output_dir = "data_clean_parquet"

# Verificar estado final de los datasets antes de guardar
print("ESTADO FINAL DE DATASETS ANTES DE GUARDAR:")
print(f" - listings_clean: {listings_clean.count():,} registros, {len(listings_clean.columns)} columnas")
print(f" - neighb_clean: {neighb_clean.count():,} registros, {len(neighb_clean.columns)} columnas") 
print(f" - reviews_clean: {reviews_clean.count():,} registros, {len(reviews_clean.columns)} columnas")

# Mostrar esquemas finales
print(f"\nESQUEMA FINAL - LISTINGS_CLEAN:")
listings_clean.printSchema()

print(f"\nESQUEMA FINAL - NEIGHB_CLEAN:")
neighb_clean.printSchema()

print(f"\nESQUEMA FINAL - REVIEWS_CLEAN:")
reviews_clean.printSchema()

TRANSFORMACIÓN FINAL A FORMATO PARQUET
ESTADO FINAL DE DATASETS ANTES DE GUARDAR:
 - listings_clean: 14,960 registros, 18 columnas
 - neighb_clean: 32 registros, 2 columnas
 - reviews_clean: 452,609 registros, 2 columnas

ESQUEMA FINAL - LISTINGS_CLEAN:
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: double (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: str

In [23]:
print("\nGUARDAR DATASETS EN FORMATO PARQUET...")

# Definir rutas de salida
listings_parquet_path = f"{output_dir}/listings_clean.parquet"
neighb_parquet_path = f"{output_dir}/neighbourhoods_clean.parquet"
reviews_parquet_path = f"{output_dir}/reviews_clean.parquet"

print(f"Guardando en directorio: {output_dir}/")

# Guardar listings_clean en Parquet
print(f" - Guardando listings_clean → {listings_parquet_path}")
listings_clean.coalesce(1).write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(listings_parquet_path)

print(f"\tlistings_clean guardado exitosamente")

# Guardar neighb_clean en Parquet  
print(f" - Guardando neighb_clean → {neighb_parquet_path}")
neighb_clean.coalesce(1).write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(neighb_parquet_path)

print(f"\tneighb_clean guardado exitosamente")

# Guardar reviews_clean en Parquet (puede necesitar más particiones por su tamaño)
print(f" - Guardando reviews_clean → {reviews_parquet_path}")
reviews_clean.coalesce(4).write \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(reviews_parquet_path)

print(f"\treviews_clean guardado exitosamente")


GUARDAR DATASETS EN FORMATO PARQUET...
Guardando en directorio: data_clean_parquet/
 - Guardando listings_clean → data_clean_parquet/listings_clean.parquet
	listings_clean guardado exitosamente
 - Guardando neighb_clean → data_clean_parquet/neighbourhoods_clean.parquet
	neighb_clean guardado exitosamente
 - Guardando reviews_clean → data_clean_parquet/reviews_clean.parquet
	reviews_clean guardado exitosamente


In [24]:
print("\nVERIFICACIÓN DE ARCHIVOS PARQUET GENERADOS")

# Verificar que los archivos fueron creados correctamente
print("ARCHIVOS PARQUET CREADOS:")

# Leer y verificar cada archivo Parquet
print(f"\nVERIFICANDO LISTINGS PARQUET:")
listings_parquet_verify = spark.read.parquet(listings_parquet_path)
print(f" - Registros: {listings_parquet_verify.count():,}")
print(f" - Columnas: {len(listings_parquet_verify.columns)}")
print(f" - Schema verificado: ✅")

print(f"\nVERIFICANDO NEIGHBOURHOODS PARQUET:")
neighb_parquet_verify = spark.read.parquet(neighb_parquet_path)
print(f" - Registros: {neighb_parquet_verify.count():,}")
print(f" - Columnas: {len(neighb_parquet_verify.columns)}")
print(f" - Schema verificado: ✅")

print(f"\nVERIFICANDO REVIEWS PARQUET:")
reviews_parquet_verify = spark.read.parquet(reviews_parquet_path)
print(f" - Registros: {reviews_parquet_verify.count():,}")
print(f" - Columnas: {len(reviews_parquet_verify.columns)}")
print(f" - Schema verificado: ✅")

print(f"\nVERIFICACIÓN COMPLETA - TODOS LOS ARCHIVOS PARQUET ESTÁN CORRECTOS")


VERIFICACIÓN DE ARCHIVOS PARQUET GENERADOS
ARCHIVOS PARQUET CREADOS:

VERIFICANDO LISTINGS PARQUET:
 - Registros: 14,960
 - Columnas: 18
 - Schema verificado: ✅

VERIFICANDO NEIGHBOURHOODS PARQUET:
 - Registros: 32
 - Columnas: 2
 - Schema verificado: ✅

VERIFICANDO REVIEWS PARQUET:
 - Registros: 452,609
 - Columnas: 2
 - Schema verificado: ✅

VERIFICACIÓN COMPLETA - TODOS LOS ARCHIVOS PARQUET ESTÁN CORRECTOS


### 4.1. Resumen Final del Pipeline

El pipeline de limpieza y transformación de datos de Airbnb ha sido completado exitosamente.

In [68]:
# Mantener SparkSession activa para demostraciones
# spark.stop()  # Comentado para poder usar los ejemplos siguientes

### 4.2. Cómo usar los archivos Parquet creados

Los archivos Parquet se pueden leer y usar de múltiples formas. A continuación se muestran diferentes métodos:

In [25]:
print("🔧 MÉTODO 1: LECTURA CON SPARK (Recomendado para Big Data)")

# Leer archivos Parquet con Spark (ya tenemos las rutas definidas)
print("Leyendo archivos Parquet con Spark:")

# Leer listings
listings_from_parquet = spark.read.parquet("data_clean_parquet/listings_clean.parquet")
print(f" - Listings cargado: {listings_from_parquet.count():,} registros")

# Leer neighbourhoods  
neighb_from_parquet = spark.read.parquet("data_clean_parquet/neighbourhoods_clean.parquet")
print(f" - Neighbourhoods cargado: {neighb_from_parquet.count():,} registros")

# Leer reviews
reviews_from_parquet = spark.read.parquet("data_clean_parquet/reviews_clean.parquet")
print(f" - Reviews cargado: {reviews_from_parquet.count():,} registros")

print(f"\nEjemplo de schema preservado (listings):")
listings_from_parquet.printSchema()

print(f"\nEjemplo de datos (primeras 5 filas):")
listings_from_parquet.select("id", "name", "neighbourhood", "room_type", "price").show(5)

🔧 MÉTODO 1: LECTURA CON SPARK (Recomendado para Big Data)
Leyendo archivos Parquet con Spark:
 - Listings cargado: 14,960 registros
 - Neighbourhoods cargado: 32 registros
 - Reviews cargado: 452,609 registros

Ejemplo de schema preservado (listings):
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: double (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: strin

In [26]:
print("MÉTODO 2: CONSULTAS AVANZADAS CON SPARK SQL")

# Registrar tablas temporales para usar SQL
listings_from_parquet.createOrReplaceTempView("listings")
neighb_from_parquet.createOrReplaceTempView("neighbourhoods") 
reviews_from_parquet.createOrReplaceTempView("reviews")

print("Ejemplos de consultas SQL sobre los datos Parquet:")

# Consulta 1: Top 5 barrios más caros
print("\nTOP 5 BARRIOS MÁS CAROS:")
query1 = """
SELECT neighbourhood, 
       ROUND(AVG(price), 2) as precio_promedio,
       COUNT(*) as total_propiedades
FROM listings 
GROUP BY neighbourhood 
HAVING COUNT(*) >= 50
ORDER BY precio_promedio DESC 
LIMIT 5
"""
spark.sql(query1).show()

# Consulta 2: Propiedades por tipo y rango de precios
print("\n2️⃣ DISTRIBUCIÓN POR TIPO DE HABITACIÓN Y RANGO DE PRECIOS:")
query2 = """
SELECT room_type,
       CASE 
         WHEN price < 30000 THEN 'Económico (<30k)'
         WHEN price < 60000 THEN 'Moderado (30k-60k)'  
         WHEN price < 100000 THEN 'Caro (60k-100k)'
         ELSE 'Premium (>100k)'
       END as rango_precio,
       COUNT(*) as cantidad
FROM listings
GROUP BY room_type, 
         CASE 
           WHEN price < 30000 THEN 'Económico (<30k)'
           WHEN price < 60000 THEN 'Moderado (30k-60k)'
           WHEN price < 100000 THEN 'Caro (60k-100k)'
           ELSE 'Premium (>100k)'
         END
ORDER BY room_type, cantidad DESC
"""
spark.sql(query2).show()

# Consulta 3: Análisis de reseñas por barrio
print("\nESTADÍSTICAS DE RESEÑAS POR BARRIO (TOP 10):")
query3 = """
SELECT neighbourhood,
       COUNT(*) as total_propiedades,
       ROUND(AVG(number_of_reviews), 1) as promedio_resenas,
       ROUND(AVG(reviews_per_month), 2) as resenas_por_mes,
       ROUND(AVG(price), 0) as precio_promedio
FROM listings
GROUP BY neighbourhood
HAVING COUNT(*) >= 100
ORDER BY promedio_resenas DESC
LIMIT 10
"""
spark.sql(query3).show()

MÉTODO 2: CONSULTAS AVANZADAS CON SPARK SQL
Ejemplos de consultas SQL sobre los datos Parquet:

TOP 5 BARRIOS MÁS CAROS:
+-------------+---------------+-----------------+
|neighbourhood|precio_promedio|total_propiedades|
+-------------+---------------+-----------------+
| Lo Barnechea|      270880.31|              737|
|  San Joaquín|      261073.52|               84|
|     Pudahuel|      167310.81|               90|
|     Vitacura|      125420.99|              344|
|  Providencia|       92741.03|             2717|
+-------------+---------------+-----------------+


2️⃣ DISTRIBUCIÓN POR TIPO DE HABITACIÓN Y RANGO DE PRECIOS:
+---------------+------------------+--------+
|      room_type|      rango_precio|cantidad|
+---------------+------------------+--------+
|Entire home/apt|Moderado (30k-60k)|    6561|
|Entire home/apt|   Caro (60k-100k)|    2082|
|Entire home/apt|   Premium (>100k)|    1427|
|Entire home/apt|  Económico (<30k)|     981|
|     Hotel room|Moderado (30k-60k)|      13|