## Part 1: Loading the dataset

In [60]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Milestone3").getOrCreate()
lookup=[]

In [61]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, lag, regexp_replace,when,min,max,to_date,datediff
from pyspark.ml.feature import  StringIndexer



In [62]:
df = spark.read.parquet('data/fintech_data_31_52_0324.parquet')


In [63]:
df.show(20)

                                                                                

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         Customer Id|           Emp Title|Emp Length|Home Ownership|Annual Inc|Annual Inc Joint|Verification Status|Zip Code|Addr State|Avg Cur Bal|Tot Cur Bal|Loan Id|    Loan Status|Loan Amount|State|Funded Amount|      Term|Int Rate|Grade|       Issue Date|Pymnt Plan|      Type|           Purpose|         Description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+---------------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YicyXHhkMVx4ODVce...|     P

In [64]:
print(df.rdd.getNumPartitions())

1


In [65]:
import psutil
logical_cores = psutil.cpu_count(logical=True)
df = df.repartition(logical_cores)

In [66]:
print(df.rdd.getNumPartitions())

[Stage 676:>                                                        (0 + 1) / 1]

4


## Part 2: Cleaning

In [67]:
df = df.toDF(*(col.lower() for col in df.columns))
df = df.toDF(*(col.replace(" ", "_") for col in df.columns))

df.show(20)


[Stage 677:>                                                        (0 + 1) / 1]

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceDlhQCNceGMzX...|            Direct

                                                                                

In [68]:
def detect_missing(df):
    missing_info = {col: (df[df[col].isNull()].count() / df.count()) * 100 for col in df.columns}
    return missing_info

missing_info = detect_missing(df)
print("Missing Information:", missing_info)

                                                                                

Missing Information: {'customer_id': 0.0, 'emp_title': 8.75323714391417, 'emp_length': 6.918238993710692, 'home_ownership': 0.0, 'annual_inc': 0.0, 'annual_inc_joint': 92.97817240103589, 'verification_status': 0.0, 'zip_code': 0.0, 'addr_state': 0.0, 'avg_cur_bal': 0.0, 'tot_cur_bal': 0.0, 'loan_id': 0.0, 'loan_status': 0.0, 'loan_amount': 0.0, 'state': 0.0, 'funded_amount': 0.0, 'term': 0.0, 'int_rate': 4.554199038105809, 'grade': 0.0, 'issue_date': 0.0, 'pymnt_plan': 0.0, 'type': 0.0, 'purpose': 0.0, 'description': 0.8953015168331484}


In [69]:


num_col=['annual_inc_joint','int_rate']
string_col=['emp_title','emp_length','description']
df=df.na.fill(value=0,subset=num_col)

for col_name in string_col:
    mode= df[df[col_name].isNull()==False].groupBy(col_name).count()\
                      .orderBy(col('count').desc())\
                      .limit(1).select(col_name).collect()[0][0]

   
    # print(mode)
    df=df.na.fill(value=mode,subset=[col_name])
    
df.show(20)    

[Stage 929:>                                                        (0 + 1) / 1]

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceDlhQCNceGMzX...|            Direct

                                                                                

In [70]:
missing_info = detect_missing(df)
print("Missing Information:", missing_info)

Missing Information: {'customer_id': 0.0, 'emp_title': 0.0, 'emp_length': 0.0, 'home_ownership': 0.0, 'annual_inc': 0.0, 'annual_inc_joint': 0.0, 'verification_status': 0.0, 'zip_code': 0.0, 'addr_state': 0.0, 'avg_cur_bal': 0.0, 'tot_cur_bal': 0.0, 'loan_id': 0.0, 'loan_status': 0.0, 'loan_amount': 0.0, 'state': 0.0, 'funded_amount': 0.0, 'term': 0.0, 'int_rate': 0.0, 'grade': 0.0, 'issue_date': 0.0, 'pymnt_plan': 0.0, 'type': 0.0, 'purpose': 0.0, 'description': 0.0}


## Part 3: Encoding

In [71]:


df_encoded_emp = df.withColumn(
    "emp_length",
    regexp_replace(
        regexp_replace(col("emp_length"), "< 1 year", "0.5 years"), 
        "10\\+ years", "10 years" 
    )
)
df_encoded_emp=df_encoded_emp.withColumn("emp_length",regexp_replace(col("emp_length")," year",""))
df_encoded_emp=df_encoded_emp.withColumn("emp_length",regexp_replace(col("emp_length"),"s","").cast("float"))

