In [20]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import isnan, when, count, col, lit, udf
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd 
import numpy as np 
import re
import scipy.stats as stats
from itertools import combinations

spark = SparkSession.builder.appName("CSVLoad").getOrCreate()

## Prepare Schema of data and read CSV

In [2]:
def format_schema_expression(path,delim=',',samp_nrows=100):
    int_types = [np.int64,int]
    float_types = [np.float64,float]

    df = pd.read_csv(path,delimiter=delim,nrows=samp_nrows)
    cols_og = [i for i in df.columns.values]
    cols_edited = [re.sub('[^a-zA-Z0-9_.]', '',i).replace('.','_') for i in cols_og]

    schema_list = []
    col_list = []
    for col in cols_og:
        if df[col].dtype in int_types:
            schema_str = str(col)+' int'
        elif df[col].dtype in float_types:
            schema_str = str(col)+' double'
        else:
            schema_str = str(col)+' string'
        col_str = str(col) +' as '+re.sub('[^a-zA-Z0-9_.]', '',col).replace('.','_') 
        schema_list.append(schema_str)
        col_list.append(col_str)
    schema_statement = ','.join(schema_list)
    col_convert_dict = dict(list(zip(cols_og,cols_edited)))

    return col_convert_dict,schema_statement

cols_dict,schema = format_schema_expression('bank-additional-full.csv',delim=';')

In [3]:
print(cols_dict)

{'age': 'age', 'job': 'job', 'marital': 'marital', 'education': 'education', 'default': 'default', 'housing': 'housing', 'loan': 'loan', 'contact': 'contact', 'month': 'month', 'day_of_week': 'day_of_week', 'duration': 'duration', 'campaign': 'campaign', 'pdays': 'pdays', 'previous': 'previous', 'poutcome': 'poutcome', 'emp.var.rate': 'emp_var_rate', 'cons.price.idx': 'cons_price_idx', 'cons.conf.idx': 'cons_conf_idx', 'euribor3m': 'euribor3m', 'nr.employed': 'nr_employed', 'y': 'y'}


## Reading CSV

In [4]:
df_bank_full=spark.read.csv('bank-additional-full.csv',
                            sep = ';',
                            header=True,
                            inferSchema=True)
for key,val in cols_dict.items():
    df_bank_full = df_bank_full.withColumnRenamed(key,val)



In [5]:
df_bank_full.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- emp_var_rate: double (nullable = true)
 |-- cons_price_idx: double (nullable = true)
 |-- cons_conf_idx: double (nullable = true)
 |-- euribor3m: double (nullable = true)
 |-- nr_employed: double (nullable = true)
 |-- y: string (nullable = true)



## Data Sanity Check

In [6]:
## 1) row count: 41188

total_rows = df_bank_full.count()
print(f"total_rows:{total_rows}")

## 2) check null values: no rows with null values
# |age|job|marital|education|default|housing|loan|contact|month|day_of_week|duration|campaign|pdays|previous|poutcome|emp_var_rate|cons_price_idx|cons_conf_idx|euribor3m|nr_employed|  y|
# +---+---+-------+---------+-------+-------+----+-------+-----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+
# |  0|  0|      0|        0|      0|      0|   0|      0|    0|          0|       0|       0|    0|       0|       0|           0|             0|            0|        0|          0|  0|
# +---+---+-------+---------+-------+-------+----+-------+-----+-----------+--------+--------+-----+--------+--------+------------+--------------+-------------+---------+-----------+---+
df_bank_full.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_bank_full.columns]).show()


## 3) check continuous variables
df_bank_full\
    .select(F.percentile_approx('nr_employed', [0.0,0.01,0.05,0.1,0.25, 0.5, 0.75,0.9,0.95,0.99,1.0], 1000000).alias('quantiles')).collect()[0].quantiles
