In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

# Crear la sesión de Spark
spark = SparkSession.builder \
    .appName("Intro a Spark") \
    .getOrCreate()

# Confirmar que Spark está corriendo
spark

In [4]:
# Crear un CSV de ejemplo
data = """id,nombre,edad
1,Ana,28
2,Juan,35
3,Luisa,41
4,Pedro,23
"""

# Guardar en archivo local en Colab
with open("personas.csv", "w") as f:
    f.write(data)

In [5]:
# Leer el archivo CSV
df = spark.read.csv("personas.csv", header=True, inferSchema=True)

# Mostrar las primeras filas
df.show()

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  1|   Ana|  28|
|  2|  Juan|  35|
|  3| Luisa|  41|
|  4| Pedro|  23|
+---+------+----+



In [6]:
df.printSchema()
df.count()

root
 |-- id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: integer (nullable = true)



4

In [7]:
df.select("nombre", "edad").show()

+------+----+
|nombre|edad|
+------+----+
|   Ana|  28|
|  Juan|  35|
| Luisa|  41|
| Pedro|  23|
+------+----+



In [8]:
df.filter(df.edad > 30).show()

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  2|  Juan|  35|
|  3| Luisa|  41|
+---+------+----+



In [9]:
from pyspark.sql.functions import lit

df = df.withColumn("activo", lit(True))
df.show()

+---+------+----+------+
| id|nombre|edad|activo|
+---+------+----+------+
|  1|   Ana|  28|  true|
|  2|  Juan|  35|  true|
|  3| Luisa|  41|  true|
|  4| Pedro|  23|  true|
+---+------+----+------+



In [10]:
from pyspark.sql.functions import when

df = df.withColumn(
    "departamento",
    when(df.nombre == "Ana", "Ventas")
    .when(df.nombre == "Juan", "Marketing")
    .when(df.nombre == "Luisa", "TI")
    .otherwise("Operaciones")
)

df.show()

+---+------+----+------+------------+
| id|nombre|edad|activo|departamento|
+---+------+----+------+------------+
|  1|   Ana|  28|  true|      Ventas|
|  2|  Juan|  35|  true|   Marketing|
|  3| Luisa|  41|  true|          TI|
|  4| Pedro|  23|  true| Operaciones|
+---+------+----+------+------------+



In [11]:
from pyspark.sql.functions import avg

df.groupBy("departamento").agg(avg("edad").alias("edad_promedio")).show()

+------------+-------------+
|departamento|edad_promedio|
+------------+-------------+
|          TI|         41.0|
| Operaciones|         23.0|
|      Ventas|         28.0|
|   Marketing|         35.0|
+------------+-------------+



In [12]:
from pyspark.sql.functions import count

df.groupBy("departamento").agg(count("*").alias("total_personas")).show()

+------------+--------------+
|departamento|total_personas|
+------------+--------------+
|          TI|             1|
| Operaciones|             1|
|      Ventas|             1|
|   Marketing|             1|
+------------+--------------+



In [13]:
df.groupBy("departamento").agg(avg("edad").alias("edad_promedio")).orderBy("edad_promedio", ascending=False).show()

+------------+-------------+
|departamento|edad_promedio|
+------------+-------------+
|          TI|         41.0|
|   Marketing|         35.0|
|      Ventas|         28.0|
| Operaciones|         23.0|
+------------+-------------+



In [14]:
from pyspark.sql import SparkSession

# Ya debes tener tu SparkSession activa como spark
# Si no la tienes, descomenta esta línea:
# spark = SparkSession.builder.getOrCreate()

from pyspark.sql import Row

# DataFrame de empleados
empleados_data = [
    Row(id=1, nombre="Ana", edad=30),
    Row(id=2, nombre="Juan", edad=25),
    Row(id=3, nombre="Luisa", edad=41),
    Row(id=4, nombre="Pedro", edad=28)
]

empleados_df = spark.createDataFrame(empleados_data)
empleados_df.show()

