In [116]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql import functions as F

In [4]:
spark = SparkSession \
    .builder \
    .appName("Exercise Data Pipeline with PySpark Week 6") \
    .getOrCreate()

In [5]:
spark

In [109]:
df_transaction = spark.read.csv("data/new_bank_transaction.csv", header=True, mode="DROPMALFORMED")

df_transaction.show(3, truncate = False, vertical = True)

-RECORD 0------------------------------
 TransactionID           | T642232     
 CustomerID              | C1010028    
 CustomerDOB             | 25/8/88     
 CustGender              | F           
 CustLocation            | DELHI       
 CustAccountBalance      | 296828.37   
 TransactionDate         | 29/8/16     
 TransactionTime         | 95212       
 TransactionAmount (INR) | 557         
-RECORD 1------------------------------
 TransactionID           | T87414      
 CustomerID              | C1010035    
 CustomerDOB             | 2/3/92      
 CustGender              | M           
 CustLocation            | MUMBAI      
 CustAccountBalance      | 7284.42     
 TransactionDate         | 1/8/16      
 TransactionTime         | 111917      
 TransactionAmount (INR) | 50          
-RECORD 2------------------------------
 TransactionID           | T560676     
 CustomerID              | C1010035_2  
 CustomerDOB             | 9/6/80      
 CustGender              | M           


In [None]:
directory = "data/new_bank_transaction.csv/"

df_ratings = spark.read.csv(directory + "part-*.csv", header=True)

df_ratings.show()

In [24]:
# init vars
DB_URL = "jdbc:postgresql://source_db:5432/source"
DB_TABLE = "marketing_campaign_deposit" 
DB_USER = "postgres"
DB_PASS = "postgres"

# set config
jdbc_url = DB_URL
table_name = DB_TABLE
connection_properties = {
    "user": DB_USER,
    "password": DB_PASS,
    "driver": "org.postgresql.Driver" # set driver postgres
}

In [35]:
df_marketing = spark \
              .read \
              .jdbc(url = jdbc_url,
                    table = table_name,
                    properties = connection_properties)

In [36]:
df_marketing.show(3, truncate = False, vertical = True)

-RECORD 0----------------------------------------
 loan_data_id       | 1                          
 age                | 58                         
 job                | management                 
 marital_id         | 1                          
 education_id       | 1                          
 default            | false                      
 balance            | $2143                      
 housing            | true                       
 loan               | false                      
 contact            | unknown                    
 day                | 5                          
 month              | may                        
 duration           | 261                        
 campaign           | 1                          
 pdays              | -1                         
 previous           | 0                          
 poutcome           | unknown                    
 subscribed_deposit | false                      
 created_at         | 2025-02-28 15:59:11.102813 


In [37]:
df_education = spark.read.jdbc(url=jdbc_url, table="education_status", properties=connection_properties)

In [38]:
df_education.printSchema()

root
 |-- education_id: integer (nullable = true)
 |-- value: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [39]:
df_education.show(3, truncate = False, vertical = True)

-RECORD 0----------------------------------
 education_id | 1                          
 value        | tertiary                   
 created_at   | 2025-02-28 15:31:04.358235 
 updated_at   | 2025-02-28 15:31:04.358235 
-RECORD 1----------------------------------
 education_id | 2                          
 value        | secondary                  
 created_at   | 2025-02-28 15:31:04.358235 
 updated_at   | 2025-02-28 15:31:04.358235 
-RECORD 2----------------------------------
 education_id | 3                          
 value        | unknown                    
 created_at   | 2025-02-28 15:31:04.358235 
 updated_at   | 2025-02-28 15:31:04.358235 
only showing top 3 rows



In [40]:
df_marital = spark.read.jdbc(url=jdbc_url, table="marital_status", properties=connection_properties)

In [41]:
df_marital.printSchema()

root
 |-- marital_id: integer (nullable = true)
 |-- value: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- updated_at: timestamp (nullable = true)



In [44]:
df_marital.show(3, truncate = False, vertical = True)

-RECORD 0--------------------------------
 marital_id | 1                          
 value      | married                    
 created_at | 2025-02-28 15:31:01.502136 
 updated_at | 2025-02-28 15:31:01.502136 
-RECORD 1--------------------------------
 marital_id | 2                          
 value      | single                     
 created_at | 2025-02-28 15:31:01.502136 
 updated_at | 2025-02-28 15:31:01.502136 
-RECORD 2--------------------------------
 marital_id | 3                          
 value      | divorced                   
 created_at | 2025-02-28 15:31:01.502136 
 updated_at | 2025-02-28 15:31:01.502136 



In [117]:
# Empty list to store column expressions
null_counts = []