#----------------------------------------------------------------------------------
# percentile [0,1,5,10,25,50,75,90,95,99,100]
#
# A. customer demographics
# age: [17, 23, 26, 28, 32, 38, 47, 55, 58, 71, 98]
#
# B. behavioral history prior to this campaign
# pdays: [0, 3, 999, 999, 999, 999, 999, 999, 999, 999, 999]
# previous: [0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 7]
#
# C. respose to this campaign
# duration: [0, 11, 36, 59, 102, 180, 319, 551, 753, 1272, 4918]
# campaign: [1, 1, 1, 1, 1, 2, 3, 5, 7, 14, 56]
#
# D. economics indicators
# emp_var_rate: [-3.4, -3.4, -2.9, -1.8, -1.8, 1.1, 1.4, 1.4, 1.4, 1.4, 1.4]
# cons_price_idx: [92.201,92.201,92.713,92.893,93.075,93.749,93.994,94.465,94.465,94.465,94.767]
# cons_conf_idx: [-50.8, -49.5, -47.1, -46.2, -42.7, -41.8, -36.4, -36.1, -33.6, -26.9, -26.9]
# euribor3m: [0.634, 0.655, 0.797, 1.046, 1.344, 4.857, 4.961, 4.964, 4.966, 4.968, 5.045]
# nr_employed: [4963.6,4963.6,5017.5,5076.2,5099.1,5191.0,5228.1,5228.1,5228.1,5228.1,5228.1]
#
#----------------------------------------------------------------------------------

## 4) check discreet/categorical variations
df_bank_full.groupBy('y').count()\
        .withColumn('percentage',F.round(F.round(col('count')/total_rows,2)*100,2))\
            .sort(F.desc('percentage')).show()
#
# A. customer demographics
# job
# +-------------+-----+----------+
# |          job|count|percentage|
# +-------------+-----+----------+
# |       admin.|10422|      25.0|
# |  blue-collar| 9254|      22.0|
# |   technician| 6743|      16.0|
# |     services| 3969|      10.0|
# |   management| 2924|       7.0|
# |      retired| 1720|       4.0|
# | entrepreneur| 1456|       4.0|
# |self-employed| 1421|       3.0|
# |    housemaid| 1060|       3.0|
# |      student|  875|       2.0|
# |   unemployed| 1014|       2.0|
# |      unknown|  330|       1.0|
# +-------------+-----+----------+
#
# marital
# +--------+-----+----------+
# | marital|count|percentage|
# +--------+-----+----------+
# | married|24928|      61.0|
# |  single|11568|      28.0|
# |divorced| 4612|      11.0|
# | unknown|   80|       0.0|
# +--------+-----+----------+
#
# education
# +-------------------+-----+----------+
# |          education|count|percentage|
# +-------------------+-----+----------+
# |  university.degree|12168|      30.0|
# |        high.school| 9515|      23.0|
# |           basic.9y| 6045|      15.0|
# |professional.course| 5243|      13.0|
# |           basic.4y| 4176|      10.0|
# |           basic.6y| 2292|       6.0|
# |            unknown| 1731|       4.0|
# |         illiterate|   18|       0.0|
# +-------------------+-----+----------+

# B. behavioral history prior to this campaign
# default
# +-------+-----+----------+
# |default|count|percentage|
# +-------+-----+----------+
# |     no|32588|      79.0|
# |unknown| 8597|      21.0|
# |    yes|    3|       0.0|
# +-------+-----+----------+
#
# housing
# +-------+-----+----------+
# |housing|count|percentage|
# +-------+-----+----------+
# |    yes|21576|      52.0|
# |     no|18622|      45.0|
# |unknown|  990|       2.0|
# +-------+-----+----------+
#
# loan
# +-------+-----+----------+
# |   loan|count|percentage|
# +-------+-----+----------+
# |     no|33950|      82.0|
# |    yes| 6248|      15.0|
# |unknown|  990|       2.0|
# +-------+-----+----------+
#
# contact
# +---------+-----+----------+
# |  contact|count|percentage|
# +---------+-----+----------+
# | cellular|26144|      63.0|
# |telephone|15044|      37.0|
# +---------+-----+----------+
#
# month 
# +-----+-----+----------+
# |month|count|percentage|
# +-----+-----+----------+
# |  may|13769|      33.0|
# |  jul| 7174|      17.0|
# |  aug| 6178|      15.0|
# |  jun| 5318|      13.0|
# |  nov| 4101|      10.0|
# |  apr| 2632|       6.0|
# |  oct|  718|       2.0|
# |  mar|  546|       1.0|
# |  sep|  570|       1.0|
# |  dec|  182|       0.0|
# +-----+-----+----------+
#
# day_of_week
# +-----------+-----+----------+
# |day_of_week|count|percentage|
# +-----------+-----+----------+
# |        mon| 8514|      21.0|
# |        thu| 8623|      21.0|
# |        tue| 8090|      20.0|
# |        wed| 8134|      20.0|
# |        fri| 7827|      19.0|
# +-----------+-----+----------+
#
# poutcome
# +-----------+-----+----------+
# |   poutcome|count|percentage|
# +-----------+-----+----------+
# |nonexistent|35563|      86.0|
# |    failure| 4252|      10.0|
# |    success| 1373|       3.0|
# +-----------+-----+----------+
#
# C. respose to this campaign
# y
# +---+-----+----------+
# |  y|count|percentage|
# +---+-----+----------+
# | no|36548|      89.0|
# |yes| 4640|      11.0|
# +---+-----+----------+