df_encoded_emp.show()


[Stage 1143:>                                                       (0 + 1) / 1]

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|
+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+
|YidceDlhQCNceGMzX...|            Direct

                                                                                

In [72]:
lookup.append({'Column':'emp_length','Original': '< 1 year', 'Encoded': '0.5'})
lookup.append({'Column':'emp_length','Original': '1 year', 'Encoded': '1'})
lookup.append({'Column':'emp_length','Original': '2 years', 'Encoded': '2'})
lookup.append({'Column':'emp_length','Original': '3 years', 'Encoded': '3'})
lookup.append({'Column':'emp_length','Original': '4 years', 'Encoded': '4'})
lookup.append({'Column':'emp_length','Original': '5 years', 'Encoded': '5'})
lookup.append({'Column':'emp_length','Original': '6 years', 'Encoded': '6'})
lookup.append({'Column':'emp_length','Original': '7 years', 'Encoded': '7'})
lookup.append({'Column':'emp_length','Original': '8 years', 'Encoded': '8'})
lookup.append({'Column':'emp_length','Original': '9 years', 'Encoded': '9'})
lookup.append({'Column':'emp_length','Original': '10+ years', 'Encoded': '10'})

In [73]:



unique_values = df_encoded_emp.select("home_ownership").distinct().rdd.flatMap(lambda x: x).collect()
df_encoded2=df_encoded_emp
for value in unique_values:
    df_encoded2 = df_encoded2.withColumn("home_ownership_"+value, when(col("home_ownership") == value, 1).otherwise(0))
    
# df_encoded2.show()    

                                                                                

In [74]:
# stringIndexer = StringIndexer(inputCol="verification_status", outputCol="verification_status_index")
# model = stringIndexer.fit(df_encoded2)
# df_encoded2 = model.transform(df_encoded2)
# encoder = OneHotEncoder(inputCol="verification_status_index", outputCol="verification_status_encoded")
# df_encoded2 = encoder.fit(df_encoded2).transform(df_encoded2)


unique_values = df_encoded2.select("verification_status").distinct().rdd.flatMap(lambda x: x).collect()
for value in unique_values:
    df_encoded2 = df_encoded2.withColumn("verification_status_" + value, when(col("verification_status") == value, 1).otherwise(0))

In [75]:
# stringIndexer = StringIndexer(inputCol="type", outputCol="type_index")
# model = stringIndexer.fit(df_encoded2)
# df_encoded2 = model.transform(df_encoded2)
# encoder = OneHotEncoder(inputCol="type_index", outputCol="type_encoded")
# df_encoded2 = encoder.fit(df_encoded2).transform(df_encoded2)

unique_values = df_encoded2.select("type").distinct().rdd.flatMap(lambda x: x).collect()
for value in unique_values:
    df_encoded2 = df_encoded2.withColumn("type_"+value, when(col("type") == value, 1).otherwise(0))

In [76]:
df_encoded2.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|verification_status_Verified|verification

In [77]:
state_indexer = StringIndexer(inputCol="state", outputCol="state_encoded")

state_indexer_model = state_indexer.fit(df_encoded2)

df_label_encoded = state_indexer_model.transform(df_encoded2)


for index, label in enumerate(state_indexer_model.labels):
    lookup.append({'Column': 'state', 'Original': label, 'Encoded': index})
    
    
df_label_encoded.show(20)




+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|verification_status_Verifie

In [78]:

state_indexer = StringIndexer(inputCol="purpose", outputCol="purpose_encoded")

state_indexer_model = state_indexer.fit(df_label_encoded)

df_label_encoded = state_indexer_model.transform(df_label_encoded)


for index, label in enumerate(state_indexer_model.labels):
    lookup.append({'Column': 'purpose', 'Original': label, 'Encoded': index})
df_label_encoded.show(20)
    

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_ANY|verificatio

In [79]:

df_encoded = df_label_encoded.withColumn(
    "letter_grade",
    when((col("grade") >= 1) & (col("grade") <= 5), "A")
    .when((col("grade") >= 6) & (col("grade") <= 10), "B")
    .when((col("grade") >= 11) & (col("grade") <= 15), "C")
    .when((col("grade") >= 16) & (col("grade") <= 20), "D")
    .when((col("grade") >= 21) & (col("grade") <= 25), "E")
    .when((col("grade") >= 26) & (col("grade") <= 30), "F")
    .when((col("grade") >= 31) & (col("grade") <= 35), "G")
    .otherwise(None)
)

