In [6]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from pyspark.sql import Window

builder = SparkSession.builder.appName("Bonus Challange") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.conf.set("spark.sql.shuffle.partitions", "4")

In [13]:
class ETLPipeline:
    def __init__(self, spark, column_dropna, avg_column, input_path=None, input_format="csv", table_name=None, timestamp_col="event_time"):
        """
        :param spark: SparkSession object
        :param input_path: Path to input file or Delta table
        :param input_format: 'csv' or 'delta'
        :param table_name: Optional table name for Delta input
        :param timestamp_col: Column containing timestamps
        """
        self.spark = spark
        self.column_dropna = column_dropna
        self.avg_column = avg_column
        self.input_path = input_path
        self.input_format = input_format
        self.table_name = table_name
        self.timestamp_col = timestamp_col
        self.df = None

    def load(self):
        """Load CSV or Delta table into DataFrame"""
        if self.input_format == "csv":
            self.df = (
                self.spark.read.csv(self.input_path, header=True, inferSchema=True)
            )
        elif self.input_format == "delta":
            if self.table_name:
                self.df = self.spark.read.format("delta").table(self.table_name)
            else:
                self.df = self.spark.read.format("delta").load(self.input_path)
        else:
            raise ValueError("Unsupported format. Use 'csv' or 'delta'.")
        return self.df


    def transform(self):
        df = self.df

        if self.column_dropna:
            df = df.fillna({f"{self.column_dropna}": 0})

        # Parse timestamp
        df = df.withColumn(self.timestamp_col, F.to_timestamp(F.col(self.timestamp_col)))

        df = (
            df.withColumn("year", F.year(F.col(self.timestamp_col)))
              .withColumn("month", F.month(F.col(self.timestamp_col)))
              .withColumn("day", F.dayofmonth(F.col(self.timestamp_col)))
        )

        # Calculated column
        if self.avg_column:
            avg_value = df.select(F.avg(F.col(f"{self.avg_column}"))).first()[0]
            df = df.withColumn(f"{self.avg_column}", F.when(F.col(f"{self.avg_column}").isNull(), avg_value).otherwise(F.col(f"{self.avg_column}")))

        self.df = df
        return df


    def save(self, output_path, mode="overwrite"):
        """Save DataFrame as Delta table partitioned by year, month, and day"""
        (
            self.df.write.format("delta")
            .mode(mode)
            .partitionBy("year", "month", "day")
            .save(output_path)
        )

In [14]:
# 1. Start Spark session
spark = (
    SparkSession.builder
    .appName("Bonus Challange")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

In [18]:
# 3. Initialize pipeline for Bronze → Silver
pipeline_silver = ETLPipeline(
    spark,
    column_dropna=False,
    avg_column= False,
    input_path="../data/raw_csvs/olist_orders_dataset.csv",
    input_format="csv",
    timestamp_col="order_purchase_timestamp",
)
# 4. Initialize pipeline Silver → Gold
pipeline_gold = ETLPipeline(
    spark,
    column_dropna=False,
    avg_column= "profit_margin",
    input_path="../delta/02_silver/orders_enriched",
    input_format="delta",
    timestamp_col="order_purchase_timestamp"
)

In [19]:
# Test csv
df_loaded_silver = pipeline_silver.load()
df_transformed_silver = pipeline_silver.transform()
pipeline_silver.save("../delta/04_bonus/orders_enriched_silver")
pipeline_gold.load()

# Check
df_transformed_silver.show(5, truncate=False)
df_transformed_silver.printSchema()


                                                                                

+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+----+-----+---+
|order_id                        |customer_id                     |order_status|order_purchase_timestamp|order_approved_at  |order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|year|month|day|
+--------------------------------+--------------------------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+----+-----+---+
|e481f51cbdc54678b7cc49136f2d6af7|9ef432eb6251297304e76186b10a928d|delivered   |2017-10-02 10:56:33     |2017-10-02 11:07:15|2017-10-04 19:55:00         |2017-10-10 21:25:13          |2017-10-18 00:00:00          |2017|10   |2  |
|53cdb2fc8bc7dce0b6741e2150273451|b0830fb4747a6c6d20dea0b8c802d7ef|delivered   |

In [20]:
#Test delta
df_loaded_gold = pipeline_gold.load()
df_transformed_gold = pipeline_gold.transform()
pipeline_gold.save("../delta/04_bonus/orders_enriched_gold")
# Check
df_transformed_gold.show(5, truncate=False)
df_transformed_gold.printSchema()

26/01/16 15:36:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

+--------------------------------+--------------------------------+--------------------------------+---------------------+--------------------------------+------------------------+-----+-------------+-----------+------------------+------------------+-------------+----+-----+---+--------------+
|order_id                        |customer_id                     |product_id                      |product_category_name|seller_id                       |order_purchase_timestamp|price|freight_value|total_price|profit_margin     |delivery_time_days|payment_count|year|month|day|customer_state|
+--------------------------------+--------------------------------+--------------------------------+---------------------+--------------------------------+------------------------+-----+-------------+-----------+------------------+------------------+-------------+----+-----+---+--------------+
|03ffce741877a000ca0fb3520673d4c4|e937151346d1bec664f5b80cd798bdd2|660422061e06da17ca6101e9d6b7aae8|esporte_lazer  

In [22]:
# Count nulls in a specific column "profit_margin"
null_count = df_transformed_gold.select(F.count(F.when(F.col("profit_margin").isNull(), 1)).alias("null_count")).collect()[0]["null_count"]

print(f"Number of nulls in 'profit_margin': {null_count}")




Number of nulls in 'profit_margin': 0


                                                                                