In [1]:
import math
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [2]:
spSession = SparkSession.builder.master("local").appName("credit-classifier").getOrCreate()

In [3]:
bankRDD = sc.textFile("data/bank.csv")

In [4]:
bankRDD.cache()

data/bank.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [5]:
bankRDD.count()

542

In [6]:
bankRDD.take(5)

['"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"',
 '30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"',
 '33;"services";"married";"secondary";"no";4789;"yes";"yes";"cellular";11;"may";220;1;339;4;"failure";"yes"',
 '35;"management";"single";"tertiary";"no";1350;"yes";"no";"cellular";16;"apr";185;1;330;1;"failure";"yes"',
 '30;"management";"married";"tertiary";"no";1476;"yes";"yes";"unknown";3;"jun";199;4;-1;0;"unknown";"yes"']

In [7]:
header = bankRDD.first()
bankRDD2 = bankRDD.filter(lambda line: line != header)

In [8]:
bankRDD2.first()

'30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"'

In [9]:
header

'"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"'

In [10]:
i_column= 0
for col in header.replace("\"", "").split(";"):    
    print("{} = lineList[{}]".format(col, i_column))
    i_column = i_column + 1

age = lineList[0]
job = lineList[1]
marital = lineList[2]
education = lineList[3]
default = lineList[4]
balance = lineList[5]
housing = lineList[6]
loan = lineList[7]
contact = lineList[8]
day = lineList[9]
month = lineList[10]
duration = lineList[11]
campaign = lineList[12]
pdays = lineList[13]
previous = lineList[14]
poutcome = lineList[15]
y = lineList[16]


### Limpeza de Dados

In [11]:
bankRDD2.first()

'30;"unemployed";"married";"primary";"no";1787;"no";"no";"cellular";19;"oct";79;1;-1;0;"unknown";"no"'

In [12]:
def transformToNumber(line):
    lineList = line.replace("\"","").split(";")
    
    age = float(lineList[0])
    outcome = 0.0 if lineList[16] == "no" else 1.0
    single = 1.0 if lineList[2] == "single" else 0.0
    married = 1.0 if lineList[2] == "married" else 0.0
    divorced = 1.0 if lineList[2] == "divorced" else 0.0
    primary = 1.0 if lineList[3] == "primary" else 0.0
    secondary = 1.0 if lineList[3] == "secondary" else 0.0
    tertiary = 1.0 if lineList[3] == "tertiary" else 0.0
    default = 1.0 if lineList[4] == "no" else 0.0
    balance = float(lineList[5])
    loan = 0.0 if lineList[7] == "no" else 1.0

    row = Row(age = age, outcome = outcome, single = single,
             married = married, divorced = divorced, primary =primary,
             secondary =secondary, tertiary = tertiary, default = default,
             balance = balance, loan = loan)
    return row

In [13]:
bankRDD3 = bankRDD2.map(transformToNumber)
bankRDD3.collect()[:1]

[Row(age=30.0, balance=1787.0, default=1.0, divorced=0.0, loan=0.0, married=1.0, outcome=0.0, primary=1.0, secondary=0.0, single=0.0, tertiary=0.0)]

### Análise Exploratória de Dados


In [14]:
df = spSession.createDataFrame(bankRDD3)
df.cache()


DataFrame[age: double, balance: double, default: double, divorced: double, loan: double, married: double, outcome: double, primary: double, secondary: double, single: double, tertiary: double]

In [15]:
df.columns

['age',
 'balance',
 'default',
 'divorced',
 'loan',
 'married',
 'outcome',
 'primary',
 'secondary',
 'single',
 'tertiary']

In [16]:
df.describe().show()

+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+
|summary|               age|           balance|           default|           divorced|               loan|           married|            outcome|           primary|         secondary|            single|          tertiary|
+-------+------------------+------------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+
|  count|               541|               541|               541|                541|                541|               541|                541|               541|               541|               541|               541|
|   mean| 41.26987060998152|1444.7818853974122| 0.977818853974122|0.10905730129390019|0.16266173752310537|0.6155

