# MLE challenge - Features engineering

### Notebook 1

In this notebook we compute five features for the **credit risk** dataset. 
Each row in the dataset consists of the credit that a user took on a given date.

These features are roughly defined as follows:

**nb_previous_loans:** number of loans granted to a given user, before the current loan.

**avg_amount_loans_previous:** average amount of loans granted to a user, before the current rating.

**age:** user age in years.

**years_on_the_job:** years the user has been in employment.

**flag_own_car:** flag that indicates if the user has his own car.

We have the following problem: the feature `avg_amount_loans_previous` takes just too long to be computed for all the rows of the dataset (at least the way it's implemented).




In [1]:
#from pyspark.sql import SparkSession as ss
# create sparksession
import pyspark
from pyspark import SparkContext
from pyspark import SparkConf
sc = pyspark.SparkContext(master="local[1]",appName="CreditRisk_N1_czavalaj")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/18 03:31:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/11/18 03:31:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
%%time
# Creamos un RDD
data = sc.textFile('dataset_credit_risk.csv')

CPU times: user 1.76 ms, sys: 1.62 ms, total: 3.37 ms
Wall time: 1.28 s


In [3]:
# Mostramos 5 elementos el RDD
data.take(5)

                                                                                

['loan_id,id,code_gender,flag_own_car,flag_own_realty,cnt_children,amt_income_total,name_income_type,name_education_type,name_family_status,name_housing_type,days_birth,days_employed,flag_mobil,flag_work_phone,flag_phone,flag_email,occupation_type,cnt_fam_members,status,birthday,job_start_date,loan_date,loan_amount',
 '208089,5044500,F,N,Y,0,45000.0,Pensioner,Secondary / secondary special,Widow,House / apartment,-24151,365243,1,0,0,0,,1.0,0,1955-08-04,3021-09-18,2019-01-01,133.714973572794',
 '112797,5026631,F,N,Y,0,99000.0,Working,Secondary / secondary special,Separated,House / apartment,-18068,-8870,1,0,0,0,Medicine staff,1.0,0,1972-03-30,1997-06-05,2019-01-01,158.80055787554005',
 '162434,5036645,M,Y,N,0,202500.0,Working,Incomplete higher,Married,With parents,-12596,-2399,1,0,0,0,Drivers,2.0,0,1987-03-24,2015-02-22,2019-01-01,203.60848690335118',
 '144343,5033584,F,N,Y,0,292500.0,Working,Higher education,Married,House / apartment,-17718,-4463,1,0,0,0,,2.0,0,1973-03-15,2009-06-29,201

In [4]:
# Vamos a convertir el RDD en un formato adecuado
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType,BooleanType,DateType
sqlContext = SQLContext(sc)



In [5]:
%%time
cRiskDF= sqlContext.read.csv('dataset_credit_risk.csv', sep=',', header='true', inferSchema='true')
type(cRiskDF)



CPU times: user 9.77 ms, sys: 5.71 ms, total: 15.5 ms
Wall time: 12.4 s


                                                                                

pyspark.sql.dataframe.DataFrame

In [6]:
cRiskDF.sort('id','loan_date')
cRiskDF = cRiskDF.withColumn('loan_date',cRiskDF.loan_date.cast('date'))


In [7]:
cRiskDF.printSchema()

root
 |-- loan_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- code_gender: string (nullable = true)
 |-- flag_own_car: string (nullable = true)
 |-- flag_own_realty: string (nullable = true)
 |-- cnt_children: integer (nullable = true)
 |-- amt_income_total: double (nullable = true)
 |-- name_income_type: string (nullable = true)
 |-- name_education_type: string (nullable = true)
 |-- name_family_status: string (nullable = true)
 |-- name_housing_type: string (nullable = true)
 |-- days_birth: integer (nullable = true)
 |-- days_employed: integer (nullable = true)
 |-- flag_mobil: integer (nullable = true)
 |-- flag_work_phone: integer (nullable = true)
 |-- flag_phone: integer (nullable = true)
 |-- flag_email: integer (nullable = true)
 |-- occupation_type: string (nullable = true)
 |-- cnt_fam_members: double (nullable = true)
 |-- status: integer (nullable = true)
 |-- birthday: string (nullable = true)
 |-- job_start_date: string (nullable = true)
 |-- loan_

#### Feature nb_previous_loans

In [8]:
%%time
import pyspark.sql.functions as F
from pyspark.sql.functions import when
from pyspark.sql.window import Window
window=Window.partitionBy('id').orderBy('loan_date')
cRiskDF = cRiskDF.withColumn('nb_previous_loans1', F.rank().over(window))
cRiskDF.show()

[Stage 5:>                                                          (0 + 1) / 1]

+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+----------+--------------+----------+------------------+------------------+
|loan_id|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type|name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_mobil|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|status|  birthday|job_start_date| loan_date|       loan_amount|nb_previous_loans1|
+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+----------+------------

                                                                                

In [9]:
cRiskDF = cRiskDF.withColumn('nb_previous_loans',cRiskDF.nb_previous_loans1-1)
cRiskDF.show()


21/11/18 03:32:27 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 8:>                                                          (0 + 1) / 1]

+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+----------+--------------+----------+------------------+------------------+-----------------+
|loan_id|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type|name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_mobil|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|status|  birthday|job_start_date| loan_date|       loan_amount|nb_previous_loans1|nb_previous_loans|
+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+----------

                                                                                

#### Feature avg_amount_loans_previous

In [10]:
window1=Window.partitionBy('id').orderBy('nb_previous_loans').rowsBetween(Window.unboundedPreceding, -1)
cRiskDF = cRiskDF.withColumn('avg_amount_loans_previous', F.avg('loan_amount').over(window1))
cRiskDF.show()

[Stage 11:>                                                         (0 + 1) / 1]

+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+----------+--------------+----------+------------------+------------------+-----------------+-------------------------+
|loan_id|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type|name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_mobil|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|status|  birthday|job_start_date| loan_date|       loan_amount|nb_previous_loans1|nb_previous_loans|avg_amount_loans_previous|
+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+------------

                                                                                

#### Feature age

In [11]:
cRiskDF = cRiskDF.withColumn('birthday',cRiskDF.birthday.cast('date'))

In [12]:
cRiskDF.printSchema()

root
 |-- loan_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- code_gender: string (nullable = true)
 |-- flag_own_car: string (nullable = true)
 |-- flag_own_realty: string (nullable = true)
 |-- cnt_children: integer (nullable = true)
 |-- amt_income_total: double (nullable = true)
 |-- name_income_type: string (nullable = true)
 |-- name_education_type: string (nullable = true)
 |-- name_family_status: string (nullable = true)
 |-- name_housing_type: string (nullable = true)
 |-- days_birth: integer (nullable = true)
 |-- days_employed: integer (nullable = true)
 |-- flag_mobil: integer (nullable = true)
 |-- flag_work_phone: integer (nullable = true)
 |-- flag_phone: integer (nullable = true)
 |-- flag_email: integer (nullable = true)
 |-- occupation_type: string (nullable = true)
 |-- cnt_fam_members: double (nullable = true)
 |-- status: integer (nullable = true)
 |-- birthday: date (nullable = true)
 |-- job_start_date: string (nullable = true)
 |-- loan_da

In [13]:
cRiskDF = cRiskDF.withColumn('age', (F.datediff(F.current_date(),'birthday'))/365)
cRiskDF = cRiskDF.withColumn('age', cRiskDF.age.cast('int'))
cRiskDF.show()

[Stage 14:>                                                         (0 + 1) / 1]

+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+----------+--------------+----------+------------------+------------------+-----------------+-------------------------+---+
|loan_id|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type|name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_mobil|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|status|  birthday|job_start_date| loan_date|       loan_amount|nb_previous_loans1|nb_previous_loans|avg_amount_loans_previous|age|
+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+----

                                                                                

#### Feature years_on_the_job

In [14]:
cRiskDF = cRiskDF.withColumn('job_start_date',cRiskDF.job_start_date.cast('date'))
cRiskDF = cRiskDF.withColumn('years_on_the_job', (F.datediff(F.current_date(),'job_start_date'))/365)
cRiskDF = cRiskDF.withColumn('years_on_the_job', cRiskDF.years_on_the_job.cast('int'))
cRiskDF.show()

[Stage 17:>                                                         (0 + 1) / 1]

+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+----------+--------------+----------+------------------+------------------+-----------------+-------------------------+---+----------------+
|loan_id|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type|name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_mobil|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|status|  birthday|job_start_date| loan_date|       loan_amount|nb_previous_loans1|nb_previous_loans|avg_amount_loans_previous|age|years_on_the_job|
+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+------

                                                                                

#### Feature flag_own_car

In [15]:
cRiskDF = cRiskDF.withColumn('flag_own_car', when(cRiskDF.flag_own_car == 'Y',1).when(cRiskDF.flag_own_car == 'N',0))
#cRiskDF.flag_own_car.map(lambda x : 0 if x == 'False' else 1)
cRiskDF.show()

[Stage 20:>                                                         (0 + 1) / 1]

+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+----------+-------------+----------+---------------+----------+----------+---------------+---------------+------+----------+--------------+----------+------------------+------------------+-----------------+-------------------------+---+----------------+
|loan_id|     id|code_gender|flag_own_car|flag_own_realty|cnt_children|amt_income_total|name_income_type|name_education_type|name_family_status|name_housing_type|days_birth|days_employed|flag_mobil|flag_work_phone|flag_phone|flag_email|occupation_type|cnt_fam_members|status|  birthday|job_start_date| loan_date|       loan_amount|nb_previous_loans1|nb_previous_loans|avg_amount_loans_previous|age|years_on_the_job|
+-------+-------+-----------+------------+---------------+------------+----------------+----------------+-------------------+------------------+-----------------+------

                                                                                

## Save dataset for model training

In [16]:
cRiskDF = cRiskDF.select('id', 'age', 'years_on_the_job', 'nb_previous_loans', 'avg_amount_loans_previous', 'flag_own_car', 'status')
cRiskDF.show()

[Stage 23:>                                                         (0 + 1) / 1]

+-------+---+----------------+-----------------+-------------------------+------------+------+
|     id|age|years_on_the_job|nb_previous_loans|avg_amount_loans_previous|flag_own_car|status|
+-------+---+----------------+-----------------+-------------------------+------------+------+
|5008804| 33|              12|                0|                     null|           1|     0|
|5008804| 33|              12|                1|       102.28336090329137|           1|     0|
|5008804| 33|              12|                2|       119.44270503521452|           1|     0|
|5008804| 33|              12|                3|        117.8730346375606|           1|     0|
|5008804| 33|              12|                4|       114.28953848655442|           1|     0|
|5008804| 33|              12|                5|       114.02126012338815|           1|     0|
|5008804| 33|              12|                6|       116.29139797883522|           1|     0|
|5008804| 33|              12|                7|  

                                                                                

In [17]:
#from pyspark.sql.avro.functions import from_avro, to_avro
#cRiskDF.write.avro('train_model')
cRiskDF.write.csv('train_model', header=True)

                                                                                

In [18]:
#cRiskDF = cRiskDF.na.fill(value=0,subset=['avg_amount_loans_previous']).show()