<a href="https://colab.research.google.com/github/davisdw/Lending_Tree_Loan_Prediction_Analysis/blob/main/pyspark_data_load.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'

import os

spark_version = 'spark-3.5.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
%pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

import findspark
findspark.init()

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connected to cloud.r-project.org (108.157.162.103)] [Connecting to ppa.lau                                                                                                    Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:4 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Get:10 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204

In [2]:
# Mounted Google Drive to access Data Source *DO NOT RERUN*
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import pyspark
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

Import accepted dataset, cache temp table and partitioned/export the data into CSV

In [4]:
from pyspark import SparkFiles

url = "/content/drive/MyDrive/Lending_Club_Data_Source/accepted_2007_to_2018Q4.csv"

spark.sparkContext.addFile(url)
loan_accept_df = spark.read.csv(SparkFiles.get("accepted_2007_to_2018Q4.csv"), sep=",", header=True)

# Display the Data
loan_accept_df.show()

+--------+---------+---------+-----------+---------------+----------+--------+-----------+-----+---------+--------------------+----------+--------------+----------+-------------------+--------+-----------+----------+--------------------+----+------------------+--------------------+--------+----------+-----+-----------+----------------+--------------+---------------+--------------+----------------------+----------------------+--------+-------+---------+----------+---------+-------------------+---------+-------------+------------------+---------------+---------------+-------------+------------------+----------+-----------------------+------------+---------------+------------+------------------+--------------------+-------------------+--------------------------+---------------------------+-----------+----------------+----------------+---------+-------------------------+--------------+------------+-----------+-----------+-----------+-----------+-----------+------------------+------------+-

In [5]:
# Create Temp View Tables
loan_accept_df.createOrReplaceTempView("accepted")

In [6]:
# Cache the accepted dataset table
spark.sql("cache table accepted")

DataFrame[]

In [18]:
# Review the accepted table to determine it's use of columns
# Tables that we're keeping -->>:loan_amnt, int_rate, emp_title, emp_length,
# home_ownership, annual_inc, issue_d, loan_status, purpose, addr_state, dti,fico_range_low, fico_range_high

accept_df = spark.sql( """ SELECT
        loan_amnt,
        int_rate,
        emp_title,
        emp_length,
        home_ownership,
        annual_inc,
        issue_d,
        loan_status,
        purpose,
        addr_state,
        dti,fico_range_low,
        fico_range_high
        FROM accepted
        ORDER BY issue_d DESC
    """)
accept_df.show()


+---------+--------+--------------------+----------+--------------+----------+---------------+-----------+------------------+----------+-----+--------------+---------------+
|loan_amnt|int_rate|           emp_title|emp_length|home_ownership|annual_inc|        issue_d|loan_status|           purpose|addr_state|  dti|fico_range_low|fico_range_high|
+---------+--------+--------------------+----------+--------------+----------+---------------+-----------+------------------+----------+-----+--------------+---------------+
|  10500.0|   10.99|  "Coil Winder ""B""| reactors"|       2 years|  MORTGAGE|Source Verified|   Oct-2015|              NULL|     531xx|   WI|      Jan-2002|          710.0|
|  10000.0|    6.67|Customer Service Rep| 10+ years|      MORTGAGE|   55000.0|       Sep-2018|    Current|       credit_card|        NJ|19.22|         705.0|          709.0|
|  14000.0|   10.08|  Associate director| 10+ years|      MORTGAGE|  191000.0|       Sep-2018|    Current|debt_consolidation|     

In [19]:
# Partitioning into smaller datasets by year and output the data into multiple folders
# Saved into my Google Drive
accept_df.write.partitionBy("issue_d").mode("overwrite")\
.csv("/content/drive/MyDrive/Lending_Club_Data_Source/accepted_partition_csv")


In [20]:
#Test by reading the partioned data
readPartitionData = spark.read.option("header", True)\
.csv("/content/drive/MyDrive/Lending_Club_Data_Source/accepted_partition_csv/issue_d=Apr-2008")
readPartitionData.printSchema()
readPartitionData.show()

root
 |-- 7500.0: string (nullable = true)
 |-- 8.0: string (nullable = true)
 |-- JEK: string (nullable = true)
 |-- 7 years: string (nullable = true)
 |-- MORTGAGE: string (nullable = true)
 |-- 45000.0: string (nullable = true)
 |-- Fully Paid: string (nullable = true)
 |-- other: string (nullable = true)
 |-- KS: string (nullable = true)
 |-- 7.17: string (nullable = true)
 |-- 795.0: string (nullable = true)
 |-- 799.0: string (nullable = true)

+-------+-----+--------------------+---------+--------+--------+-----------+------------------+---+-----+-----+-----+
| 7500.0|  8.0|                 JEK|  7 years|MORTGAGE| 45000.0| Fully Paid|             other| KS| 7.17|795.0|799.0|
+-------+-----+--------------------+---------+--------+--------+-----------+------------------+---+-----+-----+-----+
|25000.0|13.55|Catherine Reitmye...|   1 year|MORTGAGE| 64000.0|Charged Off|    major_purchase| CT| 13.5|695.0|699.0|
| 6700.0|12.92|                NULL| < 1 year|    RENT| 19368.0| Fully Pa

In [22]:
# Read the entire csv partition collectively
partition_accepted_df = spark.read.format("csv").option("header", True)\
.load("/content/drive/MyDrive/Lending_Club_Data_Source/accepted_partition_csv")

In [23]:
# Display the partitions csv
partition_accepted_df.show()

+-------+-----+--------------------+---------+--------+--------+-----------+------------------+---+-----+-----+-----+--------+
| 5000.0|17.27|          Bus driver|10+ years|    RENT| 62000.0| Fully Paid|       credit_card| WI|10.28|660.0|664.0| issue_d|
+-------+-----+--------------------+---------+--------+--------+-----------+------------------+---+-----+-----+-----+--------+
|22000.0| 6.49| Key Account Manager|10+ years|MORTGAGE|134000.0|    Current|debt_consolidation| FL|26.33|715.0|719.0|Mar-2016|
|30000.0|10.75|             Manager|  2 years|MORTGAGE|125000.0| Fully Paid|debt_consolidation| TX|22.14|730.0|734.0|Mar-2016|
|10000.0|16.29|           Front end|  3 years|    RENT| 40000.0|Charged Off|debt_consolidation| NY|12.42|715.0|719.0|Mar-2016|
|12000.0| 9.75|     Project Manager| < 1 year|    RENT|120000.0|    Current|debt_consolidation| VA|11.46|730.0|734.0|Mar-2016|
|30000.0|12.99|               sales|  9 years|    RENT| 70000.0|    Current|debt_consolidation| WI|19.44|700.0|

In [24]:
#Un-cached the accepted table
spark.sql("uncache table accepted")

DataFrame[]

In [25]:
# Check to see if the table is cached
if spark.catalog.isCached('accepted'):
  print('accepted dataset table is cached')
else:
  print('accepted dataset table is not there')

accepted dataset table is not there


Import rejected dataset, cache temp table and partitioned/export the data into CSV

In [26]:
# Read in the loan rejection data csv

url_2 = "/content/drive/MyDrive/Lending_Club_Data_Source/rejected_2007_to_2018Q4.csv"

spark.sparkContext.addFile(url_2)
loan_reject_df = spark.read.csv(SparkFiles.get("rejected_2007_to_2018Q4.csv"), sep=',', header=True)

loan_reject_df.show()

+-------------+----------+--------------------+----------+------+--------+-----+-----------------+-----------+
|amt_requested|      date|             purpose|risk_score|   dti|zip_code|state|employment_length|policy_code|
+-------------+----------+--------------------+----------+------+--------+-----+-----------------+-----------+
|       1000.0|2007-05-26|Wedding Covered b...|     693.0|   10%|   481xx|   NM|          4 years|        0.0|
|       1000.0|2007-05-26|  Consolidating Debt|     703.0|   10%|   010xx|   MA|         < 1 year|        0.0|
|      11000.0|2007-05-27|Want to consolida...|     715.0|   10%|   212xx|   MD|           1 year|        0.0|
|       6000.0|2007-05-27|             waksman|     698.0|38.64%|   017xx|   MA|         < 1 year|        0.0|
|       1500.0|2007-05-27|              mdrigo|     509.0| 9.43%|   209xx|   MD|         < 1 year|        0.0|
|      15000.0|2007-05-27|          Trinfiniti|     645.0|    0%|   105xx|   NY|          3 years|        0.0|
|

In [27]:
loan_reject_df.createOrReplaceTempView("rejected")

In [28]:
# Cache the Rejected dataset table
spark.sql("cache table rejected")

DataFrame[]

In [29]:
# Review the rejected table to determine it's use of columns
# Tables that we're keeping -->>: amt_requested|date|purpose|risk_score|dti|zip_code|state|employment_length|policy_code

reject_df = spark.sql(""" SELECT
        amt_requested,
        YEAR(date) AS YEAR,
        purpose,
        risk_score,
        dti,
        zip_code,
        state,
        employment_length
        FROM rejected
        ORDER BY YEAR
    """)

reject_df.show()

+-------------+----+--------------------+----------+------+--------+-----+-----------------+
|amt_requested|YEAR|             purpose|risk_score|   dti|zip_code|state|employment_length|
+-------------+----+--------------------+----------+------+--------+-----+-----------------+
|       1000.0|2007|Wedding Covered b...|     693.0|   10%|   481xx|   NM|          4 years|
|       1000.0|2007|  Consolidating Debt|     703.0|   10%|   010xx|   MA|         < 1 year|
|      11000.0|2007|Want to consolida...|     715.0|   10%|   212xx|   MD|           1 year|
|       6000.0|2007|             waksman|     698.0|38.64%|   017xx|   MA|         < 1 year|
|       1500.0|2007|              mdrigo|     509.0| 9.43%|   209xx|   MD|         < 1 year|
|      15000.0|2007|          Trinfiniti|     645.0|    0%|   105xx|   NY|          3 years|
|      10000.0|2007|         NOTIFYi Inc|     693.0|   10%|   210xx|   MD|         < 1 year|
|       3900.0|2007|         For Justin.|     700.0|   10%|   469xx|  

In [30]:
reject_df.write.partitionBy("YEAR").mode("overwrite")\
.csv("/content/drive/MyDrive/Lending_Club_Data_Source/rejected_partition_csv")

In [31]:
#Test by reading the partioned data
readPartitionData = spark.read.option("header", True)\
.csv("/content/drive/MyDrive/Lending_Club_Data_Source/rejected_partition_csv/YEAR=2007")
readPartitionData.printSchema()
readPartitionData.show()

root
 |-- 1000.0: string (nullable = true)
 |-- Wedding Covered but No Honeymoon: string (nullable = true)
 |-- 693.0: string (nullable = true)
 |-- 10%: string (nullable = true)
 |-- 481xx: string (nullable = true)
 |-- NM: string (nullable = true)
 |-- 4 years: string (nullable = true)

+-------+--------------------------------+-----+------+-----+---+---------+
| 1000.0|Wedding Covered but No Honeymoon|693.0|   10%|481xx| NM|  4 years|
+-------+--------------------------------+-----+------+-----+---+---------+
| 1000.0|              Consolidating Debt|703.0|   10%|010xx| MA| < 1 year|
|11000.0|            Want to consolida...|715.0|   10%|212xx| MD|   1 year|
| 6000.0|                         waksman|698.0|38.64%|017xx| MA| < 1 year|
| 1500.0|                          mdrigo|509.0| 9.43%|209xx| MD| < 1 year|
|15000.0|                      Trinfiniti|645.0|    0%|105xx| NY|  3 years|
|10000.0|                     NOTIFYi Inc|693.0|   10%|210xx| MD| < 1 year|
| 3900.0|                 

In [32]:
# Read the entire csv partition collectively
partition_rejected_df = spark.read.format("csv").option("header", True)\
.load("/content/drive/MyDrive/Lending_Club_Data_Source/rejected_partition_csv")

In [33]:
# display the full partitioned data table
partition_rejected_df.show()

+-------+------------------+-----+------+-----+---+---------+----+
|30000.0|debt_consolidation|681.0|35.65%|958xx| CA| < 1 year|YEAR|
+-------+------------------+-----+------+-----+---+---------+----+
|30000.0|       credit_card| NULL|16.36%|802xx| CO| < 1 year|2015|
| 5000.0|debt_consolidation|648.0|10.62%|945xx| CA| < 1 year|2015|
|15000.0|debt_consolidation| NULL|20.13%|117xx| NY| < 1 year|2015|
|10000.0|Debt consolidation|721.0|10.02%|750xx| TX|  7 years|2015|
| 1500.0|          Business| NULL|21.06%|472xx| IN|10+ years|2015|
|10000.0|    major_purchase|659.0|19.05%|853xx| AZ| < 1 year|2015|
| 1000.0|             other| NULL| 15.6%|891xx| NV|  5 years|2015|
| 5000.0|debt_consolidation|501.0|10.73%|475xx| IN| < 1 year|2015|
|35000.0|          Business| NULL|26.53%|780xx| TX|10+ years|2015|
| 6000.0|debt_consolidation|551.0| 8.71%|019xx| MA| < 1 year|2015|
|10000.0|               car| NULL| 7.35%|906xx| CA| < 1 year|2015|
|10000.0|debt_consolidation|633.0|12.57%|840xx| UT| < 1 year|2

In [34]:
# Uncache the rejected dataset table
spark.sql("uncache table rejected")

DataFrame[]

In [35]:
# Check to see if the table is cached
if spark.catalog.isCached('rejected'):
  print('rejected dataset table is cached')
else:
  print('rejected dataset table is not there')

rejected dataset table is not there
