In [1]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 67kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 43.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=58d69a435bd06b4fb08bd3501d525e052e05c782915da25d47cb9295ede0f407
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


**LOGISTIC REGRESSION OF TITANIC DATASET IN SPARK**

**Start a Spark Session**

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Titanic').getOrCreate()

In [4]:
#Import the dataset

df = spark.read.csv('titanic.csv',inferSchema=True,header=True)
df.show(5)



+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

**Checking Information about the Data**

In [5]:
# Checking the number of rows in the dataset

df.count()

891

In [6]:
# What are the names of the columns

df.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [7]:
# Check descriptive statistics of the continous variables

df.describe().show()

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764705882|0.5230078563411896|0.38159371492704824|260318.54916792738| 32.20420

In [8]:
df.describe().toPandas()

Unnamed: 0,summary,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,count,891.0,891.0,891.0,891,891,714.0,891.0,891.0,891,891.0,204,889
1,mean,446.0,0.3838383838383838,2.308641975308642,,,29.69911764705882,0.5230078563411896,0.3815937149270482,260318.54916792738,32.2042079685746,,
2,stddev,257.3538420152301,0.4865924542648575,0.8360712409770491,,,14.526497332334037,1.1027434322934315,0.8060572211299488,471609.26868834975,49.69342859718089,,
3,min,1.0,0.0,1.0,"""Andersson, Mr. August Edvard (""""Wennerstrom"""")""",female,0.42,0.0,0.0,110152,0.0,A10,C
4,max,891.0,1.0,3.0,"van Melkebeke, Mr. Philemon",male,80.0,8.0,6.0,WE/P 5735,512.3292,T,S


In [9]:
# Check the column data types

df.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

**Data Preparation and Feature Engineering**

In [10]:
#Select the necessary columns
from pyspark.sql.functions import isnull,when,count,col

In [11]:
dataset = df.select(['Survived','Pclass','Sex','Age','Fare','Embarked'])
dataset.show()

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|       0|     3|  male|22.0|   7.25|       S|
|       1|     1|female|38.0|71.2833|       C|
|       1|     3|female|26.0|  7.925|       S|
|       1|     1|female|35.0|   53.1|       S|
|       0|     3|  male|35.0|   8.05|       S|
|       0|     3|  male|null| 8.4583|       Q|
|       0|     1|  male|54.0|51.8625|       S|
|       0|     3|  male| 2.0| 21.075|       S|
|       1|     3|female|27.0|11.1333|       S|
|       1|     2|female|14.0|30.0708|       C|
|       1|     3|female| 4.0|   16.7|       S|
|       1|     1|female|58.0|  26.55|       S|
|       0|     3|  male|20.0|   8.05|       S|
|       0|     3|  male|39.0| 31.275|       S|
|       0|     3|female|14.0| 7.8542|       S|
|       1|     2|female|55.0|   16.0|       S|
|       0|     3|  male| 2.0| 29.125|       Q|
|       1|     2|  male|null|   13.0|       S|
|       0|   

In [12]:
#Check for the missing data

