In [1]:
! pip install pandas

[33mYou are using pip version 9.0.1, however version 22.0.4 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
import pandas as pd

In [3]:
pd.set_option('display.max_columns', None)

In [4]:
pd.set_option('display.max_rows', None)

In [5]:
import time as t

In [6]:
start_time = t.time()

In [7]:
start_time

1650075578.6547756

In [8]:
from pyspark.sql import functions as sparkf
from pyspark.sql.types import *

# 1. Data Profiling (EDA)

In [9]:
! hdfs dfs -ls -h /rawzone/example/*

Picked up JAVA_TOOL_OPTIONS: -Dhttps.protocols=TLSv1.2
-rw-r--r--   2 hadoopuser supergroup      1.1 G 2022-04-15 15:51 /rawzone/example/LoanStats_web.csv
-rw-r--r--   2 hadoopuser supergroup     45.1 M 2021-12-25 08:59 /rawzone/example/corrected
-rw-r--r--   2 hadoopuser supergroup    708.2 M 2021-12-28 09:34 /rawzone/example/kddcup.data
-rw-r--r--   2 hadoopuser supergroup      1.3 K 2021-12-28 09:34 /rawzone/example/kddcup.names


In [10]:
raw_df = spark.read.format('csv').option('header','true').option("inferSchema",'true').option('mode','DROPMALFORMED')\
.load('hdfs://aekanun-hadoop-master:9000/rawzone/example/LoanStats_web.csv')

In [11]:
raw_df.rdd.getNumPartitions()

9

#### นับจำนวน Attributes ด้วย .columns

In [12]:
len(raw_df.columns)

144

#### นับจำนวน row ด้วย .count()

In [13]:
raw_df.count()

1432466

#### ทำ Sub-Columns ด้วย .select()

In [14]:
selectedAttr_df = raw_df.select("loan_amnt","term","int_rate","installment","grade","emp_length",\
                           "home_ownership","annual_inc","verification_status","loan_status",\
                           "purpose","addr_state","dti","delinq_2yrs","earliest_cr_line",\
                           "inq_last_6mths","open_acc","pub_rec","revol_bal","revol_util","total_acc",\
                           "last_credit_pull_d")

#### Explore data ด้วย Basic Stats โดยใช้ .describe()

In [15]:
selectedAttr_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
loan_amnt,1432440,15370.388358325654,9646.026272413876,1000,40000
term,1432440,,,36 months,60 months
int_rate,1432440,,,5.31%,30.99%
installment,1432440,456.68781535698236,281.71344223131814,19.4,1715.42
grade,1432440,,,A,G
emp_length,1432440,,,1 year,
home_ownership,1432440,,,ANY,RENT
annual_inc,1432440,81034.58293296749,134183.35696714345,0.0,6.1E7
verification_status,1432440,,,Not Verified,Verified


#### ดู Data types ด้วย .printSchema()

In [16]:
selectedAttr_df.printSchema()

root
 |-- loan_amnt: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: string (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: double (nullable = true)
 |-- open_acc: integer (nullable = true)
 |-- pub_rec: integer (nullable = true)
 |-- revol_bal: integer (nullable = true)
 |-- revol_util: string (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)



# 2. Cleaning and Transformation (Data Prep.)

#### ลบทิ้งทั้งบรรทัด หาก attributes ใดมีค่า null ด้วย .dropna()

In [17]:
noMissing_df = selectedAttr_df.dropna(how='any')

#### ประกาศ Python & Spark Function สำหรับเอาเครื่องหมาย % ออก

In [18]:
def f_removepercentsign(origin):
    return origin.rstrip('%')

In [19]:
removepercentsign = sparkf.udf(lambda x: f_removepercentsign(x),StringType())

#### ประกาศ Pythonn & Spark Function สำหรับเอาเครื่องหมาย - ออก

In [20]:
def f_extractmonth(origin):
    return origin.split('-')[0]

In [21]:
extractmonth = sparkf.udf(lambda x: f_extractmonth(x),StringType())

#### เรียกใช้ Spark Functions ผ่านทาง withColumns()

In [22]:
crunched_df = noMissing_df.\
withColumn('revol_util',removepercentsign(noMissing_df['revol_util']).cast(DoubleType())).\
withColumn('int_rate',removepercentsign(noMissing_df['int_rate']).cast(DoubleType())).\
withColumn('earliest_cr_line',extractmonth(noMissing_df['earliest_cr_line']).cast(StringType())).\
withColumn('last_credit_pull_d',extractmonth(noMissing_df['last_credit_pull_d']).cast(StringType()))

#### ทำ sub-Rows ด้วย .filter() เลือกมาเฉพาะ Row ที่มี Loan Status เป็น Fully Paid และ Charged Off

In [23]:
relevant_df = crunched_df\
.filter((sparkf.col('loan_status') == 'Fully Paid')|(sparkf.col('loan_status') == 'Charged Off'))

In [24]:
final_df = relevant_df.dropna(how='any')

In [25]:
upstreamProcess_duration = t.time()-start_time

In [26]:
print(upstreamProcess_duration)

100.55839824676514


In [27]:
final_df.printSchema()

root
 |-- loan_amnt: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: double (nullable = true)
 |-- open_acc: integer (nullable = true)
 |-- pub_rec: integer (nullable = true)
 |-- revol_bal: integer (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)



# 3. Load Cleaned Data to Database/Data Warehouse

In [28]:
server_name = "jdbc:sqlserver://mssql-container"
database_name = "TestDB"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "spark_loan_stat_10000"
username = "SA"
password = "Passw0rd123456" # Please specify password here

In [29]:
final_df.write.mode('overwrite').format("jdbc")\
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").save()

In [30]:
writeProcess_duration = t.time()-start_time

In [31]:
print(writeProcess_duration)

146.6444342136383


In [32]:
final_df.count()

735740

# 4. ทดสอบการอ่านจาก MSSQL มาเป็น Spark DataFrame เพื่อเปรียบเทียบ

In [33]:
read_result_df = spark.read.format("jdbc")\
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password)\
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

In [34]:
read_result_df.count()

735740

In [35]:
read_result_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
loan_amnt,735740,14503.327024492348,9240.777415179264,1000,40000
term,735740,,,36 months,60 months
int_rate,735740,13.208798923640442,5.157202466162041,5.31,30.99
installment,735740,446.5535269525436,282.87812370846495,30.12,1715.42
grade,735740,,,A,G
emp_length,735740,,,1 year,
home_ownership,735740,,,ANY,RENT
annual_inc,735740,79534.12686194837,77998.95354640124,0.0,9573072.0
verification_status,735740,,,Not Verified,Verified


In [36]:
final_df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
loan_amnt,735740,14503.327024492348,9240.777415179347,1000,40000
term,735740,,,36 months,60 months
int_rate,735740,13.208798923527274,5.1572024661618965,5.31,30.99
installment,735740,446.5535269524311,282.878123708465,30.12,1715.42
grade,735740,,,A,G
emp_length,735740,,,1 year,
home_ownership,735740,,,ANY,RENT
annual_inc,735740,79534.1268619485,77998.95354640132,0.0,9573072.0
verification_status,735740,,,Not Verified,Verified


In [37]:
read_result_df.printSchema()

root
 |-- loan_amnt: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: double (nullable = true)
 |-- open_acc: integer (nullable = true)
 |-- pub_rec: integer (nullable = true)
 |-- revol_bal: integer (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)



In [38]:
final_df.printSchema()

root
 |-- loan_amnt: integer (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- installment: double (nullable = true)
 |-- grade: string (nullable = true)
 |-- emp_length: string (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- loan_status: string (nullable = true)
 |-- purpose: string (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- dti: string (nullable = true)
 |-- delinq_2yrs: string (nullable = true)
 |-- earliest_cr_line: string (nullable = true)
 |-- inq_last_6mths: double (nullable = true)
 |-- open_acc: integer (nullable = true)
 |-- pub_rec: integer (nullable = true)
 |-- revol_bal: integer (nullable = true)
 |-- revol_util: double (nullable = true)
 |-- total_acc: integer (nullable = true)
 |-- last_credit_pull_d: string (nullable = true)

