## **Import và Setup Spark**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, trim, when, year, quarter

# Create Spark session (copy từ cell trước)
spark = SparkSession.builder \
    .appName("Bronze-to-Silver-Transform") \
    .master("spark://spark-master:7077") \
    .config("spark.cores.max", "2") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

## **Test CustomerTransformer**

In [2]:
# Read Bronze Customer
df_customer = spark.read.parquet("s3a://bronze/customer/")
print(f"Bronze Customer: {df_customer.count()} rows")
df_customer.show(3)

# Transform
df_customer_clean = df_customer \
    .withColumn("c_name", trim(col("c_name"))) \
    .withColumn("c_address", trim(col("c_address"))) \
    .withColumn("c_acctbal_status", 
        when(col("c_acctbal") < 0, "NEGATIVE").otherwise("POSITIVE"))

print("After transform:")
df_customer_clean.show(3)

# Write Silver
df_customer_clean.coalesce(1).write.mode("overwrite").parquet("s3a://silver/customer.parquet")
print("Customer transform completed!")

Bronze Customer: 150000 rows
+---------+------------------+--------------------+-----------+---------------+---------+------------+--------------------+
|c_custkey|            c_name|           c_address|c_nationkey|        c_phone|c_acctbal|c_mktsegment|           c_comment|
+---------+------------------+--------------------+-----------+---------------+---------+------------+--------------------+
|        1|Customer#000000001|   IVhzIApeRb ot,c,E|         15|25-989-741-2988|   711.56|    BUILDING|to the even, regu...|
|        2|Customer#000000002|XSTf4,NCwDVaWNe6t...|         13|23-768-687-3665|   121.65|  AUTOMOBILE|l accounts. blith...|
|        3|Customer#000000003|        MG9kdTD2WBHm|          1|11-719-748-3364|  7498.12|  AUTOMOBILE| deposits eat sly...|
+---------+------------------+--------------------+-----------+---------------+---------+------------+--------------------+
only showing top 3 rows

After transform:
+---------+------------------+--------------------+----------

## **Test LineitemTransformer** 

In [3]:
# Read Bronze Lineitem
df_lineitem = spark.read.parquet("s3a://bronze/lineitem/")
print(f"Bronze Lineitem: {df_lineitem.count()} rows")

# Transform
df_lineitem_clean = df_lineitem \
    .withColumn("l_returnflag", trim(col("l_returnflag"))) \
    .withColumn("l_net_price", col("l_extendedprice") * (1 - col("l_discount"))) \
    .withColumn("l_ship_year", year(col("l_shipdate"))) \
    .filter(col("l_quantity") > 0)

print(f"After transform: {df_lineitem_clean.count()} rows")
df_lineitem_clean.select("l_orderkey", "l_net_price", "l_ship_year").show(3)

# Write Silver
df_lineitem_clean.coalesce(1).write.mode("overwrite").parquet("s3a://silver/lineitem.parquet")
print("Lineitem transform completed!")

Bronze Lineitem: 6001215 rows
After transform: 6001215 rows
+----------+------------------+-----------+
|l_orderkey|       l_net_price|l_ship_year|
+----------+------------------+-----------+
|         1|20321.500799999998|       1996|
|         1|        41844.6756|       1996|
|         1|11978.640000000001|       1996|
+----------+------------------+-----------+
only showing top 3 rows

Lineitem transform completed!


## **Verify Silver Data**

In [4]:
# Check Silver files
silver_customer = spark.read.parquet("s3a://silver/customer.parquet")
silver_lineitem = spark.read.parquet("s3a://silver/lineitem.parquet")

print(f"Silver Customer: {silver_customer.count()} rows")
print(f"Silver Lineitem: {silver_lineitem.count()} rows")

print("Sample Silver Customer:")
silver_customer.select("c_custkey", "c_name", "c_acctbal_status").show(3)

Silver Customer: 150000 rows
Silver Lineitem: 6001215 rows
Sample Silver Customer:
+---------+------------------+----------------+
|c_custkey|            c_name|c_acctbal_status|
+---------+------------------+----------------+
|        1|Customer#000000001|        POSITIVE|
|        2|Customer#000000002|        POSITIVE|
|        3|Customer#000000003|        POSITIVE|
+---------+------------------+----------------+
only showing top 3 rows