#----------------------------------------------------------------------------------


total_rows:41188




+---+-----+----------+
|  y|count|percentage|
+---+-----+----------+
| no|36548|      89.0|
|yes| 4640|      11.0|
+---+-----+----------+





## Data Cleansing and Imputation

In [7]:
## continuous variables
#----------------------------------------------------------------------------------
# pdays: [0, 3, 999, 999, 999, 999, 999, 999, 999, 999, 999] ->
# valid 'pdays' recency represent less than 5%, hence not mneaningful. Can use 'previous' instead
# previous: [0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 7]

def imputePrevious(x):
    '''
    return 0 for missing data
    csp max at 5
    '''
    if x == None:
        return 0
    elif x<=5:
        return x
    else:
        return 5

impute_previous = udf(imputePrevious,IntegerType())

# C. respose to this campaign
# duration: [0, 11, 36, 59, 102, 180, 319, 551, 753, 1272, 4918] ->
# cannot be known prior to this campaign, hence need to drop
#
# campaign: [1, 1, 1, 1, 1, 2, 3, 5, 7, 14, 56] -> substract 1 (this campaign attempt)
#
def imputeCampaign(x):
    '''
    subtract this campaign's attempt (assume 1 attempt)
    return 0 for missing data
    csp max at 15
    '''
    if x == None:
        return 0
    elif x-1<=15:
        return x-1
    else:
        return 15

impute_campaign = udf(imputeCampaign,IntegerType())

#----------------------------------------------------------------------------------

df_bank_impute = df_bank_full.withColumn('previous',impute_previous(col('previous')))\
    .withColumn('campaign',impute_campaign(col('campaign')))\
        .drop('pdays','duration')

In [8]:
## impute education variables by mode of each age_job cohort
#
#
# df = df_bank_impute.select('education','y').toPandas()
# data_crosstab = pd.crosstab(df['education'],
#                             df['y'],
#                            margins=True, margins_name='total')
# data_crosstab

# y	no	yes	total
# unknown	1480	251	1731
# total	36548	4640	41188
# cannot drop 'unknown' as % yes case are high in this category
# try using age_job to impute

def ageGroup(x):
    ''' 
    bin age into broad group
    a.student_age : <23
    b.working_age : 23-60
    c.retired_age : >60
    if missing then impute with working_age
    '''
    if x == None:
        return 'b.working_age'
    elif x<23:
        return 'a.student_age'
    elif x<=60:
        return 'b.working_age'
    else:
        return 'c.retired_age'

age_group = udf(ageGroup,StringType())
age_job = udf(lambda x: x[0]+'_'+x[1],StringType())

df_bank_impute = df_bank_impute\
    .withColumn('age_job',age_job(F.array(age_group(col('age')),col('job'))))

total_valid_edu = df_bank_impute.filter(col('education')!='unknown').count()

edu_impute_dict =df_bank_impute.filter(col('education')!='unknown')\
        .select('age_job','education')\
            .groupBy('age_job','education').count()\
                .withColumn('percentage',F.round(F.round(col('count')/total_valid_edu,4)*100,2))\
                    .sort(F.asc('age_job'),F.desc('percentage')).toPandas()\
                        .drop_duplicates(subset='age_job',keep='first')\
                            .set_index('age_job').to_dict()['education']

impute_edu = udf(lambda x: edu_impute_dict[x[1]] if ((x[0]=='unknown') and
 (x[1] in edu_impute_dict.keys())) else x[0],StringType())

df_bank_impute = df_bank_impute\
    .withColumn('education',impute_edu(F.array(col('education'),col('age_job'))))

df = df_bank_impute.select('education','y').toPandas()
data_crosstab = pd.crosstab(df['education'],
                            df['y'],
                           margins=True, margins_name='total')
data_crosstab

