In [67]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName('Titanic Data') \
    .getOrCreate()

In [68]:
spark

In [69]:
df = (spark.read
          .format("csv")
          .option('header', 'true')
          .load("train.csv"))

In [70]:
df.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

In [71]:
df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [72]:
from pyspark.sql.functions import mean, min, max

In [73]:
df.rdd.take(3)

[Row(PassengerId='1', Survived='0', Pclass='3', Name='Braund, Mr. Owen Harris', Sex='male', Age='22', SibSp='1', Parch='0', Ticket='A/5 21171', Fare='7.25', Cabin=None, Embarked='S'),
 Row(PassengerId='2', Survived='1', Pclass='1', Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age='38', SibSp='1', Parch='0', Ticket='PC 17599', Fare='71.2833', Cabin='C85', Embarked='C'),
 Row(PassengerId='3', Survived='1', Pclass='3', Name='Heikkinen, Miss. Laina', Sex='female', Age='26', SibSp='0', Parch='0', Ticket='STON/O2. 3101282', Fare='7.925', Cabin=None, Embarked='S')]

In [74]:
df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

### Conversion to Pandas dataframe

In [75]:
pdf=df.toPandas()
pdf.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S


### Print schemas

In [76]:
df.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [77]:
df.dtypes

[('PassengerId', 'string'),
 ('Survived', 'string'),
 ('Pclass', 'string'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'string'),
 ('SibSp', 'string'),
 ('Parch', 'string'),
 ('Ticket', 'string'),
 ('Fare', 'string'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

### Data manipulation
+ Selecting only few columns

In [78]:
df.select(['Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Ticket', 'Fare', 'Cabin', 'Embarked']).show()

+------+------+----+-----+-----+----------------+-------+-----+--------+
|Pclass|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+------+------+----+-----+-----+----------------+-------+-----+--------+
|     3|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|     1|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|     3|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|     1|female|  35|    1|    0|          113803|   53.1| C123|       S|
|     3|  male|  35|    0|    0|          373450|   8.05| null|       S|
|     3|  male|null|    0|    0|          330877| 8.4583| null|       Q|
|     1|  male|  54|    0|    0|           17463|51.8625|  E46|       S|
|     3|  male|   2|    3|    1|          349909| 21.075| null|       S|
|     3|female|  27|    0|    2|          347742|11.1333| null|       S|
|     2|female|  14|    1|    0|          237736|30.0708| null|       C|
|     3|female|   4|    1|    1|         PP 9549|  

### Use groupby funtion

In [79]:
df.groupBy("Ticket").count().sort("Ticket",ascending=False).show()

+-----------------+-----+
|           Ticket|count|
+-----------------+-----+
|        WE/P 5735|    2|
|        W/C 14208|    1|
|      W.E.P. 5734|    1|
|       W./C. 6609|    1|
|       W./C. 6608|    4|
|       W./C. 6607|    2|
|      W./C. 14263|    1|
|      W./C. 14258|    1|
|        SW/PP 751|    1|
| STON/O2. 3101290|    1|
| STON/O2. 3101283|    1|
| STON/O2. 3101282|    1|
| STON/O2. 3101279|    2|
| STON/O2. 3101271|    1|
|STON/O 2. 3101294|    1|
|STON/O 2. 3101293|    1|
|STON/O 2. 3101292|    1|
|STON/O 2. 3101289|    1|
|STON/O 2. 3101288|    1|
|STON/O 2. 3101286|    1|
+-----------------+-----+
only showing top 20 rows



In [80]:
df.groupBy("Embarked").count().sort("Embarked",ascending=False).show()

+--------+-----+
|Embarked|count|
+--------+-----+
|       S|  644|
|       Q|   77|
|       C|  168|
|    null|    2|
+--------+-----+



### Filtering rows

In [81]:
df.filter(df['Age'] < 30).show()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|      Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----------+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25|       null|       S|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925|       null|       S|
|          8|       0|     3|Palsson, Master. ...|  male|  2|    3|    1|          349909| 21.075|       null|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female| 27|    0|    2|          347742|11.1333|       null|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female| 14|    1|    0|          237736|30.0708|       null|       C|
|         11|       1|     3|Sandstrom, 

### withColumn

In [82]:
df.withColumn('Age', df['Age']/2).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|11.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|19.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|13.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|17.5|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|17.5|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

### Sorting 

In [83]:
df.sort(df['Name']).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|    Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------+-------+-----+--------+
|        147|       1|     3|"Andersson, Mr. A...|  male|  27|    0|    0|    350043| 7.7958| null|       S|
|        519|       1|     2|"Angle, Mrs. Will...|female|  36|    1|    0|    226875|     26| null|       S|
|        291|       1|     1|"Barber, Miss. El...|female|  26|    0|    0|     19877|  78.85| null|       S|
|        625|       0|     3|"Bowen, Mr. David...|  male|  21|    0|    0|     54636|   16.1| null|       S|
|        508|       1|     1|"Bradley, Mr. Geo...|  male|null|    0|    0|    111427|  26.55| null|       S|
|        346|       1|     2|"Brown, Miss. Ame...|female|  24|    0|    0|    248733|     13|  F33|       S|
|        209|      

In [84]:
df.sort(df['Name'].desc()).show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|        869|       0|     3|van Melkebeke, Mr...|  male|null|    0|    0|          345777|     9.5| null|       S|
|        154|       0|     3|van Billiard, Mr....|  male|40.5|    0|    2|        A/5. 851|    14.5| null|       S|
|        362|       0|     2|del Carlo, Mr. Se...|  male|  29|    1|    0|   SC/PARIS 2167| 27.7208| null|       C|
|        283|       0|     3|de Pelsmaeker, Mr...|  male|  16|    0|    0|          345778|     9.5| null|       S|
|        287|       1|     3|de Mulder, Mr. Th...|  male|  30|    0|    0|          345774|     9.5| null|       S|
|        560|       1|     3|de Messemaeker, M...|female|  36|    1|    

### Summarizing

In [85]:
df.agg(
    {'Fare': 'mean'}
).show()

+----------------+
|       avg(Fare)|
+----------------+
|32.2042079685746|
+----------------+



In [86]:
df.agg(
    {'Survived': 'mean'}
).show()

+------------------+
|     avg(Survived)|
+------------------+
|0.3838383838383838|
+------------------+



In [87]:
import pyspark.sql.functions as F
df.agg(
    F.mean(df.Fare).alias('avg'),
    F.min(df.Fare).alias('min'),
    F.max(df.Fare).alias('max')
).show()

+----------------+---+----+
|             avg|min| max|
+----------------+---+----+
|32.2042079685746|  0|93.5|
+----------------+---+----+



### Split-Apply-Combine

In [88]:
df.groupby('Age').agg({'Fare': 'mean', 'Sex': 'count'}).show()

+----+----------+------------------+
| Age|count(Sex)|         avg(Fare)|
+----+----------+------------------+
|20.5|         1|              7.25|
|   7|         3|           31.6875|
|  51|         7|28.752385714285715|
|0.75|         2|           19.2583|
|  54|         8|44.477087499999996|
|  15|         5| 49.65501999999999|
|  11|         4|         54.240625|
|  29|        20|27.090825000000002|
|  42|        13|37.125646153846155|
|  64|         2|             144.5|
|   3|         6|          25.78195|
|  30|        25|         25.541668|
|  34|        15| 16.63638666666667|
|  59|         2|            10.375|
|24.5|         1|              8.05|
|   8|         4|              28.3|
|70.5|         1|              7.75|
|  22|        27| 25.50478148148148|
|  28|        25|21.020159999999997|
|  35|        18| 89.31249999999999|
+----+----------+------------------+
only showing top 20 rows



### Data preparation and feature engineering
+ Columns are all string types

In [89]:
from pyspark.sql.functions import col
data = df.select(col('Survived').cast('float'),
                         col('Pclass').cast('float'),
                         col('Sex'),
                         col('Age').cast('float'),
                         col('Fare').cast('float'),
                         col('Embarked')
                        )
data.show()

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|     0.0|   3.0|  male|22.0|   7.25|       S|
|     1.0|   1.0|female|38.0|71.2833|       C|
|     1.0|   3.0|female|26.0|  7.925|       S|
|     1.0|   1.0|female|35.0|   53.1|       S|
|     0.0|   3.0|  male|35.0|   8.05|       S|
|     0.0|   3.0|  male|null| 8.4583|       Q|
|     0.0|   1.0|  male|54.0|51.8625|       S|
|     0.0|   3.0|  male| 2.0| 21.075|       S|
|     1.0|   3.0|female|27.0|11.1333|       S|
|     1.0|   2.0|female|14.0|30.0708|       C|
|     1.0|   3.0|female| 4.0|   16.7|       S|
|     1.0|   1.0|female|58.0|  26.55|       S|
|     0.0|   3.0|  male|20.0|   8.05|       S|
|     0.0|   3.0|  male|39.0| 31.275|       S|
|     0.0|   3.0|female|14.0| 7.8542|       S|
|     1.0|   2.0|female|55.0|   16.0|       S|
|     0.0|   3.0|  male| 2.0| 29.125|       Q|
|     1.0|   2.0|  male|null|   13.0|       S|
|     0.0|   

### Check the null values in the datasets

In [90]:
from pyspark.sql.functions import isnull, when, count, col
data.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|177|   0|       2|
+--------+------+---+---+----+--------+



#### Replace the null values by mean

In [91]:
df.agg(
    {'Age': 'mean'}
).show()

+-----------------+
|         avg(Age)|
+-----------------+
|29.69911764705882|
+-----------------+



In [100]:
data = data.replace('?', None)\
        .dropna(how='any')


### Run the code again to check the null values in the Age column

In [101]:
data.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|  0|   0|       0|
+--------+------+---+---+----+--------+



### Handling of data types
+ The ML model is only working in numeric data.
+ So we have to change the datatypes of string and catagorical before moving ahead

In [102]:
from pyspark.ml.feature import StringIndexer
data = StringIndexer(
    inputCol='Sex', 
    outputCol='Gender', 
    handleInvalid='keep').fit(dataset).transform(data)
data= StringIndexer(
    inputCol='Embarked', 
    outputCol='Boarded', 
    handleInvalid='keep').fit(dataset).transform(data)
data.show()

+--------+------+------+----+-------+--------+------+-------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|Gender|Boarded|
+--------+------+------+----+-------+--------+------+-------+
|     0.0|   3.0|  male|22.0|   7.25|       S|   0.0|    0.0|
|     1.0|   1.0|female|38.0|71.2833|       C|   1.0|    1.0|
|     1.0|   3.0|female|26.0|  7.925|       S|   1.0|    0.0|
|     1.0|   1.0|female|35.0|   53.1|       S|   1.0|    0.0|
|     0.0|   3.0|  male|35.0|   8.05|       S|   0.0|    0.0|
|     0.0|   3.0|  male|29.7| 8.4583|       Q|   0.0|    2.0|
|     0.0|   1.0|  male|54.0|51.8625|       S|   0.0|    0.0|
|     0.0|   3.0|  male| 2.0| 21.075|       S|   0.0|    0.0|
|     1.0|   3.0|female|27.0|11.1333|       S|   1.0|    0.0|
|     1.0|   2.0|female|14.0|30.0708|       C|   1.0|    1.0|
|     1.0|   3.0|female| 4.0|   16.7|       S|   1.0|    0.0|
|     1.0|   1.0|female|58.0|  26.55|       S|   1.0|    0.0|
|     0.0|   3.0|  male|20.0|   8.05|       S|   0.0|    0.0|
|     0.

In [103]:
data.dtypes

[('Survived', 'float'),
 ('Pclass', 'float'),
 ('Sex', 'string'),
 ('Age', 'float'),
 ('Fare', 'float'),
 ('Embarked', 'string'),
 ('Gender', 'double'),
 ('Boarded', 'double')]

In [105]:
# Assemble all the features with VectorAssembler
required_features = ['Pclass',
                    'Age',
                    'Fare',
                    'Gender',
                    'Boarded'
                   ]
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=required_features, outputCol='features')
final_data = assembler.transform(data)

In [107]:
final_data.show()

+--------+------+------+----+-------+--------+------+-------+--------------------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|Gender|Boarded|            features|
+--------+------+------+----+-------+--------+------+-------+--------------------+
|     0.0|   3.0|  male|22.0|   7.25|       S|   0.0|    0.0|[3.0,22.0,7.25,0....|
|     1.0|   1.0|female|38.0|71.2833|       C|   1.0|    1.0|[1.0,38.0,71.2833...|
|     1.0|   3.0|female|26.0|  7.925|       S|   1.0|    0.0|[3.0,26.0,7.92500...|
|     1.0|   1.0|female|35.0|   53.1|       S|   1.0|    0.0|[1.0,35.0,53.0999...|
|     0.0|   3.0|  male|35.0|   8.05|       S|   0.0|    0.0|[3.0,35.0,8.05000...|
|     0.0|   3.0|  male|29.7| 8.4583|       Q|   0.0|    2.0|[3.0,29.700000762...|
|     0.0|   1.0|  male|54.0|51.8625|       S|   0.0|    0.0|[1.0,54.0,51.8624...|
|     0.0|   3.0|  male| 2.0| 21.075|       S|   0.0|    0.0|[3.0,2.0,21.07500...|
|     1.0|   3.0|female|27.0|11.1333|       S|   1.0|    0.0|[3.0,27.0,11.1332...|
|   

In [110]:
final_data =final_data.drop('Sex')

In [111]:
final_data.show(2)

+--------+------+----+-------+--------+------+-------+--------------------+
|Survived|Pclass| Age|   Fare|Embarked|Gender|Boarded|            features|
+--------+------+----+-------+--------+------+-------+--------------------+
|     0.0|   3.0|22.0|   7.25|       S|   0.0|    0.0|[3.0,22.0,7.25,0....|
|     1.0|   1.0|38.0|71.2833|       C|   1.0|    1.0|[1.0,38.0,71.2833...|
+--------+------+----+-------+--------+------+-------+--------------------+
only showing top 2 rows



In [112]:
final_data =final_data.drop('Embarked')

In [113]:
final_data.show(2)

+--------+------+----+-------+------+-------+--------------------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|            features|
+--------+------+----+-------+------+-------+--------------------+
|     0.0|   3.0|22.0|   7.25|   0.0|    0.0|[3.0,22.0,7.25,0....|
|     1.0|   1.0|38.0|71.2833|   1.0|    1.0|[1.0,38.0,71.2833...|
+--------+------+----+-------+------+-------+--------------------+
only showing top 2 rows



### Modeling

In [117]:
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])

In [129]:
### Now develop a machine learning model
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='Survived', 
                            featuresCol='features',
                            maxDepth=6)

In [130]:
model = rf.fit(training_data)

In [131]:
predict = model.transform(test_data)

### Model Evaluation

In [132]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='Survived', 
    predictionCol='prediction', 
    metricName='accuracy')

In [133]:
accuracy = evaluator.evaluate(predict)
print('Test Accuracy = ', accuracy)

Test Accuracy =  0.8526315789473684


### References
+ http://people.duke.edu/~ccc14/sta-663-2018/notebooks/S15C_Spark_DataFrames.html
+ https://towardsdatascience.com/your-first-apache-spark-ml-model-d2bb82b599dd