In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
import re

# ==============================
# 1. Inicializar Spark Session
# ==============================
# (E - Extract: La conexión a la fuente de datos)
spark = SparkSession.builder \
    .appName("HDFS_NiFi_Data_Cleaning_ETL") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()

# ==============================
# 2. Paths en HDFS (Fuente)
# ==============================
file_familia = "/user/nifi/Resumen_Valores-VENTA_POR_FAMILIA2.csv"
file_producto = "/user/nifi/Resumen _Valores-VENTA_POR-PRODUCTO2.csv"

# ==============================
# 3. Función de Asignación de Nombres (Helper para E)
# ==============================
def rename_month_columns(df):
    """
    Renombra los bloques de columnas por mes (Venta, TGT, PY24, pct)
    a su formato final Mes_Métrica.
    """
    meses = ["Enero","Febrero","Marzo","Abril","Mayo","Junio",
             "Julio","Agosto","Septiembre","Octubre","Noviembre","Diciembre","YTD_JUL"]
    metricas = ["Venta","TGT","PY24","pct"]

    new_cols = ["Producto_Familia"]
    col_idx = 1

    for mes in meses:
        for metrica in metricas:
            if col_idx < len(df.columns):
                new_cols.append(f"{mes}_{metrica}")
                col_idx += 1

    while len(new_cols) < len(df.columns):
        new_cols.append(f"Extra_{len(new_cols)}")

    return df.toDF(*new_cols)

# ==============================
# 4. Función de Extracción Principal
# ==============================
def extract_data(path):
    """
    Lee el CSV, identifica la cabecera real, la filtra y asigna
    nombres temporales a las columnas.
    """
    # 1. Leer CSV sin header
    df = spark.read.option("sep", ";").option("header", "false").csv(path)

    # 2. Tomar primera fila como header real
    header_row = df.first()

    # 3. Filtrar esa fila del dataframe (limpieza mínima)
    df = df.filter(col("_c0") != header_row[0])

    # 4. Columnas temporales
    tmp_headers = [f"Col_{i}" for i in range(len(df.columns))]
    df = df.toDF(*tmp_headers)

    # 5. Renombrar columnas a Mes_Métrica
    df = rename_month_columns(df)
    
    # 6. Quitar la fila "Producto" (limpieza inicial)
    df = df.filter(df["Producto_Familia"] != "Producto")

    return df

# ==============================
# 5. Ejecución del Extract
# ==============================
df_familia_raw = extract_data(file_familia)
df_producto_raw = extract_data(file_producto)

# Muestra inicial para verificación (Opcional)
print("=== Familia RAW (después de Extract) ===")
df_familia_raw.show(5, truncate=False)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/30 17:10:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/30 17:10:35 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


=== Familia RAW (después de Extract) ===
+----------------+-----------+---------+----------+---------+-------------+-----------+------------+-----------+-----------+-----------+----------+-----------+-----------+-----------+----------+-----------+----------+-----------+---------+-----------+-----------+-----------+----------+-----------+-----------+-----------+----------+-----------+------------+-----------+-----------+----------+----------------+--------------+---------------+--------------+-------------+-----------+------------+-----------+---------------+-------------+--------------+-------------+---------------+-------------+--------------+-------------+-------------+-----------+------------+-----------+
|Producto_Familia|Enero_Venta|Enero_TGT|Enero_PY24|Enero_pct|Febrero_Venta|Febrero_TGT|Febrero_PY24|Febrero_pct|Marzo_Venta|Marzo_TGT  |Marzo_PY24|Marzo_pct  |Abril_Venta|Abril_TGT  |Abril_PY24|Abril_pct  |Mayo_Venta|Mayo_TGT   |Mayo_PY24|Mayo_pct   |Junio_Venta|Junio_TGT  |Junio_P

In [2]:
from pyspark.sql.functions import expr, regexp_extract, first, trim, upper, when
from pyspark.sql import functions as F

