In [1]:
import sys
import pandas as pd
import sqlite3 as sql
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType, StructType, StructField
from pyspark.sql.functions import to_date, rank, col, mean, datediff, current_date
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('ML_Challenge').getOrCreate()

schema = StructType([StructField('loan_id', IntegerType(), True),
                     StructField('id', IntegerType(), True),
                     StructField('code_gender', StringType(), True),
                     StructField('flag_own_car', StringType(), True),
                     StructField('flag_own_realty', StringType(), True),
                     StructField('cnt_children', IntegerType(), True),
                     StructField('amt_income_total', FloatType(), True),
                     StructField('name_income_type', StringType(), True),
                     StructField('name_education_type', StringType(), True),
                     StructField('name_family_status', StringType(), True),
                     StructField('name_housing_type', StringType(), True),
                     StructField('days_birth', IntegerType(), True),
                     StructField('days_employed', IntegerType(), True),
                     StructField('flag_mobil', IntegerType(), True),
                     StructField('flag_work_phone', IntegerType(), True),
                     StructField('flag_phone', IntegerType(), True),
                     StructField('flag_email', IntegerType(), True),
                     StructField('occupation_type', StringType(), True),
                     StructField('cnt_fam_members', FloatType(), True),
                     StructField('status', IntegerType(), True),
                     StructField('birthday', StringType(), True),
                     StructField('job_start_date', StringType(), True),
                     StructField('loan_date', StringType(), True),
                     StructField('loan_amount', DoubleType(), True)])

df = spark.read.option("sep", ",").option("header", True).schema(schema).csv("dataset_credit_risk.csv")

In [2]:
print((df.count(), len(df.columns)))

(777715, 24)


In [3]:
df = df.withColumn("loan_date", to_date(df.loan_date, 'yyyy-MM-dd'))
df = df.orderBy("id", "loan_date")

In [4]:
# Feature nb_previous_loans
df = df.withColumn("nb_previous_loans", (rank().over(Window.partitionBy("id").orderBy("loan_date"))) - 1)

In [5]:
# Feature avg_amount_loans_previous
df = df.withColumn('avg_amount_loans_previous', mean(df.loan_amount).over(Window.partitionBy('id').orderBy("loan_date").rowsBetween(-sys.maxsize, 0)))

In [6]:
# Feature age
df = df.withColumn('birthday', to_date(df.birthday, 'yyyy-MM-dd'))
df = df.withColumn('age', (datediff(current_date(), df.birthday) / 365).cast('int'))

In [7]:
# Feature years_on_the_job
df = df.withColumn('job_start_date', to_date(df.job_start_date, 'yyyy-MM-dd'))
df = df.withColumn('years_on_the_job', (datediff(current_date(), df.job_start_date) / 365).cast('int'))

In [8]:
# Feature flag_own_car
df = df.na.replace(['N', 'Y'], ['0', '1'], 'flag_own_car')
df = df.withColumn('flag_own_car', df["flag_own_car"].cast(IntegerType()))

In [9]:
# Save dataset for model training
features = df[['id', 'age', 'years_on_the_job', 'nb_previous_loans', 'avg_amount_loans_previous', 'flag_own_car', 'status']]
features.toPandas().to_csv('train_model.csv', index=False)

In [10]:
print((features.count(), len(features.columns)))

(777715, 7)


In [11]:
features = pd.read_csv('train_model.csv')
#features = features.drop("status", axis=1)
#Filtering records by the most recent (the row with the higer n of previous loans per id
filtered = features.loc[features.reset_index().groupby(['id'])['nb_previous_loans'].idxmax()]
display(filtered)

Unnamed: 0,id,age,years_on_the_job,nb_previous_loans,avg_amount_loans_previous,flag_own_car,status
424453,5008804,33,12,15,128.735464,1,0
486436,5008805,33,12,14,125.566303,1,0
61480,5008806,59,3,29,135.770224,1,0
300766,5008808,52,8,4,125.477994,0,0
300771,5008809,52,8,4,111.980052,0,0
...,...,...,...,...,...,...,...
424437,5150482,30,5,17,120.127777,1,0
181150,5150483,30,5,17,139.249856,1,0
777714,5150484,30,5,12,123.944622,1,0
120584,5150485,30,5,1,123.102183,1,0


In [14]:
conn = sql.connect('features.db')
filtered.to_sql('features', conn, if_exists='replace')