# y	no	yes	total
# education			
# basic.4y	3967	484	4451
# basic.6y	2104	188	2292
# basic.9y	5999	496	6495
# high.school	8724	1109	9833
# illiterate	14	4	18
# professional.course	4835	620	5455
# university.degree	10905	1739	12644
# total	36548	4640	41188




y,no,yes,total
education,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
basic.4y,3967,484,4451
basic.6y,2104,188,2292
basic.9y,5999,496,6495
high.school,8724,1109,9833
illiterate,14,4,18
professional.course,4835,620,5455
university.degree,10905,1739,12644
total,36548,4640,41188


In [9]:
## impute job variables by mode of each age_edu cohort
#
# df = df_bank_impute.select('job','y').toPandas()
# data_crosstab = pd.crosstab(df['job'],
#                             df['y'],
#                            margins=True, margins_name='total')
# data_crosstab

# y	no	yes	total
# unknown	293	37	330
# total	36548	4640	41188
# cannot drop 'unknown' as % yes case are high in this category
# try using age_education to impute

age_edu = udf(lambda x: x[0]+'_'+x[1],StringType())

df_bank_impute = df_bank_impute\
    .withColumn('age_edu',age_job(F.array(age_group(col('age')),col('education'))))

total_valid_job = df_bank_impute.filter(col('job')!='unknown').count()

job_impute_dict =df_bank_impute.filter(col('job')!='unknown')\
        .select('age_edu','job')\
            .groupBy('age_edu','job').count()\
                .withColumn('percentage',F.round(F.round(col('count')/total_valid_job,4)*100,2))\
                    .sort(F.asc('age_edu'),F.desc('percentage')).toPandas()\
                        .drop_duplicates(subset='age_edu',keep='first')\
                            .set_index('age_edu').to_dict()['job']

impute_job = udf(lambda x: job_impute_dict[x[1]] if ((x[0]=='unknown') and
 (x[1] in job_impute_dict.keys())) else x[0],StringType())

df_bank_impute = df_bank_impute\
    .withColumn('job',impute_job(F.array(col('job'),col('age_edu'))))

df = df_bank_impute.select('job','y').toPandas()
data_crosstab = pd.crosstab(df['job'],
                            df['y'],
                           margins=True, margins_name='total')
data_crosstab

# y	no	yes	total
# job			
# admin.	9138	1365	10503
# blue-collar	8819	653	9472
# entrepreneur	1332	124	1456
# housemaid	954	106	1060
# management	2596	328	2924
# retired	1299	442	1741
# self-employed	1272	149	1421
# services	3646	323	3969
# student	600	275	875
# technician	6022	731	6753
# unemployed	870	144	1014
# total	36548	4640	41188



y,no,yes,total
job,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
admin.,9138,1365,10503
blue-collar,8819,653,9472
entrepreneur,1332,124,1456
housemaid,954,106,1060
management,2596,328,2924
retired,1299,442,1741
self-employed,1272,149,1421
services,3646,323,3969
student,600,275,875
technician,6022,731,6753


In [10]:
## impute marital variables by mode of each age_job cohort
#
# df = df_bank_impute.select('marital','y').toPandas()
# data_crosstab = pd.crosstab(df['marital'],
#                             df['y'],
#                            margins=True, margins_name='total')
# data_crosstab

# y	no	yes	total
# unknown	1480	251	1731
# total	36548	4640	41188
# cannot drop 'unknown' as % yes case are high in this category
# try using age_job to impute

total_valid_marital = df_bank_impute.filter(col('marital')!='unknown').count()

marital_impute_dict =df_bank_impute.filter(col('marital')!='unknown')\
        .select('age_job','marital')\
            .groupBy('age_job','marital').count()\
                .withColumn('percentage',F.round(F.round(col('count')/total_valid_marital,4)*100,2))\
                    .sort(F.asc('age_job'),F.desc('percentage')).toPandas()\
                        .drop_duplicates(subset='age_job',keep='first')\
                            .set_index('age_job').to_dict()['marital']

impute_marital = udf(lambda x: marital_impute_dict[x[1]] if ((x[0]=='unknown') and
 (x[1] in marital_impute_dict.keys())) else x[0],StringType())

df_bank_impute = df_bank_impute\
    .withColumn('marital',impute_marital(F.array(col('marital'),col('age_job'))))

df = df_bank_impute.select('marital','y').toPandas()
data_crosstab = pd.crosstab(df['marital'],
                            df['y'],
                           margins=True, margins_name='total')
data_crosstab

