# Project 1: Machine Learning Project without Mllib Pipline


# Initialize SparkSession & Reading Data


In [51]:
from pyspark.sql import SparkSession

# Create a SparkSession or use the existing one
spark = SparkSession.builder.appName("Titanic Data").getOrCreate()

df = spark.read.format("csv").option("header", "true").load("./data/titanic/train.csv")

df.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

# Selecting some columns (if needed)


In [52]:
from pyspark.sql.functions import col

dataset = df.select(
    col("Survived").cast("float"),
    col("Pclass").cast("float"),
    col("Sex"),
    col("Age").cast("float"),
    col("Fare").cast("float"),
    col("Embarked"),
)

dataset.show(5)

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|     0.0|   3.0|  male|22.0|   7.25|       S|
|     1.0|   1.0|female|38.0|71.2833|       C|
|     1.0|   3.0|female|26.0|  7.925|       S|
|     1.0|   1.0|female|35.0|   53.1|       S|
|     0.0|   3.0|  male|35.0|   8.05|       S|
+--------+------+------+----+-------+--------+
only showing top 5 rows



# Removing null values (if needed)


In [53]:
from pyspark.sql.functions import isnull, when, count, col

# Display the number of rows having null values inside each of the columns (1)
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

# Replace any occurrence of '?' with None and drop any rows having any None values
dataset = dataset.replace("?", None).dropna(how="any")


# Display the number of rows having null values inside each of the columns (2 - check the data frame again)
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|177|   0|       2|
+--------+------+---+---+----+--------+

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|  0|   0|       0|
+--------+------+---+---+----+--------+



# Converting categorical variables to numeric values

Spark only supports numeric values and is incapable of handling categorical variables.

For modeling, all categorical variables must be converted to numeric values. To achieve this, StringIndexer is employed.


In [54]:
from pyspark.ml.feature import StringIndexer

dataset = (
    StringIndexer(inputCol="Sex", outputCol="Gender", handleInvalid="keep")
    .fit(dataset)
    .transform(dataset)
)
dataset = (
    StringIndexer(inputCol="Embarked", outputCol="Boarded", handleInvalid="keep")
    .fit(dataset)
    .transform(dataset)
)

dataset.show(2)

+--------+------+------+----+-------+--------+------+-------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|Gender|Boarded|
+--------+------+------+----+-------+--------+------+-------+
|     0.0|   3.0|  male|22.0|   7.25|       S|   0.0|    0.0|
|     1.0|   1.0|female|38.0|71.2833|       C|   1.0|    1.0|
+--------+------+------+----+-------+--------+------+-------+
only showing top 2 rows



In [55]:
dataset = dataset.drop("Sex").drop("Embarked")

dataset.show(2)

+--------+------+----+-------+------+-------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|
+--------+------+----+-------+------+-------+
|     0.0|   3.0|22.0|   7.25|   0.0|    0.0|
|     1.0|   1.0|38.0|71.2833|   1.0|    1.0|
+--------+------+----+-------+------+-------+
only showing top 2 rows



# Feature engineering

Spark learns through two columns, <span style="color:green">label</span> and <span style="color:green">feature</span>.
<span style="color:red">Therefore, all columns except the target column must be combined into a single column.</span> This is accomplished via <span style="color:green">VesctorAssembler</span>.


In [56]:
from pyspark.ml.feature import VectorAssembler as va

# Prepare features

# Define the feature columns set
original_feature_columns = ["Pclass", "Age", "Fare", "Gender", "Boarded"]

# Define the vector assembler to map the feature set to be dimensionally reduced to the feature column
assembler = va(inputCols=original_feature_columns, outputCol="feature")
# Use the VectorAssembler instance to transform the dataset in the spark required format
spark_required_formatted_data = assembler.transform(dataset)

spark_required_formatted_data.show(5)

+--------+------+----+-------+------+-------+--------------------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|             feature|
+--------+------+----+-------+------+-------+--------------------+
|     0.0|   3.0|22.0|   7.25|   0.0|    0.0|[3.0,22.0,7.25,0....|
|     1.0|   1.0|38.0|71.2833|   1.0|    1.0|[1.0,38.0,71.2833...|
|     1.0|   3.0|26.0|  7.925|   1.0|    0.0|[3.0,26.0,7.92500...|
|     1.0|   1.0|35.0|   53.1|   1.0|    0.0|[1.0,35.0,53.0999...|
|     0.0|   3.0|35.0|   8.05|   0.0|    0.0|[3.0,35.0,8.05000...|
+--------+------+----+-------+------+-------+--------------------+
only showing top 5 rows



# Modeling


In [57]:
from pyspark.ml.classification import RandomForestClassifier as rfc

# Splitting the dataset into train(80%) and test(20%) subsets
training_data, testing_data = spark_required_formatted_data.randomSplit([0.8, 0.2])

print(f"Number of train samples: {str(training_data.count())}")
print(f"Number of test samples: {str(testing_data.count())}")

# Instantiate the Random-Forest instance
rf = rfc(labelCol="Survived", featuresCol="feature", maxDepth=5)

# Train the rf model and save it in the model variable
model = rf.fit(training_data)

# Make predictions on the test data
predictions = model.transform(testing_data)

Number of train samples: 565
Number of test samples: 147


# Evaluation


In [58]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator as mcv

evaluator = mcv(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

print(f"Training set accuracy: {accuracy:.4f}")

Training set accuracy: 0.7755


# Checking Spark jobs

## The Jobs

<img src="./images/spark-jobs-check.png" style="width: 500px">
<img src="./images/spark-jobs-check-2.png" style="width: 500px">

## The Stages

<img src="./images/spark-stages-check.png" style="width: 500px">


# Stop the Spark Session


In [59]:
spark.stop()