#Data cleanig using pyspark

creating Spark session 

In [19]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.types import IntegerType

sparkSession = SparkSession.builder.master("local").appName('EDA').getOrCreate()

print(sparkSession)

if SparkSession.sparkContext:
    print('===============')
    print(f'AppName: {sparkSession.sparkContext.appName}')
    print(f'Master: {sparkSession.sparkContext.master}')
    print('===============')
else:
    print('Could not initialise pyspark session')


<pyspark.sql.session.SparkSession object at 0x7f3ea043e070>
AppName: EDA
Master: local


Getting data from hdfs using spark hdfs connectivity

In [20]:

Credit_raw_data= sparkSession.read.format('csv').option("header", "true").option("inferSchema", "true").load('hdfs://localhost:9000/train.csv')

Credit_raw_data.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- owns_car: string (nullable = true)
 |-- owns_house: string (nullable = true)
 |-- no_of_children: double (nullable = true)
 |-- net_yearly_income: double (nullable = true)
 |-- no_of_days_employed: double (nullable = true)
 |-- occupation_type: string (nullable = true)
 |-- total_family_members: double (nullable = true)
 |-- migrant_worker: double (nullable = true)
 |-- yearly_debt_payments: double (nullable = true)
 |-- credit_limit: double (nullable = true)
 |-- credit_limit_used(%): integer (nullable = true)
 |-- credit_score: double (nullable = true)
 |-- prev_defaults: integer (nullable = true)
 |-- default_in_last_6months: integer (nullable = true)
 |-- credit_card_default: integer (nullable = true)



Count of  nan and null values

In [21]:

Credit_raw_data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in Credit_raw_data.columns]).show()




[Stage 117:>                                                        (0 + 1) / 1]

+-----------+----+---+------+--------+----------+--------------+-----------------+-------------------+---------------+--------------------+--------------+--------------------+------------+--------------------+------------+-------------+-----------------------+-------------------+
|customer_id|name|age|gender|owns_car|owns_house|no_of_children|net_yearly_income|no_of_days_employed|occupation_type|total_family_members|migrant_worker|yearly_debt_payments|credit_limit|credit_limit_used(%)|credit_score|prev_defaults|default_in_last_6months|credit_card_default|
+-----------+----+---+------+--------+----------+--------------+-----------------+-------------------+---------------+--------------------+--------------+--------------------+------------+--------------------+------------+-------------+-----------------------+-------------------+
|          0|   0|  0|     0|     547|         0|           774|                0|                463|              0|                  83|            87|   


                                                                                

Imputing missing values with median and mode for categorical data

In [22]:

numerical = ['no_of_children','no_of_days_employed','total_family_members','yearly_debt_payments','credit_score']

from pyspark.ml.feature import Imputer

def imputed(df):
    imputer = Imputer(
    inputCols=numerical,
    outputCols=["{}_".format(c) for c in numerical]
    )
    out=imputer.setStrategy("median").fit(df).transform(df)
    return out.drop(*numerical)


Credit_data_imputed=imputed(Credit_raw_data)

Credit_data_imputed=Credit_data_imputed.na.fill(({'owns_car':Credit_data_imputed.groupby('owns_car').count().orderBy("count", ascending=False).first()[0],
             'migrant_worker':Credit_data_imputed.groupby('migrant_worker').count().orderBy("count", ascending=False).first()[0]}))

Credit_data_imputed.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in Credit_data_imputed.columns]).show()



[Stage 128:>                                                        (0 + 1) / 1]

+-----------+----+---+------+--------+----------+-----------------+---------------+--------------+------------+--------------------+-------------+-----------------------+-------------------+---------------+--------------------+---------------------+---------------------+-------------+
|customer_id|name|age|gender|owns_car|owns_house|net_yearly_income|occupation_type|migrant_worker|credit_limit|credit_limit_used(%)|prev_defaults|default_in_last_6months|credit_card_default|no_of_children_|no_of_days_employed_|total_family_members_|yearly_debt_payments_|credit_score_|
+-----------+----+---+------+--------+----------+-----------------+---------------+--------------+------------+--------------------+-------------+-----------------------+-------------------+---------------+--------------------+---------------------+---------------------+-------------+
|          0|   0|  0|     0|       0|         0|                0|              0|             0|           0|                   0|          


                                                                                

Descriptive data analysis

In [23]:

for column in Credit_data_imputed.columns:
    Credit_data_imputed.describe([column]).show()

+-------+-----------+
|summary|customer_id|
+-------+-----------+
|  count|      45528|
|   mean|       null|
| stddev|       null|
|    min| CST_100002|
|    max| CST_165962|
+-------+-----------+

+-------+------+
|summary|  name|
+-------+------+
|  count| 45528|
|   mean|  null|
| stddev|  null|
|    min|  Axel|
|    max|y Fabi|
+-------+------+

+-------+-----------------+
|summary|              age|
+-------+-----------------+
|  count|            45528|
|   mean| 38.9934106483922|
| stddev|9.543990288960151|
|    min|               23|
|    max|               55|
+-------+-----------------+

