In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType

# =============================================================================
# 1. SPARK SESSION INITIALIZATION
# =============================================================================

spark = SparkSession.builder \
    .appName("COMS_Project_Docker") \
    .master("spark://coms-spark-master:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://coms-minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio_user") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio_password") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-aws:3.3.6,"
        "org.apache.hadoop:hadoop-client:3.3.6,"
        "com.amazonaws:aws-java-sdk-bundle:1.12.367"
    ) \
    .getOrCreate()

print("SparkSession created and connected to MinIO!")

SparkSession created and connected to MinIO!


In [12]:
# =============================================================================
# 2. DEFINE PATHS & SCHEMAS
# =============================================================================
raw_base_path = "s3a://raw"
processed_base_path = "s3a://processed"

customers_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("full_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("signup_date", DateType(), True),
    StructField("phone", StringType(), True),
    StructField("region", StringType(), True)
])

orders_schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("status", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("currency", StringType(), True)
])

order_items_schema = StructType([
    StructField("order_item_id", StringType(), False),
    StructField("order_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price_per_unit", DoubleType(), True),
    StructField("discount", DoubleType(), True)
])

payments_schema = StructType([
    StructField("payment_id", StringType(), False),
    StructField("order_id", StringType(), True),
    StructField("payment_date", DateType(), True),
    StructField("amount", DoubleType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True)
])


# =============================================================================
# 3. PROCESSING LOGIC
# =============================================================================

def process_table(table_name, schema, primary_key, date_columns=[], filter_condition=None):
    """
    Generic function to read, clean, and write a table.
    """
    try:
        print(f"Processing table: {table_name}...")
        
        # Read from raw zone
        input_path = f"{raw_base_path}/{table_name}.csv"
        df = spark.read.csv(input_path, header=True, schema=schema)
        
        # Convert date columns to timestamp format
        for date_col in date_columns:
            df = df.withColumn(date_col, to_timestamp(col(date_col)))
        
        # Apply filter condition if provided
        if filter_condition is not None:
            df = df.filter(filter_condition)
            
        # Deduplicate based on primary key
        df = df.dropDuplicates([primary_key])
        
        # Write to processed zone in CSV format with headers
        output_path = f"{processed_base_path}/{table_name}"
        df.write.mode("overwrite").parquet(output_path)
        # df.write.mode("overwrite").option("header", "true").csv(output_path)
        
        print(f"Successfully processed and saved '{table_name}' as CSV to '{output_path}'.")
        # For verification, show a few rows
        df.printSchema()
        df.show(5, truncate=False)

    except Exception as e:
        print(f"Error processing table {table_name}: {e}")

# Process each table according to the requirements
process_table(
    table_name="customers_csv",
    schema=customers_schema,
    primary_key="customer_id",
    date_columns=["signup_date"]
)

process_table(
    table_name="orders_csv",
    schema=orders_schema,
    primary_key="order_id",
    date_columns=["order_date"],
    filter_condition="total_amount > 0"  # Filter out orders with total_amount <= 0
)

process_table(
    table_name="order_items_csv",
    schema=order_items_schema,
    primary_key="order_item_id"
)

process_table(
    table_name="payments_csv",
    schema=payments_schema,
    primary_key="payment_id",
    date_columns=["payment_date"],
    filter_condition=~col("payment_status").isin(["failed", "cancelled"]) # Filter out failed or cancelled payments
)


# =============================================================================
# 4. STOP SPARK SESSION
# =============================================================================
print("Bronze to Silver ETL job completed.")
# spark.stop()

Processing table: customers_csv...
Successfully processed and saved 'customers_csv' as CSV to 's3a://processed/customers_csv'.
root
 |-- customer_id: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- signup_date: timestamp (nullable = true)
 |-- phone: string (nullable = true)
 |-- region: string (nullable = true)

+-----------+----------+---------------------+-------------------+----------+------+
|customer_id|full_name |email                |signup_date        |phone     |region|
+-----------+----------+---------------------+-------------------+----------+------+
|CUST1000   |Customer 0|customer0@example.com|2025-04-12 00:00:00|0900770487|North |
|CUST1001   |Customer 1|customer1@example.com|2025-04-13 00:00:00|0900216739|West  |
|CUST1002   |Customer 2|customer2@example.com|2025-04-14 00:00:00|0900126225|North |
|CUST1003   |Customer 3|customer3@example.com|2025-04-15 00:00:00|0900877572|North |
|CUST1004   |Customer 4|custom

In [8]:
processed_base_path = "s3a://processed"

# --- Reading the 'customers' dataset ---
# Point Spark to the PARENT DIRECTORY. Spark handles the part-files automatically.
print("Loading processed customers data...")
customers_df = spark.read \
    .option("header", "true") \
    .schema(customers_schema) \
    .csv(f"{processed_base_path}/customers_csv") # Note the path is to the directory

# --- Reading the 'orders' dataset ---
print("Loading processed orders data...")
orders_df = spark.read \
    .option("header", "true") \
    .schema(orders_schema) \
    .csv(f"{processed_base_path}/orders_csv")

