# PySpark Initilization and Data Observing

In [417]:
# Importing necessaru libraries.
import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import mean
from pyspark.ml.feature import OneHotEncoder, StandardScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

In [308]:
# Creating a PySpark session.
spark = pyspark.sql.SparkSession.builder.appName('PySpark Project 1').getOrCreate()

In [310]:
# Fetching the data.
data = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('titanic.csv')

In [311]:
# Observing it.
data.show(5)

+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+----+----+--------------------+
|pclass|survived|                name|   sex|   age|sibsp|parch|ticket|    fare|  cabin|embarked|boat|body|           home.dest|
+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+----+----+--------------------+
|     1|       1|Allen, Miss. Elis...|female|    29|    0|    0| 24160|211.3375|     B5|       S|   2|   ?|        St Louis, MO|
|     1|       1|Allison, Master. ...|  male|0.9167|    1|    2|113781|  151.55|C22 C26|       S|  11|   ?|Montreal, PQ / Ch...|
|     1|       0|Allison, Miss. He...|female|     2|    1|    2|113781|  151.55|C22 C26|       S|   ?|   ?|Montreal, PQ / Ch...|
|     1|       0|Allison, Mr. Huds...|  male|    30|    1|    2|113781|  151.55|C22 C26|       S|   ?| 135|Montreal, PQ / Ch...|
|     1|       0|Allison, Mrs. Hud...|female|    25|    1|    2|113781|  151.55|C22 C26|       S|

In [313]:
# Getting information about the columns of our dataset.
data.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- survived: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: string (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)



# Data Cleaning

In [317]:
# We have a lot of '?' string in our dataaet so I will replace it null.
data = data.replace('?', None)

In [319]:
# Observing unique values inside the 'age' column.
data.select('age').distinct().show()

+------+
|   age|
+------+
|  20.5|
|0.9167|
|  38.5|
|    51|
|     7|
|  26.5|
|  0.75|
|    54|
|    15|
|    11|
|0.4167|
|    29|
|  22.5|
|    42|
|0.3333|
|    64|
|     3|
|    30|
|    34|
|    59|
+------+
only showing top 20 rows


*This column should be numerical (double)*

In [322]:
# Converting age column to numeric data type.
data = data.withColumn('age', F.col('age').cast('double'))

In [324]:
data.select('age').show()

+------+
|   age|
+------+
|  29.0|
|0.9167|
|   2.0|
|  30.0|
|  25.0|
|  48.0|
|  63.0|
|  39.0|
|  53.0|
|  71.0|
|  47.0|
|  18.0|
|  24.0|
|  26.0|
|  80.0|
|  NULL|
|  24.0|
|  50.0|
|  32.0|
|  36.0|
+------+
only showing top 20 rows


In [325]:
# The 'fare' column should numeric as well.
data = data.withColumn('fare', F.col('age').cast('double'))

In [328]:
data.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- survived: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)



In [330]:
data = data.drop('home.dest')

In [332]:
total_rows = data.count()

null_percentages = data.select([
    F.round(F.sum(F.when(F.col(f"`{c}`").isNull(), 1).otherwise(0)) / total_rows * 100, 2)
        .alias(c)
    for c in data.columns
])

null_percentages.show()

+------+--------+----+---+-----+-----+-----+------+-----+-----+--------+-----+-----+
|pclass|survived|name|sex|  age|sibsp|parch|ticket| fare|cabin|embarked| boat| body|
+------+--------+----+---+-----+-----+-----+------+-----+-----+--------+-----+-----+
|   0.0|     0.0| 0.0|0.0|20.09|  0.0|  0.0|   0.0|20.09|77.46|    0.15|62.87|90.76|
+------+--------+----+---+-----+-----+-----+------+-----+-----+--------+-----+-----+



#### Large proportions of columns like 'cabin', 'boat' and 'body' are missing so I will remove those columns since they will be problem in modeling phase.

In [335]:
data = data.drop('cabin', 'boat', 'body')

In [337]:
data.show(5)

+------+--------+--------------------+------+------+-----+-----+------+------+--------+
|pclass|survived|                name|   sex|   age|sibsp|parch|ticket|  fare|embarked|
+------+--------+--------------------+------+------+-----+-----+------+------+--------+
|     1|       1|Allen, Miss. Elis...|female|  29.0|    0|    0| 24160|  29.0|       S|
|     1|       1|Allison, Master. ...|  male|0.9167|    1|    2|113781|0.9167|       S|
|     1|       0|Allison, Miss. He...|female|   2.0|    1|    2|113781|   2.0|       S|
|     1|       0|Allison, Mr. Huds...|  male|  30.0|    1|    2|113781|  30.0|       S|
|     1|       0|Allison, Mrs. Hud...|female|  25.0|    1|    2|113781|  25.0|       S|
+------+--------+--------------------+------+------+-----+-----+------+------+--------+
only showing top 5 rows


In [381]:
# I will fill missing values in 'embarked' column.
from pyspark.sql.functions import col
mode_value_em = (
    data.groupBy("embarked")
        .count()
        .orderBy(col("count").desc())
        .first()[0]
)
data = data.fillna({'embarked': mode_value_em})

In [419]:
# Now let's fill missing values in numerical columns.
numeric_cols = ['age', 'fare']

for c in numeric_cols:
    mean_value = data.select(mean(c)).first()[0]
    data = data.fillna({c: mean_value})

#### We do not need 'name' and 'ticket' columns which are unique and will add nothig to a model.

In [422]:
data = data.drop('name', 'ticket')
data.show(5)

