In [76]:
from pyspark.sql import SparkSession
spark= SparkSession.builder.getOrCreate()

## Importing the data

In [77]:
# reading in the data
infile = 'default.csv'
df = spark.read.csv(infile, inferSchema=True, header = True)

In [86]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- default payment next month: inte

## Data Preprocessing

In [78]:
from pyspark.sql import functions as F

# checking for duplicates in our dataset, there are no duplicates since there are in total, 30000 rows
print('rows={}'.format(df.count()))
print('rows={}'.format(df.distinct().count()))

rows=30000
rows=30000


In [79]:
# checking for missing data, there are no missing data
df.agg(*[
    (1 - F.count(c) / F.count('*')).alias(c + '_miss')
    for c in df.columns
]).show()

+-------+--------------+--------+--------------+-------------+--------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+--------------+--------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------------------------+
|ID_miss|LIMIT_BAL_miss|SEX_miss|EDUCATION_miss|MARRIAGE_miss|AGE_miss|PAY_0_miss|PAY_2_miss|PAY_3_miss|PAY_4_miss|PAY_5_miss|PAY_6_miss|BILL_AMT1_miss|BILL_AMT2_miss|BILL_AMT3_miss|BILL_AMT4_miss|BILL_AMT5_miss|BILL_AMT6_miss|PAY_AMT1_miss|PAY_AMT2_miss|PAY_AMT3_miss|PAY_AMT4_miss|PAY_AMT5_miss|PAY_AMT6_miss|default payment next month_miss|
+-------+--------------+--------+--------------+-------------+--------+----------+----------+----------+----------+----------+----------+--------------+--------------+--------------+--------------+--------------+--------------+-------------+-------------+-------------+-------------+-------------+-------------+-

In [80]:
# checking for outliers

# excluding categorical features and the response
cols = [c for c in df.columns if c != 'ID' and c != 'default payment next month' and c!= 'SEX' and c != 'EDUCATION' and c!= 'MARRIAGE'] 
bnds = {} # storing the lower and upper bounds for each feature
for col in cols:
    quantiles = df.approxQuantile(col, [0.25, 0.75], 0.05) #tolerance is 0.05
    IQR = quantiles[1] - quantiles[0]

    bnds[col] = [
     quantiles[0] - 1.5 * IQR,
     quantiles[1] + 1.5 * IQR
    ]
bnds

{'LIMIT_BAL': [-180000.0, 460000.0],
 'AGE': [10.0, 58.0],
 'PAY_0': [-2.5, 1.5],
 'PAY_2': [-2.5, 1.5],
 'PAY_3': [-2.5, 1.5],
 'PAY_4': [-2.5, 1.5],
 'PAY_5': [-2.5, 1.5],
 'PAY_6': [-2.5, 1.5],
 'BILL_AMT1': [-75367.0, 136417.0],
 'BILL_AMT2': [-71850.0, 129038.0],
 'BILL_AMT3': [-68126.5, 122125.5],
 'BILL_AMT4': [-65346.5, 116137.5],
 'BILL_AMT5': [-61606.5, 108317.5],
 'BILL_AMT6': [-60093.0, 104155.0],
 'PAY_AMT1': [-4478.0, 10138.0],
 'PAY_AMT2': [-4118.0, 9530.0],
 'PAY_AMT3': [-4738.5, 9217.5],
 'PAY_AMT4': [-4410.0, 8246.0],
 'PAY_AMT5': [-4437.5, 8262.5],
 'PAY_AMT6': [-4697.5, 8418.5]}

In [97]:
# appending the outlier indicators to the dataframe
outliers = df.select(*['id', 'default payment next month', 'SEX', 'EDUCATION', 'MARRIAGE'] + [
 (
 (df[c] < bnds[c][0]) | (df[c] > bnds[c][1]))
    .alias(c + '_outlier') for c in cols
])

outliers.show(1)

# filtering the rows of the non-categorical features where there are outliers
outliers.filter((outliers[5] == 'true') | (outliers[6] == 'true') |\
               (outliers[7] == 'true') | (outliers[8] == 'true') |\
               (outliers[9] == 'true') | (outliers[10] == 'true') |\
               (outliers[11] == 'true') | (outliers[12] == 'true') |\
               (outliers[13] == 'true') | (outliers[14] == 'true') |\
               (outliers[15] == 'true') | (outliers[16] == 'true') | \
               (outliers[17] == 'true') | (outliers[18] == 'true') |\
               (outliers[19] == 'true') | (outliers[20] == 'true') |\
               (outliers[21] == 'true') | (outliers[22] == 'true') |\
               (outliers[23] == 'true') | (outliers[24] == 'true')).count()

+---+--------------------------+---+---------+--------+-----------------+-----------+-------------+-------------+-------------+-------------+-------------+-------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+----------------+----------------+----------------+----------------+----------------+
| id|default payment next month|SEX|EDUCATION|MARRIAGE|LIMIT_BAL_outlier|AGE_outlier|PAY_0_outlier|PAY_2_outlier|PAY_3_outlier|PAY_4_outlier|PAY_5_outlier|PAY_6_outlier|BILL_AMT1_outlier|BILL_AMT2_outlier|BILL_AMT3_outlier|BILL_AMT4_outlier|BILL_AMT5_outlier|BILL_AMT6_outlier|PAY_AMT1_outlier|PAY_AMT2_outlier|PAY_AMT3_outlier|PAY_AMT4_outlier|PAY_AMT5_outlier|PAY_AMT6_outlier|
+---+--------------------------+---+---------+--------+-----------------+-----------+-------------+-------------+-------------+-------------+-------------+-------------+-----------------+-----------------+-----------------+-----------------

18052

Since over half of the dataset contains outliers, maybe it is best to not remove any outliers

In [98]:
# assembling the features together into one vector

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df.columns[1:-1], outputCol="features") 
transformed = assembler.transform(df)
transformed.show(1)

df2 = transformed.drop('ID','LIMIT_BAL','SEX',
                       'EDUCATION','MARRIAGE','AGE',
                       'PAY_0','PAY_2','PAY_3',
                       'PAY_4','PAY_5','PAY_6',
                       'BILL_AMT1','BILL_AMT2','BILL_AMT3',
                       'BILL_AMT4','BILL_AMT5','BILL_AMT6',
                       'PAY_AMT1','PAY_AMT2','PAY_AMT3',
                       'PAY_AMT4','PAY_AMT5','PAY_AMT6')\
 .withColumnRenamed("default payment next month", "label")

df2.show(1, False)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+--------------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|default payment next month|            features|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+--------------------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|                         1|[20000.0,2.0,2.0,...|
+---+---------+---+---------+-------

In [96]:
# feature scaling

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled", 
                                withStd=True, withMean=False)

scaler = standardScaler.fit(df2)

scaled_df = scaler.transform(df2)

# selecting our final dataframe, where the column of scaled features has been renamed into 'features'
final_df = scaled_df.select("label", "features_scaled")\
                    .withColumnRenamed("features_scaled", "features")
final_df.show(1, False)

+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                                                                                                                                                               |
+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------