# y	no	yes	total
# marital			
# divorced	4136	476	4612
# married	22463	2544	25007
# single	9949	1620	11569
# total	36548	4640	41188




y,no,yes,total
marital,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
divorced,4136,476,4612
married,22463,2544,25007
single,9949,1620,11569
total,36548,4640,41188


In [11]:
### check'housing' chi-square

## with 'unknown': 
# chisquare-score is: 5.684494866173687  and p value is: 0.00011457832210592933
# Null Hypothesis is rejected.

## without 'unknown': 
# chisquare-score is: 5.467477030849473  and p value is: 3.957375316998579e-06
# Null Hypothesis is rejected.

# Conclusion: Impute with best guess from each age_job cohort

df = df_bank_impute\
    .filter(col('housing')!='unknown')\
        .select('housing','y').toPandas()
        
data_crosstab = pd.crosstab(df['housing'],
                            df['y'],
                           margins=True, margins_name='total')
print(data_crosstab)

# significance level
alpha = 0.05

# Calcualtion of Chisquare test statistics
chi_square = 0
rows = df['housing'].unique()
columns = df['y'].unique()
for i in columns:
    for j in rows:
        O = data_crosstab[i][j]
        E = round(data_crosstab[i]['total'] * data_crosstab['total'][j] / data_crosstab['total']['total'],4)
        chi_square += (O-E)**2/E

# The p-value approach
print("Approach 1: The p-value approach to hypothesis testing in the decision rule")
p_value = 1 - stats.norm.cdf(chi_square, (len(rows)-1)*(len(columns)-1))
conclusion = "Failed to reject the null hypothesis."
if p_value <= alpha:
    conclusion = "Null Hypothesis is rejected."
        
print("chisquare-score is:", chi_square, " and p value is:", p_value)
print(conclusion)

y           no   yes  total
housing                    
no       16596  2026  18622
yes      19069  2507  21576
total    35665  4533  40198
Approach 1: The p-value approach to hypothesis testing in the decision rule
chisquare-score is: 5.467477030849473  and p value is: 3.957375316998579e-06
Null Hypothesis is rejected.


In [12]:
## impute housing variables by mode of each age_job cohort

total_valid_housing = df_bank_impute.filter(col('housing')!='unknown').count()

housing_impute_dict =df_bank_impute.filter(col('housing')!='unknown')\
        .select('age_job','housing')\
            .groupBy('age_job','housing').count()\
                .withColumn('percentage',F.round(F.round(col('count')/total_valid_housing,4)*100,2))\
                    .sort(F.asc('age_job'),F.desc('percentage')).toPandas()\
                        .drop_duplicates(subset='age_job',keep='first')\
                            .set_index('age_job').to_dict()['housing']

impute_housing = udf(lambda x: housing_impute_dict[x[1]] if ((x[0]=='unknown') and
 (x[1] in housing_impute_dict.keys())) else x[0],StringType())

df_bank_impute = df_bank_impute\
    .withColumn('housing',impute_housing(F.array(col('housing'),col('age_job'))))

df = df_bank_impute.select('housing','y').toPandas()
data_crosstab = pd.crosstab(df['housing'],
                            df['y'],
                           margins=True, margins_name='total')
data_crosstab

# y	no	yes	total
# housing			
# no	16596	2027	18623
# yes	19952	2613	22565
# total	36548	4640	41188




y,no,yes,total
housing,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
no,16596,2027,18623
yes,19952,2613,22565
total,36548,4640,41188


In [13]:
### check'loan' chi-square

## with 'unknown': 
# chisquare-score is: 1.0940276122309665  and p value is: 0.817524766180606
# Failed to reject the null hypothesis.

## without 'unknown': 
# chisquare-score is: 0.8810165452595269  and p value is: 0.5473557680695367
# Failed to reject the null hypothesis.

# Conclusion: no need to impute. Can drop this feature

df = df_bank_impute\
    .filter(col('loan')!='unknown')\
        .select('loan','y').toPandas()
    
data_crosstab = pd.crosstab(df['loan'],
                            df['y'],
                           margins=True, margins_name='total')
print(data_crosstab)

# significance level
alpha = 0.05

# Calcualtion of Chisquare test statistics
chi_square = 0
rows = df['loan'].unique()
columns = df['y'].unique()
for i in columns:
    for j in rows:
        O = data_crosstab[i][j]
        E = round(data_crosstab[i]['total'] * data_crosstab['total'][j] / data_crosstab['total']['total'],4)
        chi_square += (O-E)**2/E

