In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace

# Initialize Spark session
spark = SparkSession.builder.appName("LoanDataTransformations").getOrCreate()

# Load loan dataset
loan_df = spark.read.csv("/FileStore/tables/loan.csv", header=True, inferSchema=True)

# Preprocess: Remove commas from "Loan Amount" and "Debt Record" for numerical operations
loan_df = loan_df.withColumn("Loan Amount", regexp_replace(col("Loan Amount"), ",", "").cast("double")) \
                 .withColumn("Debt Record", regexp_replace(col(" Debt Record"), ",", "").cast("double"))

# Register loan DataFrame as a temporary SQL view
loan_df.createOrReplaceTempView("loan")
# Show first few rows
loan_df.show(5)
display(loan_df)


+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+-----------+
|Customer_ID|Age|Gender|  Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|Debt Record|
+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+-----------+
|    IB14001| 30|  MALE|BANK MANAGER|        SINGLE|          4| 50000|      22199|            6|      HOUSING|  1000000.0|      5|      42,898|               6|                 9|    42898.0|
|    IB14008| 44|  MALE|   PROFESSOR|       MARRIED|          6| 51000|      19999|            4|     SHOPPING|    50000.0|      3|      33,999|               1|                 5|    33999.0|
|    IB14012| 30|FEMALE|     DENTIS

Customer_ID,Age,Gender,Occupation,Marital Status,Family Size,Income,Expenditure,Use Frequency,Loan Category,Loan Amount,Overdue,Debt Record,Returned Cheque,Dishonour of Bill,Debt Record.1
IB14001,30,MALE,BANK MANAGER,SINGLE,4,50000.0,22199.0,6,HOUSING,1000000.0,5,42898,6,9,42898.0
IB14008,44,MALE,PROFESSOR,MARRIED,6,51000.0,19999.0,4,SHOPPING,50000.0,3,33999,1,5,33999.0
IB14012,30,FEMALE,DENTIST,SINGLE,3,58450.0,27675.0,5,TRAVELLING,75000.0,6,20876,3,1,20876.0
IB14018,29,MALE,TEACHER,MARRIED,5,45767.0,12787.0,3,GOLD LOAN,600000.0,7,11000,0,4,11000.0
IB14022,34,MALE,POLICE,SINGLE,4,43521.0,11999.0,3,AUTOMOBILE,200000.0,2,43898,1,2,43898.0
IB14024,55,FEMALE,NURSE,MARRIED,6,34999.0,19888.0,4,AUTOMOBILE,47787.0,1,50000,0,3,50000.0
IB14025,39,FEMALE,TEACHER,MARRIED,6,46619.0,18675.0,4,HOUSING,1209867.0,8,29999,6,8,29999.0
IB14027,51,MALE,SYSTEM MANAGER,MARRIED,3,49999.0,19111.0,5,RESTAURANTS,60676.0,8,13000,2,5,13000.0
IB14029,24,FEMALE,TEACHER,SINGLE,3,45008.0,17454.0,4,AUTOMOBILE,399435.0,9,51987,4,7,51987.0
IB14031,37,FEMALE,SOFTWARE ENGINEER,MARRIED,5,55999.0,23999.0,5,AUTOMOBILE,60999.0,2,0,5,3,0.0


In [0]:
# PySpark
filtered_df = loan_df.filter(col("Income") > 50000)
print("Filter: Customers with Income > 50,000 (PySpark)")
filtered_df.show(5)

# SparkSQL
print("Filter: Customers with Income > 50,000 (SparkSQL)")
spark.sql("SELECT * FROM loan WHERE Income > 50000").show(5)

Filter: Customers with Income > 50,000 (PySpark)
+-----------+---+------+-------------------+--------------+-----------+------+-----------+-------------+----------------+-----------+-------+------------+----------------+------------------+-----------+
|Customer_ID|Age|Gender|         Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|   Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|Debt Record|
+-----------+---+------+-------------------+--------------+-----------+------+-----------+-------------+----------------+-----------+-------+------------+----------------+------------------+-----------+
|    IB14008| 44|  MALE|          PROFESSOR|       MARRIED|          6| 51000|      19999|            4|        SHOPPING|    50000.0|      3|      33,999|               1|                 5|    33999.0|
|    IB14012| 30|FEMALE|            DENTIST|        SINGLE|          3| 58450|      27675|            5|      TRAVELLING|    75000.0|      

