In [0]:
import pyspark

In [0]:
## Reading dataset with read.format... Function

df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/abhilash.jash@praxis.ac.in/Titanic_Dataset.csv")

In [0]:
## Reading the table uploaded in spark SQL

df1 = spark.sql("Select * FROM titanic_dataset_csv")
## Change this part and filename according to your filename and location of uploaded file.

In [0]:
# df has no schema inferred
print(df.printSchema())

#df1 has its schema inferred as during upload of table we inferred the schema.
print(df1.printSchema())


root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

None
root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

None


In [0]:
df1.columns

Out[10]: ['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [0]:
## Reading dataset with read.format but by inferring the schema.

df = spark.read.format("csv").option("header", "true").option("inferSchema","True").load("dbfs:/FileStore/shared_uploads/abhilash.jash@praxis.ac.in/Titanic_Dataset.csv")

In [0]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [0]:
## Looking at the spark context
spark

In [0]:
df.columns

Out[22]: ['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [0]:
df.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [0]:
#Not selecting 'PassengerID' and 'Cabin' as these don't provide any information about the analysis
my_cols = df.select(['Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Embarked'])

In [0]:
my_final_data = my_cols.na.drop()

In [0]:
## Importing 
## VectorAssembler : Assembles the independent variable inside vectors for feeding into ML model, 
## VectorIndexer : Indexes the vectors created,
## OneHotEncoder : Encodes the indexed categorical data,
## StringIndexer : Indexes the categorical data and prepares it for one hot encoding

from pyspark.ml.feature import (VectorAssembler, VectorIndexer,
                               OneHotEncoder, StringIndexer)

In [0]:
gender_indexer = StringIndexer(inputCol='Sex', outputCol= 'SexIndex')
# A B C
# 0 1 2
# ONE HOT ENCODE
# KEY A B C
# Example A
# [1, 0, 0]
gender_encoder = OneHotEncoder(inputCol='SexIndex', outputCol='SexVec')

In [0]:
## Encoding embarked similar to gender

embark_indexer = StringIndexer(inputCol='Embarked', outputCol='EmbarkIndex')
embark_encoder = OneHotEncoder(inputCol='EmbarkIndex', outputCol='EmbarkVec')

In [0]:
## Vector assembling the data and preparing the independent features to be able to fed into the ML model.

assembler = VectorAssembler(inputCols=['Pclass', 'SexVec', 'EmbarkVec', 'Age', 'Parch','Fare'], outputCol='features')

In [0]:
from pyspark.ml.classification import LogisticRegression

In [0]:
from pyspark.ml import Pipeline

In [0]:
log_reg = LogisticRegression(featuresCol='features', labelCol='Survived')

In [0]:
pipeline = Pipeline(stages = [gender_indexer, embark_indexer,
                             gender_encoder, embark_encoder,
                             assembler, log_reg])

## Creating the pipeline in stages : gender_indexer ---> embark_indexer ---> gender_encoder ---> embark encoder --->assembler (vector assembler) ---> log_reg (ML model)

In [0]:
train_data, test_data = my_final_data.randomSplit([0.7, 0.3])

In [0]:
train_data.head(5)

Out[45]: [Row(Survived=0, Pclass=1, Name='Allison, Miss. Helen Loraine', Sex='female', Age=2.0, SibSp=1, Parch=2, Ticket='113781', Fare=151.55, Embarked='S'),
 Row(Survived=0, Pclass=1, Name='Allison, Mrs. Hudson J C (Bessie Waldo Daniels)', Sex='female', Age=25.0, SibSp=1, Parch=2, Ticket='113781', Fare=151.55, Embarked='S'),
 Row(Survived=0, Pclass=1, Name='Artagaveytia, Mr. Ramon', Sex='male', Age=71.0, SibSp=0, Parch=0, Ticket='PC 17609', Fare=49.5042, Embarked='C'),
 Row(Survived=0, Pclass=1, Name='Baxter, Mr. Quigg Edmond', Sex='male', Age=24.0, SibSp=0, Parch=1, Ticket='PC 17558', Fare=247.5208, Embarked='C'),
 Row(Survived=0, Pclass=1, Name='Blackwell, Mr. Stephen Weart', Sex='male', Age=45.0, SibSp=0, Parch=0, Ticket='113784', Fare=35.5, Embarked='S')]

In [0]:
train_data.count()

Out[46]: 512

In [0]:
test_data.head(5)

Out[47]: [Row(Survived=0, Pclass=1, Name='Andrews, Mr. Thomas Jr', Sex='male', Age=39.0, SibSp=0, Parch=0, Ticket='112050', Fare=0.0, Embarked='S'),
 Row(Survived=0, Pclass=1, Name='Butt, Major. Archibald Willingham', Sex='male', Age=45.0, SibSp=0, Parch=0, Ticket='113050', Fare=26.55, Embarked='S'),
 Row(Survived=0, Pclass=1, Name='Davidson, Mr. Thornton', Sex='male', Age=31.0, SibSp=1, Parch=0, Ticket='F.C. 12750', Fare=52.0, Embarked='S'),
 Row(Survived=0, Pclass=1, Name='Douglas, Mr. Walter Donald', Sex='male', Age=50.0, SibSp=1, Parch=0, Ticket='PC 17761', Fare=106.425, Embarked='C'),
 Row(Survived=0, Pclass=1, Name='Fortune, Mr. Charles Alexander', Sex='male', Age=19.0, SibSp=3, Parch=2, Ticket='19950', Fare=263.0, Embarked='S')]

In [0]:
test_data.count()

Out[48]: 200

In [0]:
fit_model = pipeline.fit(train_data)

In [0]:
results = fit_model.transform(test_data)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [0]:
my_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Survived')

In [0]:
results.select('Survived', 'prediction').show()

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       1.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 20 rows



In [0]:
AUC = my_eval.evaluate(results)

In [0]:
AUC

Out[55]: 0.7592704250829062