# MLE challenge - Features engineering

In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
import pyspark.sql.functions as F
from pyspark.sql.functions import datediff,col, current_date
from pyspark.ml.feature import StringIndexer

In [3]:
spark = SparkSession.builder.appName('Kueski').getOrCreate()

21/10/18 20:13:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark

In [5]:
df_pyspark = spark.read.option('header', 'true').csv('dataset_credit_risk.csv')

In [6]:
df = df_pyspark.select('loan_id','id', 'flag_own_car', 'birthday', 'loan_amount', 'loan_date', 'job_start_date','status')
df.show(5)

+-------+-------+------------+----------+------------------+----------+--------------+------+
|loan_id|     id|flag_own_car|  birthday|       loan_amount| loan_date|job_start_date|status|
+-------+-------+------------+----------+------------------+----------+--------------+------+
| 208089|5044500|           N|1955-08-04|  133.714973572794|2019-01-01|    3021-09-18|     0|
| 112797|5026631|           N|1972-03-30|158.80055787554005|2019-01-01|    1997-06-05|     0|
| 162434|5036645|           Y|1987-03-24|203.60848690335118|2019-01-01|    2015-02-22|     0|
| 144343|5033584|           N|1973-03-15|113.20496431707618|2019-01-01|    2009-06-29|     0|
| 409695|5085755|           Y|1989-10-15| 109.3762599318495|2019-01-01|    2019-07-03|     0|
+-------+-------+------------+----------+------------------+----------+--------------+------+
only showing top 5 rows



In [7]:
df = df.withColumn('loan_date', df.loan_date.cast('date'))
df = df.withColumn('loan_amount', df.loan_amount.cast('float'))
df = df.withColumn('loan_id', df.loan_amount.cast('int'))
df = df.orderBy('id','loan_date')
df.printSchema()


root
 |-- loan_id: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- flag_own_car: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- loan_amount: float (nullable = true)
 |-- loan_date: date (nullable = true)
 |-- job_start_date: string (nullable = true)
 |-- status: string (nullable = true)



#### Feature avg_amount_loans_previous

In [8]:

@F.pandas_udf('float', F.PandasUDFType.GROUPED_AGG)  
def mean_udf(s):
    return s[:-1].mean()

df2 = df.groupBy('id').agg(mean_udf('loan_amount'))
df2 = df2.withColumnRenamed('mean_udf(loan_amount)', 'avg_amount_loans_previous')
df2.printSchema()

root
 |-- id: string (nullable = true)
 |-- avg_amount_loans_previous: float (nullable = true)





#### Feature nb_previous_loans

In [9]:
df3 = (df
.groupBy(F.col('id'))
.agg(F.count('id').alias('nb_previous_loans')))
df3=df3.withColumn("nb_previous_loans", df3.nb_previous_loans - 1)

#### The last update of a user

In [10]:
df_last_loan = df.groupBy('id').max('loan_id') 
df_last_loan = df_last_loan.withColumnRenamed('max(loan_id)', 'loan_id')
df_last_loan.printSchema()

root
 |-- id: string (nullable = true)
 |-- loan_id: integer (nullable = true)



In [11]:
df_f = df.join(df_last_loan, on=['loan_id', 'id'], how='inner')
df_f = df_f.join(df2, on=['id'], how='inner')
df_f = df_f.join(df3, on=['id'], how='inner')

#### Feature age

In [12]:
df_f = df_f.withColumn("age", datediff(current_date(),col("birthday"))/365.25)
df_f = df_f.withColumn('age', df_f.age.cast('int'))

#### Feature years_on_the_job

In [13]:
df_f = df_f.withColumn("years_on_the_job", datediff(current_date(),col("job_start_date"))/365.25)
df_f = df_f.withColumn('years_on_the_job', df_f.years_on_the_job.cast('int'))

#### Feature flag_own_car

In [14]:
indexer = StringIndexer(inputCol="flag_own_car", outputCol="flag_own_car_2")
indexed = indexer.fit(df_f).transform(df_f)

df_f = indexed.withColumn('flag_own_car_2', indexed.flag_own_car_2.cast('int'))

                                                                                

In [15]:
df_train = df_f.select( 'id',
                        'avg_amount_loans_previous',
                        'nb_previous_loans',
                        'flag_own_car_2',
                        'age',
                        'years_on_the_job',
                        'status')
df_train = df_train.withColumnRenamed('flag_own_car_2', 'flag_own_car')

In [16]:
df_train.show()



+-------+-------------------------+-----------------+------------+---+----------------+------+
|     id|avg_amount_loans_previous|nb_previous_loans|flag_own_car|age|years_on_the_job|status|
+-------+-------------------------+-----------------+------------+---+----------------+------+
|5132977|                131.33649|               13|           0| 40|               3|     0|
|5126737|                140.70323|                3|           0| 45|               3|     0|
|5126852|                138.86935|               20|           1| 34|              10|     0|
|5041229|                131.68132|                6|           0| 46|              28|     0|
|5041228|                120.65634|                8|           0| 46|              28|     0|
|5089760|                139.43625|               11|           1| 22|               2|     0|
|5067022|                121.39591|               23|           1| 47|               3|     0|
|5095626|                127.54001|               

                                                                                

### There are outliers in the dataset that should be excluded before train the model 

In [24]:
df.describe(['job_start_date']).show()



+-------+--------------+
|summary|job_start_date|
+-------+--------------+
|  count|        500494|
|   mean|          null|
| stddev|          null|
|    min|    1978-09-10|
|    max|    3021-09-18|
+-------+--------------+



                                                                                

## Save dataset for model training

In [17]:
df_train.write.csv('train_model.csv')

                                                                                