In [0]:
# 1. SUM (Total Loan Amount) in pyspark
loan_df.agg({"Loan Amount": "sum"}).show()
# 1. SUM (Total Loan Amount) in sparksql
print("SUM of Loan Amount (SparkSQL):")
spark.sql("SELECT SUM(`Loan Amount`) AS `Total Loan Amount` FROM loan").show()

+----------------+
|sum(Loan Amount)|
+----------------+
|    3.98526449E8|
+----------------+

SUM of Loan Amount (SparkSQL):
+-----------------+
|Total Loan Amount|
+-----------------+
|     3.98526449E8|
+-----------------+



In [0]:
# 2. AVG (Average Loan Amount) in pyspark
loan_df.agg({"Loan Amount": "avg"}).show()
# 2. AVG (Average Loan Amount) in sparksql
print("AVG of Loan Amount (SparkSQL):")
spark.sql("SELECT AVG(`Loan Amount`) AS `Average Loan Amount` FROM loan").show()


+----------------+
|avg(Loan Amount)|
+----------------+
|      797052.898|
+----------------+

AVG of Loan Amount (SparkSQL):
+-------------------+
|Average Loan Amount|
+-------------------+
|         797052.898|
+-------------------+



In [0]:
# 3. MIN (Minimum Loan Amount) in pyspark
loan_df.agg({"Loan Amount": "min"}).show()
# 3. MIN (Minimum Loan Amount) in sparksql
print("MIN of Loan Amount (SparkSQL):")
spark.sql("SELECT MIN(`Loan Amount`) AS `Min Loan Amount` FROM loan").show()

+----------------+
|min(Loan Amount)|
+----------------+
|         15350.0|
+----------------+

MIN of Loan Amount (SparkSQL):
+---------------+
|Min Loan Amount|
+---------------+
|        15350.0|
+---------------+



In [0]:
# 4. MAX (Maximum Loan Amount) in pysaprk
loan_df.agg({"Loan Amount": "max"}).show()
# 4. MAX (Maximum Loan Amount) in sparksql
print("MAX of Loan Amount (SparkSQL):")
spark.sql("SELECT MAX(`Loan Amount`) AS `Max Loan Amount` FROM loan").show()

+----------------+
|max(Loan Amount)|
+----------------+
|       7541236.0|
+----------------+

MAX of Loan Amount (SparkSQL):
+---------------+
|Max Loan Amount|
+---------------+
|      7541236.0|
+---------------+



In [0]:
# 5. COUNT (Number of Records) in pyspark
loan_df.agg({"Loan Amount": "count"}).show()
# 5. COUNT (Number of Records) in sparksql
print("COUNT of Records (SparkSQL):")
spark.sql("SELECT COUNT(`Loan Amount`) AS `Record Count` FROM loan").show()

+------------------+
|count(Loan Amount)|
+------------------+
|               500|
+------------------+

COUNT of Records (SparkSQL):
+------------+
|Record Count|
+------------+
|         500|
+------------+



In [0]:
# 6. MEAN (Mean Loan Amount) in pyspark
loan_df.agg({"Loan Amount": "mean"}).show()
# 6. MEAN (Mean Loan Amount) in sparksql
print("MEAN of Loan Amount (SparkSQL):")
spark.sql("SELECT AVG(`Loan Amount`) AS `Mean Loan Amount` FROM loan").show()

+----------------+
|avg(Loan Amount)|
+----------------+
|      797052.898|
+----------------+

MEAN of Loan Amount (SparkSQL):
+----------------+
|Mean Loan Amount|
+----------------+
|      797052.898|
+----------------+



In [0]:
# 7. AGG (Multiple Aggregations) in pyspark
from pyspark.sql import functions as F

