In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [0]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Financial Data Analysis") \
    .getOrCreate()

In [0]:
file_path = "dbfs:/FileStore/tables/financial_loan-2.csv"

In [0]:
file_path

Out[11]: 'dbfs:/FileStore/tables/financial_loan-2.csv'

In [0]:
df = spark.read.format('csv') \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

In [0]:
df.printSchema()
df.describe().show()


root
 |-- id: integer (nullable = true)
 |-- address_state: string (nullable = true)
 |-- application_type: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- issue_date: date (nullable = true)
 |-- last_credit_pull_date: date (nullable = true)
 |-- last_payment_date: date (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- next_payment_date: date (nullable = true)
 |-- member_id: integer (nullable = true)
 |-- purpose: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- term: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- annual_income: double (nullable = true)
 |-- dti: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- total_paym

In [0]:
df = df.na.drop()


In [0]:
from pyspark.sql.functions import to_date

df = df.withColumn("issue_date", to_date(df["issue_date"], "dd-MM-yyyy"))
df = df.withColumn("last_credit_pull_date", to_date(df["last_credit_pull_date"], "dd-MM-yyyy"))
df = df.withColumn("last_payment_date", to_date(df["last_payment_date"], "dd-MM-yyyy"))
df = df.withColumn("next_payment_date", to_date(df["next_payment_date"], "dd-MM-yyyy"))

In [0]:
from pyspark.sql.functions import avg
df.groupBy("address_state").agg(avg("loan_amount").alias("avg_loan")).show()


+-------------+------------------+
|address_state|          avg_loan|
+-------------+------------------+
|           AZ|11041.677135678392|
|           SC|10858.484162895928|
|           LA|10648.832923832924|
|           MN| 10553.70855148342|
|           NJ|11852.796706416808|
|           DC|12392.142857142857|
|           OR|10908.851674641148|
|           VA| 11634.41265060241|
|           RI| 9677.617801047121|
|           KY| 10990.90909090909|
|           WY|11275.316455696202|
|           NH|11898.051948051949|
|           MI|11412.481203007519|
|           NV|10916.109913793103|
|           WI| 11271.92671394799|
|           ID|            6950.0|
|           CA|11333.729619565218|
|           CT| 11517.80523255814|
|           NE|            6340.0|
|           MT|10033.219178082192|
+-------------+------------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col
df.select("grade", "int_rate").groupBy("grade").agg(avg("int_rate").alias("avg_int_rate")) \
  .orderBy(col("avg_int_rate").desc()).show()


+-----+-------------------+
|grade|       avg_int_rate|
+-----+-------------------+
|    G|0.21403145695364234|
|    F|0.19734083080040535|
|    E|0.17700876020786938|
|    D| 0.1571120951619351|
|    C|0.13550794067463046|
|    B|0.11027036476868317|
|    A|0.07347008280460453|
+-----+-------------------+



In [0]:
df.groupBy("verification_status").count().show()

+-------------------+-----+
|verification_status|count|
+-------------------+-----+
|           Verified|11937|
|    Source Verified| 9468|
|       Not Verified|15738|
+-------------------+-----+



In [0]:
df.orderBy(col("dti").desc()).select("member_id", "annual_income", "dti").show(10)

+---------+-------------+------+
|member_id|annual_income|   dti|
+---------+-------------+------+
|   998477|      51500.0|0.2999|
|   954624|      44400.0|0.2995|
|  1041264|      52000.0|0.2993|
|   931909|      35004.0|0.2993|
|   914220|      33000.0|0.2993|
|  1044119|      75000.0|0.2992|
|   934905|      34224.0|0.2989|
|  1066306|      20000.0|0.2988|
|   965506|      50000.0|0.2986|
|  1302472|      41000.0|0.2985|
+---------+-------------+------+
only showing top 10 rows



In [0]:
df.filter((col("int_rate") > 0.2) & (col("dti") > 0.4)).show()

+---+-------------+----------------+----------+---------+-----+--------------+----------+---------------------+-----------------+-----------+-----------------+---------+-------+---------+----+-------------------+-------------+---+-----------+--------+-----------+---------+-------------+
| id|address_state|application_type|emp_length|emp_title|grade|home_ownership|issue_date|last_credit_pull_date|last_payment_date|loan_status|next_payment_date|member_id|purpose|sub_grade|term|verification_status|annual_income|dti|installment|int_rate|loan_amount|total_acc|total_payment|
+---+-------------+----------------+----------+---------+-----+--------------+----------+---------------------+-----------------+-----------+-----------------+---------+-------+---------+----+-------------------+-------------+---+-----------+--------+-----------+---------+-------------+
+---+-------------+----------------+----------+---------+-----+--------------+----------+---------------------+-----------------+-------

In [0]:
df = df.withColumn("int_rate", (col("int_rate") * 100).alias("int_rate_percentage"))
df.select("int_rate").show(5)

+------------------+
|          int_rate|
+------------------+
|             15.27|
|             18.64|
|15.959999999999999|
|             10.65|
|              6.03|
+------------------+
only showing top 5 rows



In [0]:
df = df.withColumn("term", regexp_replace(col("term"), " months", "").cast("int"))
df.select("term").distinct().show()

+----+
|term|
+----+
|  60|
|  36|
+----+



In [0]:
df = df.withColumn("loan_to_income", col("loan_amount") / col("annual_income"))
df.select("loan_to_income").show(5)

+-------------------+
|     loan_to_income|
+-------------------+
|0.08333333333333333|
|             0.0625|
|               0.24|
|0.10714285714285714|
|0.04216867469879518|
+-------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import when

df = df.withColumn(
    "risk_score",
    when((col("int_rate") > 15) & (col("dti") > 0.4) & (col("loan_amount") > 50000), "High Risk")
    .when((col("int_rate") > 10) & (col("dti") > 0.3), "Medium Risk")
    .otherwise("Low Risk")
)

df.select("loan_amount", "int_rate", "dti", "risk_score").show(10)


+-----------+------------------+------+----------+
|loan_amount|          int_rate|   dti|risk_score|
+-----------+------------------+------+----------+
|       2500|             15.27|  0.01|  Low Risk|
|       3000|             18.64|0.0535|  Low Risk|
|      12000|15.959999999999999|0.2088|  Low Risk|
|       4500|             10.65| 0.054|  Low Risk|
|       3500|              6.03|0.0231|  Low Risk|
|       8000|14.649999999999999| 0.126|  Low Risk|
|       6000|             14.27|0.2453|  Low Risk|
|       5500|               7.9|0.0665|  Low Risk|
|      24000|               8.9|0.0394|  Low Risk|
|       4125|12.690000000000001|0.1773|  Low Risk|
+-----------+------------------+------+----------+
only showing top 10 rows



In [0]:
df.groupBy("address_state") \
  .agg({"loan_amount": "sum"}) \
  .orderBy(col("sum(loan_amount)").desc()) \
  .show(5)


+-------------+----------------+
|address_state|sum(loan_amount)|
+-------------+----------------+
|           CA|        75074625|
|           NY|        40637625|
|           TX|        29795825|
|           FL|        28942775|
|           NJ|        20872775|
+-------------+----------------+
only showing top 5 rows



In [0]:
df = df.withColumn(
    "emp_length_category",
    when(col("emp_length").like("%10+%"), "10+ Years")
    .when(col("emp_length").like("%< 1%"), "Less than 1 Year")
    .when(col("emp_length").like("%1 year%"), "1 Year")
    .otherwise("2-9 Years")
)

df.select("emp_length", "emp_length_category").distinct().show()


+----------+-------------------+
|emp_length|emp_length_category|
+----------+-------------------+
|   8 years|          2-9 Years|
|   3 years|          2-9 Years|
|  < 1 year|   Less than 1 Year|
|   4 years|          2-9 Years|
|   6 years|          2-9 Years|
| 10+ years|          10+ Years|
|   5 years|          2-9 Years|
|    1 year|             1 Year|
|   9 years|          2-9 Years|
|   2 years|          2-9 Years|
|   7 years|          2-9 Years|
+----------+-------------------+



In [0]:
output_path = "dbfs:/FileStore/Financialdataprocessed.csv"
df.write.format('csv') \
    .option("header", "true") \
    .save(output_path)