# Loop to iterate over each column in df_marketing
for c in df_marketing.columns:
    # Count the number of NULL values for column c
    null_count_expr = F.sum(F.col(c).isNull().cast("int")).alias(c)
    
    # Add the expression to the null_counts list
    null_counts.append(null_count_expr)

# Select the columns where the number of NULL values has been calculated
df_null_counts = df_marketing.select(null_counts)

# Display the result
df_null_counts.show(truncate=False, vertical=True)

-RECORD 0-------------------------
 loan_data_id               | 0   
 age                        | 0   
 job                        | 0   
 marital_id                 | 0   
 education_id               | 0   
 default                    | 0   
 balance                    | 0   
 housing                    | 0   
 loan                       | 0   
 contact                    | 0   
 day                        | 0   
 month                      | 0   
 duration                   | 0   
 campaign                   | 0   
 days_since_last_campaign   | 0   
 previous_campaign_contacts | 0   
 previous_campaign_outcome  | 0   
 subscribed_deposit         | 0   
 created_at                 | 0   
 updated_at                 | 0   
 duration_in_year           | 0   



In [79]:
df_marketing.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_marketing.columns]).show(truncate = False, vertical = True)

-RECORD 0-------------------------
 loan_data_id               | 0   
 age                        | 0   
 job                        | 0   
 marital_id                 | 0   
 education_id               | 0   
 default                    | 0   
 balance                    | 0   
 housing                    | 0   
 loan                       | 0   
 contact                    | 0   
 day                        | 0   
 month                      | 0   
 duration                   | 0   
 campaign                   | 0   
 days_since_last_campaign   | 0   
 previous_campaign_contacts | 0   
 previous_campaign_outcome  | 0   
 subscribed_deposit         | 0   
 created_at                 | 0   
 updated_at                 | 0   
 duration_in_year           | 0   



In [118]:
# Empty list to store column expressions
null_counts = []

# Loop to iterate over each column in df_education
for c in df_education.columns:
    # Count the number of NULL values for column c
    null_count_expr = F.sum(F.col(c).isNull().cast("int")).alias(c)
    
    # Add the expression to the null_counts list
    null_counts.append(null_count_expr)

# Select the columns where the number of NULL values has been calculated
df_null_counts = df_education.select(null_counts)

# Display the result
df_null_counts.show(truncate=False, vertical=True)

-RECORD 0-----------
 education_id | 0   
 value        | 0   
 created_at   | 0   
 updated_at   | 0   



In [80]:
df_education.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_education.columns]).show(truncate = False, vertical = True)

-RECORD 0-----------
 education_id | 0   
 value        | 0   
 created_at   | 0   
 updated_at   | 0   



In [119]:
# Empty list to store column expressions
null_counts = []

# Loop to iterate over each column in df_marital
for c in df_marital.columns:
    # Count the number of NULL values for column c
    null_count_expr = F.sum(F.col(c).isNull().cast("int")).alias(c)
    
    # Add the expression to the null_counts list
    null_counts.append(null_count_expr)

# Select the columns where the number of NULL values has been calculated
df_null_counts = df_marital.select(null_counts)

# Display the result
df_null_counts.show(truncate=False, vertical=True)

-RECORD 0---------
 marital_id | 0   
 value      | 0   
 created_at | 0   
 updated_at | 0   



In [81]:
df_marital.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_marital.columns]).show(truncate = False, vertical = True)

-RECORD 0---------
 marital_id | 0   
 value      | 0   
 created_at | 0   
 updated_at | 0   



In [120]:
# Empty list to store column expressions
null_counts = []

# Loop to iterate over each column in df_transaction
for c in df_transaction.columns:
    # Count the number of NULL values for column c
    null_count_expr = F.sum(F.col(c).isNull().cast("int")).alias(c)
    
    # Add the expression to the null_counts list
    null_counts.append(null_count_expr)

# Select the columns where the number of NULL values has been calculated
df_null_counts = df_transaction.select(null_counts)

# Display the result
df_null_counts.show(truncate=False, vertical=True)

-RECORD 0-----------------------
 TransactionID           | 0    
 CustomerID              | 0    
 CustomerDOB             | 0    
 CustGender              | 1100 
 CustLocation            | 151  
 CustAccountBalance      | 2369 
 TransactionDate         | 0    
 TransactionTime         | 0    
 TransactionAmount (INR) | 0    



In [180]:
df_transaction.printSchema()