In [17]:
for i in df.columns:   
    if not(isinstance(df.select(i).take(1)[0][0], str)):
        if True or abs(df.stat.corr("outcome", i)) > 0.2:            
            print("* ", i, df.stat.corr("outcome", i))
            print("outcome ", i, df.stat.corr("outcome", i))    

*  age -0.1823210432736525
outcome  age -0.18232104327365253
*  balance 0.036574866119976804
outcome  balance 0.036574866119976804
*  default 0.04536965206737373
outcome  default 0.04536965206737373
*  divorced -0.07812659940926987
outcome  divorced -0.07812659940926987
*  loan -0.030420586112717318
outcome  loan -0.030420586112717318
*  married -0.3753241299133561
outcome  married -0.3753241299133561
*  outcome 1.0
outcome  outcome 1.0
*  primary -0.12561548832677982
outcome  primary -0.12561548832677982
*  secondary 0.026392774894072973
outcome  secondary 0.026392774894072973
*  single 0.46323284934360515
outcome  single 0.46323284934360515
*  tertiary 0.08494840766635618
outcome  tertiary 0.08494840766635618


### Pré-processamento

In [18]:
def transformaVar(row):
    result = (row["outcome"], Vectors.dense([row["age"], 
                                            row["balance"],
                                            row["default"],
                                            row["divorced"],
                                            row["loan"],
                                            row["married"],
                                            row["primary"],
                                            row["secondary"],
                                            row["single"],
                                            row["tertiary"]
                                            ])) 
    return result

In [19]:
bankRDD4 = df.rdd.map(transformaVar)

In [20]:
bankRDD4.take(5)

[(0.0, DenseVector([30.0, 1787.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0])),
 (1.0, DenseVector([33.0, 4789.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0])),
 (1.0, DenseVector([35.0, 1350.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0])),
 (1.0, DenseVector([30.0, 1476.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0])),
 (0.0, DenseVector([59.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0]))]