# DataFrame de salarios
salarios_data = [
    Row(id=1, salario=22000),
    Row(id=2, salario=18000),
    Row(id=4, salario=19500),
    Row(id=5, salario=25000)  # Este ID no existe en empleados
]

salarios_df = spark.createDataFrame(salarios_data)
salarios_df.show()

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  1|   Ana|  30|
|  2|  Juan|  25|
|  3| Luisa|  41|
|  4| Pedro|  28|
+---+------+----+

+---+-------+
| id|salario|
+---+-------+
|  1|  22000|
|  2|  18000|
|  4|  19500|
|  5|  25000|
+---+-------+



In [15]:
join_df = empleados_df.join(salarios_df, on="id", how="inner")
join_df.show()

+---+------+----+-------+
| id|nombre|edad|salario|
+---+------+----+-------+
|  1|   Ana|  30|  22000|
|  2|  Juan|  25|  18000|
|  4| Pedro|  28|  19500|
+---+------+----+-------+



In [16]:
left_df = empleados_df.join(salarios_df, on="id", how="left")
left_df.show()

+---+------+----+-------+
| id|nombre|edad|salario|
+---+------+----+-------+
|  1|   Ana|  30|  22000|
|  2|  Juan|  25|  18000|
|  3| Luisa|  41|   NULL|
|  4| Pedro|  28|  19500|
+---+------+----+-------+



In [17]:
outer_df = empleados_df.join(salarios_df, on="id", how="outer")
outer_df.show()

+---+------+----+-------+
| id|nombre|edad|salario|
+---+------+----+-------+
|  1|   Ana|  30|  22000|
|  2|  Juan|  25|  18000|
|  3| Luisa|  41|   NULL|
|  4| Pedro|  28|  19500|
|  5|  NULL|NULL|  25000|
+---+------+----+-------+



In [18]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = [
    (1, "Ana", 28),
    (2, "Juan", 35),
    (3, "Luisa", 41),
    (4, "Pedro", 22)
]

columns = ["id", "nombre", "edad"]

df = spark.createDataFrame(data, columns)
df.show()

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  1|   Ana|  28|
|  2|  Juan|  35|
|  3| Luisa|  41|
|  4| Pedro|  22|
+---+------+----+



In [19]:
df.write.mode("overwrite").option("header", True).csv("salida_csv")

In [20]:
!ls salida_csv

part-00000-3b2f7ff2-e047-41e3-b646-74f61e996a51-c000.csv  _SUCCESS
part-00001-3b2f7ff2-e047-41e3-b646-74f61e996a51-c000.csv


In [22]:
!head salida_csv/part-00001-*.csv

id,nombre,edad
3,Luisa,41
4,Pedro,22


In [23]:
df.write.mode("overwrite").parquet("salida_parquet")

In [24]:
!ls salida_parquet

part-00000-d1b6b1d0-9eef-4980-be9d-e0f1fb441d3b-c000.snappy.parquet  _SUCCESS
part-00001-d1b6b1d0-9eef-4980-be9d-e0f1fb441d3b-c000.snappy.parquet


In [25]:
df_parquet = spark.read.parquet("salida_parquet")
df_parquet.show()

+---+------+----+
| id|nombre|edad|
+---+------+----+
|  3| Luisa|  41|
|  4| Pedro|  22|
|  1|   Ana|  28|
|  2|  Juan|  35|
+---+------+----+



In [27]:
# Primer DataFrame: usuarios
usuarios_data = [
    (1, "Ana", 28),
    (2, "Juan", 35),
    (3, "Luisa", 41),
    (4, "Pedro", 22)
]
usuarios_cols = ["id", "nombre", "edad"]
df_usuarios = spark.createDataFrame(usuarios_data, usuarios_cols)

