In [1]:
!java -version


openjdk version "11.0.28" 2025-07-15
OpenJDK Runtime Environment (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.28+6-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)


In [2]:
!pip install -q numpy==2.0.0


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.9/60.9 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.0/19.0 MB[0m [31m62.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [4]:
!pip install pyspark==3.5.1




In [5]:
! pip install pyarrow



In [6]:
from pyspark.sql import SparkSession

# Crear SparkSession local
spark = (
    SparkSession.builder
    .appName("HousingAdvanced")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)

sc = spark.sparkContext

print("✔ Conectado a Spark")
print("Spark version:", spark.version)
print("Master:", sc.master)


✔ Conectado a Spark
Spark version: 3.5.1
Master: local[*]


In [7]:
csv_path = "/content/Housing.csv"


In [8]:
df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(csv_path)
)

print("Filas:", df.count(), "| Columnas:", len(df.columns))
df.printSchema()

# Vista temporal para SQL
df.createOrReplaceTempView("Housing")

Filas: 545 | Columnas: 15
root
 |-- price: integer (nullable = true)
 |-- area: integer (nullable = true)
 |-- bedrooms: integer (nullable = true)
 |-- bathrooms: integer (nullable = true)
 |-- stories: integer (nullable = true)
 |-- mainroad: string (nullable = true)
 |-- guestroom: string (nullable = true)
 |-- basement: string (nullable = true)
 |-- hotwaterheating: string (nullable = true)
 |-- airconditioning: string (nullable = true)
 |-- parking: integer (nullable = true)
 |-- prefarea: string (nullable = true)
 |-- furnishingstatus: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- zipcode: integer (nullable = true)



In [9]:
# Estadísticas globales
spark.sql("""
SELECT
  COUNT(*) AS total_rows,
  AVG(price) AS avg_price,
  AVG(area) AS avg_area,
  AVG(bedrooms) AS avg_bedrooms,
  AVG(bathrooms) AS avg_bathrooms
FROM housing
""").show()

# Promedio de precio por estado de amueblado
spark.sql("""
SELECT furnishingstatus, COUNT(*) AS n, ROUND(AVG(price), 2) AS avg_price
FROM housing
GROUP BY furnishingstatus
ORDER BY avg_price DESC
""").show()

# Top 5 zipcodes por precio promedio (mínimo 5 registros)
spark.sql("""
SELECT zipcode, COUNT(*) AS n, ROUND(AVG(price), 2) AS avg_price
FROM housing
GROUP BY zipcode
HAVING COUNT(*) >= 5
ORDER BY avg_price DESC
LIMIT 5
""").show()


+----------+-----------------+----------------+------------------+------------------+
|total_rows|        avg_price|        avg_area|      avg_bedrooms|     avg_bathrooms|
+----------+-----------------+----------------+------------------+------------------+
|       545|4766729.247706422|5150.54128440367|2.9651376146788992|1.2862385321100918|
+----------+-----------------+----------------+------------------+------------------+

+----------------+---+----------+
|furnishingstatus|  n| avg_price|
+----------------+---+----------+
|       furnished|140| 5495696.0|
|  semi-furnished|227|4907524.23|
|     unfurnished|178|4013831.46|
+----------------+---+----------+

+-------+---+----------+
|zipcode|  n| avg_price|
+-------+---+----------+
|  33213|182|4782444.23|
|  33214|182|4759921.15|
|  33215|181|4757773.15|
+-------+---+----------+



In [10]:
from pyspark.sql import functions as F

print("Particiones actuales:", df.rdd.getNumPartitions())

# Reparticionar por zipcode
df_zip = df.repartition(8, "zipcode")
print("Particiones después de repartition:", df_zip.rdd.getNumPartitions())

# Coalesce a 2 particiones
df_zip2 = df_zip.coalesce(2)
print("Particiones después de coalesce:", df_zip2.rdd.getNumPartitions())

# Escritura en Parquet con partición
parquet_path = "/content/output/parquet_partitioned"
(
    df.write
    .mode("overwrite")
    .partitionBy("year", "furnishingstatus")
    .parquet(parquet_path)
)

print("✔ Parquet escrito en:", parquet_path)

# Leer el Parquet
df_parq = spark.read.parquet(parquet_path)
df_parq.groupBy("year", "furnishingstatus").count().orderBy("year").show()


Particiones actuales: 1
Particiones después de repartition: 8
Particiones después de coalesce: 2
✔ Parquet escrito en: /content/output/parquet_partitioned
+----+----------------+-----+
|year|furnishingstatus|count|
+----+----------------+-----+
|2015|  semi-furnished|   85|
|2015|     unfurnished|   56|
|2015|       furnished|   41|
|2016|  semi-furnished|   73|
|2016|       furnished|   54|
|2016|     unfurnished|   55|
|2017|     unfurnished|   67|
|2017|  semi-furnished|   69|
|2017|       furnished|   45|
+----+----------------+-----+



In [1]:
import pyarrow.csv as pa_csv
import pyarrow.parquet as pq
import pandas as pd

In [2]:
table_csv = pa_csv.read_csv("/content/Housing.csv")
print("Datos cargados con PyArrow desde CSV:")
print(table_csv.schema)

Datos cargados con PyArrow desde CSV:
price: int64
area: int64
bedrooms: int64
bathrooms: int64
stories: int64
mainroad: string
guestroom: string
basement: string
hotwaterheating: string
airconditioning: string
parking: int64
prefarea: string
furnishingstatus: string
year: int64
zipcode: int64


In [3]:
pq.write_table(table_csv, "/content/Housing.parquet")


In [4]:
table_parquet = pq.read_table("/content/Housing.parquet")
print("\nDatos cargados desde Parquet:")
print(table_parquet.schema)


Datos cargados desde Parquet:
price: int64
area: int64
bedrooms: int64
bathrooms: int64
stories: int64
mainroad: string
guestroom: string
basement: string
hotwaterheating: string
airconditioning: string
parking: int64
prefarea: string
furnishingstatus: string
year: int64
zipcode: int64


In [5]:
# Convertir a DataFrame de Pandas
df_pandas = table_parquet.to_pandas()
print("\nPrimeras filas del DataFrame Pandas:")
print(df_pandas.head())


Primeras filas del DataFrame Pandas:
      price  area  bedrooms  bathrooms  stories mainroad guestroom basement  \
0  13300000  7420         4          2        3      yes        no       no   
1  12250000  8960         4          4        4      yes        no       no   
2  12250000  9960         3          2        2      yes        no      yes   
3  12215000  7500         4          2        2      yes        no      yes   
4  11410000  7420         4          1        2      yes       yes      yes   

  hotwaterheating airconditioning  parking prefarea furnishingstatus  year  \
0              no             yes        2      yes        furnished  2015   
1              no             yes        3       no        furnished  2016   
2              no              no        2      yes   semi-furnished  2017   
3              no             yes        3      yes        furnished  2015   
4              no             yes        2       no        furnished  2016   

   zipcode  
0    

In [9]:
from pyspark.sql import SparkSession
df_spark = spark.read.csv("/content/Housing.csv", header=True, inferSchema=True)
print("Primeras filas del DataFrame en Spark:")
df_spark.show(5)


Primeras filas del DataFrame en Spark:
+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+----+-------+
|   price|area|bedrooms|bathrooms|stories|mainroad|guestroom|basement|hotwaterheating|airconditioning|parking|prefarea|furnishingstatus|year|zipcode|
+--------+----+--------+---------+-------+--------+---------+--------+---------------+---------------+-------+--------+----------------+----+-------+
|13300000|7420|       4|        2|      3|     yes|       no|      no|             no|            yes|      2|     yes|       furnished|2015|  33213|
|12250000|8960|       4|        4|      4|     yes|       no|      no|             no|            yes|      3|      no|       furnished|2016|  33214|
|12250000|9960|       3|        2|      2|     yes|       no|     yes|             no|             no|      2|     yes|  semi-furnished|2017|  33215|
|12215000|7500|       4|        2|      2|     yes|       no|

In [13]:
print("Columnas disponibles en el DataFrame:")
print(df_spark.columns)


Columnas disponibles en el DataFrame:
['price', 'area', 'bedrooms', 'bathrooms', 'stories', 'mainroad', 'guestroom', 'basement', 'hotwaterheating', 'airconditioning', 'parking', 'prefarea', 'furnishingstatus', 'year', 'zipcode']


In [20]:
# Crear vista temporal
df_spark.createOrReplaceTempView("housing")

query = """
SELECT AVG(`area`) AS promedio_lotarea
FROM housing
"""
df_sql = spark.sql(query)
df_sql.show()


+----------------+
|promedio_lotarea|
+----------------+
|5150.54128440367|
+----------------+



In [21]:
# Repartition y Coalesce
print("\nNúmero de particiones originales:", df_spark.rdd.getNumPartitions())

# Reparticionar a 4 particiones
df_repart = df_spark.repartition(4)
print("Número de particiones tras Repartition:", df_repart.rdd.getNumPartitions())

# Reducir particiones con Coalesce
df_coalesce = df_repart.coalesce(2)
print("Número de particiones tras Coalesce:", df_coalesce.rdd.getNumPartitions())



Número de particiones originales: 1
Número de particiones tras Repartition: 4
Número de particiones tras Coalesce: 2


In [22]:
# 4. Exportar resultados en formato Parquet
output_path = "/content/housing_parquet"
df_coalesce.write.mode("overwrite").parquet(output_path)
print("\nArchivo guardado en formato Parquet en:", output_path)


Archivo guardado en formato Parquet en: /content/housing_parquet