In [21]:
bankDF = spSession.createDataFrame(bankRDD4, ["label", "features"])
bankDF.cache()
bankDF.select("*").limit(10).show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[30.0,1787.0,1.0,...|
|  1.0|[33.0,4789.0,1.0,...|
|  1.0|[35.0,1350.0,1.0,...|
|  1.0|[30.0,1476.0,1.0,...|
|  0.0|[59.0,0.0,1.0,0.0...|
|  1.0|[35.0,747.0,1.0,0...|
|  1.0|[36.0,307.0,1.0,0...|
|  0.0|[39.0,147.0,1.0,0...|
|  0.0|[41.0,221.0,1.0,0...|
|  1.0|[43.0,-88.0,1.0,0...|
+-----+--------------------+



### Reducao de Dimensionalidade 

In [22]:
bankPCA = PCA(k=3, inputCol = "features", outputCol = "pcaFeatures")
pca_model = bankPCA.fit(bankDF)
pca_resul = pca_model.transform(bankDF);

In [23]:
pca_resul.show()

+-----+--------------------+--------------------+
|label|            features|         pcaFeatures|
+-----+--------------------+--------------------+
|  0.0|[30.0,1787.0,1.0,...|[-1787.0189037502...|
|  1.0|[33.0,4789.0,1.0,...|[-4789.0201836913...|
|  1.0|[35.0,1350.0,1.0,...|[-1350.0222197161...|
|  1.0|[30.0,1476.0,1.0,...|[-1476.0189582713...|
|  0.0|[59.0,0.0,1.0,0.0...|[-0.0378957382717...|
|  1.0|[35.0,747.0,1.0,0...|[-747.02234431639...|
|  1.0|[36.0,307.0,1.0,0...|[-307.02307565516...|
|  0.0|[39.0,147.0,1.0,0...|[-147.02501871466...|
|  0.0|[41.0,221.0,1.0,0...|[-221.02630508778...|
|  1.0|[43.0,-88.0,1.0,0...|[87.9723803239817...|
|  0.0|[39.0,9374.0,1.0,...|[-9374.0231121038...|
|  0.0|[43.0,264.0,1.0,0...|[-264.02756386818...|
|  0.0|[36.0,1109.0,1.0,...|[-1109.0229099347...|
|  1.0|[20.0,502.0,1.0,0...|[-502.01274295620...|
|  1.0|[31.0,360.0,1.0,0...|[-360.01981420801...|
|  0.0|[40.0,194.0,1.0,0...|[-194.02564650020...|
|  0.0|[56.0,4073.0,1.0,...|[-4073.0351271212...|


In [24]:
pca_resul.show(truncate = False)

+-----+---------------------------------------------+------------------------------------------------------------+
|label|features                                     |pcaFeatures                                                 |
+-----+---------------------------------------------+------------------------------------------------------------+
|0.0  |[30.0,1787.0,1.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0]|[-1787.0189037502862,28.861500049502038,0.08072442345904055]|
|1.0  |[33.0,4789.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,0.0]|[-4789.0201836913975,29.921965848088032,0.9991489487199221] |
|1.0  |[35.0,1350.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0]|[-1350.0222197161672,34.100511309713724,-0.8790181194198832]|
|1.0  |[30.0,1476.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,1.0]|[-1476.0189582713608,29.050737205343523,-0.3791477893919143]|
|0.0  |[59.0,0.0,1.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0]   |[-0.03789573827178863,58.98912141176485,0.7452038357764721] |
|1.0  |[35.0,747.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0] |[-747.0223443163976,34.4876

In [25]:
stringIndexer = StringIndexer(inputCol = "label", outputCol = "ind_label")

In [26]:
si_model = stringIndexer.fit(pca_resul)

In [27]:
obj_final = si_model.transform(pca_resul)
obj_final.collect()

[Row(label=0.0, features=DenseVector([30.0, 1787.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0]), pcaFeatures=DenseVector([-1787.0189, 28.8615, 0.0807]), ind_label=0.0),
 Row(label=1.0, features=DenseVector([33.0, 4789.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0]), pcaFeatures=DenseVector([-4789.0202, 29.922, 0.9991]), ind_label=1.0),
 Row(label=1.0, features=DenseVector([35.0, 1350.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0]), pcaFeatures=DenseVector([-1350.0222, 34.1005, -0.879]), ind_label=1.0),
 Row(label=1.0, features=DenseVector([30.0, 1476.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0]), pcaFeatures=DenseVector([-1476.019, 29.0507, -0.3791]), ind_label=1.0),
 Row(label=0.0, features=DenseVector([59.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0]), pcaFeatures=DenseVector([-0.0379, 58.9891, 0.7452]), ind_label=0.0),
 Row(label=1.0, features=DenseVector([35.0, 747.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0]), pcaFeatures=DenseVector([-747.0223, 34.4877, -0.8884]), ind_label=1.0),
 Row(label=1

### Machine Learning

In [28]:
train, test = obj_final.randomSplit([0.7, 0.3])

In [29]:
classifier = RandomForestClassifier(labelCol="ind_label", featuresCol = "pcaFeatures")

In [30]:
model = classifier.fit(train)

In [31]:
predictions = model.transform(test)
predictions.select("prediction", "ind_label", "label", "pcaFeatures").collect()

[Row(prediction=1.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-0.0167, 26.0101, 0.0679])),
 Row(prediction=0.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-543.0166, 25.6506, -0.4595])),
 Row(prediction=0.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([246.9826, 27.1573, 0.9431])),
 Row(prediction=1.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-0.0173, 27.0098, 0.0641])),
 Row(prediction=1.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-444.0185, 28.7127, 0.8655])),
 Row(prediction=1.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-1372.019, 29.0878, -0.8598])),
 Row(prediction=0.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-1808.0189, 28.8365, 0.8831])),
 Row(prediction=1.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-3096.0186, 27.9802, 0.505])),
 Row(prediction=1.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-338.0198, 30.7801, 0.8563])),
 Row(prediction=0.0, ind_label=0.0, label=0.0, pcaFeatures=DenseVector([-39

In [32]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction",
                                             labelCol="ind_label",
                                             metricName="accuracy")
evaluator.evaluate(predictions)

0.7011494252873564

In [33]:
predictions.groupBy("prediction", "ind_label").count().show()

+----------+---------+-----+
|prediction|ind_label|count|
+----------+---------+-----+
|       1.0|      1.0|   36|
|       0.0|      1.0|   34|
|       1.0|      0.0|   18|
|       0.0|      0.0|   86|
+----------+---------+-----+