# --- Reading the 'order_items' dataset ---
print("Loading processed order_items data...")
order_items_df = spark.read \
    .option("header", "true") \
    .schema(order_items_schema) \
    .csv(f"{processed_base_path}/order_items_csv")

# Now you can work with these DataFrames
print("Successfully loaded processed customers data:")
customers_df.printSchema()
customers_df.show(5, truncate=False)

print("Successfully loaded processed orders data:")
orders_df.printSchema()
orders_df.show(5, truncate=False)

Loading processed customers data...
Loading processed orders data...
Loading processed order_items data...
Successfully loaded processed customers data:
root
 |-- customer_id: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- signup_date: date (nullable = true)
 |-- phone: string (nullable = true)
 |-- region: string (nullable = true)

+-----------+----------+---------------------+-----------+----------+------+
|customer_id|full_name |email                |signup_date|phone     |region|
+-----------+----------+---------------------+-----------+----------+------+
|CUST1000   |Customer 0|customer0@example.com|2025-04-12 |0900770487|North |
|CUST1001   |Customer 1|customer1@example.com|2025-04-13 |0900216739|West  |
|CUST1002   |Customer 2|customer2@example.com|2025-04-14 |0900126225|North |
|CUST1003   |Customer 3|customer3@example.com|2025-04-15 |0900877572|North |
|CUST1004   |Customer 4|customer4@example.com|2025-04-16 |0900388

In [14]:
from pyspark.sql.functions import (
    col, count, sum, avg, min, max, date_sub, current_date, when, lit,
    row_number, rank, first
)
from pyspark.sql.window import Window

# =============================================================================
# 1. LOAD PROCESSED (SILVER) DATA
# =============================================================================
print("--- Loading Processed (Silver) Data ---")

processed_base_path = "s3a://processed"
curated_base_path = "s3a://curated"

# Load the four processed tables
customers_df = spark.read.parquet(f"{processed_base_path}/customers_csv")
orders_df = spark.read.parquet(f"{processed_base_path}/orders_csv")
order_items_df = spark.read.parquet(f"{processed_base_path}/order_items_csv")
payments_df = spark.read.parquet(f"{processed_base_path}/payments_csv")

print("All processed tables loaded successfully.")


# =============================================================================
# 2. CREATE `customer_orders_summary`
# =============================================================================
print("\n--- Creating 'customer_orders_summary' ---")

customer_orders_summary = orders_df.groupBy("customer_id").agg(
    count("order_id").alias("total_orders"),
    sum("total_amount").alias("total_amount_spent"),
    avg("total_amount").alias("average_order_value"),
    min("order_date").alias("first_order_date"),
    max("order_date").alias("last_order_date")
).withColumn(
    "active_status",
    when(col("last_order_date") >= date_sub(current_date(), 90), lit("active"))
    .otherwise(lit("inactive"))
)

# Write to the curated zone
customer_orders_summary.write.mode("overwrite").parquet(f"{curated_base_path}/customer_orders_summary")

print("Successfully created and saved 'customer_orders_summary'.")
customer_orders_summary.show(5, truncate=False)


# =============================================================================
# 3. CREATE `order_facts`
# =============================================================================
print("\n--- Creating 'order_facts' ---")

# Join orders, items, and customer info
order_facts = order_items_df.join(
    orders_df,
    order_items_df.order_id == orders_df.order_id,
    "inner"
).join(
    customers_df,
    orders_df.customer_id == customers_df.customer_id,
    "inner"
).withColumn(
    "net_revenue",
    (col("quantity") * col("price_per_unit")) - col("discount")
).select(
    orders_df["order_date"],
    orders_df["order_id"],
    order_items_df["order_item_id"],
    customers_df["customer_id"],
    customers_df["full_name"].alias("customer_name"),
    customers_df["region"],
    order_items_df["product_id"],
    order_items_df["product_name"],
    order_items_df["category"],
    order_items_df["quantity"],
    order_items_df["price_per_unit"],
    order_items_df["discount"],
    "net_revenue",
    orders_df["channel"]
)

# Write to the curated zone, partitioned by order_date
order_facts.write.mode("overwrite").partitionBy("order_date").parquet(f"{curated_base_path}/order_facts")

print("Successfully created and saved 'order_facts'.")
order_facts.show(5, truncate=False)

--- Loading Processed (Silver) Data ---
All processed tables loaded successfully.

--- Creating 'customer_orders_summary' ---
Successfully created and saved 'customer_orders_summary'.
+-----------+------------+------------------+-------------------+-------------------+-------------------+-------------+
|customer_id|total_orders|total_amount_spent|average_order_value|first_order_date   |last_order_date    |active_status|
+-----------+------------+------------------+-------------------+-------------------+-------------------+-------------+
|CUST1004   |6           |1926.18           |321.03000000000003 |2025-03-26 00:00:00|2025-04-19 00:00:00|active       |
|CUST1009   |10          |2471.38           |247.138            |2025-03-23 00:00:00|2025-04-18 00:00:00|active       |
|CUST1006   |8           |1306.81           |163.35125          |2025-03-23 00:00:00|2025-04-18 00:00:00|active       |
|CUST1005   |1           |231.23            |231.23             |2025-04-06 00:00:00|2025-04-06 