In [None]:
!pip install pyspark

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit, avg
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

In [2]:
import random

In [3]:
random.seed(1234)

In [4]:
spark = SparkSession.builder.appName("PySparkTitanikJob").getOrCreate()

In [5]:
spark

In [6]:
titanic_df = spark.read.csv('train.csv', header = 'True', inferSchema='True')

In [7]:
display(titanic_df)

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [8]:
titanic_df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [9]:
passengers_count = titanic_df.count()

In [10]:
titanic_df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [11]:
gropuBy_output = titanic_df.groupBy("Survived").count()

In [12]:
gropuBy_output.show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [13]:
avg_age = round(titanic_df.select(avg(col('Age'))).collect()[0][0],0)

In [14]:
avg_age

30.0

In [15]:
titanic_df = titanic_df.fillna({'Age': avg_age})

In [16]:
titanic_df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|30.0|    0|    0|      

In [17]:
titanic_df = titanic_df.fillna({"Embarked" : 'S'})

In [18]:
titanic_df = titanic_df.drop("Cabin")

In [19]:
titanic_df = titanic_df.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [20]:
titanic_df = titanic_df.withColumn('Alone',lit(0))

In [21]:
titanic_df = titanic_df.withColumn("Alone",when(titanic_df["Family_Size"] == 0, 1).otherwise(titanic_df["Alone"]))

In [22]:
data_df = titanic_df

In [23]:
titanic_df.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+--------+-----------+-----+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Embarked|Family_Size|Alone|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+--------+-----------+-----+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25|       S|          1|    0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|       C|          1|    0|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+--------+-----------+-----+
only showing top 2 rows



In [24]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanic_df) for column in ["Sex","Embarked"]]
pipeline = Pipeline(stages=indexers)
titanic_df = pipeline.fit(titanic_df).transform(titanic_df)

In [25]:
titanic_df = titanic_df.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex")

In [26]:
titanic_df.show()

+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Family_Size|Alone|Sex_index|Embarked_index|
+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+
|       0|     3|22.0|    1|    0|   7.25|          1|    0|      0.0|           0.0|
|       1|     1|38.0|    1|    0|71.2833|          1|    0|      1.0|           1.0|
|       1|     3|26.0|    0|    0|  7.925|          0|    1|      1.0|           0.0|
|       1|     1|35.0|    1|    0|   53.1|          1|    0|      1.0|           0.0|
|       0|     3|35.0|    0|    0|   8.05|          0|    1|      0.0|           0.0|
|       0|     3|30.0|    0|    0| 8.4583|          0|    1|      0.0|           2.0|
|       0|     1|54.0|    0|    0|51.8625|          0|    1|      0.0|           0.0|
|       0|     3| 2.0|    3|    1| 21.075|          4|    0|      0.0|           0.0|
|       1|     3|27.0|    0|    2|11.1333|          2|

In [27]:
feature = VectorAssembler(inputCols=titanic_df.columns[1:],outputCol="features")
feature_vector= feature.transform(titanic_df)

In [28]:
feature_vector.show()

