## 1. Extracting Data from a Data Source.

In [1]:
! rm -rf LoanStats_web.csv

In [2]:
! wget https://storage.googleapis.com/aekanunlab/LoanStats_web.csv

--2020-08-23 02:35:55--  https://storage.googleapis.com/aekanunlab/LoanStats_web.csv
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.212.128, 172.217.214.128, 108.177.111.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.212.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1160157683 (1.1G) [text/csv]
Saving to: ‘LoanStats_web.csv’


2020-08-23 02:36:03 (134 MB/s) - ‘LoanStats_web.csv’ saved [1160157683/1160157683]



In [3]:
! wc -l LoanStats_web.csv

1432493 LoanStats_web.csv


In [4]:
! head -2 LoanStats_web.csv

"id","member_id","loan_amnt","funded_amnt","funded_amnt_inv","term","int_rate","installment","grade","sub_grade","emp_title","emp_length","home_ownership","annual_inc","verification_status","issue_d","loan_status","pymnt_plan","url","desc","purpose","title","zip_code","addr_state","dti","delinq_2yrs","earliest_cr_line","inq_last_6mths","mths_since_last_delinq","mths_since_last_record","open_acc","pub_rec","revol_bal","revol_util","total_acc","initial_list_status","out_prncp","out_prncp_inv","total_pymnt","total_pymnt_inv","total_rec_prncp","total_rec_int","total_rec_late_fee","recoveries","collection_recovery_fee","last_pymnt_d","last_pymnt_amnt","next_pymnt_d","last_credit_pull_d","collections_12_mths_ex_med","mths_since_last_major_derog","policy_code","application_type","annual_inc_joint","dti_joint","verification_status_joint","acc_now_delinq","tot_coll_amt","tot_cur_bal","open_acc_6m","open_act_il","open_il_12m","open_il_24m","mths_since_rcnt_il","total_bal_il","il_util","open_rv_1

## 2. Loading the data into the HDFS (Raw Zone).

In [5]:
! hdfs dfs -rm /user/root/LoanStats_web.csv

Deleted /user/root/LoanStats_web.csv


In [6]:
! hdfs dfs -put LoanStats_web.csv /user/root/

In [7]:
! hdfs dfs -ls /user/root/

Found 3 items
drwxr-xr-x   - root hadoop          0 2020-08-23 02:35 /user/root/.sparkStaging
-rw-r--r--   2 root hadoop 1160157683 2020-08-23 02:36 /user/root/LoanStats_web.csv
drwxr-xr-x   - root hadoop          0 2020-08-23 02:12 /user/root/crunched_data


## 3. Transformation the data using Apache Spark and Save them back to HDFS (Trusted Zone).

In [8]:
sc

In [9]:
raw_df = spark.read.format('csv').option('header','true').option('mode','DROPMALFORMED')\
.load('/user/root/LoanStats_web.csv')

In [10]:
! pip install pandas



In [11]:
raw_pandas = raw_df.limit(10).toPandas()

In [12]:
import pandas as pd

In [13]:
raw_pandas.shape[1]+1

145

In [14]:
pd.set_option('display.max_rows', raw_pandas.shape[1]+1)

In [15]:
raw_pandas.transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
id,,,,,,,,,,
member_id,,,,,,,,,,
loan_amnt,12800,9000,8000,25000,35000,20000,15000,6000,5625,16000
funded_amnt,12800,9000,8000,25000,35000,20000,15000,6000,5625,16000
funded_amnt_inv,12800,9000,8000,25000,35000,20000,15000,6000,5625,16000
term,36 months,36 months,36 months,60 months,60 months,36 months,60 months,36 months,36 months,60 months
int_rate,11.99%,15.31%,10.75%,12.99%,18.25%,12.99%,9.16%,7.39%,18.25%,9.75%
installment,425.09,313.36,260.97,568.7,893.54,673.79,312.55,186.34,204.07,337.99
grade,C,C,B,C,D,C,B,A,D,B
sub_grade,C1,C5,B4,C2,D3,C2,B2,A4,D3,B3


In [16]:
from pyspark.sql import functions as sparkf

In [17]:
from pyspark.sql.types import *

In [18]:
clean_symbol = sparkf.udf(lambda x: x.replace('%',''))

In [19]:
clean_text = sparkf.udf(lambda x:x.replace('years','').replace('+',''))

In [20]:
crunched_raw_df = raw_df.withColumn('loan_amnt',sparkf.col('loan_amnt').cast(FloatType()))\
.withColumn('int_rate',clean_symbol('int_rate'))\
.withColumn('int_rate',sparkf.col('int_rate').cast(FloatType()))\
.withColumn('emp_length',clean_text('emp_length'))\
.withColumn('emp_length',sparkf.col('emp_length').cast(FloatType()))

