In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('coding_challenge').getOrCreate()
loan = spark.read.csv("/FileStore/tables/loan.csv", header=True, inferSchema= True)   #original table name = loan_csv
 
loan.createOrReplaceTempView("loan")

In [0]:
#Filter with pyspark and spark sql

#pyspark
onelakh_loan= loan.filter(loan["Loan Amount"] > '100000').count()
print('people who took loan greather than 1 lakh (pyspark): ', onelakh_loan)

#spark sql
print('\npeople who took loan greather than 1 lakh (spark sql): ')
spark.sql("select count(*) as Count from loan where `Loan Amount` >'100000'").show()

people who took loan greather than 1 lakh (pyspark):  279

people who took loan greather than 1 lakh (spark sql): 
+-----+
|Count|
+-----+
|  279|
+-----+



In [0]:
 from pyspark.sql.functions import sum,max,min,avg,count, mean, count

# Calculate total income amount
income = loan.agg(sum("Income").alias("total_income"))
print("Total income (pyspark): ",income.show())

print("\nTotal income (spark sql):")
spark.sql("select sum(Income)  from loan").show()
 



+------------+
|total_income|
+------------+
|    31982882|
+------------+

Total income (pyspark):  None

Total income (spark sql):
+-----------+
|sum(Income)|
+-----------+
|   31982882|
+-----------+



In [0]:
#max income
income = loan.agg(max("Income").alias("max_income"))
print("Max income (pyspark): ",income.show())

print("\nMax income (spark sql):")
spark.sql("select max(Income)  from loan").show()

+----------+
|max_income|
+----------+
|    930000|
+----------+

Max income (pyspark):  None

Max income (spark sql):
+-----------+
|max(Income)|
+-----------+
|     930000|
+-----------+



In [0]:
#min income
income = loan.agg(min("Income").alias("min_income"))
print("Minincome (pyspark): ",income.show())

print("\nMin income (spark sql):")
spark.sql("select min(Income)  from loan").show()

+----------+
|min_income|
+----------+
|     28366|
+----------+

Minincome (pyspark):  None

Min income (spark sql):
+-----------+
|min(Income)|
+-----------+
|      28366|
+-----------+



In [0]:
#avg income
income = loan.agg(avg("Income").alias("avg_incomet"))
print("avg income (pyspark): ",income.show())

print("\nAvg income (spark sql):")
spark.sql("select avg(Income)  from loan").show()

+-----------------+
|      avg_incomet|
+-----------------+
|68339.49145299145|
+-----------------+

avg income (pyspark):  None

Avg income (spark sql):
+-----------------+
|      avg(Income)|
+-----------------+
|68339.49145299145|
+-----------------+



In [0]:
#mean income
income = loan.agg(mean("Income").alias("Mean income"))
print("Mean income (pyspark): ",income.show())

print("\nMean income (spark sql)")
spark.sql("select mean(Income)  from loan").show()

+-----------------+
|      Mean income|
+-----------------+
|68339.49145299145|
+-----------------+

Mean income (pyspark):  None

Mean income (spark sql)
+-----------------+
|     mean(Income)|
+-----------------+
|68339.49145299145|
+-----------------+



In [0]:
from pyspark.sql.functions import count

# Count the number of non-null entries in the 'Income' column
income_count = loan.agg(count("Income").alias("Income_Count"))

print("Count of Income (PySpark):")
income_count.show()

print("\nCount of Income (Spark SQL):")
spark.sql("SELECT count(Income) AS Income_Count FROM loan").show()


Count of Income (PySpark):
+------------+
|Income_Count|
+------------+
|         468|
+------------+


Count of Income (Spark SQL):
+------------+
|Income_Count|
+------------+
|         468|
+------------+



In [0]:
#Group By
a=loan.groupBy("Loan Category").min("Income")
print('Minimun income in each loan category(py spark):', a.show())

spark.sql("select `Loan Category`, min(Income) as min_income from loan GROUP BY `Loan Category`").show()

+------------------+-----------+
|     Loan Category|min(Income)|
+------------------+-----------+
|           HOUSING|      29565|
|        TRAVELLING|      30000|
|       BOOK STORES|      34275|
|       AGRICULTURE|      32571|
|         GOLD LOAN|      28366|
|  EDUCATIONAL LOAN|      35247|
|        AUTOMOBILE|      30000|
|          BUSINESS|      40883|
|COMPUTER SOFTWARES|      31747|
|           DINNING|      33312|
|          SHOPPING|      30454|
|       RESTAURANTS|      30000|
|       ELECTRONICS|      34836|
|          BUILDING|      48613|
|        RESTAURANT|      35735|
|   HOME APPLIANCES|      45389|
+------------------+-----------+

Minimun income in each loan category(py spark): None
+------------------+----------+
|     Loan Category|min_income|
+------------------+----------+
|           HOUSING|     29565|
|        TRAVELLING|     30000|
|       BOOK STORES|     34275|
|       AGRICULTURE|     32571|
|         GOLD LOAN|     28366|
|  EDUCATIONAL LOAN|     35247

In [0]:
#joins
risk_data = [
    ("HOUSING", "High Risk"),
    ("SHOPPING", "Low Risk"),
    ("TRAVELLING", "Medium Risk"),
    ("GOLD LOAN", "Medium Risk"),
    ("AUTOMOBILE", "High Risk"),
]
risk_df = spark.createDataFrame(risk_data, ["Loan Category", "Risk Level"])

# Register risk DataFrame as a temporary SQL view
risk_df.createOrReplaceTempView("risk")

In [0]:
#Inner join
print("Inner Join (PySpark)")
loan.join(risk_df, on="Loan Category", how="inner").display(5)