In [80]:
lookup.append({'Column': 'grade', 'Original': '1', 'Encoded': 'A'})
lookup.append({'Column': 'grade', 'Original': '2', 'Encoded': 'A'})
lookup.append({'Column': 'grade', 'Original': '3', 'Encoded': 'A'})
lookup.append({'Column': 'grade', 'Original': '4', 'Encoded': 'A'})
lookup.append({'Column': 'grade', 'Original': '5', 'Encoded': 'A'})
lookup.append({'Column': 'grade', 'Original': '6', 'Encoded': 'B'})
lookup.append({'Column': 'grade', 'Original': '7', 'Encoded': 'B'})
lookup.append({'Column': 'grade', 'Original': '8', 'Encoded': 'B'})
lookup.append({'Column': 'grade', 'Original': '9', 'Encoded': 'B'})
lookup.append({'Column': 'grade', 'Original': '10', 'Encoded': 'B'})
lookup.append({'Column': 'grade', 'Original': '11', 'Encoded': 'C'})
lookup.append({'Column': 'grade', 'Original': '12', 'Encoded': 'C'})
lookup.append({'Column': 'grade', 'Original': '13', 'Encoded': 'C'})
lookup.append({'Column': 'grade', 'Original': '14', 'Encoded': 'C'})
lookup.append({'Column': 'grade', 'Original': '15', 'Encoded': 'C'})
lookup.append({'Column': 'grade', 'Original': '16', 'Encoded': 'D'})
lookup.append({'Column': 'grade', 'Original': '17', 'Encoded': 'D'})
lookup.append({'Column': 'grade', 'Original': '18', 'Encoded': 'D'})
lookup.append({'Column': 'grade', 'Original': '19', 'Encoded': 'D'})
lookup.append({'Column': 'grade', 'Original': '20', 'Encoded': 'D'})
lookup.append({'Column': 'grade', 'Original': '21', 'Encoded': 'E'})
lookup.append({'Column': 'grade', 'Original': '22', 'Encoded': 'E'})
lookup.append({'Column': 'grade', 'Original': '23', 'Encoded': 'E'})
lookup.append({'Column': 'grade', 'Original': '24', 'Encoded': 'E'})
lookup.append({'Column': 'grade', 'Original': '25', 'Encoded': 'E'})
lookup.append({'Column': 'grade', 'Original': '26', 'Encoded': 'F'})
lookup.append({'Column': 'grade', 'Original': '27', 'Encoded': 'F'})
lookup.append({'Column': 'grade', 'Original': '28', 'Encoded': 'F'})
lookup.append({'Column': 'grade', 'Original': '29', 'Encoded': 'F'})
lookup.append({'Column': 'grade', 'Original': '30', 'Encoded': 'F'})
lookup.append({'Column': 'grade', 'Original': '31', 'Encoded': 'G'})
lookup.append({'Column': 'grade', 'Original': '32', 'Encoded': 'G'})
lookup.append({'Column': 'grade', 'Original': '33', 'Encoded': 'G'})
lookup.append({'Column': 'grade', 'Original': '34', 'Encoded': 'G'})
lookup.append({'Column': 'grade', 'Original': '35', 'Encoded': 'G'})

In [81]:
df_encoded.show()

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+-----------------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|       issue_date|pymnt_plan|      type|           purpose|         description|home_ownership_OWN|home_ownership_RENT|home_ownership_MORTGAGE|home_ownership_AN

## Part 4: Feature Engineering

In [82]:
df_feat_eng = df_encoded.withColumn(
    "issue_date",
    to_date("issue_date", "dd MMMM yyyy") 
)

In [83]:


window = Window.partitionBy("letter_grade").orderBy("issue_date")

##1
df_feat_eng = df_feat_eng.withColumn(
    "prev_issue_date_same_grade",
    lag("issue_date", 1).over(window)
)
##2
df_feat_eng = df_feat_eng.withColumn(
    "prev_loan_amount_same_grade",
    lag("loan_amount", 1).over(window)
)


In [84]:

window = Window.partitionBy("state", "letter_grade").orderBy("issue_date")
##3
df_feat_eng = df_feat_eng.withColumn(
    "prev_issue_date_same_state_grade",
    lag("issue_date", 1).over(window)
)