# Assuming 'loan_df' is the DataFrame
loan_df.agg(
    F.sum("Loan Amount").alias("Total Loan Amount"),
    F.avg("Loan Amount").alias("Average Loan Amount"),
    F.min("Loan Amount").alias("Min Loan Amount"),
    F.max("Loan Amount").alias("Max Loan Amount"),
    F.count("Loan Amount").alias("Loan Count")
).show()

# 7. AGG (Multiple Aggregations) in sparksql
print("Multiple Aggregations (SparkSQL):")
spark.sql("""
    SELECT 
        SUM(`Loan Amount`) AS `Total Loan Amount`,
        AVG(`Loan Amount`) AS `Average Loan Amount`,
        MIN(`Loan Amount`) AS `Min Loan Amount`,
        MAX(`Loan Amount`) AS `Max Loan Amount`,
        COUNT(`Loan Amount`) AS `Record Count`
    FROM loan
""").show()

+-----------------+-------------------+---------------+---------------+----------+
|Total Loan Amount|Average Loan Amount|Min Loan Amount|Max Loan Amount|Loan Count|
+-----------------+-------------------+---------------+---------------+----------+
|     3.98526449E8|         797052.898|        15350.0|      7541236.0|       500|
+-----------------+-------------------+---------------+---------------+----------+

Multiple Aggregations (SparkSQL):
+-----------------+-------------------+---------------+---------------+------------+
|Total Loan Amount|Average Loan Amount|Min Loan Amount|Max Loan Amount|Record Count|
+-----------------+-------------------+---------------+---------------+------------+
|     3.98526449E8|         797052.898|        15350.0|      7541236.0|         500|
+-----------------+-------------------+---------------+---------------+------------+



In [0]:
# Group by Loan Category and sum Loan Amount using Pyspark
loan_df.groupBy("Loan Category").sum("Loan Amount").show()

# Group by Loan Category and sum Loan Amount using Spark SQL
print("Group By Loan Category and Sum Loan Amount (SparkSQL):")
spark.sql("""
    SELECT 
        `Loan Category`,
        SUM(`Loan Amount`) AS `Total Loan Amount`
    FROM loan
    GROUP BY `Loan Category`
""").show()



+------------------+----------------+
|     Loan Category|sum(Loan Amount)|
+------------------+----------------+
|           HOUSING|     6.0346129E7|
|        TRAVELLING|     3.6608973E7|
|       BOOK STORES|       3681651.0|
|       AGRICULTURE|     1.1590221E7|
|         GOLD LOAN|     7.0991425E7|
|  EDUCATIONAL LOAN|     1.8394223E7|
|        AUTOMOBILE|     5.6542964E7|
|          BUSINESS|     2.3368358E7|
|COMPUTER SOFTWARES|     3.4810861E7|
|           DINNING|       9167850.0|
|          SHOPPING|     1.5645414E7|
|       RESTAURANTS|     2.5006754E7|
|       ELECTRONICS|       8970419.0|
|          BUILDING|       3792037.0|
|        RESTAURANT|     1.1729243E7|
|   HOME APPLIANCES|       7879927.0|
+------------------+----------------+

Group By Loan Category and Sum Loan Amount (SparkSQL):
+------------------+-----------------+
|     Loan Category|Total Loan Amount|
+------------------+-----------------+
|           HOUSING|      6.0346129E7|
|        TRAVELLING|      3.

In [0]:
# JOINS (Sample Dataset Preparation)
# Create a sample DataFrame for risk levels
risk_data = [
    ("HOUSING", "High Risk"),
    ("SHOPPING", "Low Risk"),
    ("TRAVELLING", "Medium Risk"),
    ("GOLD LOAN", "Medium Risk"),
    ("AUTOMOBILE", "High Risk"),
]
risk_df = spark.createDataFrame(risk_data, ["Loan Category", "Risk Level"])

# Register risk DataFrame as a temporary SQL view
risk_df.createOrReplaceTempView("risk")

# PySpark Joins
print("Inner Join (PySpark)")
loan_df.join(risk_df, on="Loan Category", how="inner").show(5)

# SparkSQL Joins
print("Inner Join (SparkSQL)")
spark.sql("""
    SELECT l.*, r.`Risk Level`
    FROM loan l
    JOIN risk r
    ON l.`Loan Category` = r.`Loan Category`
""").show(5)

