### 1) import SparkSession

In [1]:
from pyspark.sql import SparkSession

### 2) read and inspect data

In [1]:
# read data
df = spark.read.csv('train.csv',inferSchema=True,header=True)

NameError: name 'spark' is not defined

In [3]:
# print schema
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [4]:
# num rows and num cols
print((df.count(), len(df.columns)))

(891, 12)


In [5]:
# show top 20 rows
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [6]:
# drop columns
df = df.drop('PassengerId','Name','Ticket','Cabin','Embarked','Sex')
df.show()

+--------+------+----+-----+-----+-------+
|Survived|Pclass| Age|SibSp|Parch|   Fare|
+--------+------+----+-----+-----+-------+
|       0|     3|22.0|    1|    0|   7.25|
|       1|     1|38.0|    1|    0|71.2833|
|       1|     3|26.0|    0|    0|  7.925|
|       1|     1|35.0|    1|    0|   53.1|
|       0|     3|35.0|    0|    0|   8.05|
|       0|     3|null|    0|    0| 8.4583|
|       0|     1|54.0|    0|    0|51.8625|
|       0|     3| 2.0|    3|    1| 21.075|
|       1|     3|27.0|    0|    2|11.1333|
|       1|     2|14.0|    1|    0|30.0708|
|       1|     3| 4.0|    1|    1|   16.7|
|       1|     1|58.0|    0|    0|  26.55|
|       0|     3|20.0|    0|    0|   8.05|
|       0|     3|39.0|    1|    5| 31.275|
|       0|     3|14.0|    0|    0| 7.8542|
|       1|     2|55.0|    0|    0|   16.0|
|       0|     3| 2.0|    4|    1| 29.125|
|       1|     2|null|    0|    0|   13.0|
|       0|     3|31.0|    1|    0|   18.0|
|       1|     3|null|    0|    0|  7.225|
+--------+-

# Method-1: Non-Pipeline Solution

### 4a) split data into train and test

In [7]:
# train test split the data
dftrain, dftest = df.randomSplit([0.8,0.2])

### 4b) fit and transform training data

In [8]:
# descriptive stats - note null vals (based on counts)
dftrain.describe().show() # df.describe('Age').show()

+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|summary|          Survived|            Pclass|               Age|             SibSp|            Parch|             Fare|
+-------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|  count|               725|               725|               572|               725|              725|              725|
|   mean|0.3917241379310345| 2.326896551724138|29.217954545454543|0.5296551724137931|0.383448275862069|31.68006220689653|
| stddev|0.4884725669440271|0.8289692038556871|14.127279327699085|1.1091111952192345|0.817117778007452|47.76285732711233|
|    min|                 0|                 1|              0.42|                 0|                0|              0.0|
|    max|                 1|                 3|              74.0|                 8|                6|         512.3292|
+-------+---------------

In [9]:
# select columns
dftrain.select('Age','Pclass').show()

+----+------+
| Age|Pclass|
+----+------+
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
|null|     1|
| 2.0|     1|
|18.0|     1|
|19.0|     1|
|19.0|     1|
|21.0|     1|
|24.0|     1|
+----+------+
only showing top 20 rows



In [10]:
# use groupby to get at the Pclass values
dftrain.groupby('Pclass').count().show()

+------+-----+
|Pclass|count|
+------+-----+
|     1|  169|
|     3|  406|
|     2|  150|
+------+-----+



In [11]:
# impute Age
from pyspark.ml.feature import Imputer
impage = Imputer(strategy='median', inputCols=['Age'],outputCols=['impAge']).fit(dftrain)
dftrain = impage.transform(dftrain)
dftrain = dftrain.drop('Age')
dftrain.show()

+--------+------+-----+-----+--------+------+
|Survived|Pclass|SibSp|Parch|    Fare|impAge|
+--------+------+-----+-----+--------+------+
|       0|     1|    0|    0|     0.0|  28.0|
|       0|     1|    0|    0|     0.0|  28.0|
|       0|     1|    0|    0|  25.925|  28.0|
|       0|     1|    0|    0|    26.0|  28.0|
|       0|     1|    0|    0| 27.7208|  28.0|
|       0|     1|    0|    0| 27.7208|  28.0|
|       0|     1|    0|    0| 30.6958|  28.0|
|       0|     1|    0|    0|    35.0|  28.0|
|       0|     1|    0|    0|    39.6|  28.0|
|       0|     1|    0|    0|    42.4|  28.0|
|       0|     1|    0|    0|    50.0|  28.0|
|       0|     1|    0|    0|    52.0|  28.0|
|       0|     1|    0|    0|221.7792|  28.0|
|       0|     1|    0|    0| 227.525|  28.0|
|       0|     1|    1|    2|  151.55|   2.0|
|       0|     1|    1|    0|   108.9|  18.0|
|       0|     1|    1|    0|    53.1|  19.0|
|       0|     1|    3|    2|   263.0|  19.0|
|       0|     1|    0|    1| 77.2

In [12]:
# one hot encode Pclass