+------+--------+------+------+-----+-----+------+--------+
|pclass|survived|   sex|   age|sibsp|parch|  fare|embarked|
+------+--------+------+------+-----+-----+------+--------+
|     1|       1|female|  29.0|    0|    0|  29.0|       S|
|     1|       1|  male|0.9167|    1|    2|0.9167|       S|
|     1|       0|female|   2.0|    1|    2|   2.0|       S|
|     1|       0|  male|  30.0|    1|    2|  30.0|       S|
|     1|       0|female|  25.0|    1|    2|  25.0|       S|
+------+--------+------+------+-----+-----+------+--------+
only showing top 5 rows


In [424]:
data.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- survived: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = false)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- fare: double (nullable = false)
 |-- embarked: string (nullable = false)



# Data Exploration

In [427]:
# Let's see average age of male and female passengers.
data.groupBy('sex').agg({'age': 'mean'}).show()

+------+------------------+
|   sex|          avg(age)|
+------+------------------+
|female|28.886935390492333|
|  male|30.430715521707075|
+------+------------------+



In [429]:
# Now let's if there is any relation between an age and being survived.
data.groupBy('survived').agg({'age': 'mean'}).show()

+--------+------------------+
|survived|          avg(age)|
+--------+------------------+
|       1|29.058812438814574|
|       0| 30.38936817968011|
+--------+------------------+



Not too much difference.

In [432]:
# Let's see if survivability has something to do with gender.
data.groupBy('sex').agg({'survived': 'mean'}).show()

+------+-------------------+
|   sex|      avg(survived)|
+------+-------------------+
|female| 0.7274678111587983|
|  male|0.19098457888493475|
+------+-------------------+



Yes there is a correlation.

In [435]:
# Now let's see if pclass has something to do with survivability.
data.groupBy('pclass').agg({'survived': 'mean'}).show()

+------+------------------+
|pclass|     avg(survived)|
+------+------------------+
|     1|0.6191950464396285|
|     3|0.2552891396332863|
|     2|0.4296028880866426|
+------+------------------+



Yes, there is a high correlation there as well people from upper class survived more on average.

# Encoding and Scaling

In [439]:
data.printSchema()

root
 |-- pclass: integer (nullable = true)
 |-- survived: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = false)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- fare: double (nullable = false)
 |-- embarked: string (nullable = false)



In [468]:
categorical_cols = ["sex", "embarked"]
indexed_cols = [c + "_indexed" for c in categorical_cols]
encoded_cols = [c + "_encoded" for c in categorical_cols]

indexers = [
    StringIndexer(inputCol=c, outputCol=ic, handleInvalid='keep') 
    for c, ic in zip(categorical_cols, indexed_cols)
]
encoder = OneHotEncoder(inputCols=indexed_cols, outputCols=encoded_cols)

In [470]:
numeric_cols = ["pclass", "age", "sibsp", "parch", "fare"]

# First, assembling numeric features into one vector
numeric_assembler = VectorAssembler(inputCols=numeric_cols, outputCol="numeric_features")

# Scaling numeric features
scaler = StandardScaler(inputCol="numeric_features", outputCol="scaled_numeric_features")

In [472]:
# Combining scaled numeric + encoded categorical features
assembler_inputs = encoded_cols + ["scaled_numeric_features"]
final_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")

In [474]:
stages = indexers + [encoder, numeric_assembler, scaler, final_assembler]
pipeline = Pipeline(stages=stages)

# Fitting & transforming.
data_prepared = pipeline.fit(data).transform(data)

In [476]:
data_prepared.select("features", "survived").show(5, truncate=False)

+--------------------------------------------------------------------------------------------------------------------+--------+
|features                                                                                                            |survived|
+--------------------------------------------------------------------------------------------------------------------+--------+
|(10,[1,2,5,6,9],[1.0,1.0,1.1935509781844968,2.2509937091094283,2.2509937091094283])                                 |1       |
|[1.0,0.0,1.0,0.0,0.0,1.1935509781844968,0.07115468734967631,0.9600076272872315,2.31064208577779,0.07115468734967631]|1       |
|[0.0,1.0,1.0,0.0,0.0,1.1935509781844968,0.15524094545582265,0.9600076272872315,2.31064208577779,0.15524094545582265]|0       |
|[1.0,0.0,1.0,0.0,0.0,1.1935509781844968,2.32861418183734,0.9600076272872315,2.31064208577779,2.32861418183734]      |0       |
|[0.0,1.0,1.0,0.0,0.0,1.1935509781844968,1.9405118181977832,0.9600076272872315,2.31064208577779,1.940511

# Modeling

In [478]:
# Train Test split.
train_data, test_data = data_prepared.randomSplit([0.8, 0.2], seed=42)

In [482]:
lr = LogisticRegression(featuresCol="features", labelCol="survived")
lr_model = lr.fit(train_data)

In [484]:
predictions = lr_model.transform(test_data)
predictions.select("features", "survived", "prediction", "probability").show(5, truncate=False)

+------------------------------------------------------------------------------------------------------------------+--------+----------+----------------------------------------+
|features                                                                                                          |survived|prediction|probability                             |
+------------------------------------------------------------------------------------------------------------------+--------+----------+----------------------------------------+
|(10,[1,3,5,6,9],[1.0,1.0,1.1935509781844968,2.7943370182048075,2.7943370182048075])                               |0       |1.0       |[0.0505485294132934,0.9494514705867066] |
|[1.0,0.0,0.0,1.0,0.0,1.1935509781844968,1.3971685091024038,0.9600076272872315,0.0,1.3971685091024038]             |0       |1.0       |[0.37591972322532635,0.6240802767746736]|
|[1.0,0.0,1.0,0.0,0.0,1.1935509781844968,1.4747889818303153,2.8800228818616946,2.31064208577779,1.474788981830

In [488]:
evaluator = BinaryClassificationEvaluator(labelCol="survived", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print("AUC:", auc)

AUC: 0.8115033222591362


The model performed very well.