### create spark session connecting to standalone cluster

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("spark://master0.ddoc.os.fyre.ibm.com:31505") \
    .appName("demo") \
    .getOrCreate()

### import spark sql and ml for data preparation

In [2]:
from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer

### read in dataset

In [3]:
import pandas as pd
csv = pd.read_csv("http://oc-minio-default.apps.ddoc.os.fyre.ibm.com/titanic/train.orig.csv", dtype=str)
csv = csv.where((pd.notnull(csv)), None)

In [4]:
from pyspark.sql.types import *
columns_struct_fields = [StructField(field_name, StringType(), True)
                                 for field_name in csv.columns]
schema = StructType(columns_struct_fields)

In [74]:
df = spark.createDataFrame(csv, schema=schema)

In [75]:
df = df.select(col("Survived").cast("int"),col("PassengerId").cast("int"),col("Name"),col("Parch").cast("int"),col("Sex"),col("Embarked"),col("Pclass").cast("int"),col("Age").cast("double"),col("SibSp").cast("int"),col("Fare").cast("double"))

In [76]:
df.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Fare: double (nullable = true)



In [77]:
df.show()

+--------+-----------+--------------------+-----+------+--------+------+----+-----+-------+
|Survived|PassengerId|                Name|Parch|   Sex|Embarked|Pclass| Age|SibSp|   Fare|
+--------+-----------+--------------------+-----+------+--------+------+----+-----+-------+
|       0|          1|Braund, Mr. Owen ...|    0|  male|       S|     3|22.0|    1|   7.25|
|       1|          2|Cumings, Mrs. Joh...|    0|female|       C|     1|38.0|    1|71.2833|
|       1|          3|Heikkinen, Miss. ...|    0|female|       S|     3|26.0|    0|  7.925|
|       1|          4|Futrelle, Mrs. Ja...|    0|female|       S|     1|35.0|    1|   53.1|
|       0|          5|Allen, Mr. Willia...|    0|  male|       S|     3|35.0|    0|   8.05|
|       0|          6|    Moran, Mr. James|    0|  male|       Q|     3|null|    0| 8.4583|
|       0|          7|McCarthy, Mr. Tim...|    0|  male|       S|     1|54.0|    0|51.8625|
|       0|          8|Palsson, Master. ...|    1|  male|       S|     3| 2.0|   

### data explore

In [78]:
df.groupBy("Survived").count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [79]:
df.groupBy("Pclass","Survived").count().show()

+------+--------+-----+
|Pclass|Survived|count|
+------+--------+-----+
|     1|       0|   80|
|     3|       1|  119|
|     1|       1|  136|
|     2|       1|   87|
|     2|       0|   97|
|     3|       0|  372|
+------+--------+-----+



### handle rows with null value

In [80]:
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [81]:
# Calling function
null_columns_count_list = null_value_count(df)

In [82]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|              Embarked|                2|
|                   Age|              177|
+----------------------+-----------------+



Look up Name for clue of age to be assigned

In [83]:
df = df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))

In [84]:
df.select("Initial").distinct().show()

+--------+
| Initial|
+--------+
|     Don|
|    Miss|
|Countess|
|     Col|
|     Rev|
|    Lady|
|  Master|
|    Capt|
|     Mme|
|      Mr|
|      Dr|
|     Mrs|
|     Sir|
|Jonkheer|
|    Mlle|
|   Major|
|      Ms|
+--------+



In [85]:
df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])

In [86]:
df.select("Initial").distinct().show()

+-------+
|Initial|
+-------+
|   Miss|
|  Other|
| Master|
|     Mr|
|    Mrs|
+-------+



In [87]:
mean_ages = df.groupby('Initial').avg('Age')

In [88]:
def get_mean_age(x):
    return round(mean_ages.filter(mean_ages.Initial == x).select('avg(Age)').collect()[0][0])

Now impute the age by initial

In [89]:
df = df.withColumn("Age",when((df["Initial"] == "Miss") & (df["Age"].isNull()), get_mean_age('Miss')).otherwise(df["Age"]))
df = df.withColumn("Age",when((df["Initial"] == "Other") & (df["Age"].isNull()), get_mean_age('Other')).otherwise(df["Age"]))
df = df.withColumn("Age",when((df["Initial"] == "Master") & (df["Age"].isNull()), get_mean_age('Master')).otherwise(df["Age"]))
df = df.withColumn("Age",when((df["Initial"] == "Mr") & (df["Age"].isNull()), get_mean_age('Mr')).otherwise(df["Age"]))
df = df.withColumn("Age",when((df["Initial"] == "Mrs") & (df["Age"].isNull()), get_mean_age('Mrs')).otherwise(df["Age"]))

Embarked null value, impute with the majority value of 'S'

In [90]:
df.groupBy("Embarked").count().show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       Q|   77|
|    null|    2|
|       C|  168|
|       S|  644|
+--------+-----+



In [91]:
df = df.na.fill({"Embarked" : 'S'})

create Family_size and Alone for more analyze

In [92]:
df = df.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [93]:
df.groupBy("Family_Size").count().show()

+-----------+-----+
|Family_Size|count|
+-----------+-----+
|          1|  161|
|          6|   12|
|          3|   29|
|          5|   22|
|          4|   15|
|          7|    6|
|         10|    7|
|          2|  102|
|          0|  537|
+-----------+-----+



In [94]:
df = df.withColumn('Alone',lit(0))