+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+--------------------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|Family_Size|Alone|Sex_index|Embarked_index|            features|
+--------+------+----+-----+-----+-------+-----------+-----+---------+--------------+--------------------+
|       0|     3|22.0|    1|    0|   7.25|          1|    0|      0.0|           0.0|[3.0,22.0,1.0,0.0...|
|       1|     1|38.0|    1|    0|71.2833|          1|    0|      1.0|           1.0|[1.0,38.0,1.0,0.0...|
|       1|     3|26.0|    0|    0|  7.925|          0|    1|      1.0|           0.0|[3.0,26.0,0.0,0.0...|
|       1|     1|35.0|    1|    0|   53.1|          1|    0|      1.0|           0.0|[1.0,35.0,1.0,0.0...|
|       0|     3|35.0|    0|    0|   8.05|          0|    1|      0.0|           0.0|(9,[0,1,4,6],[3.0...|
|       0|     3|30.0|    0|    0| 8.4583|          0|    1|      0.0|           2.0|[3.0,30.0,0.0,0.0...|
|       0|     1|54.0|    0|    0|51.

In [29]:
(training_data, test_data) = feature_vector.randomSplit([0.8, 0.2],seed = 42)

In [30]:
training_data.show()

+--------+------+----+-----+-----+--------+-----------+-----+---------+--------------+--------------------+
|Survived|Pclass| Age|SibSp|Parch|    Fare|Family_Size|Alone|Sex_index|Embarked_index|            features|
+--------+------+----+-----+-----+--------+-----------+-----+---------+--------------+--------------------+
|       0|     1| 2.0|    1|    2|  151.55|          3|    0|      1.0|           0.0|[1.0,2.0,1.0,2.0,...|
|       0|     1|21.0|    0|    1| 77.2875|          1|    0|      0.0|           0.0|[1.0,21.0,0.0,1.0...|
|       0|     1|22.0|    0|    0|135.6333|          0|    1|      0.0|           1.0|[1.0,22.0,0.0,0.0...|
|       0|     1|24.0|    0|    0|    79.2|          0|    1|      0.0|           1.0|[1.0,24.0,0.0,0.0...|
|       0|     1|24.0|    0|    1|247.5208|          1|    0|      0.0|           1.0|[1.0,24.0,0.0,1.0...|
|       0|     1|25.0|    1|    2|  151.55|          3|    0|      1.0|           0.0|[1.0,25.0,1.0,2.0...|
|       0|     1|27.0|    0|

# ML models

# LogisticRegression

In [31]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(training_data)
lr_prediction = lrModel.transform(test_data)
lr_prediction.select("prediction", "Survived", "features").show(5)
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,18.0,1.0,0.0...|
|       1.0|       0|[1.0,19.0,1.0,0.0...|
|       0.0|       0|[1.0,19.0,3.0,2.0...|
|       1.0|       0|[1.0,29.0,1.0,0.0...|
|       0.0|       0|(9,[0,1,4,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [32]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("LogisticRegression [Accuracy] = %g"% (lr_accuracy))
print("LogisticRegression [Error] = %g " % (1.0 - lr_accuracy))

LogisticRegression [Accuracy] = 0.793103
LogisticRegression [Error] = 0.206897 


# DecisionTreeClassifier

In [33]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(training_data)
dt_prediction = dt_model.transform(test_data)

dt_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[1.0,18.0,1.0,0.0...|
|       0.0|       0|[1.0,19.0,1.0,0.0...|
|       0.0|       0|[1.0,19.0,3.0,2.0...|
|       0.0|       0|[1.0,29.0,1.0,0.0...|
|       0.0|       0|(9,[0,1,4,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [34]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("DecisionTreeClassifier [Accuracy] = %g"% (dt_accuracy))
print("DecisionTreeClassifier [Error] = %g " % (1.0 - dt_accuracy))

DecisionTreeClassifier [Accuracy] = 0.798851
DecisionTreeClassifier [Error] = 0.201149 


# RandomForestClassifier

In [35]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(training_data)
rf_prediction = rf_model.transform(test_data)
rf_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[1.0,18.0,1.0,0.0...|
|       0.0|       0|[1.0,19.0,1.0,0.0...|
|       0.0|       0|[1.0,19.0,3.0,2.0...|
|       0.0|       0|[1.0,29.0,1.0,0.0...|
|       0.0|       0|(9,[0,1,4,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [36]:
rf_accuracy = evaluator.evaluate(rf_prediction)
print("RandomForestClassifier [Accuracy] = %g"% (rf_accuracy))
print("RandomForestClassifier [Error] = %g" % (1.0 - rf_accuracy))

RandomForestClassifier [Accuracy] = 0.821839
RandomForestClassifier [Error] = 0.178161


# Gradient-boosted tree classifier

In [37]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(training_data)
gbt_prediction = gbt_model.transform(test_data)
gbt_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[1.0,18.0,1.0,0.0...|
|       1.0|       0|[1.0,19.0,1.0,0.0...|
|       0.0|       0|[1.0,19.0,3.0,2.0...|
|       1.0|       0|[1.0,29.0,1.0,0.0...|
|       0.0|       0|(9,[0,1,4,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [38]:
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Gradient-boosted [Accuracy] = %g"% (gbt_accuracy))
print("Gradient-boosted [Error] = %g"% (1.0 - gbt_accuracy))

Gradient-boosted [Accuracy] = 0.793103
Gradient-boosted [Error] = 0.206897


# Save & Load Model

In [39]:
rf_model.write().overwrite().save('rf_model')

In [40]:
from pyspark.ml.classification import RandomForestClassificationModel
type(RandomForestClassificationModel.load('rf_model'))

pyspark.ml.classification.RandomForestClassificationModel

# Pipeline

In [41]:
from pyspark.ml.pipeline import PipelineModel

In [42]:
data_df = data_df.drop("PassengerId","Name","Ticket","Cabin")

In [43]:
data_df.show()

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|          1|    0|
|       1|     1|female|38.0|    1|    0|71.2833|       C|          1|    0|
|       1|     3|female|26.0|    0|    0|  7.925|       S|          0|    1|
|       1|     1|female|35.0|    1|    0|   53.1|       S|          1|    0|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|          0|    1|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|          0|    1|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|          0|    1|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|          4|    0|
|       1|     3|female|27.0|    0|    2|11.1333|       S|          2|    0|
|       1|     2|female|14.0|    1|    0|30.0708|       C|          1|    0|

In [44]:
train, validate = data_df.randomSplit([0.8, 0.2])

In [45]:
train.show(5)

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|       0|     1|female| 2.0|    1|    2| 151.55|       S|          3|    0|
|       0|     1|female|25.0|    1|    2| 151.55|       S|          3|    0|
|       0|     1|female|50.0|    0|    0|28.7125|       C|          0|    1|
|       0|     1|  male|18.0|    1|    0|  108.9|       C|          1|    0|
|       0|     1|  male|19.0|    1|    0|   53.1|       S|          1|    0|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
only showing top 5 rows



In [46]:
validate.show(5)

+--------+------+----+----+-----+-----+--------+--------+-----------+-----+
|Survived|Pclass| Sex| Age|SibSp|Parch|    Fare|Embarked|Family_Size|Alone|
+--------+------+----+----+-----+-----+--------+--------+-----------+-----+
|       0|     1|male|22.0|    0|    0|135.6333|       C|          0|    1|
|       0|     1|male|24.0|    0|    1|247.5208|       C|          1|    0|
|       0|     1|male|27.0|    0|    2|   211.5|       C|          2|    0|
|       0|     1|male|30.0|    0|    0|    31.0|       S|          0|    1|
|       0|     1|male|30.0|    0|    0|    35.0|       S|          0|    1|
+--------+------+----+----+-----+-----+--------+--------+-----------+-----+
only showing top 5 rows



In [47]:
indexer_sex = StringIndexer(inputCol="Sex", outputCol="Sex_index")

In [48]:
indexer_sex.fit(train).transform(train).show()

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|Sex_index|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+
|       0|     1|female| 2.0|    1|    2| 151.55|       S|          3|    0|      1.0|
|       0|     1|female|25.0|    1|    2| 151.55|       S|          3|    0|      1.0|
|       0|     1|female|50.0|    0|    0|28.7125|       C|          0|    1|      1.0|
|       0|     1|  male|18.0|    1|    0|  108.9|       C|          1|    0|      0.0|
|       0|     1|  male|19.0|    1|    0|   53.1|       S|          1|    0|      0.0|
|       0|     1|  male|19.0|    3|    2|  263.0|       S|          5|    0|      0.0|
|       0|     1|  male|21.0|    0|    1|77.2875|       S|          1|    0|      0.0|
|       0|     1|  male|24.0|    0|    0|   79.2|       C|          0|    1|      0.0|
|       0|     1|  male|28.0|    0|    0|  

In [49]:
indexer_embarked = StringIndexer(inputCol="Embarked", outputCol="Embarked_index")

In [50]:
indexer_embarked.fit(train).transform(train).show()

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+--------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|Embarked_index|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+--------------+
|       0|     1|female| 2.0|    1|    2| 151.55|       S|          3|    0|           0.0|
|       0|     1|female|25.0|    1|    2| 151.55|       S|          3|    0|           0.0|
|       0|     1|female|50.0|    0|    0|28.7125|       C|          0|    1|           1.0|
|       0|     1|  male|18.0|    1|    0|  108.9|       C|          1|    0|           1.0|
|       0|     1|  male|19.0|    1|    0|   53.1|       S|          1|    0|           0.0|
|       0|     1|  male|19.0|    3|    2|  263.0|       S|          5|    0|           0.0|
|       0|     1|  male|21.0|    0|    1|77.2875|       S|          1|    0|           0.0|
|       0|     1|  male|24.0|    0|    0|   79.2|       C|          0|    1|    

In [51]:
feature = VectorAssembler(
    inputCols=["Pclass","Age","SibSp","Parch","Fare","Family_Size","Embarked_index","Sex_index"],
    outputCol="features")


In [52]:
train_sex = indexer_sex.fit(train).transform(train)
train_embarked = indexer_embarked.fit(train).transform(train_sex)
feature = VectorAssembler(
    inputCols=["Pclass","Age","SibSp","Parch","Fare","Family_Size","Embarked_index","Sex_index"],
    outputCol="features")
result = feature.transform(train_embarked)

In [53]:
result.select('features').show()

+--------------------+
|            features|
+--------------------+
|[1.0,2.0,1.0,2.0,...|
|[1.0,25.0,1.0,2.0...|
|[1.0,50.0,0.0,0.0...|
|[1.0,18.0,1.0,0.0...|
|[1.0,19.0,1.0,0.0...|
|[1.0,19.0,3.0,2.0...|
|[1.0,21.0,0.0,1.0...|
|(8,[0,1,4,6],[1.0...|
|(8,[0,1,4],[1.0,2...|
|[1.0,28.0,1.0,0.0...|
|(8,[0,1,4],[1.0,2...|
|[1.0,29.0,1.0,0.0...|
|(8,[0,1],[1.0,30.0])|
|(8,[0,1],[1.0,30.0])|
|(8,[0,1,4],[1.0,3...|
|(8,[0,1,4],[1.0,3...|
|(8,[0,1,4],[1.0,3...|
|(8,[0,1,4,6],[1.0...|
|(8,[0,1,4,6],[1.0...|
|(8,[0,1,4,6],[1.0...|
+--------------------+
only showing top 20 rows



In [54]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(training_data)
rf_prediction = rf_model.transform(test_data)
rf_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[1.0,18.0,1.0,0.0...|
|       0.0|       0|[1.0,19.0,1.0,0.0...|
|       0.0|       0|[1.0,19.0,3.0,2.0...|
|       0.0|       0|[1.0,29.0,1.0,0.0...|
|       0.0|       0|(9,[0,1,4,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [55]:
type(rf_model)

pyspark.ml.classification.RandomForestClassificationModel

In [56]:
rf_classifier = RandomForestClassifier(labelCol="Survived", featuresCol="features")

In [57]:
pipeline = Pipeline(stages=[indexer_sex, indexer_embarked, feature, rf_classifier])

In [58]:
type(pipeline)

pyspark.ml.pipeline.Pipeline

In [59]:
train.show()

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|       0|     1|female| 2.0|    1|    2| 151.55|       S|          3|    0|
|       0|     1|female|25.0|    1|    2| 151.55|       S|          3|    0|
|       0|     1|female|50.0|    0|    0|28.7125|       C|          0|    1|
|       0|     1|  male|18.0|    1|    0|  108.9|       C|          1|    0|
|       0|     1|  male|19.0|    1|    0|   53.1|       S|          1|    0|
|       0|     1|  male|19.0|    3|    2|  263.0|       S|          5|    0|
|       0|     1|  male|21.0|    0|    1|77.2875|       S|          1|    0|
|       0|     1|  male|24.0|    0|    0|   79.2|       C|          0|    1|
|       0|     1|  male|28.0|    0|    0|   47.1|       S|          0|    1|
|       0|     1|  male|28.0|    1|    0|82.1708|       C|          1|    0|

In [60]:
p_model = pipeline.fit(train)

In [61]:
type(p_model)

pyspark.ml.pipeline.PipelineModel

In [62]:
p_model.write().overwrite().save('p_model')

In [63]:
model = PipelineModel.load('p_model')

In [64]:
validate.show()

+--------+------+----+----+-----+-----+--------+--------+-----------+-----+
|Survived|Pclass| Sex| Age|SibSp|Parch|    Fare|Embarked|Family_Size|Alone|
+--------+------+----+----+-----+-----+--------+--------+-----------+-----+
|       0|     1|male|22.0|    0|    0|135.6333|       C|          0|    1|
|       0|     1|male|24.0|    0|    1|247.5208|       C|          1|    0|
|       0|     1|male|27.0|    0|    2|   211.5|       C|          2|    0|
|       0|     1|male|30.0|    0|    0|    31.0|       S|          0|    1|
|       0|     1|male|30.0|    0|    0|    35.0|       S|          0|    1|
|       0|     1|male|31.0|    0|    0| 50.4958|       S|          0|    1|
|       0|     1|male|36.0|    0|    0|  40.125|       C|          0|    1|
|       0|     1|male|36.0|    1|    0|   78.85|       S|          1|    0|
|       0|     1|male|38.0|    0|    0|     0.0|       S|          0|    1|
|       0|     1|male|45.0|    0|    0|    35.5|       S|          0|    1|
|       0|  

In [65]:
prediction = p_model.transform(validate)

In [66]:
prediction.toPandas().head()

Unnamed: 0,Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Embarked,Family_Size,Alone,Sex_index,Embarked_index,features,rawPrediction,probability,prediction
0,0,1,male,22.0,0,0,135.6333,C,0,1,0.0,1.0,"(1.0, 22.0, 0.0, 0.0, 135.6333, 0.0, 1.0, 0.0)","[11.665096903435314, 8.334903096564682]","[0.5832548451717658, 0.4167451548282342]",0.0
1,0,1,male,24.0,0,1,247.5208,C,1,0,0.0,1.0,"[1.0, 24.0, 0.0, 1.0, 247.5208, 1.0, 1.0, 0.0]","[10.821004721107837, 9.17899527889216]","[0.5410502360553919, 0.45894976394460807]",0.0
2,0,1,male,27.0,0,2,211.5,C,2,0,0.0,1.0,"[1.0, 27.0, 0.0, 2.0, 211.5, 2.0, 1.0, 0.0]","[8.783132763059173, 11.216867236940828]","[0.4391566381529587, 0.5608433618470414]",1.0
3,0,1,male,30.0,0,0,31.0,S,0,1,0.0,0.0,"(1.0, 30.0, 0.0, 0.0, 31.0, 0.0, 0.0, 0.0)","[14.110720869143059, 5.889279130856943]","[0.7055360434571529, 0.2944639565428472]",0.0
4,0,1,male,30.0,0,0,35.0,S,0,1,0.0,0.0,"(1.0, 30.0, 0.0, 0.0, 35.0, 0.0, 0.0, 0.0)","[14.462028058685542, 5.537971941314459]","[0.723101402934277, 0.27689859706572295]",0.0


In [67]:
prediction.select(["Pclass","Age","SibSp","Parch","Fare","Family_Size","Embarked_index","Sex_index"]).show(5)

+------+----+-----+-----+--------+-----------+--------------+---------+
|Pclass| Age|SibSp|Parch|    Fare|Family_Size|Embarked_index|Sex_index|
+------+----+-----+-----+--------+-----------+--------------+---------+
|     1|22.0|    0|    0|135.6333|          0|           1.0|      0.0|
|     1|24.0|    0|    1|247.5208|          1|           1.0|      0.0|
|     1|27.0|    0|    2|   211.5|          2|           1.0|      0.0|
|     1|30.0|    0|    0|    31.0|          0|           0.0|      0.0|
|     1|30.0|    0|    0|    35.0|          0|           0.0|      0.0|
+------+----+-----+-----+--------+-----------+--------------+---------+
only showing top 5 rows



In [68]:
prediction.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [69]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

In [70]:
p_accuracy = evaluator.evaluate(prediction)
print("Pipeline model [Accuracy] = %g"% (p_accuracy))
print("Pipeline model [Error] = %g " % (1.0 - p_accuracy))

Pipeline model [Accuracy] = 0.815789
Pipeline model [Error] = 0.184211 


# Hyperparameter tuning

In [71]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [72]:
paramGrid = ParamGridBuilder()\
                  .addGrid(rf_classifier.maxDepth, [2,3, 4])\
                  .addGrid(rf_classifier.maxBins, [2, 3, 4])\
                  .addGrid(rf_classifier.minInfoGain, [0.05, 0.1, 0.15])\
                  .build()

In [73]:
 tvs = TrainValidationSplit(estimator=pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=evaluator,
                            trainRatio=0.8)

In [74]:
model = tvs.fit(train)

In [75]:
type(model)

pyspark.ml.tuning.TrainValidationSplitModel

In [76]:
model.bestModel

PipelineModel_5276b993fbf5

In [77]:
model.bestModel.stages[-1]._java_obj

JavaObject id=o15105

In [78]:
jo = model.bestModel.stages[-1]._java_obj
print('Max Depth: {}'.format(jo.getMaxDepth()))
print('Num Trees: {}'.format(jo.getMaxBins()))
print('Impurity: {}'.format(jo.getMinInfoGain()))

Max Depth: 2
Num Trees: 3
Impurity: 0.05
