In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config('spark.shuffle.useOldFetchProtocol', 'true'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
spark

In [3]:
LENDING_CLUB_HDFS_PATH = "hdfs:/user/itv019490/lending_club_project_019490"


In [4]:
customers_df = spark.read.parquet(
    "hdfs:/user/itv019490/lending_club_project_019490/customers_cleaned"
)


In [27]:
customers_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- emp_title: string (nullable = true)
 |-- emp_length: integer (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- address_state: string (nullable = true)
 |-- address_zipcode: string (nullable = true)
 |-- address_country: string (nullable = true)
 |-- grade: string (nullable = true)
 |-- sub_grade: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- total_high_credit_limit: float (nullable = true)
 |-- application_type: string (nullable = true)
 |-- join_anual_income: float (nullable = true)
 |-- verification_status_joint: string (nullable = true)
 |-- ingest_date: timestamp (nullable = true)



In [5]:
loans_df = spark.read.parquet(
    "hdfs:/user/itv019490/lending_club_project_019490/loans_cleaned"
)


In [6]:
loans_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- member_id: string (nullable = true)
 |-- loan_amount: float (nullable = true)
 |-- funded_amount: float (nullable = true)
 |-- interest_rate: float (nullable = true)
 |-- monthly_installment: float (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- loan_purpose: string (nullable = true)
 |-- loan_title: string (nullable = true)
 |-- ingest_date: timestamp (nullable = true)
 |-- loan_term_years: double (nullable = true)



In [7]:
loans_defaulters_delinq_df= spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("hdfs:/user/itv019490/lending_club_project_019490/loans_defaulters_delinq_cleaned_csv")



In [8]:
loans_defaulters_delinq_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- deliqn_2yrs: integer (nullable = true)
 |-- deliqn_amnt: double (nullable = true)
 |-- mths_since_last_deliqn: integer (nullable = true)



In [9]:
loans_defaulters_pubrec_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("hdfs:/user/itv019490/lending_club_project_019490/loans_defaulters_pubrec_cleaned_csv")


In [10]:
loans_defaulters_pubrec_df.printSchema()

root
 |-- member_id: string (nullable = true)
 |-- pub_rec: double (nullable = true)
 |-- pub_rec_bankruptcies: double (nullable = true)
 |-- inq_last_6mths: double (nullable = true)



In [25]:
loans_repayment_df = spark.read.parquet(
    "hdfs:/user/itv019490/lending_club_project_019490/loans_repayment_cleaned"
)


In [26]:
loans_repayment_df.printSchema()

root
 |-- loan_id: string (nullable = true)
 |-- total_principal_received: float (nullable = true)
 |-- total_interest_received: float (nullable = true)
 |-- total_late_fee_received: float (nullable = true)
 |-- total_payment_received: float (nullable = true)
 |-- last_payment_amount: float (nullable = true)
 |-- last_payment_date: string (nullable = true)
 |-- next_payment_date: string (nullable = true)
 |-- ingest_date: timestamp (nullable = true)



In [30]:
print("customers:", customers_df.count())
print("loans:", loans_df.count())
print("loans_defaulters_delinq:", loans_defaulters_delinq_df.count())
print("defaulters_pubrec:", loans_defaulters_pubrec_df.count())
print("repayment:", loans_repayment_df.count())


customers: 2260633
loans: 2260667
loans_defaulters_delinq: 1106163
defaulters_pubrec: 1070125
repayment: 2259549
