In [12]:
import pyspark
from pyspark.sql import SparkSession
from sqlalchemy import create_engine
from dotenv import load_dotenv
from datetime import datetime
import os

In [13]:
# Create SparkSession
spark = SparkSession \
        .builder \
        .appName("PySpark Exercise Week 6") \
        .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
        .getOrCreate()

In [14]:
spark

In [15]:
# Load .env and define the credentials
load_dotenv(".env", override=True)

SOURCE_DB_HOST = os.getenv("SOURCE_DB_HOST")
SOURCE_DB_NAME = os.getenv("SOURCE_DB_NAME")
SOURCE_DB_USER = os.getenv("SOURCE_DB_USER")
SOURCE_DB_PASS = os.getenv("SOURCE_DB_PASS")
SOURCE_DB_PORT = os.getenv("SOURCE_DB_PORT")

DWH_DB_HOST = os.getenv("DWH_DB_HOST")
DWH_DB_NAME = os.getenv("DWH_DB_NAME")
DWH_DB_USER = os.getenv("DWH_DB_USER")
DWH_DB_PASS = os.getenv("DWH_DB_PASS")
DWH_DB_PORT = os.getenv("DWH_DB_PORT")

LOG_DB_HOST = os.getenv("LOG_DB_HOST")
LOG_DB_NAME = os.getenv("LOG_DB_NAME")
LOG_DB_USER = os.getenv("LOG_DB_USER")
LOG_DB_PASS = os.getenv("LOG_DB_PASS")
LOG_DB_PORT = os.getenv("LOG_DB_PORT")

In [16]:
def source_engine():
    SOURCE_DB_URL = f"jdbc:postgresql://{SOURCE_DB_HOST}:{SOURCE_DB_PORT}/{SOURCE_DB_NAME}"
    return SOURCE_DB_URL, SOURCE_DB_USER, SOURCE_DB_PASS 

def dwh_engine():
    DWH_DB_URL = f"jdbc:postgresql://{DWH_DB_HOST}:{DWH_DB_PORT}/{DWH_DB_NAME}"
    return DWH_DB_URL, DWH_DB_USER, DWH_DB_PASS 

def log_engine():
    LOG_DB_URL = f"jdbc:postgresql://{LOG_DB_HOST}:{LOG_DB_PORT}/{LOG_DB_NAME}"
    return LOG_DB_URL, LOG_DB_USER, LOG_DB_PASS 

## **Set Up Logging Function**

In [17]:
import logging

def logging_process():
    
    # Configure logging
    logging.basicConfig(
        filename="/home/jovyan/work/log/info.log",
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
    )

In [18]:
def load_log_msg(spark: SparkSession, log_msg):

    LOG_DB_URL, LOG_DB_USER, LOG_DB_PASS = log_engine()
    table_name = "etl_log"

    # set config
    connection_properties = {
        "user": LOG_DB_USER,
        "password": LOG_DB_PASS,
        "driver": "org.postgresql.Driver" # set driver postgres
    }

    log_msg.write.jdbc(url = LOG_DB_URL,
                  table = table_name,
                  mode = "append",
                  properties = connection_properties)

## **Extract Database**

In [19]:
def extract_database(spark: SparkSession, table_name: str):
    
    # Get source db config
    SOURCE_DB_URL, SOURCE_DB_USER, SOURCE_DB_PASS = source_engine()

    # Set config
    connection_properties = {
        "user" : SOURCE_DB_USER,
        "password" : SOURCE_DB_PASS,
        "driver" : "org.postgresql.Driver"
    }

    # Set current timestamp for logging
    current_timestamp = datetime.now()

    try:

        # Read data
        df = spark \
            .read \
            .jdbc(url=SOURCE_DB_URL,
                  table=table_name,
                  properties=connection_properties)

        print(f"Extraction process successful for table: {table_name}")

        # Set success log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "extraction", "success", "source_db", table_name, current_timestamp)]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date"])
    
        return df

    except Exception as e:
        print(f"Extraction process failed: {e}")

        # Set failed log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "extraction", "failed", "source_db", table_name, current_timestamp, str(e))]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date", "error_msg"])

    finally:
        load_log_msg(spark=spark, log_msg=log_message)


In [20]:
education_status_df = extract_database(spark=spark, table_name="education_status")
marital_status_df = extract_database(spark=spark, table_name="marital_status")
marketing_campaign_deposit_df = extract_database(spark=spark, table_name="marketing_campaign_deposit")

