In [1]:
from pyspark.sql import SparkSession

# Configuramos Spark para que descargue el conector de Mongo automáticamente
spark = SparkSession.builder \
    .appName("AutoInsights_ETL") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.input.uri", "mongodb://mongodb:27017/autoinsights.raw_data") \
    .config("spark.mongodb.output.uri", "mongodb://mongodb:27017/autoinsights.agregados") \
    .getOrCreate()

print("Spark con Mongo Inicializado")

Spark con Mongo Inicializado


In [None]:
# Ruta interna del contenedor (mapeada por Docker)
file_path = "/home/jovyan/work/data/vehicles.csv"

# Leer CSV con header y esquema inferido
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Cachear para velocidad
df.cache()

print(f"Total de registros cargados: {df.count()}")
df.printSchema()

In [None]:
# 1. Eliminar filas con precio irreal (menor a $500 o mayor a $1M)
# 2. Eliminar años futuros o muy viejos (1990 - 2025)
# 3. Eliminar filas sin marca o modelo

df_clean = df.filter(
    (col("price") > 500) & 
    (col("price") < 1000000) & 
    (col("year") > 1990) & 
    (col("year") <= 2025) &
    col("manufacturer").isNotNull() &
    col("model").isNotNull()
)

print(f"Registros después de limpieza: {df_clean.count()}")

In [None]:
# Calcular precio promedio por Marca y Año
df_agg = df_clean.groupBy("manufacturer", "year") \
    .agg(
        round(avg("price"), 2).alias("avg_price"),
        count("*").alias("count")
    ) \
    .orderBy("manufacturer", "year")

# Mostrar una muestra
df_agg.show(20)

In [None]:
# Escribir el DataFrame agregado en MongoDB
# mode("overwrite") borra la colección vieja y escribe la nueva.

df_agg.write.format("mongo") \
    .mode("overwrite") \
    .option("database", "autoinsights") \
    .option("collection", "precios_promedio") \
    .save()

print("¡Datos guardados en MongoDB exitosamente!")