In [0]:
file_path = "dbfs:/Volumes/workspace/data/data/accepted_2007_to_2018Q4.csv.gz"
spark_df = spark.read.csv(
    path=file_path, 
    header=True, 
    inferSchema=True)
    
print(spark_df.count())
spark_df.show(5)
spark_df.printSchema()

2260701
+--------+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+--------+-----------+----------+--------------------+----+------------------+------------------+--------+----------+-----+-----------+----------------+--------------+---------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+------------------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------+-------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+-----------+-----------+-----------+------------------+--------

In [0]:
JDBC_USER = "diakiv"
JDBC_PASSWORD = "bZ7djk35Vwpc.Jh"
JDBC_HOST = "postgresql-diakiv.alwaysdata.net"  
JDBC_PORT = "5432"
DB_NAME = "diakiv_project"

TABLE_NAME = "rejected_loans_data"

JDBC_URL = f"jdbc:postgresql://{JDBC_HOST}:{JDBC_PORT}/{DB_NAME}"
JDBC_DRIVER = "org.postgresql.Driver"

In [0]:
try:
    df_from_postgres = spark.read \
        .format("jdbc") \
        .option("url", JDBC_URL) \
        .option("dbtable", TABLE_NAME) \
        .option("user", JDBC_USER) \
        .option("password", JDBC_PASSWORD) \
        .option("driver", JDBC_DRIVER) \
        .load()

    print(f"Data successfully read from table '{TABLE_NAME}' into Spark DataFrame.")
        

    print(f"Record count: {df_from_postgres.count()}")
    df_from_postgres.show(5)
    df_from_postgres.printSchema()

except Exception as e:
    print(f"Error while connecting or reading data from PostgreSQL:")
    print(f"Error details: {e}")
    print("Check if the credentials are correct, if your Databricks cluster is running, and if Alwaysdata allows remote connections.")

Data successfully read from table 'rejected_loans_data' into Spark DataFrame.
Record count: 318370
+----------------+----------------+--------------------+----------+--------------------+--------+-----+-----------------+-----------+
|Amount Requested|Application Date|          Loan Title|Risk_Score|Debt-To-Income Ratio|Zip Code|State|Employment Length|Policy Code|
+----------------+----------------+--------------------+----------+--------------------+--------+-----+-----------------+-----------+
|          1000.0|      2007-05-26|Wedding Covered b...|     693.0|                 10%|   481xx|   NM|          4 years|        0.0|
|          1000.0|      2007-05-26|  Consolidating Debt|     703.0|                 10%|   010xx|   MA|         < 1 year|        0.0|
|         11000.0|      2007-05-27|Want to consolida...|     715.0|                 10%|   212xx|   MD|           1 year|        0.0|
|          6000.0|      2007-05-27|             waksman|     698.0|              38.64%|   017xx|

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType, DateType
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("GradualTransformation").getOrCreate()

df_accepted = spark_df        
df_rejected = df_from_postgres

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, IntegerType

month_map = {
    'Jan':'01','Feb':'02','Mar':'03','Apr':'04','May':'05','Jun':'06',
    'Jul':'07','Aug':'08','Sep':'09','Oct':'10','Nov':'11','Dec':'12'
}

def convert_mmm_yyyy(col):
    expr = col
    for m, num in month_map.items():
        expr = F.regexp_replace(expr, m, num)
    expr = F.concat_ws("-", F.substring(expr, 4, 4), F.substring(expr, 1, 2), F.lit("01"))
    return expr

df_accepted_temp = df_accepted

date_cols = ['issue_d', 'earliest_cr_line', 'last_pymnt_d', 'next_pymnt_d', 'last_credit_pull_d']
numeric_cleaned_cols = [
    'int_rate', 'dti', 'revol_util', 'annual_inc', 'total_rev_hi_lim',
    'tot_hi_cred_lim', 'total_bal_ex_mort', 'total_bc_limit',
    'total_il_high_credit_limit'
]

numeric_check_regex = r'^\s*-?\d+(\.\d+)?%?\s*$'
regex_cleaner = r'[% ]'

for col in numeric_cleaned_cols:
    if col in df_accepted_temp.columns:
        df_accepted_temp = df_accepted_temp.withColumn(
            col,
            F.when(
                F.col(col).rlike(numeric_check_regex),
                F.regexp_replace(F.col(col), regex_cleaner, "").cast(DoubleType())
            ).otherwise(None)
        )

for col in date_cols:
    if col in df_accepted_temp.columns:
        df_accepted_temp = df_accepted_temp.withColumn(col + "_str", F.col(col).cast("string"))
        df_accepted_temp = df_accepted_temp.withColumn(
            col,
            F.when(
                F.col(col + "_str").rlike(r'^[A-Za-z]{3}-\d{4}$'),
                F.to_date(convert_mmm_yyyy(F.col(col + "_str")), "yyyy-MM-dd")
            ).when(
                F.col(col + "_str").rlike(r'^\d{4}-\d{2}-\d{2}$'),
                F.to_date(F.col(col + "_str"), "yyyy-MM-dd")
            ).otherwise(None)
        ).drop(col + "_str")