Extraction process successful for table: education_status
Extraction process successful for table: marital_status
Extraction process successful for table: marketing_campaign_deposit


In [21]:
education_status_df.show()

+------------+---------+--------------------+--------------------+
|education_id|    value|          created_at|          updated_at|
+------------+---------+--------------------+--------------------+
|           1| tertiary|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           2|secondary|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           3|  unknown|2025-02-28 15:31:...|2025-02-28 15:31:...|
|           4|  primary|2025-02-28 15:31:...|2025-02-28 15:31:...|
+------------+---------+--------------------+--------------------+



In [22]:
marital_status_df.show()

+----------+--------+--------------------+--------------------+
|marital_id|   value|          created_at|          updated_at|
+----------+--------+--------------------+--------------------+
|         1| married|2025-02-28 15:31:...|2025-02-28 15:31:...|
|         2|  single|2025-02-28 15:31:...|2025-02-28 15:31:...|
|         3|divorced|2025-02-28 15:31:...|2025-02-28 15:31:...|
+----------+--------+--------------------+--------------------+



## **Extract CSV**

In [23]:
path = "data/"

def extract_csv(spark: SparkSession, file_name: str):

    current_timestamp = datetime.now()

    try:
        # Read data
        df = spark.read.csv(path + file_name, header=True)
        
        print(f"Extraction process successful for file: {file_name}")

        # Set success log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "extraction", "success", "csv", file_name, current_timestamp)]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date"])
    
        return df

    except Exception as e:
        print(f"Extraction process failed: {e}")

        # Set failed log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "extraction", "failed", "csv", file_name, current_timestamp, str(e))]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date", "error_msg"])

    finally:
        load_log_msg(spark=spark, log_msg=log_message)

In [24]:
bank_df = extract_csv(spark=spark, file_name="new_bank_transaction.csv")

Extraction process successful for file: new_bank_transaction.csv


## **Data Profiling**

In [25]:
# Check Percentage of Missing Values for each column with pyspark
import pandas as pd
import json
from pyspark.sql.functions import col, count, when, round

def check_missing_values(df):

    total_data = df.count()

    # Calculate the percentage of missing values for each column
    get_missing_values = df.select([
        round((count(when(col(c).isNull(), c)) / total_data) * 100, 2).alias(c)
        for c in df.columns
    ]).collect()[0].asDict()
    
    return get_missing_values

In [26]:
data_profiling_report = {
    "person_in_charge" : "rico_febrian",
    "checking_date" : datetime.now().strftime('%d/%m/%y'),
    "Column Information": {
        "Education Status": {"count": len(education_status_df.columns), "columns": education_status_df.columns},
        "Marital Status": {"count": len(marital_status_df.columns), "columns": marital_status_df.columns},
        "Marketing Campaign Deposit": {"count": len(marketing_campaign_deposit_df.columns), "columns": marketing_campaign_deposit_df.columns},
        "Bank Transaction": {"count": len(bank_df.columns), "columns": bank_df.columns},
    },
    "Check Data Size": {
        "Education Status": education_status_df.count(),
        "Marital Status": marital_status_df.count(),
        "Marketing Campaign Deposit": marketing_campaign_deposit_df.count(),
        "Bank Transaction": bank_df.count(),
    },
    "Data Type For Each Column" : {
        "education status": education_status_df.dtypes,
        "marital status": marital_status_df.dtypes,
        "marketing campaign deposit": marketing_campaign_deposit_df.dtypes,
        "bank transaction": bank_df.dtypes
    },
    "Check Missing Value" : {
        "education status": check_missing_values(education_status_df),
        "marital status": check_missing_values(marital_status_df),
        "marketing campaign deposit": check_missing_values(marketing_campaign_deposit_df),
        "bank transaction": check_missing_values(bank_df)
    }
}

# Print dalam format JSON yang rapi
print(json.dumps(data_profiling_report, indent=4))