root
 |-- TransactionID: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerDOB: string (nullable = true)
 |-- CustGender: string (nullable = true)
 |-- CustLocation: string (nullable = true)
 |-- CustAccountBalance: string (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- TransactionTime: string (nullable = true)
 |-- TransactionAmount (INR): string (nullable = true)



### **2. Source to Target Mapping**
---


## Column Mapping

### Education Status
source : table education_status

target : table education_status

| Source Column   | Target Column   | Transformation                                   |
|----------------|----------------|---------------------------------------------------------|
| `education_id` | `education_id` | -                       |
| `value`        | `value`        | - |
| `created_at`   | `created_at`   | -           |
| `updated_at`   | `updated_at`   | -          |


### Marital Status
source : table marital_status

target : table marital_status
| Source Column   | Target Column   | Transformation |
|----------------|----------------|---------------|
| `marital_id`   | `marital_id`   | - |
| `value`        | `value`        | - |
| `created_at`   | `created_at`   | - |
| `updated_at`   | `updated_at`   | - |


### Marketing Campaign for Deposit
source : table marketing_campaign_deposit

target : table marketing_campaign_deposit
| Source Column              | Target Column                | Transformation                                      |
|----------------------------|-----------------------------|----------------------------------------------------|
| `loan_data_id`             | `loan_data_id`              | - |
| `age`                      | `age`                       | - |
| `job`                      | `job`                       | - |
| `marital_id`               | `marital_id`                | - |
| `education_id`             | `education_id`              | - |
| `"default"`                | `"default"`                 | - |
| `balance`                  | `balance`                   | Remove `$` sign and convert to `INT` |
| `housing`                  | `housing`                   | - |
| `loan`                     | `loan`                      | - |
| `contact`                  | `contact`                   | - |
| `"day"`                    | `"day"`                     | - |
| `"month"`                  | `"month"`                   | - |
| `duration`                 | `duration`                  | - |
| `duration`                 | `duration_in_year`          | duration divide by `365`, round down, and cast to `INT` |
| `campaign`                 | `campaign`                  | - |
| `pdays`                    | `days_since_last_campaign`  | Rename column |
| `previous`                 | `previous_campaign_contacts`| Rename column |
| `poutcome`                 | `previous_campaign_outcome` | Rename column |
| `subscribed_deposit`       | `subscribed_deposit`        | - |
| `created_at`               | `created_at`                | - |
| `updated_at`               | `updated_at`                | - |

### Customers
source : file new_bank_transaction.csv

target : table customers

| Source Column          | Target Column      | Transformation                                      |
|------------------------|-------------------|----------------------------------------------------|
| `CustomerID`          | `customer_id`      | Rename column |
| `CustomerDOB`         | `birth_date`       | Convert to `DATE` format (`d/M/yy`), adjust years if > 2025 |
| `CustGender`          | `gender`           | Rename column; Map `M` → `Male`, `F` → `Female`, others → `Other` |
| `CustLocation`        | `location`         | Rename column |
| `CustAccountBalance`  | `account_balance`  | Rename column, cast to decimal number |

### Transactions
source : file new_bank_transaction.csv

target : table transactions

| Source Column                 | Target Column      | Transformation                                                   |
|--------------------------------|-------------------|-----------------------------------------------------------------|
| `TransactionID`               | `transaction_id`  | Rename column |
| `CustomerID`                  | `customer_id`     | Rename column |
| `TransactionDate`             | `transaction_date` | Convert to `DATE` format (`d/M/yy`), adjust years if > 2025 |
| `TransactionTime`             | `transaction_time` | Convert to `HH:MM:SS` format |
| `TransactionAmount (INR)`     | `transaction_amount` | Rename column, cast to decimal number |


### **3. Transform Data**
---

In [88]:
import logging
import os
from pyspark.sql import SparkSession


def logging_process(log_file="script/log/info.log"):
    # Configure logging
    os.makedirs(os.path.dirname(log_file), exist_ok=True)
    logging.basicConfig(
        filename=log_file,
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s"
    )
    logger = logging.getLogger()
    return logger


def init_spark_session():
    spark = SparkSession.builder.appName(
        "Exercise Data Pipeline Week_6"
    ).getOrCreate()

    return spark

In [89]:
import logging
import pyspark

logging_process()


def extract_data(
    data_name: str, format_data: str
) -> pyspark.sql.DataFrame:
    """
    Function to extract movie data in csv or database table

    Parameters
    ----------
    data_name (str): name of data or table of data sources
    format_data (str): format data of data sources, currently on csv or db

    Returns
    -------
    df (pyspark.sql.DataFrame): dataframe of data sources
    """
    # create spark session
    spark = init_spark_session()

    # set variable for database
    DB_URL = "jdbc:postgresql://source_db:5432/source"
    DB_USER = "postgres"
    DB_PASS = "postgres"

    # set config
    jdbc_url = DB_URL
    connection_properties = {
        "user": DB_USER,
        "password": DB_PASS,
        "driver": "org.postgresql.Driver" # set driver postgres
    }

    try:
        if format_data.lower() == "csv":
            logging.info(f"===== Start Extracting {data_name} data =====")

            df = spark.read.csv(f"data/{data_name}.csv", header=True)

            logging.info(f"===== Finish Extracting {data_name} data =====")

            return df

        elif format_data.lower() == "db":
            logging.info(f"===== Start Extracting {data_name} data =====")

            df = spark.read.jdbc(
                url=jdbc_url, table=data_name, properties=connection_properties
            )

            logging.info(f"===== Finish Extracting {data_name} data =====")

            return df

        else:
            raise Exception("Format data not supported yet")

    except Exception as e:
        logging.error("====== Failed to Extract Data ======")
        logging.error(e)

        raise Exception(e)

In [90]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, round, regexp_replace

def casting_data_types(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to cast data types based on table name.
    """
    casting_mappings = {
        "marketing_campaign_deposit": {
            "balance": ("int", "\\$"),
            "duration_in_year": ("int", None, "duration", 365)
        }
    }
    
    if table_name in casting_mappings:
        for col_name, cast_info in casting_mappings[table_name].items():
            if len(cast_info) == 2 and cast_info[1]:
                df = df.withColumn(col_name, regexp_replace(col(col_name), cast_info[1], "").cast(cast_info[0]))
            elif len(cast_info) == 4:
                df = df.withColumn(col_name, round(col(cast_info[2]) / cast_info[3]).cast(cast_info[0]))
    
    return df


In [91]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import regexp_replace, col

def clean_data(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to clean specific characters from the data.
    """
    cleaning_mappings = {
        "marketing_campaign_deposit": {"balance": "\$"}
    }
    
    if table_name in cleaning_mappings:
        for col_name, pattern in cleaning_mappings[table_name].items():
            df = df.withColumn(col_name, regexp_replace(col(col_name), pattern, ""))
    
    return df

In [174]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_date, col, when, expr, length, lit, concat, substring

def convert_date_columns(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to convert date and time columns based on table name.
    """
    if table_name == "transactions":
        # Convert TransactionDate from d/M/yy to YYYY/MM/DD
        df = df.withColumn("TransactionDate", to_date(col("TransactionDate"), "d/M/yy"))
        
        # Convert TransactionTime from HHMMSS to HH:MM:SS
        df = df.withColumn("TransactionTime", expr(
            "substring(TransactionTime, 1, 2) || ':' || "
            "substring(TransactionTime, 3, 2) || ':' || "
            "substring(TransactionTime, 5, 2)"
        ))
    
    if table_name == "customers":
        # Convert CustomerDOB with year correction
        df = df.withColumn(
            "birth_date_temp",
            when(substring(col("CustomerDOB"), 7, 2).cast("int") > 25,
                 concat(substring(col("CustomerDOB"), 1, 6), lit("19"), substring(col("CustomerDOB"), 7, 2))
                 ).otherwise(
                concat(substring(col("CustomerDOB"), 1, 6), lit("20"), substring(col("CustomerDOB"), 7, 2))
            )
        )
        df = df.withColumn("birth_date", to_date(col("birth_date_temp"), "d/M/yyyy")).drop("birth_date_temp")
    return df


In [197]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_date, col, udf
from pyspark.sql.types import StringType

def convert_date_columns(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to convert date columns based on table name.
    """
    date_mappings = {
        "customers": {"CustomerDOB": "birth_date"},
        "transactions": {"TransactionDate": "transaction_date"}
    }
    
    if table_name in date_mappings:
        for old_col, new_col in date_mappings[table_name].items():
            if old_col == "CustomerDOB":
                # Convert CustomerDOB to correct format using UDF
                df = df.withColumn(new_col, to_date(convert_year_format_udf(col(old_col)), "d/M/yyyy"))
            else:
                # Convert TransactionDate to correct format
                df = df.withColumn(new_col, to_date(convert_year_format_udf(col(old_col)), "d/M/yyyy"))
    
    return df

def convert_year_format(date_str: str) -> str:
    """
    Function to convert CustomerDOB to correct format.
    """
    if date_str:
        parts = date_str.split('/')
        if len(parts) == 3:
            day, month, year = parts
            if int(year) > 25:
                year = "19" + year
            else:
                year = "20" + year
            return f"{day}/{month}/{year}"
    return date_str

# Register UDF
convert_year_format_udf = udf(convert_year_format, StringType())

In [None]:
deep

In [209]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, to_date, concat_ws, substring, when, lit

def convert_date_columns(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to convert date columns based on table name.
    """
    date_mappings = {
        "customers": {"CustomerDOB": "birth_date"},
        "transactions": {"TransactionDate": "transaction_date"}
    }
    
    if table_name in date_mappings:
        for old_col, new_col in date_mappings[table_name].items():
            if old_col == "CustomerDOB":
                # Convert CustomerDOB to correct format (d/M/yyyy)
                df = df.withColumn(
                    new_col,
                    to_date(
                        concat_ws(
                            '/',
                            substring(col(old_col), 1, 2),  # Day
                            substring(col(old_col), 4, 2),  # Month
                            concat_ws(
                                '',
                                when(substring(col(old_col), 7, 2).cast('int') > 25, lit('19'))
                                .otherwise(lit('20')),
                                substring(col(old_col), 7, 2)  # Year
                            )
                        ),
                        "d/M/yyyy"
                    )
                )
            else:
                # Convert TransactionDate to correct format (d/M/yyyy)
                df = df.withColumn(
                    new_col,
                    to_date(
                        concat_ws(
                            '/',
                            substring(col(old_col), 1, 2),  # Day
                            substring(col(old_col), 4, 2),  # Month
                            concat_ws(
                                '',
                                when(substring(col(old_col), 7, 2).cast('int') > 25, lit('19'))
                                .otherwise(lit('20')),
                                substring(col(old_col), 7, 2)  # Year
                            )
                        ),
                        "d/M/yyyy"
                    )
                )
    
    return df

def convert_transaction_time(df: DataFrame) -> DataFrame:
    """
    Function to convert TransactionTime to HH:MM:SS format.
    Handles cases where the time string has 5 digits (e.g., 014103 -> 01:41:03).
    """
    df = df.withColumn(
        "TransactionTime",
        concat_ws(
            ':',
            when(length(col("TransactionTime")) == 5, concat_ws('', lit('0'), substring(col("TransactionTime"), 1, 1)))  # Add leading zero for 5-digit times
            .otherwise(substring(col("TransactionTime"), 1, 2)),  # HH
            when(length(col("TransactionTime")) == 5, substring(col("TransactionTime"), 2, 2))  # MM for 5-digit times
            .otherwise(substring(col("TransactionTime"), 3, 2)),  # MM
            when(length(col("TransactionTime")) == 5, substring(col("TransactionTime"), 4, 2))  # SS for 5-digit times
            .otherwise(substring(col("TransactionTime"), 5, 2))   # SS
        )
    )
    return df

In [None]:
claude

In [367]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_date, col, when, concat, substring, lit, regexp_extract

def convert_date_columns(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to convert date and time columns based on table name.
    
    Args:
        df: Input DataFrame
        table_name: Name of the table being processed
        
    Returns:
        DataFrame with converted date and time columns
    """
    if table_name == "transactions":
        # Convert TransactionDate from d/M/yy to YYYY/MM/DD
        df = df.withColumn("transaction_date", to_date(col("TransactionDate"), "d/M/yy"))
        
        # Convert TransactionTime from HHMMSS to HH:MM:SS format using lpad
        df = df.withColumn("padded_time", lpad(col("TransactionTime"), 6, "0"))
        
        # Then convert to HH:MM:SS format
        df = df.withColumn(
            "transaction_time",
            concat(
                substring(col("padded_time"), 1, 2), lit(":"),
                substring(col("padded_time"), 3, 2), lit(":"),
                substring(col("padded_time"), 5, 2)
            )
        )
        
        # Drop the temporary column
        df = df.drop("padded_time")
        
        return df
        
    elif table_name == "customers":
        # Convert CustomerDOB with year > 25 check using regex for safer extraction
        df = df.withColumn(
            "birth_date",
            when(
                regexp_extract(col("CustomerDOB"), "(\\d{1,2})/(\\d{1,2})/(\\d{2})", 3).cast("int") > 25,
                to_date(
                    concat(
                        regexp_extract(col("CustomerDOB"), "(\\d{1,2})/(\\d{1,2})/", 1), 
                        lit("/"),
                        regexp_extract(col("CustomerDOB"), "\\d{1,2}/(\\d{1,2})/", 1),
                        lit("/19"),
                        regexp_extract(col("CustomerDOB"), "\\d{1,2}/\\d{1,2}/(\\d{2})", 1)
                    ),
                    "d/M/yyyy"
                )
            ).otherwise(to_date(col("CustomerDOB"), "d/M/yy"))
        )
        
    return df

In [339]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_date, col, when, concat, substring, lit, regexp_extract

def convert_date_columns(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to convert date and time columns based on table name.
    
    Args:
        df: Input DataFrame
        table_name: Name of the table being processed
        
    Returns:
        DataFrame with converted date and time columns
    """
    if table_name == "transactions":
        # Convert TransactionDate from d/M/yy to YYYY/MM/DD
        df = df.withColumn("transaction_date", to_date(col("TransactionDate"), "d/M/yy"))
        
        # Convert TransactionTime from HHMMSS to HH:MM:SS format using lpad
        df = df.withColumn("padded_time", lpad(col("TransactionTime"), 6, "0"))
        
        # Then convert to HH:MM:SS format
        df = df.withColumn(
            "transaction_time",
            when(length(col(time_col)) == 6,
                 concat(
                     regexp_extract(col(time_col), "^(\\d{2})", 1), lit(":"),
                     regexp_extract(col(time_col), "^\\d{2}(\\d{2})", 1), lit(":"),
                     regexp_extract(col(time_col), "^\\d{4}(\\d{2})", 1)
                 ))
            .when(length(col(time_col)) == 5,
                  concat(
                      lit("0"), regexp_extract(col(time_col), "^(\\d{1})", 1), lit(":"),
                      regexp_extract(col(time_col), "^\\d{1}(\\d{2})", 1), lit(":"),
                      regexp_extract(col(time_col), "^\\d{3}(\\d{2})", 1)
                  ))
            .when(length(col(time_col)) == 4,
                  concat(
                      lit("00"), lit(":"), regexp_extract(col(time_col), "^(\\d{2})", 1), lit(":"),
                      regexp_extract(col(time_col), "^\\d{2}(\\d{2})", 1)
                  ))
            .when(length(col(time_col)) == 3,
                  concat(
                      lit("00"), lit(":"), lit("0"), regexp_extract(col(time_col), "^(\\d{1})", 1), lit(":"),
                      regexp_extract(col(time_col), "^\\d{1}(\\d{2})", 1)
                  ))
            .when(length(col(time_col)) == 2,
                  concat(
                      lit("00"), lit(":"), lit("00"), lit(":"), regexp_extract(col(time_col), "^(\\d{2})", 1)
                  ))
            .when(length(col(time_col)) == 1,
                  concat(
                      lit("00"), lit(":"), lit("00"), lit(":"), lit("0"), regexp_extract(col(time_col), "^(\\d{1})", 1)
                  ))
            .otherwise(lit("00:00:00"))
        )
        
    elif table_name == "customers":
        # Convert CustomerDOB with year > 25 check using regex for safer extraction
        df = df.withColumn(
            "birth_date",
            when(
                regexp_extract(col("CustomerDOB"), "(\\d{1,2})/(\\d{1,2})/(\\d{2})", 3).cast("int") > 25,
                to_date(
                    concat(
                        regexp_extract(col("CustomerDOB"), "(\\d{1,2})/(\\d{1,2})/", 1), 
                        lit("/"),
                        regexp_extract(col("CustomerDOB"), "\\d{1,2}/(\\d{1,2})/", 1),
                        lit("/19"),
                        regexp_extract(col("CustomerDOB"), "\\d{1,2}/\\d{1,2}/(\\d{2})", 1)
                    ),
                    "d/M/yyyy"
                )
            ).otherwise(to_date(col("CustomerDOB"), "d/M/yy"))
        )
        
    return df

In [322]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_date, col, when, concat, substring, lit, regexp_extract

def convert_date_columns(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to convert date and time columns based on table name.
    
    Args:
        df: Input DataFrame
        table_name: Name of the table being processed
        
    Returns:
        DataFrame with converted date and time columns
    """
    if table_name == "transactions":
        # Convert TransactionDate from d/M/yy to YYYY/MM/DD
        df = df.withColumn("transaction_date", to_date(col("TransactionDate"), "d/M/yy"))
        
        # Convert TransactionTime from HHMMSS to HH:MM:SS format using safe pattern matching
        df = df.withColumn(
            "transaction_time",
            when(length(col("TransactionTime")) == 6,
                 # 123456 → 12:34:56
                 concat(
                     substring(col("TransactionTime"), 1, 2), lit(":"),
                     substring(col("TransactionTime"), 3, 2), lit(":"),
                     substring(col("TransactionTime"), 5, 2)
                 )
            ).when(length(col("TransactionTime")) == 5,
                 # 12345 → 01:23:45
                 concat(
                     lit("0"), substring(col("TransactionTime"), 1, 1), lit(":"),
                     substring(col("TransactionTime"), 2, 2), lit(":"),
                     substring(col("TransactionTime"), 4, 2)
                 )
            ).when(length(col("TransactionTime")) == 4,
                 # 1234 → 00:12:34
                 concat(
                     lit("00"), lit(":"),
                     substring(col("TransactionTime"), 1, 2), lit(":"),
                     substring(col("TransactionTime"), 3, 2)
                 )
            ).when(length(col("TransactionTime")) == 3,
                 # 123 → 00:01:23
                 concat(
                     lit("00"), lit(":"),
                     lit("0"), substring(col("TransactionTime"), 1, 1), lit(":"),
                     substring(col("TransactionTime"), 2, 2)
                 )
            ).when(length(col("TransactionTime")) == 2,
                 # 12 → 00:00:12
                 concat(
                     lit("00"), lit(":"),
                     lit("00"), lit(":"),
                     substring(col("TransactionTime"), 1, 2)
                 )
            ).when(length(col("TransactionTime")) == 1,
                 # 1 → 00:00:01
                 concat(
                     lit("00"), lit(":"),
                     lit("00"), lit(":"),
                     lit("0"), substring(col("TransactionTime"), 1, 1)
                 )
            ).otherwise(lit("00:00:00"))  # Handle empty or null values
        )

    elif table_name == "customers":
        # Convert CustomerDOB with year > 25 check using regex for safer extraction
        df = df.withColumn(
            "birth_date",
            when(
                regexp_extract(col("CustomerDOB"), "(\\d{1,2})/(\\d{1,2})/(\\d{2})", 3).cast("int") > 25,
                to_date(
                    concat(
                        regexp_extract(col("CustomerDOB"), "(\\d{1,2})/(\\d{1,2})/", 1), 
                        lit("/"),
                        regexp_extract(col("CustomerDOB"), "\\d{1,2}/(\\d{1,2})/", 1),
                        lit("/19"),
                        regexp_extract(col("CustomerDOB"), "\\d{1,2}/\\d{1,2}/(\\d{2})", 1)
                    ),
                    "d/M/yyyy"
                )
            ).otherwise(to_date(col("CustomerDOB"), "d/M/yy"))
        )
        
    return df

In [93]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import when, col

def feature_engineering(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to apply feature engineering based on table name.
    """
    if table_name == "customers":
        df = df.withColumn("gender", when(col("CustGender") == "M", "Male")
                                      .when(col("CustGender") == "F", "Female")
                                      .otherwise("Other"))
    return df


In [371]:
from pyspark.sql import DataFrame

def rename_columns(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to rename columns based on the table name.
    """
    rename_mappings = {
        "marketing_campaign_deposit": {
            "pdays": "days_since_last_campaign",
            "previous": "previous_campaign_contacts",
            "poutcome": "previous_campaign_outcome"
        },
        "customers": {
            "CustomerID": "customer_id",
            "CustLocation": "location",
            "CustAccountBalance": "account_balance"
        },
        "transactions": {
            "TransactionID": "transaction_id",
            # "TransactionTime": "transaction_time",
            "TransactionAmount (INR)": "transaction_amount"
        }
    }
    
    if table_name in rename_mappings:
        for old_col, new_col in rename_mappings[table_name].items():
            df = df.withColumnRenamed(old_col, new_col)
    
    return df

In [363]:
import logging
from pyspark.sql import DataFrame

logging_process()

def transform_data(df: DataFrame, table_name: str) -> DataFrame:
    """
    Function to apply all transformation steps on the dataframe.
    """
    try:
        logging.info(f"===== Start Transforming Data for {table_name} =====")
        df = rename_columns(df, table_name)
        df = convert_date_columns(df, table_name)
        # df = convert_transaction_time(df)
        df = casting_data_types(df, table_name)
        df = feature_engineering(df, table_name)
        df = clean_data(df, table_name)
        logging.info(f"===== Finished Transforming Data for {table_name} =====")
        return df
    except Exception as e:
        logging.error(f"===== Failed to Transform Data for {table_name} =====")
        logging.error(e)
        raise

In [334]:
import logging
import pyspark

logging_process()


def load_data(df_result: pyspark.sql.DataFrame, table_name: str) -> None:
    """
    Function that used to dump the result to the database using PySpark

    Parameters
    ----------
    df_result (pyspark.sql.DataFrame): final result of pyspark movie dataframe
    """
    try:
        # set variable for database
        DB_URL = "jdbc:postgresql://data_warehouse:5432/data_warehouse"
        DB_USER = "postgres"
        DB_PASS = "postgres"

        # set config
        jdbc_url = DB_URL
        connection_properties = {
            "user": DB_USER,
            "password": DB_PASS,
            "driver": "org.postgresql.Driver" # set driver postgres
        }

        logging.info("===== Start Load data to the database =====")

        # load data
        df_result.write.jdbc(
            url=jdbc_url,
            table=table_name,
            mode="overwrite",    # sql untuk method truncate terlebih dahulu 
            properties=connection_properties,
        )

        logging.info("===== Finish Load data to the database =====")

    except Exception as e:
        logging.error("===== Failed Load data to the database =====")
        logging.error(e)
        raise Exception(e)

In [None]:
import logging


# Initialize logging
logging_process()

if __name__ == "__main__":
    logging.info("===== Start Banking Data Pipeline =====")

    try:
        # Extract data from CSV and database
        df_transactions = extract_data(data_name="new_bank_transaction", format_data="csv")
        df_customers = extract_data(data_name="new_bank_transaction", format_data="csv")
        df_marketing = extract_data(data_name="marketing_campaign_deposit", format_data="db")
        df_education = extract_data(data_name="education_status", format_data="db")
        df_marital = extract_data(data_name="marital_status", format_data="db")

        # Transform each dataset separately
        df_transactions = transform_data(df_transactions, "transactions")
        df_customers = transform_data(df_customers, "customers")
        df_marketing = transform_data(df_marketing, "marketing_campaign_deposit")
        df_education = transform_data(df_education, "education_status")
        df_marital = transform_data(df_marital, "marital_status")

        # Load each transformed dataset into the data warehouse
        load_data(df_transactions, table_name="transactions")
        load_data(df_customers, table_name="customers")
        load_data(df_marketing, table_name="marketing_campaign_deposit")
        load_data(df_education, table_name="education_status")
        load_data(df_marital, table_name="marital_status")

        logging.info("===== Finish Banking Data Pipeline =====")

    except Exception as e:
        logging.error("===== Data Pipeline Failed =====")
        logging.error(e)
        raise


In [372]:
df_customers = extract_data(data_name="new_bank_transaction", format_data="csv")

In [373]:
df_customers = transform_data(df_customers, "customers")

In [135]:
df_customers

DataFrame[TransactionID: string, customer_id: string, CustomerDOB: string, CustGender: string, location: string, account_balance: string, TransactionDate: string, TransactionTime: string, TransactionAmount (INR): string, gender: string]

In [130]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [295]:
load_data(df_customers, table_name="customers")

In [None]:
df_customers.show(83, truncate = False, vertical = True)

In [352]:
df_customers.show(1, truncate = False, vertical = True)

-RECORD 0-----------------------------
 TransactionID           | T642232    
 customer_id             | C1010028   
 CustomerDOB             | 25/8/88    
 CustGender              | F          
 location                | DELHI      
 account_balance         | 296828.37  
 TransactionDate         | 29/8/16    
 TransactionTime         | 09:52:12   
 TransactionAmount (INR) | 557        
 birth_date              | 1988-08-25 
 gender                  | Female     
only showing top 1 row



In [374]:
df_customers.filter(col("TransactionID") == "T986198").show(truncate = False, vertical = True)

-RECORD 0-----------------------------
 TransactionID           | T986198    
 customer_id             | C1010372_2 
 CustomerDOB             | 29/5/92    
 CustGender              | F          
 location                | GURGAON    
 account_balance         | 3762.92    
 TransactionDate         | 15/9/16    
 TransactionTime         | 5919       
 TransactionAmount (INR) | 94         
 birth_date              | 1992-05-29 
 gender                  | Female     



In [353]:
df_customers.filter(col("TransactionID") == "T986198").show(truncate = False, vertical = True)


-RECORD 0-----------------------------
 TransactionID           | T986198    
 customer_id             | C1010372_2 
 CustomerDOB             | 29/5/92    
 CustGender              | F          
 location                | GURGAON    
 account_balance         | 3762.92    
 TransactionDate         | 15/9/16    
 TransactionTime         | 59:19:     
 TransactionAmount (INR) | 94         
 birth_date              | 1992-05-29 
 gender                  | Female     



In [157]:
df_customers2.show(3, truncate = False, vertical = True)

-RECORD 0------------------------------
 TransactionID           | T642232     
 customer_id             | C1010028    
 CustomerDOB             | 25/8/88     
 CustGender              | F           
 location                | DELHI       
 account_balance         | 296828.37   
 TransactionDate         | 29/8/16     
 TransactionTime         | 95212       
 TransactionAmount (INR) | 557         
 birth_date              | 8208-08-25  
 gender                  | Female      
-RECORD 1------------------------------
 TransactionID           | T87414      
 customer_id             | C1010035    
 CustomerDOB             | 2/3/92      
 CustGender              | M           
 location                | MUMBAI      
 account_balance         | 7284.42     
 TransactionDate         | 1/8/16      
 TransactionTime         | 111917      
 TransactionAmount (INR) | 50          
 birth_date              | 9220-03-02  
 gender                  | Male        
-RECORD 2------------------------------