# Segundo DataFrame: compras
compras_data = [
    (1, "Laptop", 1000),
    (1, "Mouse", 25),
    (2, "Monitor", 200),
    (3, "Teclado", 50),
    (5, "Audífonos", 75)  # id 5 no existe en usuarios
]
compras_cols = ["id_usuario", "producto", "precio"]
df_compras = spark.createDataFrame(compras_data, compras_cols)

In [28]:
# Join tipo INNER
df_joined = df_usuarios.join(df_compras, df_usuarios.id == df_compras.id_usuario, "inner")
df_joined.show()

+---+------+----+----------+--------+------+
| id|nombre|edad|id_usuario|producto|precio|
+---+------+----+----------+--------+------+
|  1|   Ana|  28|         1|  Laptop|  1000|
|  1|   Ana|  28|         1|   Mouse|    25|
|  2|  Juan|  35|         2| Monitor|   200|
|  3| Luisa|  41|         3| Teclado|    50|
+---+------+----+----------+--------+------+



In [29]:
from pyspark.sql.functions import sum as spark_sum

df_gastos = df_joined.groupBy("nombre").agg(
    spark_sum("precio").alias("gasto_total")
)
df_gastos.show()

+------+-----------+
|nombre|gasto_total|
+------+-----------+
| Luisa|         50|
|   Ana|       1025|
|  Juan|        200|
+------+-----------+



In [30]:
from pyspark.sql.functions import current_date, date_add, datediff

# Añadir columna con fecha actual y fecha futura
df_con_fechas = df_usuarios.withColumn("hoy", current_date()) \
                           .withColumn("expira", date_add(current_date(), 30)) \
                           .withColumn("dias_restantes", datediff(date_add(current_date(), 30), current_date()))

df_con_fechas.show()

+---+------+----+----------+----------+--------------+
| id|nombre|edad|       hoy|    expira|dias_restantes|
+---+------+----+----------+----------+--------------+
|  1|   Ana|  28|2025-08-07|2025-09-06|            30|
|  2|  Juan|  35|2025-08-07|2025-09-06|            30|
|  3| Luisa|  41|2025-08-07|2025-09-06|            30|
|  4| Pedro|  22|2025-08-07|2025-09-06|            30|
+---+------+----+----------+----------+--------------+



In [31]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Transformaciones Avanzadas") \
    .getOrCreate()

In [33]:
# Primer DataFrame: usuarios
usuarios_data = [
    (1, "Ana", 28),
    (2, "Juan", 35),
    (3, "Luisa", 41),
    (4, "Pedro", 22)
]
usuarios_cols = ["id", "nombre", "edad"]
df_usuarios = spark.createDataFrame(usuarios_data, usuarios_cols)

In [35]:
import os

os.listdir("/content")

['.config', 'salida_parquet', 'salida_csv', 'personas.csv', 'sample_data']

In [38]:
df = spark.read.csv("/content/usuarios.csv", header=True, inferSchema=True)
df.show()

+---+------+----+----------------+----------------+
| id|nombre|edad|          correo|          ciudad|
+---+------+----+----------------+----------------+
|  1|  Juan|  25|  juan@gmail.com|Ciudad de México|
|  2|   Ana|  30| ana@hotmail.com|     Guadalajara|
|  3|  Luis|  22|  luis@yahoo.com|       Monterrey|
|  4|Carmen|  27|carmen@gmail.com|         Tijuana|
|  5| Pedro|  35| pedro@gmail.com|          Puebla|
+---+------+----+----------------+----------------+



In [41]:
from pyspark.sql.functions import avg, max, min

# Agrupar por país y aplicar funciones agregadas
df_grouped = df.groupBy("ciudad").agg(
    avg("edad").alias("edad_promedio"),
    max("edad").alias("edad_maxima"),
    min("edad").alias("edad_minima")
)

df_grouped.show()