##4
df_feat_eng = df_feat_eng.withColumn(
    "prev_loan_amount_same_state_grade",
    lag("loan_amount", 1).over(window)
)

df_feat_eng.show()


+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+--------------------------+---------------------------+--------------------------------+---------------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           p

## Part 5: Analysis SQL vs Spark


Identify the average loan amount and interest rate for loans marked as
"Default" in the Loan Status, grouped by Emp Length and annual income ranges.
Hint: Use SQL Cases to bin Annual Income into Income Ranges

In [85]:


df_feat_eng.createOrReplaceTempView("loans")
inc_groups=df_feat_eng.approxQuantile('annual_inc', [0.25, 0.5, 0.75], 0)
min_value = df.select(min("annual_inc")).collect()[0][0]
max_value = df.select(max("annual_inc")).collect()[0][0]
# loan_labels = [f'{int(bins[i])}-{int(bins[i+1])}K' for i in range(len(bins)-1)]
query = f"""
SELECT 
    emp_length,
    CASE 
        WHEN annual_inc < {inc_groups[0]} THEN '{int(min_value/1000)}-{int(inc_groups[0]/1000)}K'
        WHEN annual_inc < {inc_groups[1]} THEN '{int(inc_groups[0]/1000)}-{int(inc_groups[1]/1000)}K'
        WHEN annual_inc < {inc_groups[2]} THEN '{int(inc_groups[1]/1000)}-{int(inc_groups[2]/1000)}K'
        ELSE '{int(inc_groups[2]/10000)}-{int(max_value/1000)}K' 
    END AS income_range,
    AVG(loan_amount) AS avg_loan_amount,
    AVG(int_rate) AS avg_int_rate
FROM loans
WHERE loan_status = 'Default'
GROUP BY emp_length, income_range
"""


sql_df = spark.sql(query)
sql_df.show()

+----------+------------+---------------+-------------------+
|emp_length|income_range|avg_loan_amount|       avg_int_rate|
+----------+------------+---------------+-------------------+
|       0.5|       0-47K|        14087.5|0.19285000000000002|
+----------+------------+---------------+-------------------+



In [86]:
# print(inc_groups)
# query = f"""SELECT * FROM loans WHERE loan_status = 'Default'"""
# sql_df = spark.sql(query)
# sql_df.show()


In [87]:

#  WHEN annual_inc < {inc_groups[0]} THEN '{int(min_value/1000)}-{int(inc_groups[0]/1000)}K'
#         WHEN annual_inc < {inc_groups[1]} THEN '{int(inc_groups[0]/1000)}-{int(inc_groups[1]/1000)}K'
#         WHEN annual_inc < {inc_groups[2]} THEN '{int(inc_groups[1]/1000)}-{int(inc_groups[2]/1000)}K'
#         ELSE '{int(inc_groups[2]/10000)}-{int(max_value/1000)}K' 
df_feat_eng_temp = df_feat_eng.withColumn(
    "income_range",
    when(df_feat_eng['annual_inc'] < inc_groups[0], f'{int(min_value/1000)}-{int(inc_groups[0]/1000)}K')
    .when((df_feat_eng['annual_inc'] >= inc_groups[0]) & (df_feat_eng['annual_inc'] < inc_groups[1]),f'{int(inc_groups[0]/1000)}-{int(inc_groups[1]/1000)}K')
        .when((df_feat_eng['annual_inc'] >= inc_groups[1]) & (df_feat_eng['annual_inc'] < inc_groups[2]),f'{int(inc_groups[1]/1000)}-{int(inc_groups[2]/1000)}K')
            .when((df_feat_eng['annual_inc'] >= inc_groups[2]),f'{int(inc_groups[2]/1000)}--{int(max_value/1000)}K')
)

pyspark_df = (
    df_feat_eng_temp.filter(df_feat_eng_temp['loan_status'] == "Default")
    .groupBy("emp_length", "income_range")
    .agg({"loan_amount": "avg", "int_rate": "avg"})
)

pyspark_df.show()


+----------+------------+----------------+-------------------+
|emp_length|income_range|avg(loan_amount)|      avg(int_rate)|
+----------+------------+----------------+-------------------+
|       0.5|       0-47K|         14087.5|0.19285000000000002|
+----------+------------+----------------+-------------------+



