In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.sql.functions import isnan, when, count, col, avg
import pyspark.sql.functions as f
from pyspark.sql.types import *

In [2]:
# Spark Environment
conf = SparkConf()
sc = SparkContext(conf=conf)
conf.getAll()
sqlContext = SQLContext(sc)

In [30]:
def loadData():
    
    trainSchema = StructType([
    StructField("PassengerId", IntegerType(), True),
    StructField("Survived", IntegerType(), True),
    StructField("Pclass", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", FloatType(), True),
    StructField("SibSp", IntegerType(), True),
    StructField("Parch", IntegerType(), True),
    StructField("Ticket", StringType(), True),
    StructField("Fare", FloatType(), True),
    StructField("Cabin", StringType(), True),
    StructField("Embarked", StringType(), True)]
  )
    
    testSchema = StructType([
    StructField("PassengerId", IntegerType(), True),
    StructField("Pclass", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("Sex", StringType(), True),
    StructField("Age", FloatType(), True),
    StructField("SibSp", IntegerType(), True),
    StructField("Parch", IntegerType(), True),
    StructField("Ticket", StringType(), True),
    StructField("Fare", FloatType(), True),
    StructField("Cabin", StringType(), True),
    StructField("Embarked", StringType(), True)]
  )

    csvFormat = "com.databricks.spark.csv"

    trainDF = sqlContext.read.format(csvFormat).option("header", "true").schema(trainSchema).load("train.csv")
    
    testDF = sqlContext.read.format(csvFormat).option("header", "true").schema(testSchema).load("test.csv")
    
    true_labels = sqlContext.read.format(csvFormat).option("header", "true").load("true labels.csv")
    
    testDF = testDF.join(true_labels, "PassengerId").withColumnRenamed("Survived", "TrueSurvived")

    return trainDF, testDF


In [31]:
trainDF, testDF = loadData()

In [33]:
trainDF.show(6)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [19]:
# Get the number of missing data points per column
print("Missing Values in Training data",trainDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in trainDF.columns]).show())
print( "0 VAlues in Fare in Training data", trainDF.filter(col("Fare") == 0).show())

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+

Missing Values in Training data None
+-----------+--------+------+--------------------+----+----+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+------+----+-----+--------+
|        180|       0|     3| Leonard, Mr. Lionel|male|36.0|    0|    0|  LINE| 0.0| null|       S|
|        264|       0|     1|Harrison, Mr. Wil...|male|40.0|    0|    0|112059| 0.0|  B94|       S|
|        272|       1|     3|Tornquist, Mr. Wi...|ma

In [20]:
testDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in testDF.columns]).show()

+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|     0|   0|  0| 86|    0|    0|     0|   1|  327|       0|
+-----------+------+----+---+---+-----+-----+------+----+-----+--------+



It can be seen that columns Age, Cabin and Embarked have null values. Even column Fare has rows with value 0. So, we will replace null values of Age and Embarked with average age and "S", respectively. 0 values of Fare will be replaced with average fare. Null values of Cabin cannot be replaced.

In [34]:
def fillNAValues(trainDF,testDF):
    
    # Fill null values of age with average age
    trainDF = trainDF.na.fill(trainDF.where((col("Age") > 0)).agg(avg("Age")).first()[0], ["Age"])
    trainDF = trainDF.withColumn("Age", f.round(trainDF["Age"], 1))
    
    # Fill 0 values of Fare with average fare
    trainDF = trainDF.na.fill(0.0,["Fare"]) # convert null values to 0
    avg_fare = trainDF.where((col("Fare") > 0)).agg(avg("Fare")).first()[0]
    trainDF = trainDF.withColumn("Fare", when((col("Fare") == 0.0) ,avg_fare).otherwise(col("Fare")))
    trainDF = trainDF.withColumn("Fare", f.round(trainDF["Fare"], 2))
    
    # Replace null values of Embarked with "S"
    trainDF = trainDF.withColumn("Embarked", when((col("Embarked").isNull()) , "S").otherwise(col("Embarked")))
    
    # Fill null values of age with average age
    testDF = testDF.na.fill(testDF.where((col("Age") > 0)).agg(avg("Age")).first()[0], ["Age"])
    testDF = testDF.withColumn("Age", f.round(testDF["Age"], 1))
    
    # Fill 0 values of Fare with average fare
    testDF = testDF.na.fill(0.0,["Fare"]) # convert null values to 0
    avg_fare_test = testDF.where((col("Fare") > 0)).agg(avg("Fare")).first()[0]
    testDF = testDF.withColumn("Fare", when((col("Fare") == 0) ,avg_fare_test).otherwise(col("Fare")))
    testDF = testDF.withColumn("Fare", f.round(testDF["Fare"], 2))
    
    # Replace null values of Embarked with "S"
    testDF = testDF.withColumn("Embarked", when((col("Embarked").isNull()) , "S").otherwise(col("Embarked")))
    
    return trainDF, testDF
    

In [35]:
newtrainDF, newtestDF = fillNAValues(trainDF, testDF)

In [37]:
newtrainDF.show(6)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-----+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket| Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-----+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171| 7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.28|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282| 7.93| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803| 53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450| 8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|29.7|    0|    0|          330877| 8.46

In [43]:
genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex")
embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked")
 
surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived")

In [39]:
genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec")
embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec")

In [40]:
# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features")
 
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")

In [44]:
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, embarkIndexer, genderEncoder,embarkEncoder, assembler, rf]) 

In [45]:
# Train model.  This also runs the indexers.
model = pipeline.fit(newtrainDF)
 
# Predictions
predictions = model.transform(newtestDF)

In [48]:
predictions.select("prediction", "TrueSurvived", "features").show(5)

+----------+------------+--------------------+
|prediction|TrueSurvived|            features|
+----------+------------+--------------------+
|       0.0|           0|[3.0,1.0,34.5,0.0...|
|       0.0|           1|[3.0,0.0,47.0,1.0...|
|       0.0|           0|[2.0,1.0,62.0,0.0...|
|       0.0|           0|[3.0,1.0,27.0,0.0...|
|       0.0|           1|[3.0,0.0,22.0,1.0...|
+----------+------------+--------------------+
only showing top 5 rows



In [49]:
predictions1 = predictions.select(col("TrueSurvived").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="TrueSurvived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions1)
print("Accuracy = %g" % accuracy)
print("Test Error = %g" % (1.0 - accuracy))

Accuracy = 0.91866
Test Error = 0.0813397


Compared the predictions for test set with the given true labels and the accuracy came out to be 91% with Random Forest Classifier.