+-------+------+
|summary|gender|
+-------+------+
|  count| 45528|
|   mean|  null|
| stddev|  null|
|    min|     F|
|    max|   XNA|
+-------+------+

+-------+--------+
|summary|owns_car|
+-------+--------+
|  count|   45528|
|   mean|    null|
| stddev|    null|
|    min|       N|
|    max|       Y|
+-------+--------+

+-------+----------+
|summary|owns_house|
+-------+----------+
|  co

Droping irrelevant columns

In [25]:
Credit_data_imputed= Credit_data_imputed.withColumn('credit_score_', when(Credit_data_imputed['credit_score_']>900,900).otherwise(Credit_data_imputed['credit_score_']))


Credit_data_imputed=Credit_data_imputed.drop(*['customer_id','name'])

Credit_data_imputed=Credit_data_imputed.where(Credit_data_imputed['gender']!='XNA')


In [34]:

columns=[ "net_yearly_income", "credit_limit", "credit_limit_used(%)", "no_of_days_employed_", "yearly_debt_payments_"]


import pyspark.pandas as ps

df = ps.DataFrame(Credit_data_imputed)

for col in columns:
    df[col].plot.box().show()

In [49]:
Credit_data_imputed = Credit_data_imputed.drop(*['no_of_days_employed_'])

In [55]:
print(Credit_data_clean.columns)

['age', 'gender', 'owns_car', 'owns_house', 'net_yearly_income', 'occupation_type', 'migrant_worker', 'credit_limit', 'credit_limit_used(%)', 'prev_defaults', 'default_in_last_6months', 'credit_card_default', 'no_of_children_', 'total_family_members_', 'yearly_debt_payments_', 'credit_score_']


In [53]:

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col


columns=[ "net_yearly_income", "credit_limit", "credit_limit_used(%)", "yearly_debt_payments_"]

for c in columns:
    df_stats = Credit_data_imputed.select(
                _mean(col(c)).alias('mean'),
                _stddev(col(c)).alias('std')
            ).collect()

    mean = df_stats[0]['mean']
    std = df_stats[0]['std']

    Credit_data_clean = Credit_data_imputed.withColumn(c, (when(Credit_data_imputed[c]>(mean+(3*std)),mean).otherwise(Credit_data_imputed[c])))


In [57]:
df.corr()


                                                                                

Unnamed: 0,age,net_yearly_income,migrant_worker,credit_limit,credit_limit_used(%),prev_defaults,default_in_last_6months,credit_card_default,no_of_children_,total_family_members_,yearly_debt_payments_,credit_score_
age,1.0,0.004079,-0.005562,0.004468,-0.005517,0.0014,-0.001399,-0.000974,-0.008406,-0.010704,-0.001782,0.000913
net_yearly_income,0.004079,1.0,0.001501,0.993378,0.002696,-0.004696,0.015092,0.011508,0.009006,0.010442,0.069838,-0.010003
migrant_worker,-0.005562,0.001501,1.0,-9.4e-05,0.010348,0.029217,0.030001,0.034015,0.070531,0.080699,0.020437,-0.013764
credit_limit,0.004468,0.993378,-9.4e-05,1.0,0.00311,-0.004301,0.015759,0.012251,0.009421,0.010255,0.067104,-0.010086
credit_limit_used(%),-0.005517,0.002696,0.010348,0.00311,1.0,0.252504,0.253683,0.32664,0.006804,0.001994,-0.002113,-0.180382
prev_defaults,0.0014,-0.004696,0.029217,-0.004301,0.252504,1.0,0.811352,0.771703,0.019043,0.01057,-0.005726,-0.487702
default_in_last_6months,-0.001399,0.015092,0.030001,0.015759,0.253683,0.811352,1.0,0.776077,0.021394,0.013433,-0.004647,-0.465804
credit_card_default,-0.000974,0.011508,0.034015,0.012251,0.32664,0.771703,0.776077,1.0,0.023278,0.010754,-0.004251,-0.560656
no_of_children_,-0.008406,0.009006,0.070531,0.009421,0.006804,0.019043,0.021394,0.023278,1.0,0.869956,0.03008,-0.015233
total_family_members_,-0.010704,0.010442,0.080699,0.010255,0.001994,0.01057,0.013433,0.010754,0.869956,1.0,0.08241,-0.011448


In [81]:
columns=['age', 'net_yearly_income', 'migrant_worker', 'credit_limit', 'credit_limit_used(%)', 'prev_defaults', 'default_in_last_6months', 'no_of_children_', 'total_family_members_', 'yearly_debt_payments_', 'credit_score_']
corr_col=[]
for i in columns:
    for j in columns:
        if i!= j:
            if Credit_data_clean.stat.corr(i,j) >0.9:
                corr_col.append((i,j))
            

In [82]:
corr_col

[('net_yearly_income', 'credit_limit'), ('credit_limit', 'net_yearly_income')]

In [84]:
Credit_data_clean=Credit_data_clean.drop(*['net_yearly_income'])

In [85]:

Credit_data_clean.write.option("header", "true").mode('overwrite').csv('hdfs://localhost:9000/data_with_no_outliers')