##### วาง data ใน Trusted Zone เพื่อ Support -> (1) Descriptive Analytics with Interactive Query (2) ELT ลดบวมของ DW

In [21]:
! hdfs dfs -rm -r "/user/root/crunched_data/"

Deleted /user/root/crunched_data


In [22]:
crunched_raw_df.count()

1432439

In [23]:
crunched_raw_df.write.mode('overwrite').parquet('/user/root/crunched_data/')

### 4. Data Analytics with Various SQL Queries.

#### สร้าง table ชื่อ analytics_loan เพื่อ Summarized ทางด้านการเงิน

In [24]:
! hive -e "DROP TABLE IF EXISTS analytics_loan"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
OK
Time taken: 2.131 seconds


In [25]:
! hive -e "CREATE EXTERNAL TABLE IF NOT EXISTS analytics_loan (grade string, loan_amnt float, int_rate float) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS PARQUET LOCATION '/user/root/crunched_data/' tblproperties ('skip.header.line.count'='1')"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
OK
Time taken: 2.135 seconds


In [26]:
! hive -e "select count(*) from analytics_loan"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
Query ID = root_20200823023838_04214700-a180-4958-a58c-c06ab920ef0b
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1598147476368_0054)

OK
1432439
Time taken: 31.682 seconds, Fetched: 1 row(s)


In [27]:
! hive -e "show columns from analytics_loan"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
OK
grade               
loan_amnt           
int_rate            
Time taken: 1.812 seconds, Fetched: 3 row(s)


In [28]:
! hive -e "select grade, count(*) as num from analytics_loan group by grade order by num DESC"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
Query ID = root_20200823023932_817f9fbe-bec5-42cb-be86-165ed640da6f
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1598147476368_0057)

OK
B	424110
C	421101
A	302988
D	191136
E	68353
F	19480
G	5271
Time taken: 26.112 seconds, Fetched: 7 row(s)


In [29]:
! hive -e "select grade, int_rate, avg(loan_amnt) from analytics_loan \
group by grade, int_rate order by int_rate ASC"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
Query ID = root_20200823024008_69a35a17-399e-45b4-8845-1ed2d5377fcb
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1598147476368_0059)

OK
A	5.31	17268.11796122141
A	5.32	14450.231374989851
G	6.0	18410.0
A	6.0	16021.42857142857
B	6.0	15058.870967741936
C	6.0	15207.24043715847
D	6.0	14717.857142857143
E	6.0	25147.727272727272
F	6.0	24450.96153846154
A	6.07	15201.17055190277
A	6.08	15591.83793800539
A	6.11	18153.841525670945
A	6.19	15080.01512859304
A	6.24	12722.222222222223
A	6.46	17008.949089623475
A	6.49	14254.788809226933
A	6.67	15889.998285910182
A	6.71	14697.217153284671
A	6.72	15054.450261780104
A	6.83	14764.413722478239
A	6.89	15700.0
A	6.97	14107.375696306994
A	6.99	13424.824452027826
A	7.02	15187.031381003713
A	7.21	15023.153006611397
A	7.24	13945.10055816426
A	7.26	12930.76923076923
A	7.34	15537.665911298154
A	7.35	

#### สร้าง table ชื่อ analytics_borrower เพื่อ Summarized ทางด้าน Profile ผู้กู้

In [30]:
! hive -e "DROP TABLE IF EXISTS analytics_borrower"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
OK
Time taken: 2.066 seconds


In [31]:
! hive -e "CREATE EXTERNAL TABLE IF NOT EXISTS analytics_borrower (id string, loan_amnt float, int_rate float, emp_length float) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS PARQUET LOCATION '/user/root/crunched_data/' tblproperties ('skip.header.line.count'='1')"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
OK
Time taken: 1.944 seconds


In [32]:
! hive -e "select emp_length, avg(loan_amnt) from analytics_borrower group by emp_length order by emp_length ASC"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
Query ID = root_20200823024112_2b07df6b-8bd8-4f02-a475-ae31e0216a4e
Total jobs = 1
Launching Job 1 out of 1
Status: Running (Executing on YARN cluster with App id application_1598147476368_0063)

OK
NULL	14035.318427827466
2.0	14808.726487523993
3.0	14914.434898375028
4.0	14950.930941486971
5.0	15249.950950544619
6.0	15404.563049099757
7.0	15698.691922802001
8.0	15829.652785912303
9.0	15702.363503114693
10.0	16514.665777413527
Time taken: 30.082 seconds, Fetched: 10 row(s)


In [33]:
! hive -e "show tables"


Logging initialized using configuration in file:/etc/hive/conf.dist/hive-log4j2.properties Async: true
OK
analytics_borrower
analytics_loan
Time taken: 1.763 seconds, Fetched: 2 row(s)


In [35]:
len(raw_df.columns)

144