# Titanic using sql dataframe and ml module

Source: [link](https://creativedata.atlassian.net/wiki/spaces/SAP/pages/83237142/Pyspark+-+Tutorial+based+on+Titanic+Dataset)

In [1]:
# Import packages

from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *

In [2]:
# Creatingt Spark SQL environment
from pyspark.sql import SparkSession, HiveContext
SparkContext.setSystemProperty("local", "count app")
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [3]:
# spark is an existing SparkSession
train = spark.read.csv("train.csv", header = True)
# Displays the content of the DataFrame to stdout
train.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

In [4]:
# String to float on some columns of the dataset : creates a new dataset
train = train.select(col("Survived"),col("Sex"),col("Embarked"),col("Pclass").cast("float"),col("Age").cast("float"),col("SibSp").cast("float"),col("Fare").cast("float"))

In [5]:
# dropping null values
train = train.dropna()

In [6]:
# Spliting in train and test set. Beware : It sorts the dataset
(traindf, testdf) = train.randomSplit([0.7,0.3])

### Without Pipeline

In [7]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
train = StringIndexer(inputCol="Sex", outputCol="indexedSex").fit(train).transform(train)
train = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked").fit(train).transform(train)
 
train = StringIndexer(inputCol="Survived", outputCol="indexedSurvived").fit(train).transform(train)

In [8]:
# One Hot Encoder on indexed features
train = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec").transform(train)
train = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec").transform(train)

In [9]:
# Feature assembler as a vector
train = VectorAssembler(inputCols=["Pclass","sexVec","embarkedVec", "Age","SibSp","Fare"],outputCol="features").transform(train)

In [10]:
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")
 
model = rf.fit(train)
 
predictions = model.transform(train)

In [11]:
# Select example rows to display.
predictions.select(col("prediction"),col("probability"),).show(5)

+----------+--------------------+
|prediction|         probability|
+----------+--------------------+
|       0.0|[0.86299638188778...|
|       1.0|[0.00964419693670...|
|       0.0|[0.52486340313811...|
|       1.0|[0.05882506560885...|
|       0.0|[0.89350245190954...|
+----------+--------------------+
only showing top 5 rows



### With Pipeline

In [12]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex")
embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked")
 
surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived")

In [13]:
# One Hot Encoder on indexed features
genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec")
embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec")

In [14]:
# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features")

In [15]:
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")

In [16]:
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, embarkIndexer, genderEncoder,embarkEncoder, assembler, rf]) # genderIndexer,embarkIndexer,genderEncoder,embarkEncoder,

In [17]:
# Train model.  This also runs the indexers.
model = pipeline.fit(traindf)

In [18]:
# Predictions
predictions = model.transform(testdf)

In [19]:
# Select example rows to display.
predictions.columns 

['Survived',
 'Sex',
 'Embarked',
 'Pclass',
 'Age',
 'SibSp',
 'Fare',
 'indexedSurvived',
 'indexedSex',
 'indexedEmbarked',
 'sexVec',
 'embarkedVec',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [20]:
# Select example rows to display.
predictions.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[3.0,0.0,14.5,1.0...|
|       0.0|       0|[3.0,0.0,32.0,1.0...|
|       1.0|       0|[1.0,0.0,2.0,1.0,...|
|       1.0|       0|[1.0,0.0,25.0,1.0...|
|       0.0|       0|[3.0,0.0,2.0,4.0,...|
+----------+--------+--------------------+
only showing top 5 rows



In [21]:
# Select (prediction, true label) and compute test error
predictions = predictions.select(col("Survived").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 0.19469


In [22]:
rfModel = model.stages[6]
print(rfModel)  # summary only

RandomForestClassificationModel (uid=RandomForestClassifier_4fcfb2eff546658e7d42) with 20 trees


In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)

Accuracy = 0.80531


In [24]:
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)

f1 = 0.796292


In [25]:
evaluatorwp = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)

weightedPrecision = 0.829774


In [26]:
evaluatorwr = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)

weightedRecall = 0.80531