+----------------+-------------+-----------+-----------+
|          ciudad|edad_promedio|edad_maxima|edad_minima|
+----------------+-------------+-----------+-----------+
|Ciudad de México|         25.0|         25|         25|
|       Monterrey|         22.0|         22|         22|
|     Guadalajara|         30.0|         30|         30|
|          Puebla|         35.0|         35|         35|
|         Tijuana|         27.0|         27|         27|
+----------------+-------------+-----------+-----------+



In [42]:
from pyspark.sql import Row

# Crear tabla de países y continentes
continentes = [
    Row(ciudad="Ciudad de México", continente="América"),
    Row(ciudad="Monterrey", continente="América"),
    Row(ciudad="Tijuana", continente="Europa")
]

df_continente = spark.createDataFrame(continentes)
df_continente.show()

+----------------+----------+
|          ciudad|continente|
+----------------+----------+
|Ciudad de México|   América|
|       Monterrey|   América|
|         Tijuana|    Europa|
+----------------+----------+



In [44]:
df_joined = df.join(df_continente, on="ciudad", how="left")
df_joined.show()

+----------------+---+------+----+----------------+----------+
|          ciudad| id|nombre|edad|          correo|continente|
+----------------+---+------+----+----------------+----------+
|Ciudad de México|  1|  Juan|  25|  juan@gmail.com|   América|
|       Monterrey|  3|  Luis|  22|  luis@yahoo.com|   América|
|     Guadalajara|  2|   Ana|  30| ana@hotmail.com|      NULL|
|          Puebla|  5| Pedro|  35| pedro@gmail.com|      NULL|
|         Tijuana|  4|Carmen|  27|carmen@gmail.com|    Europa|
+----------------+---+------+----+----------------+----------+



In [45]:
# Guardar resultados agrupados en CSV
df_grouped.write.mode("overwrite").csv("/content/output_por_pais.csv")

# Guardar resultados agrupados en formato Parquet
df_grouped.write.mode("overwrite").parquet("/content/output_por_pais.parquet")

In [46]:
from pyspark.sql.functions import when

In [47]:
df = df.withColumn("es_adulto", when(df.edad >= 18, True).otherwise(False))
df.select("nombre", "edad", "es_adulto").show()

+------+----+---------+
|nombre|edad|es_adulto|
+------+----+---------+
|  Juan|  25|     true|
|   Ana|  30|     true|
|  Luis|  22|     true|
|Carmen|  27|     true|
| Pedro|  35|     true|
+------+----+---------+



In [48]:
from pyspark.sql.functions import col

df = df.withColumn(
    "rango_edad",
    when(col("edad") < 18, "menor")
    .when((col("edad") >= 18) & (col("edad") <= 29), "joven")
    .when((col("edad") >= 30) & (col("edad") <= 59), "adulto")
    .otherwise("mayor")
)

df.select("nombre", "edad", "rango_edad").show()

+------+----+----------+
|nombre|edad|rango_edad|
+------+----+----------+
|  Juan|  25|     joven|
|   Ana|  30|    adulto|
|  Luis|  22|     joven|
|Carmen|  27|     joven|
| Pedro|  35|    adulto|
+------+----+----------+



In [49]:
df.show(truncate=False)

+---+------+----+----------------+----------------+---------+----------+
|id |nombre|edad|correo          |ciudad          |es_adulto|rango_edad|
+---+------+----+----------------+----------------+---------+----------+
|1  |Juan  |25  |juan@gmail.com  |Ciudad de México|true     |joven     |
|2  |Ana   |30  |ana@hotmail.com |Guadalajara     |true     |adulto    |
|3  |Luis  |22  |luis@yahoo.com  |Monterrey       |true     |joven     |
|4  |Carmen|27  |carmen@gmail.com|Tijuana         |true     |joven     |
|5  |Pedro |35  |pedro@gmail.com |Puebla          |true     |adulto    |
+---+------+----+----------------+----------------+---------+----------+



In [50]:
df.orderBy(col("edad").desc()).show()