# The p-value approach
print("Approach 1: The p-value approach to hypothesis testing in the decision rule")
p_value = 1 - stats.norm.cdf(chi_square, (len(rows)-1)*(len(columns)-1))
conclusion = "Failed to reject the null hypothesis."
if p_value <= alpha:
    conclusion = "Null Hypothesis is rejected."
        
print("chisquare-score is:", chi_square, " and p value is:", p_value)
print(conclusion)

y         no   yes  total
loan                     
no     30100  3850  33950
yes     5565   683   6248
total  35665  4533  40198
Approach 1: The p-value approach to hypothesis testing in the decision rule
chisquare-score is: 0.8810165452595269  and p value is: 0.5473557680695367
Failed to reject the null hypothesis.


In [14]:
### 'month' and 'day_of_week' has no meaningful implications as 
### they cannot lead to actionable insights
### advices regarding which month of day_of_week to solicit product to customers
### are not practical

## drop 'loan','age_job','age_edu','month','day_of_week'

df_bank_impute = df_bank_impute\
    .drop('loan','age_job','age_edu','month','day_of_week')

In [16]:
# put 'yes' default under 'unknown'
# too small observations to have its own category
# +-------+-----+----------+
# |default|count|percentage|
# +-------+-----+----------+
# |     no|32588|      79.0|
# |unknown| 8597|      21.0|
# |    yes|    3|       0.0|
# +-------+-----+----------+

impute_default = udf(lambda x: 'unknown' if x=='yes' else x,StringType())

df_bank_impute = df_bank_impute\
    .withColumn('default',impute_default(col('default')))

In [17]:
### check'poutcome' chi-square

## with 'nonexistence': 
# chisquare-score is: 4230.523060237804  and p value is: 0.0
# Null Hypothesis is rejected.

## 'nonexistence' grouped to 'success': 
# chisquare-score is: 41.64765927513662  and p value is: 0.0
# Null Hypothesis is rejected.

## 'nonexistence' grouped to 'failure': 
# chisquare-score is: 4119.883422967389  and p value is: 0.0
# Null Hypothesis is rejected.

# Conclusion: no need to collapse variation. Current variation gives superior chi-square

collapse = udf(lambda x: 'failure' if x=='nonexistent' else x, StringType())
df = df_bank_impute\
    .withColumn('poutcome',collapse(col('poutcome')))\
        .select('poutcome','y').toPandas()
    
data_crosstab = pd.crosstab(df['poutcome'],
                            df['y'],
                           margins=True, margins_name='total')
print(data_crosstab)

# significance level
alpha = 0.05

# Calcualtion of Chisquare test statistics
chi_square = 0
rows = df['poutcome'].unique()
columns = df['y'].unique()
for i in columns:
    for j in rows:
        O = data_crosstab[i][j]
        E = round(data_crosstab[i]['total'] * data_crosstab['total'][j] / data_crosstab['total']['total'],4)
        chi_square += (O-E)**2/E

# The p-value approach
print("Approach 1: The p-value approach to hypothesis testing in the decision rule")
p_value = 1 - stats.norm.cdf(chi_square, (len(rows)-1)*(len(columns)-1))
conclusion = "Failed to reject the null hypothesis."
if p_value <= alpha:
    conclusion = "Null Hypothesis is rejected."
        
print("chisquare-score is:", chi_square, " and p value is:", p_value)
print(conclusion)

y            no   yes  total
poutcome                    
failure   36069  3746  39815
success     479   894   1373
total     36548  4640  41188
Approach 1: The p-value approach to hypothesis testing in the decision rule
chisquare-score is: 4119.883422967389  and p value is: 0.0
Null Hypothesis is rejected.


## correlation: 
#### Drop >80% correlated features
#### Keep ones with highest correlation to target variable
#### In Tree-based ensemble, we can keep correlated features but let's drop it here, 
#### giving options to use LogisticRegression or other formula-based classifier

