In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
sp=SparkSession.builder.appName('toddlerapp').getOrCreate()

In [22]:
dtitanic=sp.read.csv('d:\\titanic.csv',header=True,inferSchema=True)
dtitanic.toPandas()

Unnamed: 0,Survived,Pclass,Name,Sex,Age,SiblingsSpouses,ParentsChildren,Fare
0,0,3,Mr. Owen Harris Braund,male,22.0,1,0,7.2500
1,1,1,Mrs. John Bradley (Florence Briggs Thayer) Cum...,female,38.0,1,0,71.2833
2,1,3,Miss. Laina Heikkinen,female,26.0,0,0,7.9250
3,1,1,Mrs. Jacques Heath (Lily May Peel) Futrelle,female,35.0,1,0,53.1000
4,0,3,Mr. William Henry Allen,male,35.0,0,0,8.0500
...,...,...,...,...,...,...,...,...
882,0,2,Rev. Juozas Montvila,male,27.0,0,0,13.0000
883,1,1,Miss. Margaret Edith Graham,female,19.0,0,0,30.0000
884,0,3,Miss. Catherine Helen Johnston,female,7.0,1,2,23.4500
885,1,1,Mr. Karl Howell Behr,male,26.0,0,0,30.0000


In [23]:
dtitanic.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SiblingsSpouses: integer (nullable = true)
 |-- ParentsChildren: integer (nullable = true)
 |-- Fare: double (nullable = true)



In [24]:
dtitanic.groupBy("SiblingsSpouses").count().show()

+---------------+-----+
|SiblingsSpouses|count|
+---------------+-----+
|              1|  209|
|              3|   16|
|              5|    5|
|              4|   18|
|              8|    7|
|              2|   28|
|              0|  604|
+---------------+-----+



In [25]:
input_columns=dtitanic.columns
input_columns=input_columns[4:6:1]
dependent_var='Survived'
print(input_columns)
print(dependent_var)

['Age', 'SiblingsSpouses']
Survived


In [26]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.classification import NaiveBayes

numeric_inputs=[]
string_inputs=[]
for column in input_columns:
    if str(indexed.schema[column].dataType)=='StringType':
        indexer=StringIndexer(inputCol=column,outputCol=column+"_num")
        indexed=indexer.fit(indexed).transform(indexed)
        new_col_name=column+"_num"
        string_inputs.append(new_col_name)
    else:
        numeric_inputs.append(column)
print('numeric input',numeric_inputs)
print('String_inputs',string_inputs)

d={}
for col in numeric_inputs:
    d[col]=indexed.approxQuantile(col,[0.01,0.99],0.25)
for col in numeric_inputs:
    skew=indexed.agg(skewness(indexed[col])).collect()
    skew=skew[0][0]
    if skew > 1: # If right skew, floor, cap and log(x+1)
        indexed = indexed.withColumn(col, \
        log(when(dtitanic[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1: # If left skew floor, cap and exp(x)
        indexed = indexed.withColumn(col, \
        exp(when(dtitanic[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")        
print(skew)  

In [27]:
features_list = numeric_inputs + string_inputs
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
output = assembler.transform(indexed).select('features','label')
output.show(5,False)

+-------------------------+-----+
|features                 |label|
+-------------------------+-----+
|[22.0,0.6931471805599453]|0.0  |
|[38.0,0.6931471805599453]|1.0  |
|[26.0,0.0]               |1.0  |
|[35.0,0.6931471805599453]|1.0  |
|[35.0,0.0]               |0.0  |
+-------------------------+-----+
only showing top 5 rows



In [28]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scalerModel = scaler.fit(output)
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show()

Features scaled to range: [0.000000, 1000.000000]
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[271.173661724051...|
|  1.0|[472.229203317416...|
|  1.0|[321.437547122392...|
|  1.0|[434.531289268660...|
|  0.0|[434.531289268660...|
|  0.0|[334.003518471977...|
|  0.0|[673.284744910781...|
|  0.0|[19.8542347323448...|
|  1.0|[334.003518471977...|
|  1.0|[170.645890927368...|
|  1.0|[44.9861774315154...|
|  1.0|[723.548630309122...|
|  0.0|[246.041719024880...|
|  0.0|[484.795174667001...|
|  0.0|[170.645890927368...|
|  1.0|[685.850716260367...|
|  0.0|[19.8542347323448...|
|  1.0|[283.739633073636...|
|  0.0|[384.267403870319...|
|  1.0|[271.173661724051...|
+-----+--------------------+
only showing top 20 rows



In [29]:
train,test = final_data.randomSplit([0.70,0.30])

In [30]:
nbclassifier=NaiveBayes()

In [31]:
nbcModel = nbclassifier.fit(train)

In [32]:
predictions = nbcModel.transform(test)

In [19]:
predictions.printSchema()
predictions.select('label','rawPrediction','probability','prediction').show()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+-----+--------------------+--------------------+----------+
|label|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+----------+
|  0.0|[-1040.3583147057...|[1.00914021129105...|       1.0|
|  0.0|[-408.37933427671...|[0.00120863596018...|       1.0|
|  0.0|[-938.92340971019...|[7.73617545833334...|       1.0|
|  0.0|[-813.85204677977...|[9.11931629727032...|       1.0|
|  0.0|[-834.51011039012...|[1.62172713589367...|       1.0|
|  0.0|[-851.03656127841...|[2.57033672775070...|       1.0|
|  0.0|[-56.617587160572...|[0.87765062316954...|       0.0|
|  0.0|[-64.880812604714...|[0.90030705523580...|       0.0|
|  0.0|[-64.880812604714...|[0.90030705523580...|       0.0|
|  0.0|[-466.22191238570...|[0.00602890946112...|    

In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator();
accuracy = evaluator.evaluate(predictions)
print("Accuracy of Model :" , accuracy)
print("Test Error of Model :" , 1-accuracy)

Accuracy of Model : 0.46643626991565124
Test Error of Model : 0.5335637300843488


In [34]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator=ClusteringEvaluator();
accuracy = evaluator.evaluate(predictions)
print("Accuracy of Model :" , accuracy)
print("Test Error of Model :" , 1-accuracy)

Accuracy of Model : 0.6801775910327561
Test Error of Model : 0.3198224089672439
