In [1]:
from pyspark.sql.functions import *


In [2]:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
               

In [None]:
# Define paths
# Omitting the container and storage account name for security reasons
base_path = "abfss://container_name@storage_account_name.dfs.core.windows.net/bronze/fintech/"
output_base_path = "abfss://container_name@storage_account_name.dfs.core.windows.net/silver/fintech/"


In [25]:
'''
Transformation for Accounts dataset - 
    A new column, "AccountAgeYears", is added, which calculates how long each account has been active 
    by determining the difference between the current date and the account's open date, then converting it to years.
'''
def transform_accounts():
    df = spark.read.parquet(f"{base_path}Accounts/Accounts.parquet")
    # Transformation: Calculate account age in years
    df_transformed = df.withColumn("AccountAgeYears", 
                                   round(datediff(current_date(), col("OpenDate")) / 365.25, 2))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Accounts/")
    

In [26]:
'''
Transformation for Customers dataset - 
    A new FullName column is created by concatenating FirstName and LastName.
    A MaskedEmail column is created, where the email address is partially hidden 
    (the part before the @ symbol is replaced by ***).
'''
def transform_customers():
    df = spark.read.parquet(f"{base_path}Customers/Customers.parquet")
    # Transformation: Create a full name column and mask the email address
    df_transformed = df.withColumn("FullName", concat_ws(" ", col("FirstName"), col("LastName"))) \
                       .withColumn("MaskedEmail", 
                                   concat(lit("***@"), substring_index(col("Email"), "@", -1)))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Customers/")
    

In [27]:
'''
Transformation for Loans dataset - 
    TotalInterest: The total interest paid on the loan.
    LoanDurationYears: The duration of the loan in years.

Total Interest Calculation:
    The code calculates the total interest on the loan using the formula:
            TotalInterest = (LoanAmount X InterestRate)/100 
    The result is explicitly cast to decimal(28,8) to ensure precise storage in the Delta table.

Loan Duration Calculation:
    The duration of the loan in years is calculated based on the difference between the LoanEndDate and LoanStartDate.
    The result is rounded to 2 decimal places for clarity.
'''
def transform_loans():
    df = spark.read.parquet(f"{base_path}Loans/Loans.parquet")
    # Transformation: Calculate total interest with explicit casting to match the Delta table
    df_transformed = df.withColumn("TotalInterest", 
                                   (col("LoanAmount") * col("InterestRate") / 100).cast("decimal(28,8)")) \
                       .withColumn("LoanDurationYears", 
                                   round(datediff(col("LoanEndDate"), col("LoanStartDate")) / 365.25, 2))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Loans/")
    

In [28]:
'''
Transformation for Payments dataset - 
    Transform the dataset by adding a new column DaysSinceLastPayment, 
    which calculates how many days have passed since the last payment.
'''
def transform_payments():
    df = spark.read.parquet(f"{base_path}Payments/Payments.parquet")
    # Transformation: Calculate days since last payment
    df_transformed = df.withColumn("DaysSinceLastPayment", 
                                   datediff(current_date(), col("PaymentDate")))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Payments/")
    

In [29]:
'''
Transformation for Transactions dataset - 
    Transform the dataset by creating a new column TransactionCategory based on the value of the TransactionType column.
    If the transaction is a Deposit, it's categorized as Income.
    If it's a Withdrawal, it's categorized as Expense.
    Any other transaction type falls into the Other category.
'''
def transform_transactions():
    df = spark.read.parquet(f"{base_path}Transactions/Transactions.parquet")
    # Transformation: Categorize transaction types
    df_transformed = df.withColumn("TransactionCategory", 
                                   when(col("TransactionType") == "Deposit", "Income")
                                   .when(col("TransactionType") == "Withdrawal", "Expense")
                                   .otherwise("Other"))
    df_transformed.write.format("delta").mode("overwrite").save(f"{output_base_path}Transactions/")
    

In [31]:
'''
Process each table - 
    transform_accounts(): Processes the Accounts dataset, typically applying transformations 
    like calculating account age.

    transform_customers(): Processes the Customers dataset, applying transformations such as 
    creating a full name and masking email addresses.

    transform_loans(): Processes the Loans dataset, calculating fields like total interest and loan duration.

    transform_payments(): Processes the Payments dataset, calculating how many days have passed
    since the last payment.

    transform_transactions(): Processes the Transactions dataset, categorizing transaction types 
    (e.g., Deposit -> Income, Withdrawal -> Expense).
'''
transform_accounts()
transform_customers()
transform_loans()
transform_payments()
transform_transactions()

print("Bronze To Silver Completed !!")
