In [0]:
# ==============================================================================
# 1. IMPORT LIBRATIES
# ==============================================================================

import os
import numpy as np
import warnings
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import IntegerType
from datetime import datetime

warnings.filterwarnings("ignore")

In [0]:
# ==============================================================================
# 2. CONFIGURATION AND UTILS
# ==============================================================================

pg_host = dbutils.secrets.get(scope="postgres-secrets", key="host")
pg_port = dbutils.secrets.get(scope="postgres-secrets", key="port")
pg_database = dbutils.secrets.get(scope="postgres-secrets", key="database")
pg_username = dbutils.secrets.get(scope="postgres-secrets", key="username")
pg_password = dbutils.secrets.get(scope="postgres-secrets", key="password")

jdbc_url = f"jdbc:postgresql://{pg_host}:{pg_port}/{pg_database}"
CONNECTION_PROPERTIES = {
    "user": pg_username,
    "password": pg_password,
    "driver": "org.postgresql.Driver"
}

# --- Target Table Name ---
INPUT_TABLE_NAME = "retail_fact_table"
OUTPUT_TABLE_NAME = "retail_feature_engg_done"





In [0]:
# ==============================================================================
# 3. READING FUNCTION (PySpark I/O)
# ==============================================================================

def read_data_from_postgres(table_name: str) -> DataFrame:
    """
    Reads data from PostgreSQL directly into a Spark DataFrame.
    """
    print(f"Reading data from PostgreSQL table: {table_name}...")
    
    # Read using Spark
    spark_df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=CONNECTION_PROPERTIES)
    
    print(f"Read {spark_df.count():,} records.")
    return spark_df

In [0]:
# ==============================================================================
# 4. TRANSFORMATION/FE FUNCTION
# ==============================================================================


from pyspark.sql import DataFrame
def process_and_feature_engineer(df: DataFrame) -> DataFrame:

    """
    Performs data cleaning, imputation, and feature creation using PySpark commands.
    """
    print("\nStarting data cleaning and feature engineering (PySpark)...")
    
    # 1. Missing Value Handling (Dropping 1 missing total_payment_value row)
    df = df.dropna(subset=['total_payment_value'])

    # 2. Convert to Integer (payment_installments)
    df = df.withColumn('payment_installments', F.col('payment_installments').cast(IntegerType()))

    # 3. One-Hot Encoding (payment_method, order_status)    
    # Payment Method OHE
    payment_methods = ['boleto', 'credit_card', 'debit_card', 'voucher']
    for method in payment_methods:
        df = df.withColumn(
            f'payment_method_{method}', 
            F.when(F.col('payment_method') == method, 1).otherwise(0).cast(IntegerType())
        )
    df = df.drop('payment_method')

    # Order Status OHE
    order_statuses = ['approved', 'canceled', 'delivered', 'invoiced', 'processing', 'shipped', 'unavailable']
    for status in order_statuses:
        df = df.withColumn(
            f'order_status_{status}', 
            F.when(F.col('order_status') == status, 1).otherwise(0).cast(IntegerType())
        )
    df = df.drop('order_status')
    
    # 4. Product/Review Missing Value Handling & Type Conversion
    df = df.dropna(subset=['product_category','review_score'])
    df = df.withColumn('review_score', F.col('review_score').cast(IntegerType()))
    df = df.dropna(subset=['product_weight_g'])

    # 5. Product Weight Feature
    df = df.withColumn('product_weight_kg', F.col('product_weight_g') / 1000)
    df = df.drop('product_weight_g')

    # 6. Datetime Features
    df = df.withColumn('purchase_datetime', F.to_timestamp(F.col('purchase_date')))
    
    df = df.withColumn('purchase_year', F.year(F.col('purchase_datetime')))
    df = df.withColumn('purchase_month', F.month(F.col('purchase_datetime')))
    df = df.withColumn('purchase_day', F.dayofmonth(F.col('purchase_datetime')))
    df = df.withColumn('purchase_dayofweek', F.dayofweek(F.col('purchase_datetime'))) # 1=Sunday, 7=Saturday
    df = df.withColumn('purchase_hour', F.hour(F.col('purchase_datetime')))
    df = df.drop('purchase_date') # Drop original date string

    # 7. Time Segment Feature
    df = df.withColumn('purchase_time_segment', 
        F.when((F.col('purchase_hour') >= 0) & (F.col('purchase_hour') < 6), 'late_night')
         .when((F.col('purchase_hour') >= 6) & (F.col('purchase_hour') < 12), 'morning')
         .when((F.col('purchase_hour') >= 12) & (F.col('purchase_hour') < 18), 'afternoon')
         .otherwise('evening')
    )
    
    # Time Segment OHE
    time_segments = ['late_night', 'morning', 'afternoon', 'evening']
    for segment in time_segments:
        df = df.withColumn(
            f'purchase_time_segment_{segment}', 
            F.when(F.col('purchase_time_segment') == segment, 1).otherwise(0).cast(IntegerType())
        )
    df = df.drop('purchase_time_segment')
    
    print("✓ Feature engineering complete (Pure PySpark).")
    return df




In [0]:
# ==============================================================================
# 5. WRITING FUNCTION (PySpark I/O)
# ==============================================================================

def write_data_to_postgres(df: 'pyspark.sql.dataframe.DataFrame', table_name: str):
    """
    Writes the final PySpark DataFrame result to PostgreSQL.
    """
    print(f"\nWriting {df.count():,} records to PostgreSQL table: {table_name}...")
    
    # The final write command
    try:
        df.write.mode("overwrite").jdbc(url=jdbc_url, table=table_name, properties=CONNECTION_PROPERTIES)
        print(f"ETL Complete! {df.count():,} records written to {table_name}")
    except Exception as e:
        print(f"Failed to write to PostgreSQL: {e}")
        raise

    # Verification
    verify_df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=CONNECTION_PROPERTIES)
    print(f"Verification: Read {verify_df.count():,} rows successfully from {table_name}")

In [0]:
# ==============================================================================
# 6. EXECUTION BLOCK (Orchestration)
# ==============================================================================

if __name__ == "__main__":
    
    # --- 1. Read ---
    raw_df_spark = read_data_from_postgres(INPUT_TABLE_NAME)
    
    # --- 2. Process ---
    processed_df_spark = process_and_feature_engineer(raw_df_spark)
    
    # --- 3. Write ---
    write_data_to_postgres(processed_df_spark, OUTPUT_TABLE_NAME)

Reading data from PostgreSQL table: retail_fact_table...
Read 106,388 records.

Starting data cleaning and feature engineering (PySpark)...
✓ Feature engineering complete (Pure PySpark).

Writing 104,023 records to PostgreSQL table: retail_feature_engg_done...
ETL Complete! 104,023 records written to retail_feature_engg_done
Verification: Read 104,023 rows successfully from retail_feature_engg_done
