In [1]:
#import the necessary libs

from pyspark.sql import SparkSession
import getpass

username=getpass.getuser()
spark=SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.suffle.useOldFetchProtocol",'true'). \
    config("spark.dql.warehouse.dir",f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
loans_raw_df=spark.read \
    .format("csv") \
    .option("inferSchema", "True") \
    .option("header", "True") \
    .load("/user/itv014478/lendingclubproject/raw/loans_data_csv")

In [3]:
loans_raw_df.show()

+--------+--------------------+---------+-----------+---------+--------+-----------+--------+-----------+------------------+--------------------+
| loan_id|           member_id|loan_amnt|funded_amnt|     term|int_rate|installment| issue_d|loan_status|           purpose|               title|
+--------+--------------------+---------+-----------+---------+--------+-----------+--------+-----------+------------------+--------------------+
|68407277|6d5091b3fcaaeb4ea...|   3600.0|     3600.0|36 months|   13.99|     123.03|Dec-2015| Fully Paid|debt_consolidation|  Debt consolidation|
|68355089|b5e7938b0a2da4cea...|  24700.0|    24700.0|36 months|   11.99|     820.28|Dec-2015| Fully Paid|    small_business|            Business|
|68341763|91060b858433e8a61...|  20000.0|    20000.0|60 months|   10.78|     432.66|Dec-2015| Fully Paid|  home_improvement|                null|
|66310712|cab1fa9f533688b0a...|  35000.0|    35000.0|60 months|   14.85|      829.9|Dec-2015|    Current|debt_consolidation|

In [4]:
loans_raw_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amnt: double (nullable = true)
 |-- funded_amnt: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- issue_d: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- title: string (nullable = true)



In [5]:
loan_schema='loan_id string  ,member_id  string  ,loan_amount  float  ,funded_amount  float  ,loan_term_months  string,interest_rate  float  ,monthly_installment  float  ,issue_ddate  string  ,loan_status  string  ,loan_purpose  string  ,loan_title  string '

In [6]:
loans_raw_df=spark.read \
    .format("csv") \
    .option("inferSchema", "True") \
    .schema(loan_schema) \
    .load("/user/itv014478/lendingclubproject/raw/loans_data_csv")

In [7]:
loans_raw_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amount: float (nullable = true)
 |-- funded_amount: float (nullable = true)
 |-- loan_term_months: string (nullable = true)
 |-- interest_rate: float (nullable = true)
 |-- monthly_installment: float (nullable = true)
 |-- issue_ddate: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_purpose: string (nullable = true)
 |-- loan_title: string (nullable = true)



In [8]:
loans_raw_df.createOrReplaceTempView('loans')

In [9]:
spark.sql("SELECT COUNT(*) FROM loans")

count(1)
462495


In [10]:
spark.sql("SELECT * FROM loans WHERE loan_amount is null")

loan_id,member_id,loan_amount,funded_amount,loan_term_months,interest_rate,monthly_installment,issue_ddate,loan_status,loan_purpose,loan_title
loan_id,member_id,,,term,,,issue_d,loan_status,purpose,title
Total amount fund...,e3b0c44298fc1c149...,,,,,,,,,
Total amount fund...,e3b0c44298fc1c149...,,,,,,,,,


In [11]:
col_to_check=["loan_amount",	"funded_amount"	,"loan_term_months"	,"interest_rate",	"monthly_installment",	"issue_ddate"	,"loan_status"	,"loan_purpose"	]
loans_filterd_df=loans_raw_df.na.drop(subset=col_to_check)

In [12]:
loans_filterd_df.count()

462491

In [13]:
loans_filterd_df.createOrReplaceTempView("loans")

In [14]:
from pyspark.sql.functions import regexp_replace, col


In [15]:
loan_term_modified=loans_filterd_df.withColumn("loan_term_months", (regexp_replace(col("loan_term_months")," months","") \
                                                 .cast("int")/12).cast("int")) \
.withColumnRenamed("loan_term_months","loan_term_years")

In [16]:
loan_term_modified.createOrReplaceTempView("loans")

In [17]:
spark.sql("SELECT DISTINCT(loan_purpose) FROM loans")

loan_purpose
wedding
educational
other
small_business
debt_consolidation
credit_card
moving
vacation
renewable_energy
house


In [18]:
spark.sql("SELECT loan_purpose,count(*)AS total FROM loans GROUP BY loan_purpose ORDER BY total DESC")

loan_purpose,total
debt_consolidation,270942
credit_card,111282
home_improvement,28448
other,22983
major_purchase,8724
medical,4615
car,4041
small_business,3835
moving,2704
vacation,2570


In [19]:
loan_purpose_lookup = ["debt_consolidation", "credit_card", "home_improvement", "other", "major_purchase", "medical", "small_business", "car", "vacation", "moving", "house", "wedding", "renewable_energy", "educational"]

In [20]:
from pyspark.sql.functions import when

In [22]:
loans_purpose_modified = loan_term_modified.withColumn("loan_purpose", when(col("loan_purpose").isin(loan_purpose_lookup), col("loan_purpose")).otherwise("other"))

In [23]:
loans_purpose_modified.createOrReplaceTempView("loans")

In [24]:
spark.sql("select loan_purpose, count(*) as total from loans group by loan_purpose order by total desc")

loan_purpose,total
debt_consolidation,270942
credit_card,111282
home_improvement,28448
other,22983
major_purchase,8724
medical,4615
car,4041
small_business,3835
moving,2704
vacation,2570


In [25]:
from pyspark.sql.functions import count

In [26]:
loans_purpose_modified.groupBy("loan_purpose").agg(count("*").alias("total")).orderBy(col("total").desc())

loan_purpose,total
debt_consolidation,270942
credit_card,111282
home_improvement,28448
other,22983
major_purchase,8724
medical,4615
car,4041
small_business,3835
moving,2704
vacation,2570


In [27]:
loans_purpose_modified.write \
.format("parquet") \
.mode("overwrite") \
.option("path", "/user/itv014478/lendingclubproject/cleaned//loans_parquet") \
.save()

In [28]:
loans_purpose_modified.write \
.format("csv") \
.mode("overwrite") \
.option("path", "/user/itv014478/lendingclubproject/cleaned/loans_csv") \
.save()