# categorical values must first be converted to category indices
from pyspark.ml.feature import StringIndexer
sipclass = StringIndexer(handleInvalid='keep', 
                         inputCol='Pclass', outputCol='idxPclass').fit(dftrain)
dftrain = sipclass.transform(dftrain)
dftrain = dftrain.drop('Pclass')

# category indices can then be converted to OHE
# note: output represented in sparse format - so 0s not printed:
# so for instance, (3,[2],[1.0]) means OHE vector of length 3 with a 1.0 at position 2, and 0 elsewhere.
from pyspark.ml.feature import OneHotEncoderEstimator
ohe = OneHotEncoderEstimator(handleInvalid='keep', dropLast=True,
                             inputCols=['idxPclass'], 
                             outputCols=['ohePclass']).fit(dftrain)
dftrain = ohe.transform(dftrain)
dftrain = dftrain.drop('idxPclass')
dftrain.sample(withReplacement=False, fraction=0.1).limit(20).show()

+--------+-----+-----+--------+------+-------------+
|Survived|SibSp|Parch|    Fare|impAge|    ohePclass|
+--------+-----+-----+--------+------+-------------+
|       0|    0|    0|     0.0|  28.0|(4,[1],[1.0])|
|       0|    0|    1|247.5208|  24.0|(4,[1],[1.0])|
|       0|    1|    0|    52.0|  42.0|(4,[1],[1.0])|
|       0|    0|    0|    28.5|  45.5|(4,[1],[1.0])|
|       0|    0|    0| 28.7125|  50.0|(4,[1],[1.0])|
|       0|    1|    0| 106.425|  50.0|(4,[1],[1.0])|
|       0|    0|    0|   26.55|  56.0|(4,[1],[1.0])|
|       0|    0|    0|    11.5|  18.0|(4,[2],[1.0])|
|       0|    0|    0|    73.5|  18.0|(4,[2],[1.0])|
|       0|    0|    0|    10.5|  23.0|(4,[2],[1.0])|
|       0|    0|    0|    10.5|  24.0|(4,[2],[1.0])|
|       0|    0|    0|    13.0|  25.0|(4,[2],[1.0])|
|       0|    0|    0|    26.0|  27.0|(4,[2],[1.0])|
|       0|    1|    0|    21.0|  27.0|(4,[2],[1.0])|
|       0|    0|    0|    10.5|  31.0|(4,[2],[1.0])|
|       0|    1|    0| 30.0708|  32.5|(4,[2],[

In [13]:
# combine all feature columns into vector form to feed into ML model
from pyspark.ml.feature import VectorAssembler 
va = VectorAssembler(inputCols=['SibSp','Parch','Fare','impAge','ohePclass'],outputCol='features')
dftrain = va.transform(dftrain) # 
dftrain = dftrain.drop('SibSp','Parch','Fare','impAge','ohePclass')
dftrain.show()

+--------+--------------------+
|Survived|            features|
+--------+--------------------+
|       0|(8,[3,5],[28.0,1.0])|
|       0|(8,[3,5],[28.0,1.0])|
|       0|(8,[2,3,5],[25.92...|
|       0|(8,[2,3,5],[26.0,...|
|       0|(8,[2,3,5],[27.72...|
|       0|(8,[2,3,5],[27.72...|
|       0|(8,[2,3,5],[30.69...|
|       0|(8,[2,3,5],[35.0,...|
|       0|(8,[2,3,5],[39.6,...|
|       0|(8,[2,3,5],[42.4,...|
|       0|(8,[2,3,5],[50.0,...|
|       0|(8,[2,3,5],[52.0,...|
|       0|(8,[2,3,5],[221.7...|
|       0|(8,[2,3,5],[227.5...|
|       0|[1.0,2.0,151.55,2...|
|       0|(8,[0,2,3,5],[1.0...|
|       0|(8,[0,2,3,5],[1.0...|
|       0|[3.0,2.0,263.0,19...|
|       0|(8,[1,2,3,5],[1.0...|
|       0|(8,[2,3,5],[79.2,...|
+--------+--------------------+
only showing top 20 rows



In [14]:
# train random forest classifier model on training data
from pyspark.ml.classification import RandomForestClassifier
rfc = RandomForestClassifier(labelCol="Survived", featuresCol="features",
                            numTrees=100).fit(dftrain)

### 4c) tranform, predict and evaluate test data

In [15]:
dftest.show()

+--------+------+----+-----+-----+--------+
|Survived|Pclass| Age|SibSp|Parch|    Fare|
+--------+------+----+-----+-----+--------+
|       0|     1|null|    0|    0|   26.55|
|       0|     1|null|    0|    0|    31.0|
|       0|     1|22.0|    0|    0|135.6333|
|       0|     1|25.0|    1|    2|  151.55|
|       0|     1|27.0|    0|    2|   211.5|
|       0|     1|30.0|    0|    0|   27.75|
|       0|     1|36.0|    1|    0|   78.85|
|       0|     1|37.0|    1|    0|    53.1|
|       0|     1|38.0|    0|    0|     0.0|
|       0|     1|40.0|    0|    0|     0.0|
|       0|     1|40.0|    0|    0| 27.7208|
|       0|     1|45.0|    0|    0|    35.5|
|       0|     1|46.0|    1|    0|  61.175|
|       0|     1|50.0|    1|    0|    55.9|
|       0|     1|56.0|    0|    0| 30.6958|
|       0|     1|58.0|    0|    0|    29.7|
|       0|     1|58.0|    0|    2| 113.275|
|       0|     1|60.0|    0|    0|   26.55|
|       0|     1|64.0|    0|    0|    26.0|
|       0|     1|65.0|    0|    

In [16]:
# impute Age
dftest = impage.transform(dftest)
dftest = dftest.drop('Age')

# si and ohe Pclass, Sex, Embarked
dftest = sipclass.transform(dftest)
dftest = dftest.drop('Pclass')
dftest = ohe.transform(dftest)
dftest = dftest.drop('idxPclass')

# combine all feature columns into vector form to feed into ML model
dftest = va.transform(dftest)
dftest = dftest.drop('SibSp','Parch','Fare','impAge','ohePclass')
dftest.show()

+--------+--------------------+
|Survived|            features|
+--------+--------------------+
|       0|(8,[2,3,5],[26.55...|
|       0|(8,[2,3,5],[31.0,...|
|       0|(8,[2,3,5],[135.6...|
|       0|[1.0,2.0,151.55,2...|
|       0|(8,[1,2,3,5],[2.0...|
|       0|(8,[2,3,5],[27.75...|
|       0|(8,[0,2,3,5],[1.0...|
|       0|(8,[0,2,3,5],[1.0...|
|       0|(8,[3,5],[38.0,1.0])|
|       0|(8,[3,5],[40.0,1.0])|
|       0|(8,[2,3,5],[27.72...|
|       0|(8,[2,3,5],[35.5,...|
|       0|(8,[0,2,3,5],[1.0...|
|       0|(8,[0,2,3,5],[1.0...|
|       0|(8,[2,3,5],[30.69...|
|       0|(8,[2,3,5],[29.7,...|
|       0|(8,[1,2,3,5],[2.0...|
|       0|(8,[2,3,5],[26.55...|
|       0|(8,[2,3,5],[26.0,...|
|       0|(8,[1,2,3,5],[1.0...|
+--------+--------------------+
only showing top 20 rows



In [17]:
# predict using random forest classifier on test data
predictions = rfc.transform(dftest)
predictions.show()

+--------+--------------------+--------------------+--------------------+----------+
|Survived|            features|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+--------------------+----------+
|       0|(8,[2,3,5],[26.55...|[45.3504588863735...|[0.45350458886373...|       1.0|
|       0|(8,[2,3,5],[31.0,...|[44.4172924206899...|[0.44417292420689...|       1.0|
|       0|(8,[2,3,5],[135.6...|[20.2622153518000...|[0.20262215351800...|       1.0|
|       0|[1.0,2.0,151.55,2...|[23.1149116045624...|[0.23114911604562...|       1.0|
|       0|(8,[1,2,3,5],[2.0...|[23.7277946209613...|[0.23727794620961...|       1.0|
|       0|(8,[2,3,5],[27.75...|[45.1903519440570...|[0.45190351944057...|       1.0|
|       0|(8,[0,2,3,5],[1.0...|[21.3991127327763...|[0.21399112732776...|       1.0|
|       0|(8,[0,2,3,5],[1.0...|[33.0898737409890...|[0.33089873740989...|       1.0|
|       0|(8,[3,5],[38.0,1.0])|[70.3179246693724...|[0.7031792466

In [18]:
# evaluate prediction results
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions)

0.7349397590361446

# Method-2: Pipeline Solution (instead of Method-1)

### 5a) split data into train and test

In [19]:
# train test split the data
dftrain, dftest = df.randomSplit([0.8,0.2])

### 5b) build pipeline and tuse it to fit/tranform on train and transform/predict/evaluate on test

In [21]:
# set up age imputation
impage = Imputer(strategy='median', inputCols=['Age'],outputCols=['impAge'])

# set up Pclass indexing
sipclass = StringIndexer(handleInvalid='keep', 
                        inputCol='Pclass', outputCol='idxPclass')

# set up Pclass OHE
ohe = OneHotEncoderEstimator(handleInvalid='keep', dropLast=True,
                             inputCols=['idxPclass'], 
                             outputCols=['ohePclass'])

# set up feature vector assembly
va = VectorAssembler(inputCols=['SibSp','Parch','Fare','impAge','ohePclass'],outputCol='features')

# set up classification model
rfc = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=100)

# create pipeline with proprocessing and classification steps
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[impage, sipclass, ohe, va, rfc]) 

# train model with training data
mdl = pipeline.fit(dftrain)

# predict on test data
pred = mdl.transform(dftest)

# evaluate predictions
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions)

0.7349397590361446