In [87]:
import yaml

def load_config(path="config/pipeline_config.yaml"):
    with open(path, "r") as file:
        return yaml.safe_load(file)

In [19]:
import os
from pyspark.sql import DataFrame

def write_parquet(df: DataFrame, output_path: str, partition_by: str = None, mode: str = "overwrite"):
    try:
        if partition_by:
            df.write.mode(mode).partitionBy(partition_by).parquet(output_path)
        else:
            df.write.mode(mode).parquet(output_path)
    
    except Exception as e:
        print(f"write_parquet: {e}")
        return None

In [102]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField


def create_spark_session(app_name):
    config = load_config()
    return SparkSession.builder \
        .appName(app_name) \
        .master(config["spark"]["master"]) \
        .config("spark.sql.debug.maxToStringFields", 1000) \
        .getOrCreate()

def get_schema():
    return StructType([
        StructField("Unnamed: 0", StringType(), True),
        StructField("id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("description", StringType(), True),
        StructField("long_description", StringType(), True),
        StructField("customer_part_id", StringType(), True),
        StructField("manufacturer_name", StringType(), True),
        StructField("manufacturer_part_id", StringType(), True),
        StructField("competitor_name", StringType(), True),
        StructField("competitor_part_name", StringType(), True),
        StructField("competitor_part_id", StringType(), True),
        StructField("category", StringType(), True),
        StructField("unit_of_measure", StringType(), True),
        StructField("unit_quantity", StringType(), True),
        StructField("requested_quantity", StringType(), True),
        StructField("requested_unit_price", StringType(), True)
    ])

def read_materials(spark, input_path, header, quote, escape, multiline, infer_schema, file_format):
    schema = get_schema() if not infer_schema else None
    try:
        df = spark.read.format(file_format) \
            .option("header", header) \
            .option("quote", quote) \
            .option("escape", escape) \
            .option("multiline", multiline)
        
        if infer_schema:
            df = df.option("inferSchema", True)
        else:
            df = df.schema(schema)
        
        df = df.load(input_path)
        print("File loaded successfully.")
        return df
    except Exception as e:
        print(f"Ingestion process failed: {e}")
        return None

In [21]:
from pyspark.sql.functions import col, trim, regexp_replace, when, initcap, count, lit
from pyspark.sql.types import IntegerType, DoubleType
from pyspark.sql import DataFrame
from functools import reduce

def clean_and_validate_data(df: DataFrame, quarantine_path: str) -> DataFrame:
    try:
        # Identify rows with NaN values
        number_columns = ["id", "unit_quantity", "requested_quantity", "requested_unit_price"]
        nan_condition = [~col(c).rlike(r"^\d+\.0$") for c in number_columns]
        only_nan_rows = df.filter(reduce(lambda a, b: a & b, nan_condition))

        # Identify all columns null
        null_condition = [col(c).isNull() for c in df.columns]
        only_null_rows = df.filter(reduce(lambda a, b: a & b, null_condition))

        # Clean spaces, normalize text
        text_columns = ["name", "description", "manufacturer_name", "competitor_name", "category"]
        for col_name in text_columns:
            df = df.withColumn(col_name, initcap(col(col_name)))
            df = df.withColumn(col_name, trim(regexp_replace(col(col_name), r"\s+", " ")))

        # Validate columns that can't be null
        invalid_rows = df.filter(
            col("id").isNull() |
            col("name").isNull() |
            col("category").isNull()
        )

        discarded_rows = only_nan_rows.union(only_null_rows)
        discarded_rows = discarded_rows.union(invalid_rows).distinct()
        discarded_rows = discarded_rows.repartition(1) 


        #  Save discarded rows in quarantine
        if discarded_rows.count() > 0:
            discarded_rows.write.mode("overwrite").parquet(quarantine_path)
            print(f"{discarded_rows.count()} rows sent to quarantine.")
        
        # Filter valid rows
        valid_rows = df.subtract(discarded_rows)

        # Cast to integer
        valid_rows = valid_rows.withColumn("id", col("id").cast(IntegerType()))

        # Cast to double
        double_columns = ["unit_quantity", "requested_quantity", "requested_unit_price"]
        for c in double_columns:
            valid_rows = valid_rows.withColumn(c, col(c).cast(DoubleType()))
       
        # Drop columns > 90% null
        total_rows = valid_rows.count()
        threshold = 0.9

        null_counts = valid_rows.select([
            (count(when(col(c).isNull(), c)) / lit(total_rows)).alias(c)
            for c in valid_rows.columns
        ])
        null_ratios = null_counts.first().asDict()
        columns_to_keep = [c for c, ratio in null_ratios.items() if ratio < threshold]
        df_clean = valid_rows.select(columns_to_keep)

        df_clean = df_clean.drop("Unnamed: 0")

        return df_clean
    
    except Exception as e:
        print(f"clean_and_validate_data: {e}")
        return None

In [22]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, avg, min, max, countDistinct, when, lit, isnan, round

def category_aggregations(df: DataFrame) -> DataFrame:
    try:
        return df.groupBy("category").agg(
            count("*").alias("item_count"),
            round(avg("requested_unit_price"), 2).alias("avg_price"),
            round(min("requested_unit_price"), 2).alias("min_price"),
            round(max("requested_unit_price"), 2).alias("max_price")
        )
    except Exception as e:
        print(f"unexpected error has occurred: {e}")
        return None

def column_profiling(df: DataFrame) -> DataFrame:
    try:
        # Metrics
        stats = []
        for col_name in df.columns:
            col_stats = df.select(
                lit(col_name).alias("column"),
                count(when(col(col_name).isNull() | isnan(col(col_name)), col_name)).alias("null_count"),
                countDistinct(col_name).alias("unique_count"),
                min(col_name).alias("min"),
                max(col_name).alias("max")
            ).collect()[0]
            stats.append(col_stats.asDict())
        
        # Convertimos a un nuevo DataFrame
        return df.sparkSession.createDataFrame(stats)
    except Exception as e:
        print(f"An unexpected error has occurred: {e}")
        return None

In [103]:
config = load_config()
spark = create_spark_session(config["spark"]["app_name"])

In [104]:
paths = config["paths"]
input_path = paths["input_csv"]
raw_path = paths["raw_data_dir"]
quarantine_path = paths["quarantine_data_dir"]
processed_path = paths["processed_data_dir"]

read_opts = config.get("read_options", {})

header = read_opts.get("header", True)
infer_schema = read_opts.get("infer_schema", False)
delimiter = read_opts.get("delimiter", ",")
multiline = read_opts.get("multiline", False)
escape = read_opts.get("escape", '"')
quote = read_opts.get("quote", '"')
file_format = read_opts.get("file_format", "csv")

# header = config["read_options"]["header"]
# delimiter = config["read_options"]["delimiter"]
# delimiter = config["read_options"]["delimiter"]
# multiline = config["read_options"]["multiline"]
# escape = config["read_options"]["delimiter"]
# quote = config["read_options"]["quote"]
# file_format = config["read_options"]["file_format"]

In [105]:
# Ingestion
df_raw = read_materials(spark, input_path, header, quote, escape, multiline, infer_schema, file_format)

File loaded successfully.


In [49]:
# Cleansing and validation
df_clean = clean_and_validate_data(df_raw, quarantine_path)

400 rows sent to quarantine.


In [107]:
df_raw.show(10)

+----------+----+--------------------+--------------------+----------------+----------------+-----------------+--------------------+---------------+--------------------+------------------+--------+---------------+-------------+------------------+--------------------+
|Unnamed: 0|  id|                name|         description|long_description|customer_part_id|manufacturer_name|manufacturer_part_id|competitor_name|competitor_part_name|competitor_part_id|category|unit_of_measure|unit_quantity|requested_quantity|requested_unit_price|
+----------+----+--------------------+--------------------+----------------+----------------+-----------------+--------------------+---------------+--------------------+------------------+--------+---------------+-------------+------------------+--------------------+
|       0.0| 1.0|3COM ETHERLINK 3 ...|3COM ETHERLINK 3 ...|            NULL|            NULL|            3 COM|               3C509|           NULL|                NULL|              NULL|     OAG

In [109]:
df_raw.coalesce(1).write.mode("overwrite").json("data/materials.json")

In [110]:
json_strings = df_raw.toJSON().collect()

In [None]:
df_clean.select(lit("requested_unit_price").alias("column"),
                count(when(col("requested_unit_price").isNull() | isnan(col("requested_unit_price")), "requested_unit_price")).alias("null_count"),
                countDistinct("requested_unit_price").alias("unique_count"),
                min("requested_unit_price").alias("min"),
                round(avg("requested_unit_price"), 2).alias("avg"))\
                .show()

+--------------------+----------+------------+---+-----+
|              column|null_count|unique_count|min|  avg|
+--------------------+----------+------------+---+-----+
|requested_unit_price|         1|        2247|0.0|464.0|
+--------------------+----------+------------+---+-----+



In [None]:
# Aggregations
df_category_stats = category_aggregations(df_clean)


In [None]:
from pyspark.sql.functions import col, count, countDistinct, min, max, mean, when, isnan
from pyspark.sql import Row

# Paso 1: Preparamos agregaciones por columna
agg_exprs = []
for c in df_clean.columns:
    agg_exprs += [
        count(when(col(c).isNull() | isnan(col(c)), c)).alias(f"{c}_nulls"),
        countDistinct(col(c)).alias(f"{c}_unique"),
        min(col(c)).alias(f"{c}_min"),
        max(col(c)).alias(f"{c}_max"),
        mean(col(c)).alias(f"{c}_mean")
    ]

# Paso 2: Ejecutamos una sola agregación
agg_result = df_clean.agg(*agg_exprs).first().asDict()

# Paso 3: Armamos los resultados por columna
stats = []
for c in df_clean.columns:
    stats.append(Row(
        column=c,
        dtype=str(df_clean.schema[c].dataType),
        null_count=agg_result.get(f"{c}_nulls"),
        unique_count=agg_result.get(f"{c}_unique"),
        min=agg_result.get(f"{c}_min"),
        max=agg_result.get(f"{c}_max"),
        mean=agg_result.get(f"{c}_mean")
    ))

# Paso 4: Convertimos a DataFrame final
summary_df = spark.createDataFrame(stats)


In [None]:
write_parquet(df_category_stats, os.path.join(processed_path, "category_stats"))

In [None]:
df_clean.write.mode("overwrite").partitionBy("category").orc("data/test")

In [83]:
df_r = spark.read \
    .parquet("data/raw")

df_q = spark.read \
    .parquet("data/quarantine")

In [86]:
#45432
#726
df_q.show()


+----------+------+----+-----------+----------------+----------------+-----------------+--------------------+---------------+--------------------+------------------+--------+---------------+-------------+------------------+--------------------+--------+
|Unnamed: 0|    id|name|description|long_description|customer_part_id|manufacturer_name|manufacturer_part_id|competitor_name|competitor_part_name|competitor_part_id|category|unit_of_measure|unit_quantity|requested_quantity|requested_unit_price|is_valid|
+----------+------+----+-----------+----------------+----------------+-----------------+--------------------+---------------+--------------------+------------------+--------+---------------+-------------+------------------+--------------------+--------+
|    1052.0|1053.0|NULL|       NULL|            NULL|            NULL|             AJAX|           U-01-0231|           NULL|                NULL|              NULL|     MRT|           NULL|         NULL|              NULL|               

In [55]:
df_raw.show(5)

+----------+---+--------------------+--------------------+----------------+----------------+-----------------+--------------------+---------------+--------------------+------------------+--------+---------------+-------------+------------------+--------------------+
|Unnamed: 0| id|                name|         description|long_description|customer_part_id|manufacturer_name|manufacturer_part_id|competitor_name|competitor_part_name|competitor_part_id|category|unit_of_measure|unit_quantity|requested_quantity|requested_unit_price|
+----------+---+--------------------+--------------------+----------------+----------------+-----------------+--------------------+---------------+--------------------+------------------+--------+---------------+-------------+------------------+--------------------+
|       0.0|1.0|3COM ETHERLINK 3 ...|3COM ETHERLINK 3 ...|            NULL|            NULL|            3 COM|               3C509|           NULL|                NULL|              NULL|     OAG|   

In [31]:
from pyspark.sql.functions import concat
text_columns = ["name", "description", "manufacturer_name", "competitor_name", "category"]
for col_name in text_columns:
    cleaned_col = col_name + "_cleaned"
    df = df_raw.withColumn(cleaned_col, initcap(col(col_name)))
    df = df_raw.withColumn(cleaned_col, trim(regexp_replace(col(col_name), r"\s+", " ")))