dataset.select([count(when(isnull(c),c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|177|   0|       2|
+--------+------+---+---+----+--------+



In [14]:
#Drop the missing values

data = dataset.replace('null',None).dropna(how='any')

In [15]:
data.select([count(when(isnull(c),c)).alias(c) for c in data.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|  0|   0|       0|
+--------+------+---+---+----+--------+



**Index Categorical Variables with StringIndexer**

In [16]:
from pyspark.ml.feature import StringIndexer

In [18]:
data = StringIndexer(inputCol='Sex',outputCol='Gender',handleInvalid='keep').fit(data).transform(data)
data = StringIndexer(inputCol='Embarked',outputCol='Boarded',handleInvalid='keep').fit(data).transform(data)
data.show()

+--------+------+------+----+-------+--------+------+-------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|Gender|Boarded|
+--------+------+------+----+-------+--------+------+-------+
|       0|     3|  male|22.0|   7.25|       S|   0.0|    0.0|
|       1|     1|female|38.0|71.2833|       C|   1.0|    1.0|
|       1|     3|female|26.0|  7.925|       S|   1.0|    0.0|
|       1|     1|female|35.0|   53.1|       S|   1.0|    0.0|
|       0|     3|  male|35.0|   8.05|       S|   0.0|    0.0|
|       0|     1|  male|54.0|51.8625|       S|   0.0|    0.0|
|       0|     3|  male| 2.0| 21.075|       S|   0.0|    0.0|
|       1|     3|female|27.0|11.1333|       S|   1.0|    0.0|
|       1|     2|female|14.0|30.0708|       C|   1.0|    1.0|
|       1|     3|female| 4.0|   16.7|       S|   1.0|    0.0|
|       1|     1|female|58.0|  26.55|       S|   1.0|    0.0|
|       0|     3|  male|20.0|   8.05|       S|   0.0|    0.0|
|       0|     3|  male|39.0| 31.275|       S|   0.0|    0.0|
|       

In [19]:
#Check data types

data.dtypes

[('Survived', 'int'),
 ('Pclass', 'int'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('Fare', 'double'),
 ('Embarked', 'string'),
 ('Gender', 'double'),
 ('Boarded', 'double')]

In [21]:
#Drop unnecessary columns

data = data.drop('Sex','Embarked')
data.show()

+--------+------+----+-------+------+-------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|
+--------+------+----+-------+------+-------+
|       0|     3|22.0|   7.25|   0.0|    0.0|
|       1|     1|38.0|71.2833|   1.0|    1.0|
|       1|     3|26.0|  7.925|   1.0|    0.0|
|       1|     1|35.0|   53.1|   1.0|    0.0|
|       0|     3|35.0|   8.05|   0.0|    0.0|
|       0|     1|54.0|51.8625|   0.0|    0.0|
|       0|     3| 2.0| 21.075|   0.0|    0.0|
|       1|     3|27.0|11.1333|   1.0|    0.0|
|       1|     2|14.0|30.0708|   1.0|    1.0|
|       1|     3| 4.0|   16.7|   1.0|    0.0|
|       1|     1|58.0|  26.55|   1.0|    0.0|
|       0|     3|20.0|   8.05|   0.0|    0.0|
|       0|     3|39.0| 31.275|   0.0|    0.0|
|       0|     3|14.0| 7.8542|   1.0|    0.0|
|       1|     2|55.0|   16.0|   1.0|    0.0|
|       0|     3| 2.0| 29.125|   0.0|    2.0|
|       0|     3|31.0|   18.0|   1.0|    0.0|
|       0|     2|35.0|   26.0|   0.0|    0.0|
|       1|     2|34.0|   13.0|   0

**Assemble all the features using Assembler object**

In [23]:
required_features = 'Pclass','Age','Fare','Gender','Boarded'
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=required_features,outputCol='features')
transformed_data = assembler.transform(data)

In [24]:
transformed_data.show()

+--------+------+----+-------+------+-------+--------------------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|            features|
+--------+------+----+-------+------+-------+--------------------+
|       0|     3|22.0|   7.25|   0.0|    0.0|[3.0,22.0,7.25,0....|
|       1|     1|38.0|71.2833|   1.0|    1.0|[1.0,38.0,71.2833...|
|       1|     3|26.0|  7.925|   1.0|    0.0|[3.0,26.0,7.925,1...|
|       1|     1|35.0|   53.1|   1.0|    0.0|[1.0,35.0,53.1,1....|
|       0|     3|35.0|   8.05|   0.0|    0.0|[3.0,35.0,8.05,0....|
|       0|     1|54.0|51.8625|   0.0|    0.0|[1.0,54.0,51.8625...|
|       0|     3| 2.0| 21.075|   0.0|    0.0|[3.0,2.0,21.075,0...|
|       1|     3|27.0|11.1333|   1.0|    0.0|[3.0,27.0,11.1333...|
|       1|     2|14.0|30.0708|   1.0|    1.0|[2.0,14.0,30.0708...|
|       1|     3| 4.0|   16.7|   1.0|    0.0|[3.0,4.0,16.7,1.0...|
|       1|     1|58.0|  26.55|   1.0|    0.0|[1.0,58.0,26.55,1...|
|       0|     3|20.0|   8.05|   0.0|    0.0|[3.0,20.0,8.05,0.

**Modelling**

In [25]:
#Split the data into train and test sets

train_df,test_df = transformed_data.randomSplit([0.7,0.3])

In [26]:
#Define the Model

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='Survived',featuresCol='features')

In [27]:
#Fit the model

model = lr.fit(train_df)

In [28]:
#Predict with the test dataset

predictions = model.transform(test_df)

In [29]:
predictions.show()

+--------+------+----+-------+------+-------+--------------------+--------------------+--------------------+----------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|            features|       rawPrediction|         probability|prediction|
+--------+------+----+-------+------+-------+--------------------+--------------------+--------------------+----------+
|       0|     1| 2.0| 151.55|   1.0|    0.0|[1.0,2.0,151.55,1...|[-3.9346068971753...|[0.01917838203319...|       1.0|
|       0|     1|18.0|  108.9|   0.0|    1.0|[1.0,18.0,108.9,0...|[-1.1731684861228...|[0.23628274310936...|       1.0|
|       0|     1|19.0|  263.0|   0.0|    0.0|[1.0,19.0,263.0,0...|[-0.7911731920810...|[0.31191681785515...|       1.0|
|       0|     1|29.0|   30.0|   0.0|    0.0|[1.0,29.0,30.0,0....|[-0.3122663739815...|[0.42256163930216...|       1.0|
|       0|     1|29.0|   66.6|   0.0|    0.0|[1.0,29.0,66.6,0....|[-0.3166126611876...|[0.42150148936271...|       1.0|
|       0|     1|30.0|  27.75|   0.0|   

In [31]:
#Evaluate the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='Survived',rawPredictionCol='prediction')

In [32]:
predictions.select('Survived','prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       1.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [33]:
AUC = evaluator.evaluate(predictions)
AUC

0.80139853001225