# ==============================
# 1. Función de Transformación Principal
# ==============================
def transform_to_wide_format(df, nivel_valor):
    """
    Aplica todas las transformaciones: unpivot, pivot, tipado, normalización
    para pasar de formato ancho (Mes_Metrica) a formato largo (Mes, Metrica).
    """
    id_col = "Producto_Familia"
    value_cols = [c for c in df.columns if c != id_col]

    # 1. Unpivot (formato largo intermedio)
    expr_str = "stack({0}, {1}) as (Columna, Valor)".format(
        len(value_cols),
        ",".join([f"'{c}', {c}" for c in value_cols])
    )
    df_long = df.select(id_col, expr(expr_str))

    # 2. Separar Mes y Métrica
    df_long = df_long.withColumn("Mes", regexp_extract("Columna", r"^(.*)_(Venta|TGT|PY24|pct)$", 1)) \
                     .withColumn("Metrica", regexp_extract("Columna", r"^(.*)_(Venta|TGT|PY24|pct)$", 2)) \
                     .drop("Columna")

    # 3. Pivotear: convertir Métrica en columnas (formato wide final)
    df_wide = df_long.groupBy(id_col, "Mes").pivot("Metrica").agg(F.first("Valor"))

    # 4. Normalizar columna Mes (limpieza de texto)
    df_wide = df_wide.withColumn("Mes", trim(upper(df_wide["Mes"])))

    # 5. Limpieza y tipado de métricas (casteo a double y reemplazo de nulos)
    metricas = ["Venta", "TGT", "PY24", "pct"]
    for m in metricas:
        df_wide = df_wide.withColumn(
            m,
            when(col(m).isNull(), 0.0).otherwise(col(m).cast("double"))
        )

    # 6. Agregar columna de nivel
    df_wide = df_wide.withColumn("Nivel", lit(nivel_valor))
    
    return df_wide

# ==============================
# 2. Ejecución del Transform
# ==============================

# Aplicar transformación a ambos datasets
df_familia_wide = transform_to_wide_format(df_familia_raw, "FAMILIA")
df_producto_wide = transform_to_wide_format(df_producto_raw, "PRODUCTO")

# 3. Unir ambos datasets limpios (Transformación final)
df_total = (
    df_familia_wide
    .unionByName(df_producto_wide, allowMissingColumns=True)
)

# Muestra para verificación
print("\n=== Resultado Unificado y Limpio (después de Transform) ===")
df_total.show(5, truncate=False)
df_total.printSchema()


=== Resultado Unificado y Limpio (después de Transform) ===
+----------------+---------+--------+-----------+---------+-----------+-------+
|Producto_Familia|Mes      |PY24    |TGT        |Venta    |pct        |Nivel  |
+----------------+---------+--------+-----------+---------+-----------+-------+
|AGGLAD          |ABRIL    |24635.46|120891.2407|91519.9  |0.757043269|FAMILIA|
|AGGLAD          |AGOSTO   |25769.42|2560.845455|0.0      |0.0        |FAMILIA|
|AGGLAD          |DICIEMBRE|71802.0 |2681.545455|0.0      |0.0        |FAMILIA|
|AGGLAD          |ENERO    |22534.35|110094.81  |110094.81|1.0        |FAMILIA|
|AGGLAD          |FEBRERO  |31785.06|114546.1364|110784.62|0.96716156 |FAMILIA|
+----------------+---------+--------+-----------+---------+-----------+-------+
only showing top 5 rows

root
 |-- Producto_Familia: string (nullable = true)
 |-- Mes: string (nullable = true)
 |-- PY24: double (nullable = true)
 |-- TGT: double (nullable = true)
 |-- Venta: double (nullable = true

In [3]:
# ==============================
# 1. Path de Destino
# ==============================
output_path = "/user/etl_output/resumen_ventas_limpio.parquet"

# ==============================
# 2. Ejecución del Load
# ==============================
print(f"\nEscribiendo resultado limpio en: {output_path}")

# Escribir el DataFrame final a Parquet en HDFS
df_total.write \
    .mode("overwrite") \
    .parquet(output_path)

print("Proceso ETL (Extract, Transform, Load) finalizado y datos cargados.")

# Detener Spark Session
spark.stop()


Escribiendo resultado limpio en: /user/etl_output/resumen_ventas_limpio.parquet


                                                                                

Proceso ETL (Extract, Transform, Load) finalizado y datos cargados.