In [44]:
# previous: [0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 7]
#
# C. respose to this campaign
# duration: [0, 11, 36, 59, 102, 180, 319, 551, 753, 1272, 4918]
# campaign: [1, 1, 1, 1, 1, 2, 3, 5, 7, 14, 56]
#
# D. economics indicators
# emp_var_rate: [-3.4, -3.4, -2.9, -1.8, -1.8, 1.1, 1.4, 1.4, 1.4, 1.4, 1.4]
# cons_price_idx: [92.201,92.201,92.713,92.893,93.075,93.749,93.994,94.465,94.465,94.465,94.767]
# cons_conf_idx: [-50.8, -49.5, -47.1, -46.2, -42.7, -41.8, -36.4, -36.1, -33.6, -26.9, -26.9]
# euribor3m: [0.634, 0.655, 0.797, 1.046, 1.344, 4.857, 4.961, 4.964, 4.966, 4.968, 5.045]
# nr_employed: [4963.6,4963.6,5017.5,5076.2,5099.1,5191.0,5228.1,5228.1,5228.1,5228.1,5228.1]
#
continuous_feats_list = ['previous','campaign','emp_var_rate','cons_price_idx',
                         'cons_conf_idx','euribor3m','nr_employed']

combo = list(combinations(continuous_feats_list,2))
target_combo = [(i,'y_int') for i in continuous_feats_list]
combined_combo = combo+target_combo

def get_corr(df_pyspark,combo,threshold=0.7):
    '''get correlation of all pairs'''
    y_int_func = udf(lambda x: 1 if x=='yes' else 0,IntegerType())
    df = df_pyspark.withColumn('y_int',y_int_func(col('y'))).toPandas()

    df_corr = pd.DataFrame(columns=['col1','col2','corr','abs_corr','to_compare'])
    for idx,(i,j) in enumerate(combo):
        if abs(np.corrcoef(df[i], df[j])[0, 1]) > threshold:
            to_compare = 1
        else:
            to_compare = 0
        df_corr.loc[idx] = [i,j,np.corrcoef(df[i], df[j])[0, 1],abs(np.corrcoef(df[i], df[j])[0, 1]),to_compare]

    return df_corr

### Assign 80% threshold

df_corr = get_corr(df_bank_impute,combo=combined_combo,threshold=0.8)

y_int_corr_dict = df_corr.iloc[np.where(df_corr['col2']=='y_int')][['col1','abs_corr']].set_index('col1').to_dict()['abs_corr']


conds = (df_corr['to_compare']==1)
df_corr_to_compare = df_corr.iloc[np.where(conds)]

initial_feats_to_compare = [i for i in df_corr_to_compare['col1'].unique()] +\
    [i for i in df_corr_to_compare['col2'].unique() if i not in df_corr_to_compare['col1'].unique()]
feats_to_compare = initial_feats_to_compare
latest_rows_to_compare = int(df_corr_to_compare.shape[0])
rows_to_compare = latest_rows_to_compare-1
while rows_to_compare < latest_rows_to_compare:
    latest_rows_to_compare = rows_to_compare
    print(f"feats_to_compare: {feats_to_compare}")
    conds_compare = (df_corr_to_compare['col1'].isin(feats_to_compare))|(df_corr_to_compare['col2'].isin(feats_to_compare))
    df_compare = df_corr_to_compare.iloc[np.where(conds_compare)]
    df_compare['winning_feat'] = df_compare.apply(lambda row: row['col1'] if y_int_corr_dict[row['col1']]>y_int_corr_dict[row['col2']] else row['col2'],axis=1)
    feats_to_compare = [i for i in df_compare['winning_feat'].unique()]
    rows_to_compare = int(df_corr_to_compare.iloc[np.where(conds_compare)].shape[0])

final_conds = (df_compare['col1'].isin(feats_to_compare))&(df_compare['col2'].isin(feats_to_compare))
winning_feats  = [i for i in df_compare.iloc[np.where(final_conds)]['winning_feat'].unique()]
cols_to_drop = [i for i in initial_feats_to_compare if i not in winning_feats]
print(f"winning_feats:{winning_feats}")
print(f"cols_to_drop:{cols_to_drop}")


### Drop correlated vars

df_bank_impute = df_bank_impute.drop(*cols_to_drop)
df_bank_impute.printSchema()



feats_to_compare: ['emp_var_rate', 'euribor3m', 'nr_employed']
winning_feats:['nr_employed']
cols_to_drop:['emp_var_rate', 'euribor3m']
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- cons_price_idx: double (nullable = true)
 |-- cons_conf_idx: double (nullable = true)
 |-- nr_employed: double (nullable = true)
 |-- y: string (nullable = true)



## Save as parquet

In [45]:

df_bank_impute.write.format('parquet').saveAsTable('bank_impute_clean')