# SparkSQL Joins
print("Inner Join (SparkSQL)")
spark.sql("""
    SELECT l.*, r.`Risk Level`
    FROM loan l
    JOIN risk r
    ON l.`Loan Category` = r.`Loan Category`
""").show(5)

Inner Join (PySpark)


Loan Category,Customer_ID,Age,Gender,Occupation,Marital Status,Family Size,Income,Expenditure,Use Frequency,Loan Amount,Overdue,Debt Record,Returned Cheque,Dishonour of Bill,Risk Level
HOUSING,IB15107,46,FEMALE,CHARTERED APPRAISER,MARRIED,2,85088.0,44256.0,5,347254,9,37966,1,5,High Risk
HOUSING,IB15095,53,FEMALE,SOFTWARE ENGINEER,SINGLE,4,75210.0,26027.0,7,740375,7,35419,2,4,High Risk
HOUSING,IB15091,42,MALE,AGRICULTURAL ENGINEER,MARRIED,2,66610.0,35744.0,4,582943,8,28192,0,8,High Risk
HOUSING,IB15084,36,FEMALE,CLERK,MARRIED,4,38000.0,19000.0,3,300000,2,5600,4,8,High Risk
HOUSING,IB15024,26,MALE,DIETICIAN,SINGLE,3,95425.0,53086.0,2,488076,4,61227,5,2,High Risk
HOUSING,IB14993,47,FEMALE,CLERK,MARRIED,3,41182.0,22773.0,6,855157,3,41755,0,6,High Risk
HOUSING,IB14991,52,MALE,PILOT,SINGLE,7,95853.0,49629.0,4,1392577,6,25269,4,8,High Risk
HOUSING,IB14974,33,FEMALE,BUSINESS,MARRIED,3,85562.0,49473.0,5,1178327,8,18187,5,3,High Risk
HOUSING,IB14971,38,FEMALE,TEACHER,MARRIED,5,52815.0,22751.0,9,605610,9,47828,4,4,High Risk
HOUSING,IB14947,32,FEMALE,DIETICIAN,SINGLE,3,50060.0,28989.0,6,600085,6,45436,4,9,High Risk


Inner Join (SparkSQL)
+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+----------+
|Customer_ID|Age|Gender|          Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|Risk Level|
+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+----------+
|    IB15107| 46|FEMALE| CHARTERED APPRAISER|       MARRIED|          2| 85088|      44256|            5|      HOUSING|    347,254|      9|       37966|               1|                 5| High Risk|
|    IB15095| 53|FEMALE|   SOFTWARE ENGINEER|        SINGLE|          4| 75210|      26027|            7|      HOUSING|    740,375|      7|       35419|               2|         

In [0]:
#outer join
print("Outer Join (PySpark)")
loan.join(risk_df, on="Loan Category", how="outer").show(5)

print("Outer Join (SparkSQL)")
spark.sql("""
    SELECT l.*, r.`Risk Level`
    FROM loan l
    FULL OUTER JOIN risk r
    ON l.`Loan Category` = r.`Loan Category`
""").show(5)

Outer Join (PySpark)
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+------------+----------------+------------------+----------+
|Loan Category|Customer_ID|Age|Gender|          Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|Risk Level|
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+------------+----------------+------------------+----------+
|  AGRICULTURE|    IB14589| 56|  MALE|              FARMER|        SINGLE|          3| 51162|      12591|            9|    104,758|      3|       75535|               2|                 1|      null|
|  AGRICULTURE|    IB14604| 52|  MALE|          ACCOUNTANT|        SINGLE|          7| 46086|      27201|            9|  1,682,942|      7|       29654|               5|          

In [0]:
print("Left Join (PySpark)")
loan.join(risk_df, on="Loan Category", how="left").show(5)

print("Left Join (SparkSQL)")
spark.sql("""
    SELECT l.*, r.`Risk Level`
    FROM loan l
    LEFT JOIN risk r
    ON l.`Loan Category` = r.`Loan Category`
""").show(5)

Left Join (PySpark)
+-------------+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-----------+-------+------------+----------------+------------------+-----------+
|Loan Category|Customer_ID|Age|Gender|  Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill| Risk Level|
+-------------+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-----------+-------+------------+----------------+------------------+-----------+
|      HOUSING|    IB14001| 30|  MALE|BANK MANAGER|        SINGLE|          4| 50000|      22199|            6| 10,00,000 |      5|      42,898|               6|                 9|  High Risk|
|     SHOPPING|    IB14008| 44|  MALE|   PROFESSOR|       MARRIED|          6| 51000|      19999|            4|     50,000|      3|      33,999|               1|                 5|   Low Risk|
|   TRAVELLING|

In [0]:
print("Right Join (PySpark)")
loan.join(risk_df, on="Loan Category", how="right").show(5)

print("Right Join (SparkSQL)")
spark.sql("""
    SELECT l.*, r.`Risk Level`
    FROM loan l
    RIGHT JOIN risk r
    ON l.`Loan Category` = r.`Loan Category`
""").show(5)

Right Join (PySpark)
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+------------+----------------+------------------+----------+
|Loan Category|Customer_ID|Age|Gender|          Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|Risk Level|
+-------------+-----------+---+------+--------------------+--------------+-----------+------+-----------+-------------+-----------+-------+------------+----------------+------------------+----------+
|      HOUSING|    IB15107| 46|FEMALE| CHARTERED APPRAISER|       MARRIED|          2| 85088|      44256|            5|    347,254|      9|       37966|               1|                 5| High Risk|
|      HOUSING|    IB15095| 53|FEMALE|   SOFTWARE ENGINEER|        SINGLE|          4| 75210|      26027|            7|    740,375|      7|       35419|               2|          