Calculate the average difference between Loan Amount and Funded Amount for each
loan Grade and sort by the grades with the largest differences.

In [88]:



query = f"""
SELECT 
    letter_grade,
    avg(loan_amount-funded_amount) as avg_amount_diff
From loans
GROUP by letter_grade
order by avg_amount_diff desc
"""


sql_df = spark.sql(query)
sql_df.show()

+------------+---------------+
|letter_grade|avg_amount_diff|
+------------+---------------+
|           F|            0.0|
|           E|            0.0|
|           B|            0.0|
|           D|            0.0|
|           C|            0.0|
|           A|            0.0|
|           G|            0.0|
+------------+---------------+



In [89]:

df_feat_eng_temp = df_feat_eng.withColumn(
                        "amount_diff", 
                        df_feat_eng["loan_amount"] - df_feat_eng["funded_amount"])

pyspark_df = (
    df_feat_eng_temp.groupBy("letter_grade")
    .agg({"amount_diff": "avg"})
    .orderBy("avg(amount_diff)", ascending=False)
)

pyspark_df.show()


+------------+----------------+
|letter_grade|avg(amount_diff)|
+------------+----------------+
|           F|             0.0|
|           E|             0.0|
|           B|             0.0|
|           D|             0.0|
|           C|             0.0|
|           A|             0.0|
|           G|             0.0|
+------------+----------------+



Compare the total Loan Amount for loans with "Verified" and "Not Verified"
Verification Status across each state (Addr State).

In [90]:



query = f"""
select state,sum(verified_loan_amount) AS total_verified_loan_amount,
    sum(not_verified_loan_amount) AS total_not_verified_loan_amount
from 
(
SELECT 
    state,
    CASE when verification_status = 'Verified' then loan_amount Else 0 END as verified_loan_amount,
    CASE when verification_status = 'Not Verified' then loan_amount Else 0 END as not_verified_loan_amount
    
From loans

) subquery
group by state
"""

sql_df = spark.sql(query)
sql_df.show()

+-----+--------------------------+------------------------------+
|state|total_verified_loan_amount|total_not_verified_loan_amount|
+-----+--------------------------+------------------------------+
|   SC|                 1295175.0|                     1793700.0|
|   AZ|                 2615100.0|                     3053825.0|
|   LA|                 1494225.0|                     1241750.0|
|   MN|                 2084925.0|                     2297375.0|
|   NJ|                 4928675.0|                     4515200.0|
|   DC|                  220500.0|                      238625.0|
|   OR|                 1325125.0|                     1461900.0|
|   VA|                 3952500.0|                     3397100.0|
|   RI|                  441025.0|                      432025.0|
|   WY|                  284125.0|                      248375.0|
|   KY|                  943800.0|                     1471350.0|
|   NH|                  505075.0|                      631325.0|
|   MI|   

In [91]:

df_feat_eng_temp = df_feat_eng.withColumn(
                        "verified_loan_amount", 
                        when (df_feat_eng['verification_status'] == 'Verified',df_feat_eng['loan_amount']).otherwise(0))
df_feat_eng_temp = df_feat_eng_temp.withColumn(
                        "not_verified_loan_amount", 
                        when (df_feat_eng['verification_status'] == 'Not Verified',df_feat_eng['loan_amount']).otherwise(0))

pyspark_df = (
    df_feat_eng_temp.groupBy("state")
    .agg({"verified_loan_amount": "sum","not_verified_loan_amount": "sum"})
    
)

pyspark_df.show()


+-----+-------------------------+-----------------------------+
|state|sum(verified_loan_amount)|sum(not_verified_loan_amount)|
+-----+-------------------------+-----------------------------+
|   SC|                1295175.0|                    1793700.0|
|   AZ|                2615100.0|                    3053825.0|
|   LA|                1494225.0|                    1241750.0|
|   MN|                2084925.0|                    2297375.0|
|   NJ|                4928675.0|                    4515200.0|
|   DC|                 220500.0|                     238625.0|
|   OR|                1325125.0|                    1461900.0|
|   VA|                3952500.0|                    3397100.0|
|   RI|                 441025.0|                     432025.0|
|   WY|                 284125.0|                     248375.0|
|   KY|                 943800.0|                    1471350.0|
|   NH|                 505075.0|                     631325.0|
|   MI|                3060200.0|       

Calculate the average time gap (in days) between consecutive loans for each
grade using the new features you added in the feature engineering phase

