In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import * 
import pyspark.sql.functions as F
from pyspark.sql.functions import col, asc,desc
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from pyspark.sql import SQLContext
from pyspark.mllib.stat import Statistics
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix

spark=SparkSession.builder \
.master ("local[*]")\
.appName("part3")\
.getOrCreate()

In [3]:
sc=spark.sparkContext
sqlContext=SQLContext(sc)



In [18]:
#read file
df=spark.read \
 .option("header","True")\
 .option("inferSchema","True")\
 .option("sep",";")\
 .csv("C:\\Users\\sucha\\Desktop\\pyspark\\mini project\\XYZ_Bank_Deposit_Data_Classification.csv")
print("There are",df.count(),"rows",len(df2.columns),
      "columns" ,"in the data.") 

There are 41188 rows 22 columns in the data.


In [19]:
def udf_multiple(age):
    if (age <= 25):
        return 'Under 25'
    elif (age >= 25 and age <= 35):
        return 'Between 25 and 35'
    elif (age > 35 and age < 50):
        return 'Between 36 and 49'
    elif (age >= 50):
        return 'Over 50'
    else: return 'N/A'

education_udf = udf(udf_multiple)
df=df.withColumn("Age_udf", education_udf('Age'))

In [20]:
df = df.withColumnRenamed("emp.var.rate", "employment_variation_rate")\
       .withColumnRenamed("Cons.price.idx", "consumer_price_index")\
       .withColumnRenamed("Cons.conf.idx", "consumer_confidence_index")\
       .withColumnRenamed("euribor3m", "euribor_3_monthrate")\
       .withColumnRenamed("nr.employed", "number_of_employees")\
       .withColumnRenamed("y", "Outcome")



In [21]:

encoding_var = [i[0] for i in df.dtypes if (i[1]=='string') & (i[0]!='Outcome')]
num_var = [i[0] for i in df.dtypes if ((i[1]=='int') | (i[1]=='double')) & (i[0]!='Outcome')]

string_indexes = [StringIndexer(inputCol = c, outputCol = 'IDX_' + c, handleInvalid = 'keep') for c in encoding_var]
onehot_indexes = [OneHotEncoder(inputCols = ['IDX_' + c], outputCols = ['OHE_' + c]) for c in encoding_var]
label_indexes = StringIndexer(inputCol = 'Outcome', outputCol = 'label', handleInvalid = 'keep')
assembler = VectorAssembler(inputCols = num_var + ['OHE_' + c for c in encoding_var], outputCol = "features")


In [22]:
from pyspark.sql.functions import *
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorSlicer

rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed = 8464,
                            numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)

pipe = Pipeline(stages = string_indexes + onehot_indexes + [assembler, label_indexes, rf])


In [23]:
mod = pipe.fit(df)


In [24]:
df2 = mod.transform(df)


In [25]:
mod.stages[-1].featureImportances


SparseVector(67, {0: 0.0062, 1: 0.292, 2: 0.0019, 3: 0.0285, 4: 0.0255, 5: 0.1013, 6: 0.0104, 7: 0.045, 8: 0.0629, 9: 0.2053, 10: 0.0003, 11: 0.0009, 12: 0.0006, 14: 0.0001, 15: 0.0002, 19: 0.0002, 20: 0.0003, 21: 0.0007, 23: 0.0001, 24: 0.0001, 26: 0.0002, 28: 0.0003, 30: 0.0002, 31: 0.0002, 34: 0.0005, 35: 0.0003, 37: 0.0003, 38: 0.0001, 39: 0.0003, 42: 0.0001, 43: 0.0297, 44: 0.0026, 45: 0.0015, 46: 0.0014, 51: 0.0069, 52: 0.0058, 55: 0.0003, 56: 0.0009, 59: 0.0002, 60: 0.0091, 61: 0.0009, 62: 0.1547, 63: 0.0002, 64: 0.0002, 65: 0.0002, 66: 0.0003})

In [26]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))


In [27]:
ExtractFeatureImp(mod.stages[-1].featureImportances, df2, "features").head(10)


Unnamed: 0,idx,name,score
1,1,duration,0.292015
9,9,number_of_employees,0.205271
62,62,OHE_poutcome_success,0.154734
5,5,employment_variation_rate,0.101274
8,8,euribor_3_monthrate,0.06293
7,7,consumer_confidence_index,0.045009
43,43,OHE_contact_cellular,0.029651
3,3,pdays,0.028486
4,4,previous,0.025479
6,6,consumer_price_index,0.010437