+---+------+----+----------------+----------------+---------+----------+
| id|nombre|edad|          correo|          ciudad|es_adulto|rango_edad|
+---+------+----+----------------+----------------+---------+----------+
|  5| Pedro|  35| pedro@gmail.com|          Puebla|     true|    adulto|
|  2|   Ana|  30| ana@hotmail.com|     Guadalajara|     true|    adulto|
|  4|Carmen|  27|carmen@gmail.com|         Tijuana|     true|     joven|
|  1|  Juan|  25|  juan@gmail.com|Ciudad de México|     true|     joven|
|  3|  Luis|  22|  luis@yahoo.com|       Monterrey|     true|     joven|
+---+------+----+----------------+----------------+---------+----------+



In [51]:
df.filter(col("es_adulto") == True).show()

+---+------+----+----------------+----------------+---------+----------+
| id|nombre|edad|          correo|          ciudad|es_adulto|rango_edad|
+---+------+----+----------------+----------------+---------+----------+
|  1|  Juan|  25|  juan@gmail.com|Ciudad de México|     true|     joven|
|  2|   Ana|  30| ana@hotmail.com|     Guadalajara|     true|    adulto|
|  3|  Luis|  22|  luis@yahoo.com|       Monterrey|     true|     joven|
|  4|Carmen|  27|carmen@gmail.com|         Tijuana|     true|     joven|
|  5| Pedro|  35| pedro@gmail.com|          Puebla|     true|    adulto|
+---+------+----+----------------+----------------+---------+----------+



In [52]:
df.filter(col("rango_edad") == "adulto").show()

+---+------+----+---------------+-----------+---------+----------+
| id|nombre|edad|         correo|     ciudad|es_adulto|rango_edad|
+---+------+----+---------------+-----------+---------+----------+
|  2|   Ana|  30|ana@hotmail.com|Guadalajara|     true|    adulto|
|  5| Pedro|  35|pedro@gmail.com|     Puebla|     true|    adulto|
+---+------+----+---------------+-----------+---------+----------+



In [53]:
df.filter((col("rango_edad") == "adulto") & (col("edad") > 40)).show()

+---+------+----+------+------+---------+----------+
| id|nombre|edad|correo|ciudad|es_adulto|rango_edad|
+---+------+----+------+------+---------+----------+
+---+------+----+------+------+---------+----------+



In [54]:
df.limit(3).show()

+---+------+----+---------------+----------------+---------+----------+
| id|nombre|edad|         correo|          ciudad|es_adulto|rango_edad|
+---+------+----+---------------+----------------+---------+----------+
|  1|  Juan|  25| juan@gmail.com|Ciudad de México|     true|     joven|
|  2|   Ana|  30|ana@hotmail.com|     Guadalajara|     true|    adulto|
|  3|  Luis|  22| luis@yahoo.com|       Monterrey|     true|     joven|
+---+------+----+---------------+----------------+---------+----------+



In [55]:
df_filtrado = df.filter((col("es_adulto") == True) & (col("edad") > 40))

In [56]:
df_filtrado.write.mode("overwrite").option("header", True).csv("/content/usuarios_filtrados")

In [57]:
df_nuevo = spark.read.csv("/content/usuarios_filtrados", header=True, inferSchema=True)
df_nuevo.show()

+---+------+----+------+------+---------+----------+
| id|nombre|edad|correo|ciudad|es_adulto|rango_edad|
+---+------+----+------+------+---------+----------+
+---+------+----+------+------+---------+----------+



In [58]:
df.createOrReplaceTempView("usuarios")

In [59]:
resultado_sql = spark.sql("""
    SELECT ciudad, COUNT(*) AS total_usuarios, AVG(edad) AS edad_promedio
    FROM usuarios
    GROUP BY ciudad
    ORDER BY total_usuarios DESC
""")
resultado_sql.show()

+----------------+--------------+-------------+
|          ciudad|total_usuarios|edad_promedio|
+----------------+--------------+-------------+
|Ciudad de México|             1|         25.0|
|       Monterrey|             1|         22.0|
|     Guadalajara|             1|         30.0|
|          Puebla|             1|         35.0|
|         Tijuana|             1|         27.0|
+----------------+--------------+-------------+