In [92]:
query = f"""
select letter_grade,
avg(DATEDIFF(day,prev_issue_date_same_grade,issue_date)) as avg_time_gap
from loans
group by letter_grade

"""
sql_df = spark.sql(query)
sql_df.show()

+------------+-------------------+
|letter_grade|       avg_time_gap|
+------------+-------------------+
|           A| 0.4764017033356991|
|           B|0.33782083543029695|
|           C| 0.3531036296685955|
|           D| 0.6761521027449006|
|           E|  1.824981301421092|
|           F|  5.480582524271845|
|           G|  19.36521739130435|
+------------+-------------------+



In [93]:

df_feat_eng_temp = df_feat_eng.withColumn(
                        "time_gap", 
                        datediff(df_feat_eng['issue_date'],df_feat_eng['prev_issue_date_same_grade']))

pyspark_df = (
    df_feat_eng_temp.groupBy("letter_grade")
    .agg({"time_gap": "avg"})
    
)

pyspark_df.show()


+------------+-------------------+
|letter_grade|      avg(time_gap)|
+------------+-------------------+
|           A| 0.4764017033356991|
|           B|0.33782083543029695|
|           C| 0.3531036296685955|
|           D| 0.6761521027449006|
|           E|  1.824981301421092|
|           F|  5.480582524271845|
|           G|  19.36521739130435|
+------------+-------------------+



Identify the average difference in loan amounts between consecutive loans
within the same state and grade combination.

In [94]:


query = f"""
select letter_grade,
state,
avg(prev_loan_amount_same_state_grade-loan_amount) as avg_amount_diff
from loans
group by letter_grade,state

"""
sql_df = spark.sql(query)
sql_df.show()

+------------+-----+-------------------+
|letter_grade|state|    avg_amount_diff|
+------------+-----+-------------------+
|           A|   AK|               50.0|
|           B|   AK|-1666.6666666666667|
|           C|   AK|           382.8125|
|           D|   AK| -661.3636363636364|
|           E|   AK|             6500.0|
|           F|   AK|               NULL|
|           A|   AL| -495.5357142857143|
|           B|   AL|-15.555555555555555|
|           C|   AL|  38.57142857142857|
|           D|   AL|-168.13725490196077|
|           E|   AL|-198.68421052631578|
|           F|   AL|            -2062.5|
|           G|   AL|               NULL|
|           A|   AR|             -400.0|
|           B|   AR| -88.13559322033899|
|           C|   AR|           -98.4375|
|           D|   AR|-192.36111111111111|
|           E|   AR|-486.53846153846155|
|           F|   AR| 2216.6666666666665|
|           G|   AR|               NULL|
+------------+-----+-------------------+
only showing top

In [95]:

df_feat_eng_temp = df_feat_eng.withColumn(
                        "amount_diff", 
                        df_feat_eng["prev_loan_amount_same_state_grade"] - df_feat_eng["loan_amount"])
                        

pyspark_df = (
    df_feat_eng_temp.groupBy("letter_grade","state")
    .agg({"amount_diff": "avg"})
    
)

pyspark_df.show()


+------------+-----+-------------------+
|letter_grade|state|   avg(amount_diff)|
+------------+-----+-------------------+
|           A|   AK|               50.0|
|           B|   AK|-1666.6666666666667|
|           C|   AK|           382.8125|
|           D|   AK| -661.3636363636364|
|           E|   AK|             6500.0|
|           F|   AK|               NULL|
|           A|   AL| -495.5357142857143|
|           B|   AL|-15.555555555555555|
|           C|   AL|  38.57142857142857|
|           D|   AL|-168.13725490196077|
|           E|   AL|-198.68421052631578|
|           F|   AL|            -2062.5|
|           G|   AL|               NULL|
|           A|   AR|             -400.0|
|           B|   AR| -88.13559322033899|
|           C|   AR|           -98.4375|
|           D|   AR|-192.36111111111111|
|           E|   AR|-486.53846153846155|
|           F|   AR| 2216.6666666666665|
|           G|   AR|               NULL|
+------------+-----+-------------------+
only showing top

## Part 6: Lookup Table & Saving the dataset

In [96]:
lookup