fico_numeric_regex = r'^\d+$'
df_accepted_temp = df_accepted_temp.withColumn(
    'fico_range_low',
    F.when(F.col('fico_range_low').rlike(fico_numeric_regex), F.col('fico_range_low').cast(DoubleType())).otherwise(None)
)
df_accepted_temp = df_accepted_temp.withColumn(
    'fico_range_low', F.col('fico_range_low').cast(IntegerType())
)

df_accepted_temp = df_accepted_temp.withColumn(
    'fico_range_high',
    F.when(F.col('fico_range_high').rlike(fico_numeric_regex), F.col('fico_range_high').cast(DoubleType())).otherwise(None)
)
df_accepted_temp = df_accepted_temp.withColumn(
    'fico_range_high', F.col('fico_range_high').cast(IntegerType())
)

if 'emp_length' in df_accepted_temp.columns:
    emp_length_check_regex = r'^[<|>|+|\d\s]*[years|year]?$'
    df_accepted_temp = df_accepted_temp.withColumn(
        'emp_length',
        F.when(
            F.col('emp_length').rlike(emp_length_check_regex),
            F.regexp_replace(F.col('emp_length'), r'[<|>|+| years| year]', '').cast(DoubleType())
        ).otherwise(None)
    )

all_numeric_cols = numeric_cleaned_cols + ['fico_range_low', 'fico_range_high', 'emp_length']
df_accepted_temp = df_accepted_temp.fillna(0.0, subset=all_numeric_cols)

df_accepted_temp = df_accepted_temp.dropDuplicates()
df_accepted_clean = df_accepted_temp.drop('id', 'member_id', 'url', 'desc', 'policy_code')


df_rejected_temp = df_rejected.selectExpr(
    "`Amount Requested` as loan_amnt",
    "`Application Date` as issue_d",
    "`Loan Title` as title",
    "`Risk_Score` as risk_score",
    "`Debt-To-Income Ratio` as dti",
    "`Zip Code` as zip_code",
    "`State` as addr_state",
    "`Employment Length` as emp_length"
)

df_rejected_temp = df_rejected_temp.withColumn("loan_status", F.lit("Rejected"))

df_rejected_temp = df_rejected_temp.withColumn("issue_d_str", F.col("issue_d").cast("string"))
df_rejected_temp = df_rejected_temp.withColumn(
    "issue_d",
    F.when(
        F.col("issue_d_str").rlike(r'^[A-Za-z]{3}-\d{4}$'),
        F.to_date(convert_mmm_yyyy(F.col("issue_d_str")), "yyyy-MM-dd")
    ).when(
        F.col("issue_d_str").rlike(r'^\d{4}-\d{2}-\d{2}$'),
        F.to_date(F.col("issue_d_str"), "yyyy-MM-dd")
    ).otherwise(None)
).drop("issue_d_str")

df_rejected_temp = df_rejected_temp.withColumn(
    "dti",
    F.when(
        F.col("dti").rlike(numeric_check_regex),
        F.regexp_replace(F.col("dti"), r'[%| ]', '').cast(DoubleType())
    ).otherwise(None)
)

df_rejected_temp = df_rejected_temp.withColumn(
    "emp_length",
    F.when(
        F.col("emp_length").rlike(r'^[0-9\.\-]*$'),
        F.regexp_replace(F.col("emp_length"), r'[<|>|+| years| year]', '').cast(DoubleType())
    ).otherwise(None)
)

df_rejected_clean = df_rejected_temp.fillna(0.0, subset=['dti', 'risk_score', 'emp_length'])
df_rejected_clean = df_rejected_clean.dropDuplicates()

accepted_output_path = "/Volumes/workspace/data/accepted_volume/accepted_clean_delta"
rejected_output_path = "/Volumes/workspace/data/accepted_volume/rejected_clean_delta"

df_accepted_clean.write \
    .mode("overwrite") \
    .format("delta") \
    .save(accepted_output_path)

df_rejected_clean.write \
    .mode("overwrite") \
    .format("delta") \
    .save(rejected_output_path)

union_cols = [
    "loan_amnt", "issue_d", "title", "dti", "zip_code", "addr_state",
    "emp_length", "loan_status"
]

df_accepted_union = df_accepted_clean.select(*union_cols)
df_rejected_union = df_rejected_clean.select(*union_cols)

df_union = df_accepted_union.unionByName(df_rejected_union)

df_agg_ratio = df_union.groupBy("addr_state").pivot("loan_status").agg(F.count("*"))

df_agg_ratio = df_agg_ratio.withColumn(
    "acceptance_rate",
    F.col("Accepted") / (F.col("Accepted") + F.col("Rejected"))
).orderBy(F.col("acceptance_rate").desc())

output_path = "/Volumes/workspace/data/accepted_volume/loans_combined_delta"

df_union.write \
    .mode("overwrite") \
    .partitionBy("addr_state") \
    .format("delta") \
    .save(output_path)