In [None]:
from pyspark.sql import SparkSession, functions as F

# Crear sesión de Spark
spark = SparkSession.builder.appName('BatchProcessing').getOrCreate()

# Ruta del archivo en HDFS
file_path = 'hdfs://192.168.1.10:9000/Tarea3/rows.csv'

# Cargar los datos desde HDFS
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(file_path)

# Verificar el esquema
df.printSchema()


In [None]:
# Eliminar duplicados
df_cleaned = df.dropDuplicates()

# Manejo de valores nulos
df_cleaned = df_cleaned.fillna({'VALOR': 0})

# Convertir columnas de fecha al formato correcto (si es necesario)
df_cleaned = df_cleaned.withColumn("VIGENCIADESDE", F.to_date(F.col("VIGENCIADESDE"), "yyyy-MM-dd")) \
                       .withColumn("VIGENCIAHASTA", F.to_date(F.col("VIGENCIAHASTA"), "yyyy-MM-dd"))

# Mostrar primeras filas del DataFrame limpio
df_cleaned.show()


In [None]:
# Resumen estadístico
df_cleaned.describe().show()

# Contar registros por categoría (ejemplo)
df_cleaned.groupBy("categoria").count().show()

# Filtrar y ordenar por valor
top_values = df_cleaned.filter(F.col("VALOR") > 5000).sort(F.desc("VALOR"))
top_values.show()


In [None]:
output_path = 'hdfs://192.168.1.10:9000/Tarea3/processed_data'
df_cleaned.write.mode('overwrite').parquet(output_path)