[{'Column': 'emp_length', 'Original': '< 1 year', 'Encoded': '0.5'},
 {'Column': 'emp_length', 'Original': '1 year', 'Encoded': '1'},
 {'Column': 'emp_length', 'Original': '2 years', 'Encoded': '2'},
 {'Column': 'emp_length', 'Original': '3 years', 'Encoded': '3'},
 {'Column': 'emp_length', 'Original': '4 years', 'Encoded': '4'},
 {'Column': 'emp_length', 'Original': '5 years', 'Encoded': '5'},
 {'Column': 'emp_length', 'Original': '6 years', 'Encoded': '6'},
 {'Column': 'emp_length', 'Original': '7 years', 'Encoded': '7'},
 {'Column': 'emp_length', 'Original': '8 years', 'Encoded': '8'},
 {'Column': 'emp_length', 'Original': '9 years', 'Encoded': '9'},
 {'Column': 'emp_length', 'Original': '10+ years', 'Encoded': '10'},
 {'Column': 'state', 'Original': 'CA', 'Encoded': 0},
 {'Column': 'state', 'Original': 'NY', 'Encoded': 1},
 {'Column': 'state', 'Original': 'TX', 'Encoded': 2},
 {'Column': 'state', 'Original': 'FL', 'Encoded': 3},
 {'Column': 'state', 'Original': 'IL', 'Encoded': 4},

In [97]:
df_lookup = spark.createDataFrame(lookup)

output_path = "data/lookup.parquet" 
df_lookup.coalesce(1).write.mode("overwrite").parquet(output_path)

df_lookup.show()


                                                                                

+----------+-------+---------+
|    Column|Encoded| Original|
+----------+-------+---------+
|emp_length|    0.5| < 1 year|
|emp_length|      1|   1 year|
|emp_length|      2|  2 years|
|emp_length|      3|  3 years|
|emp_length|      4|  4 years|
|emp_length|      5|  5 years|
|emp_length|      6|  6 years|
|emp_length|      7|  7 years|
|emp_length|      8|  8 years|
|emp_length|      9|  9 years|
|emp_length|     10|10+ years|
|     state|      0|       CA|
|     state|      1|       NY|
|     state|      2|       TX|
|     state|      3|       FL|
|     state|      4|       IL|
|     state|      5|       NJ|
|     state|      6|       PA|
|     state|      7|       OH|
|     state|      8|       GA|
+----------+-------+---------+
only showing top 20 rows



In [98]:
df_feat_eng.show()

[Stage 1271:>                                                       (0 + 1) / 1]

+--------------------+--------------------+----------+--------------+----------+----------------+-------------------+--------+----------+-----------+-----------+-------+-----------+-----------+-----+-------------+----------+--------+-----+----------+----------+----------+------------------+--------------------+------------------+-------------------+-----------------------+------------------+----------------------------+-----------------------------------+--------------------------------+--------------+---------------+---------------+----------+-------------+---------------+------------+--------------------------+---------------------------+--------------------------------+---------------------------------+
|         customer_id|           emp_title|emp_length|home_ownership|annual_inc|annual_inc_joint|verification_status|zip_code|addr_state|avg_cur_bal|tot_cur_bal|loan_id|loan_status|loan_amount|state|funded_amount|      term|int_rate|grade|issue_date|pymnt_plan|      type|           p

                                                                                

In [99]:
df_final = df_feat_eng
output_path = "data/fintech_data_MET_P2_52_0324_clean.parquet"
df_final.coalesce(1).write.mode("overwrite").parquet(output_path)

                                                                                

In [100]:
import pandas as pd
from sqlalchemy import create_engine

# Database configuration based on your Docker setup
DB_HOST = "localhost"
DB_PORT = "5432"  
DB_NAME = "testdb"
DB_USER = "root"
DB_PASSWORD = "root"

# Connection URI for SQLAlchemy
DATABASE_URI = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

# Initialize the database connection using SQLAlchemy engine
engine = create_engine(DATABASE_URI)


def save_to_database(df, table_name="fintech_data"):
    try:
        if(engine.connect()):
            df.to_sql(table_name, engine, if_exists='replace', index=False)
            print(f"Data successfully saved to table '{table_name}'.")
        else:
            print("Error: Could not establish connection to the database")
    except Exception as e:
        print(f"Error saving data to database: {e}")
        
save_to_database(df_final.toPandas(), table_name="fintech_data")
save_to_database(df_lookup.toPandas(), table_name="lookup")        
        

                                                                                

Data successfully saved to table 'fintech_data'.
Data successfully saved to table 'lookup'.