{
    "person_in_charge": "rico_febrian",
    "checking_date": "23/03/25",
    "Column Information": {
        "Education Status": {
            "count": 4,
            "columns": [
                "education_id",
                "value",
                "created_at",
                "updated_at"
            ]
        },
        "Marital Status": {
            "count": 4,
            "columns": [
                "marital_id",
                "value",
                "created_at",
                "updated_at"
            ]
        },
        "Marketing Campaign Deposit": {
            "count": 20,
            "columns": [
                "loan_data_id",
                "age",
                "job",
                "marital_id",
                "education_id",
                "default",
                "balance",
                "housing",
                "loan",
                "contact",
                "day",
                "month",
                "duration",
                "campai

## **Transform Data**

In [27]:
marketing_campaign_deposit_df.show(1, vertical=True)

-RECORD 0----------------------------------
 loan_data_id       | 1                    
 age                | 58                   
 job                | management           
 marital_id         | 1                    
 education_id       | 1                    
 default            | false                
 balance            | $2143                
 housing            | true                 
 loan               | false                
 contact            | unknown              
 day                | 5                    
 month              | may                  
 duration           | 261                  
 campaign           | 1                    
 pdays              | -1                   
 previous           | 0                    
 poutcome           | unknown              
 subscribed_deposit | false                
 created_at         | 2025-02-28 15:59:... 
 updated_at         | 2025-02-28 15:59:... 
only showing top 1 row



### Transform *marketing_campaign_deposit*

In [28]:
from datetime import datetime
from pyspark.sql import functions as F

def transform_marketing_campaign(spark, df):

    current_timestamp = datetime.now()

    try:

        # Rename selected columns
        rename_columns = {
            "pdays" : "days_since_last_campaign",
            "previous" : "previous_campaign_contacts",
            "poutcome" : "previous_campaign_outcome"
        }
        
        df = df.withColumnsRenamed(rename_columns)

        # Remove dollar ($) sign in balance column
        df = df.withColumn('balance', F.regexp_replace(df['balance'], r"\$", " "))

        # Then, cast balance column to integer
        df = df.withColumn('balance', df['balance'].cast('int'))

        # Create new column duration_in_year
        # Divide value in duration column with 365, then round down and convert to integer
        df = df.withColumn('duration_in_year', F.floor(df['duration'] / 365).cast('int'))

        # Arrange columns
        df = df.select(
            'loan_data_id', 
            'age', 
            'job', 
            'marital_id', 
            'education_id', 
            'default', 
            'balance', 
            'housing',
            'loan',
            'contact',
            'day',
            'month',
            'duration',
            'duration_in_year',
            'campaign',
            'days_since_last_campaign',
            'previous_campaign_contacts',
            'previous_campaign_outcome',
            'subscribed_deposit',
            'created_at',
            'updated_at'
        )

        print("Transformation process successful for table: marketing_campaign_deposit")

        # Set success log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "transformation", "success", "source_db", "marketing_campaign_deposit", current_timestamp)]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date"])
    
        return df

    except Exception as e:
        print(f"Transformation process failed: {e}")

        # Set failed log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "transformation", "failed", "source_db", "marketing_campaign_deposit", current_timestamp)]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date", "error_msg"])

    finally:
        load_log_msg(spark=spark, log_msg=log_message)
    

In [29]:
transformed_marketing_df = transform_marketing_campaign(spark=spark, df=marketing_campaign_deposit_df)

Transformation process successful for table: marketing_campaign_deposit


### Transform *customers*

In [30]:
bank_df.show(5)

+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|TransactionID|CustomerID|CustomerDOB|CustGender|CustLocation|CustAccountBalance|TransactionDate|TransactionTime|TransactionAmount (INR)|
+-------------+----------+-----------+----------+------------+------------------+---------------+---------------+-----------------------+
|      T642232|  C1010028|    25/8/88|         F|       DELHI|         296828.37|        29/8/16|          95212|                    557|
|       T87414|  C1010035|     2/3/92|         M|      MUMBAI|           7284.42|         1/8/16|         111917|                     50|
|      T560676|C1010035_2|     9/6/80|         M| NAVI MUMBAI|         378013.09|        27/8/16|         185011|                    700|
|      T610204|  C1010036|    26/2/96|         M|     GURGAON|         355430.17|        26/8/16|          95203|                    208|
|      T957663|  C1010041|     6/9

In [31]:
from pyspark.sql.functions import col, when, lit, to_date, date_format, year, round

def transform_customers(spark, df):

    current_timestamp = datetime.now()

    try:

        # Rename selected columns
        rename_columns = {
            "CustomerID" : "customer_id",
            "CustomerDOB" : "birth_date",
            "CustGender" : "gender",
            "CustLocation" : "location",
            "CustAccountBalance" : "account_balance"
        }
        
        df = df.withColumnsRenamed(rename_columns)

        # Mapping gender column value
        df = df.withColumn("gender", 
                           when(col("gender") == "M", "Male")
                           .when(col("gender") == "F", "Female")
                           .when(col("gender") == "T", "Other")
                           .otherwise(col("gender"))
                          )

        # Convert to DATE format
        df = df.withColumn("birth_date", 
                           to_date(col("birth_date"), "d/M/yy")
                          )

        # # Give flag if there's a year that > 2025
        df = df.withColumn("birth_date", 
                           when(year(col("birth_date")) > 2025, lit(None))
                           .otherwise(col("birth_date"))
                          )

        # Cast account_balance column to float
        df = df.withColumn("account_balance", round(df["account_balance"].cast("float")))

        # Arrange columns
        df = df.select(
            'customer_id', 
            'birth_date', 
            'gender', 
            'location', 
            'account_balance' 
        )
        
        print("Transformation process successful for table: customers")

        # Set success log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "transformation", "success", "csv", "customers", current_timestamp)]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date"])
    
        return df

    except Exception as e:
        print(f"Transformation process failed: {e}")

        # Set failed log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "transformation", "failed", "csv", "customers", current_timestamp)]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date", "error_msg"])

    finally:
        load_log_msg(spark=spark, log_msg=log_message)

In [32]:
transformed_customers_df = transform_customers(spark=spark, df=bank_df)

Transformation process successful for table: customers


In [33]:
transformed_customers_df.show()

+-----------+----------+------+-----------+---------------+
|customer_id|birth_date|gender|   location|account_balance|
+-----------+----------+------+-----------+---------------+
|   C1010028|1988-08-25|Female|      DELHI|       296828.0|
|   C1010035|1992-03-02|  Male|     MUMBAI|         7284.0|
| C1010035_2|1980-06-09|  Male|NAVI MUMBAI|       378013.0|
|   C1010036|1996-02-26|  Male|    GURGAON|       355430.0|
|   C1010041|1993-09-06|Female|      DELHI|        34119.0|
| C1010041_2|1975-09-14|Female|      NOIDA|       746732.0|
| C1010041_3|1992-07-13|Female|      LOHIT|         1291.0|
|   C1010064|1988-09-19|  Male|      DELHI|            2.0|
|   C1010065|1985-02-16|  Male|  BARABANKI|        18819.0|
|   C1010071|1985-09-14|Female|  NEW DELHI|       329868.0|
|   C1010074|1985-09-14|Female|  NEW DELHI|       329868.0|
|   C1010078|1983-09-16|Female|     MUMBAI|      1421750.0|
|   C1010129|1992-09-15|  Male|    KOLKATA|        27340.0|
| C1010129_2|1997-12-22|  Male|     NAGP

### Transform *transaction*

In [34]:
from pyspark.sql.functions import col, when, lit, to_date, date_format, from_unixtime

def transform_transactions(spark, df):

    current_timestamp = datetime.now()

    try:

        # Rename selected columns
        rename_columns = {
            "TransactionID" : "transaction_id",
            "CustomerID" : "customer_id",
            "TransactionDate" : "transaction_date",
            "TransactionTime" : "transaction_time",
            "TransactionAmount (INR)" : "transaction_amount"
        }
        
        df = df.withColumnsRenamed(rename_columns)

        # Convert transaction_date column value to DATE format
        df = df.withColumn("transaction_date", 
                           to_date(col("transaction_date"), "d/M/yy")
                          )

        # Give flag if there's a year that > 2025
        df = df.withColumn("transaction_date", 
                           when(year(col("transaction_date")) > 2025, lit(None))
                           .otherwise(col("transaction_date"))
                          )

        # Convert transaction_time column value to HH:MM:SS format
        df = df.withColumn("transaction_time",
                           from_unixtime(col("transaction_time").cast("int"), "HH:mm:ss")
                          )

        # Cast transaction_amount column to float
        df = df.withColumn("transaction_amount", round(df["transaction_amount"].cast("float")))
                          
        # Arrange columns
        df = df.select(
            'transaction_id', 
            'customer_id', 
            'transaction_date', 
            'transaction_time', 
            'transaction_amount' 
        )
        
        print("Transformation process successful for table: transactions")

        # Set success log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "transformation", "success", "csv", "transactions", current_timestamp)]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date"])
    
        return df

    except Exception as e:
        print(f"Transformation process failed: {e}")

        # Set failed log message
        log_message = spark.sparkContext \
            .parallelize([("sources", "transformation", "failed", "csv", "transactions", current_timestamp)]) \
            .toDF(["step", "process", "status", "source", "table_name", "etl_date", "error_msg"])

    finally:
        load_log_msg(spark=spark, log_msg=log_message)  

In [35]:
transformed_transactions_df = transform_transactions(spark=spark, df=bank_df)

Transformation process successful for table: transactions


In [36]:
transformed_transactions_df.show()

+--------------+-----------+----------------+----------------+------------------+
|transaction_id|customer_id|transaction_date|transaction_time|transaction_amount|
+--------------+-----------+----------------+----------------+------------------+
|       T642232|   C1010028|      2016-08-29|        02:26:52|             557.0|
|        T87414|   C1010035|      2016-08-01|        07:05:17|              50.0|
|       T560676| C1010035_2|      2016-08-27|        03:23:31|             700.0|
|       T610204|   C1010036|      2016-08-26|        02:26:43|             208.0|
|       T957663|   C1010041|      2016-09-10|        21:08:53|           14500.0|
|       T113533| C1010041_2|      2016-08-06|        23:38:57|            2397.0|
|       T888200| C1010041_3|      2016-09-07|        00:24:19|              20.0|
|       T152889|   C1010064|      2016-08-05|        07:45:05|            3000.0|
|       T670148|   C1010065|      2016-08-28|        21:03:51|             500.0|
|       T601310|

## **Load Data**

In [37]:
from datetime import datetime
from pyspark.sql import SparkSession
from sqlalchemy import create_engine, text

def dwh_engine_sqlalchemy():
    return create_engine(f"postgresql://{DWH_DB_USER}:{DWH_DB_PASS}@{DWH_DB_HOST}:{DWH_DB_PORT}/{DWH_DB_NAME}")

def load_to_dwh(spark, df, table_name, source_name):

    current_timestamp = datetime.now()

    try:

        # Establish connection to warehouse db
        conn = dwh_engine_sqlalchemy()

        with conn.begin() as connection:

            # Truncate all tables in data warehouse
            connection.execute(text(f"TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE"))

        print(f"Success truncating table: {table_name}")

    except Exception as e:
        print(f"Error when truncating table: {e}")

        log_message = spark.sparkContext\
            .parallelize([("warehouse", "load", "failed", source_name, table_name, current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
        
        load_log_msg(spark=spark, log_msg=log_message) 

    finally:
        conn.dispose()

    # Load transformed DataFrame to warehouse db
    try:
        
        # Getdwh db config
        DWH_DB_URL, DWH_DB_USER, DWH_DB_PASS = dwh_engine()
    
        # Set config
        properties = {
            "user" : DWH_DB_USER,
            "password" : DWH_DB_PASS,
        }

        df.write.jdbc(url=DWH_DB_URL,
                      table=table_name,
                      mode="append",
                      properties=properties)

        print(f"Load process successful for table: {table_name}")

        log_message = spark.sparkContext\
            .parallelize([("warehouse", "load", "success", source_name, table_name, current_timestamp)])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date'])
        
        load_log_msg(spark=spark, log_msg=log_message) 
        
    except Exception as e:
        print(f"Load process failed: {e}")

        log_message = spark.sparkContext\
            .parallelize([("warehouse", "load", "failed", source_name, table_name, current_timestamp, str(e))])\
            .toDF(['step', 'process', 'status', 'source', 'table_name', 'etl_date', 'error_msg'])
        
    finally:
        load_log_msg(spark=spark, log_msg=log_message) 


In [38]:
load_to_dwh(spark=spark, df=transformed_customers_df, table_name="customers", source_name="source_db")

Success truncating table: customers
Load process successful for table: customers


In [39]:
load_to_dwh(spark=spark, df=transformed_transactions_df, table_name="transactions", source_name="source_db")

Success truncating table: transactions
Load process successful for table: transactions


In [40]:
load_to_dwh(spark=spark, df=education_status_df, table_name="education_status", source_name="source_db")

Success truncating table: education_status
Load process successful for table: education_status


In [41]:
load_to_dwh(spark=spark, df=marital_status_df, table_name="marital_status", source_name="source_db")

Success truncating table: marital_status
Load process successful for table: marital_status


In [42]:
load_to_dwh(spark=spark, df=transformed_marketing_df, table_name="marketing_campaign_deposit", source_name="source_db")

Success truncating table: marketing_campaign_deposit
Load process successful for table: marketing_campaign_deposit