In [60]:
# MINI PROYECTO SPARK: ANÁLISIS DE USUARIOS

# ─────────────────────────────────────────────────────────────
# 🛠️ 1. IMPORTAR LIBRERÍAS Y CREAR SESIÓN SPARK
# ─────────────────────────────────────────────────────────────
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, count, round

spark = SparkSession.builder \
    .appName("Proyecto ETL Usuarios") \
    .getOrCreate()

# ─────────────────────────────────────────────────────────────
# 📥 2. CARGAR DATOS DESDE CSV
# ─────────────────────────────────────────────────────────────
df = spark.read.csv("/content/usuarios.csv", header=True, inferSchema=True)
df.show(5)
df.printSchema()

# ─────────────────────────────────────────────────────────────
# 🧹 3. LIMPIEZA DE DATOS
# Eliminar duplicados y registros con valores nulos en columnas clave
# ─────────────────────────────────────────────────────────────
df_clean = df.dropDuplicates().dropna(subset=["nombre", "edad", "ciudad"])
df_clean.show(5)

# ─────────────────────────────────────────────────────────────
# 🔧 4. TRANSFORMACIONES
# Agregar columna "es_adulto" y "rango_edad"
# ─────────────────────────────────────────────────────────────
df_transformed = df_clean.withColumn(
    "es_adulto", when(col("edad") >= 18, True).otherwise(False)
).withColumn(
    "rango_edad", when(col("edad") < 18, "menor")
    .when((col("edad") >= 18) & (col("edad") <= 29), "no es")
    .when((col("edad") >= 30) & (col("edad") <= 59), "adulto")
    .otherwise("mayor")
)

df_transformed.show(5)

# ─────────────────────────────────────────────────────────────
# 📊 5. ANÁLISIS: ESTADÍSTICAS POR CIUDAD
# Contar usuarios y calcular edad promedio por ciudad
# ─────────────────────────────────────────────────────────────
df_estadisticas = df_transformed.groupBy("ciudad").agg(
    count("*").alias("total_usuarios"),
    round(avg("edad"), 1).alias("edad_promedio")
)

df_estadisticas.show()

# ─────────────────────────────────────────────────────────────
# 💾 6. GUARDAR RESULTADOS EN CSV
# Se guardan en /content para Google Colab o local si estás en Jupyter
# ─────────────────────────────────────────────────────────────
df_transformed.write.mode("overwrite").option("header", "true") \
    .csv("/content/usuarios_transformados.csv")

df_estadisticas.write.mode("overwrite").option("header", "true") \
    .csv("/content/estadisticas_ciudad.csv")

# ─────────────────────────────────────────────────────────────

+---+------+----+----------------+----------------+
| id|nombre|edad|          correo|          ciudad|
+---+------+----+----------------+----------------+
|  1|  Juan|  25|  juan@gmail.com|Ciudad de México|
|  2|   Ana|  30| ana@hotmail.com|     Guadalajara|
|  3|  Luis|  22|  luis@yahoo.com|       Monterrey|
|  4|Carmen|  27|carmen@gmail.com|         Tijuana|
|  5| Pedro|  35| pedro@gmail.com|          Puebla|
+---+------+----+----------------+----------------+

root
 |-- id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- edad: integer (nullable = true)
 |-- correo: string (nullable = true)
 |-- ciudad: string (nullable = true)

+---+------+----+----------------+----------------+
| id|nombre|edad|          correo|          ciudad|
+---+------+----+----------------+----------------+
|  4|Carmen|  27|carmen@gmail.com|         Tijuana|
|  5| Pedro|  35| pedro@gmail.com|          Puebla|
|  2|   Ana|  30| ana@hotmail.com|     Guadalajara|
|  3|  Luis|  22|  luis@yah