In [95]:
df = df.withColumn("Alone",when(df["Family_Size"] == 0, 1).otherwise(df["Alone"]))

Lets convert Sex, Embarked & Initial columns from string to number using StringIndexer

In [96]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in ["Sex","Embarked","Initial"]]
pipeline = Pipeline(stages=indexers)
df = pipeline.fit(df).transform(df)

In [97]:
df.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Initial: string (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- Initial_index: double (nullable = false)



drop columns not for features

In [98]:
df = df.drop("PassengerId","Name","Embarked","Sex","Initial")

In [99]:
df.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = false)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- Initial_index: double (nullable = false)



create features vector

In [100]:
feature = VectorAssembler(inputCols=df.columns[1:],outputCol="features")
feature_vector= feature.transform(df)

In [101]:
feature_vector.show()

+--------+-----+------+----+-----+-------+-----------+-----+---------+--------------+-------------+--------------------+
|Survived|Parch|Pclass| Age|SibSp|   Fare|Family_Size|Alone|Sex_index|Embarked_index|Initial_index|            features|
+--------+-----+------+----+-----+-------+-----------+-----+---------+--------------+-------------+--------------------+
|       0|    0|     3|22.0|    1|   7.25|          1|    0|      0.0|           0.0|          0.0|(10,[1,2,3,4,5],[...|
|       1|    0|     1|38.0|    1|71.2833|          1|    0|      1.0|           1.0|          2.0|[0.0,1.0,38.0,1.0...|
|       1|    0|     3|26.0|    0|  7.925|          0|    1|      1.0|           0.0|          1.0|[0.0,3.0,26.0,0.0...|
|       1|    0|     1|35.0|    1|   53.1|          1|    0|      1.0|           0.0|          2.0|[0.0,1.0,35.0,1.0...|
|       0|    0|     3|35.0|    0|   8.05|          0|    1|      0.0|           0.0|          0.0|(10,[1,2,4,6],[3....|
|       0|    0|     3|33.0|    

split data into train and test (80/20)

In [102]:
(trainingData, testData) = feature_vector.randomSplit([0.8, 0.2],seed = 11)

run training GBT (Gradient-boosted tree classifier)

In [135]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10, seed=20, maxDepth=4)
gbt_model = gbt.fit(trainingData)
gbt_prediction = gbt_model.transform(testData)
gbt_prediction.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[0.0,1.0,28.0,1.0...|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,3,4,5],[...|
|       1.0|       0|[0.0,2.0,24.0,0.0...|
|       0.0|       0|(10,[1,2,4,6],[2....|
|       0.0|       0|(10,[1,2,4,6],[2....|
|       0.0|       0|[0.0,2.0,29.0,1.0...|
|       0.0|       0|(10,[1,2,4,6,8],[...|
|       1.0|       0|[0.0,2.0,38.0,0.0...|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       1.0|       0|[0.0,3.0,18.0,1.0...|
|       0.0|       0|(10,[1,2,6],[3.0,...|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
+----------

run scoring

In [136]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie is = %g"% (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie %g"% (1.0 - gbt_accuracy))

Accuracy of Gradient-boosted tree classifie is = 0.819767
Test Error of Gradient-boosted tree classifie 0.180233


run logistic regression

In [131]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(trainingData)
lr_prediction = lrModel.transform(testData)
lr_prediction.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[0.0,1.0,28.0,1.0...|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,3,4,5],[...|
|       1.0|       0|[0.0,2.0,24.0,0.0...|
|       0.0|       0|(10,[1,2,4,6],[2....|
|       0.0|       0|(10,[1,2,4,6],[2....|
|       0.0|       0|[0.0,2.0,29.0,1.0...|
|       0.0|       0|(10,[1,2,4,6,8],[...|
|       1.0|       0|[0.0,2.0,38.0,0.0...|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       1.0|       0|[0.0,3.0,18.0,1.0...|
|       0.0|       0|(10,[1,2,6],[3.0,...|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
+----------

run scoring

In [132]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

Accuracy of LogisticRegression is = 0.790698
Test Error of LogisticRegression = 0.209302 


run SVM training

In [137]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="Survived", featuresCol="features")
svm_model = svm.fit(trainingData)
svm_prediction = svm_model.transform(testData)
svm_prediction.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[0.0,1.0,28.0,1.0...|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,4,6],[1....|
|       0.0|       0|(10,[1,2,3,4,5],[...|
|       1.0|       0|[0.0,2.0,24.0,0.0...|
|       0.0|       0|(10,[1,2,4,6],[2....|
|       0.0|       0|(10,[1,2,4,6],[2....|
|       0.0|       0|[0.0,2.0,29.0,1.0...|
|       0.0|       0|(10,[1,2,4,6,8],[...|
|       1.0|       0|[0.0,2.0,38.0,0.0...|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       1.0|       0|[0.0,3.0,18.0,1.0...|
|       0.0|       0|(10,[1,2,6],[3.0,...|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
|       0.0|       0|(10,[1,2,4,6],[3....|
+----------

run scoring

In [138]:
svm_accuracy = evaluator.evaluate(svm_prediction)
print("Accuracy of Support Vector Machine is = %g"% (svm_accuracy))
print("Test Error of Support Vector Machine = %g " % (1.0 - svm_accuracy))

Accuracy of Support Vector Machine is = 0.80814
Test Error of Support Vector Machine = 0.19186 


In [None]:
spark.stop()