Inner Join (PySpark)
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+-----------+---------------+-----------------+-----------+----------+
|Loan Category|Customer_ID|Age|Gender|          Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Amount|Overdue|Debt Record|Returned Cheque|Dishonour of Bill|Debt Record|Risk Level|
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+-----------+---------------+-----------------+-----------+----------+
|      HOUSING|    IB15107| 46|FEMALE| CHARTERED APPRAISER|       MARRIED|          2| 85088|      44256|            5|   347254.0|      9|      37966|              1|                5|    37966.0| High Risk|
|      HOUSING|    IB15095| 53|FEMALE|   SOFTWARE ENGINEER|        SINGLE|          4| 75210|      26027|            7|   740375.0|      7|    

In [0]:
print("Outer Join (PySpark)")
loan_df.join(risk_df, on="Loan Category", how="outer").show(5)

print("Outer Join (SparkSQL)")
spark.sql("""
    SELECT l.*, r.`Risk Level`
    FROM loan l
    FULL OUTER JOIN risk r
    ON l.`Loan Category` = r.`Loan Category`
""").show(5)

Outer Join (PySpark)
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+-----------+---------------+-----------------+-----------+----------+
|Loan Category|Customer_ID|Age|Gender|          Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Amount|Overdue|Debt Record|Returned Cheque|Dishonour of Bill|Debt Record|Risk Level|
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+-----------+---------------+-----------------+-----------+----------+
|  AGRICULTURE|    IB14589| 56|  MALE|              FARMER|        SINGLE|          3| 51162|      12591|            9|   104758.0|      3|      75535|              2|                1|    75535.0|      null|
|  AGRICULTURE|    IB14604| 52|  MALE|          ACCOUNTANT|        SINGLE|          7| 46086|      27201|            9|  1682942.0|      7|    

In [0]:
print("Left Join (PySpark)")
loan_df.join(risk_df, on="Loan Category", how="left").show(5)

print("Left Join (SparkSQL)")
spark.sql("""
    SELECT l.*, r.`Risk Level`
    FROM loan l
    LEFT JOIN risk r
    ON l.`Loan Category` = r.`Loan Category`
""").show(5)

Left Join (PySpark)
+-------------+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-----------+-------+-----------+---------------+-----------------+-----------+-----------+
|Loan Category|Customer_ID|Age|Gender|  Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Amount|Overdue|Debt Record|Returned Cheque|Dishonour of Bill|Debt Record| Risk Level|
+-------------+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-----------+-------+-----------+---------------+-----------------+-----------+-----------+
|      HOUSING|    IB14001| 30|  MALE|BANK MANAGER|        SINGLE|          4| 50000|      22199|            6|  1000000.0|      5|     42,898|              6|                9|    42898.0|  High Risk|
|     SHOPPING|    IB14008| 44|  MALE|   PROFESSOR|       MARRIED|          6| 51000|      19999|            4|    50000.0|      3|     33,999|              1|             

In [0]:
print("Right Join (PySpark)")
loan_df.join(risk_df, on="Loan Category", how="right").show(5)

print("Right Join (SparkSQL)")
spark.sql("""
    SELECT l.*, r.`Risk Level`
    FROM loan l
    RIGHT JOIN risk r
    ON l.`Loan Category` = r.`Loan Category`
""").show(5)

Right Join (PySpark)
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+-----------+---------------+-----------------+-----------+----------+
|Loan Category|Customer_ID|Age|Gender|          Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Amount|Overdue|Debt Record|Returned Cheque|Dishonour of Bill|Debt Record|Risk Level|
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+-----------+---------------+-----------------+-----------+----------+
|      HOUSING|    IB15107| 46|FEMALE| CHARTERED APPRAISER|       MARRIED|          2| 85088|      44256|            5|   347254.0|      9|      37966|              1|                5|    37966.0| High Risk|
|      HOUSING|    IB15095| 53|FEMALE|   SOFTWARE ENGINEER|        SINGLE|          4| 75210|      26027|            7|   740375.0|      7|    