# Project 1: Machine Learning Project without Mllib Pipline
Name: Awara Pirkhdrie, 
Date: 2024-02-29

In [2]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\Spark\\spark-3.5.1-bin-hadoop3'

# Initialize SparkSession

In [3]:
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession \
        .builder \
        .appName("Titanic Data") \
        .getOrCreate()

In [8]:
spark

# Reading Data
• Dataset (Titanic): https://www.kaggle.com/c/titanic/data

In [72]:
# Creates DataFrame 'df' by reading a CSV file with Spark, including column names and specifying the file path.
df = (spark.read
        .format("csv")
        .option("header", "true")
        .load("./Dataset/titanic/train.csv"))

In [73]:
df.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

# Selecting some columns (if needed)
• From pyspark.sql.functions import col and then select columns

In [74]:
from pyspark.sql.functions import col

In [75]:
# Selects and type transforms columns from DataFrame for analysis.
dataset = df.select(col("Survived").cast("float"),
                    col("Pclass").cast("float"),
                    col("Sex"),
                    col("Age").cast("float"),
                    col("Fare").cast("float"),
                    col("Embarked"),
)

In [76]:
dataset.show(4)

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|     0.0|   3.0|  male|22.0|   7.25|       S|
|     1.0|   1.0|female|38.0|71.2833|       C|
|     1.0|   3.0|female|26.0|  7.925|       S|
|     1.0|   1.0|female|35.0|   53.1|       S|
+--------+------+------+----+-------+--------+
only showing top 4 rows



# Removing null values (if needed)

In [77]:
from pyspark.sql.functions import isnull, when, count, col

In [78]:
# Counts and displays the number of null values per column in the dataset.
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|177|   0|       2|
+--------+------+---+---+----+--------+



In [79]:
#Replaces "?" with None and removes rows with any None.
dataset = dataset.replace("?", None).dropna(how="any")

In [80]:
#Counts null values in each column and displays the result.
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|  0|   0|       0|
+--------+------+---+---+----+--------+



# Converting categorical variables to numeric values
• Spark only supports numeric values and is incapable of handling categorical variables. For modeling, all categorical
variables must be converted to numeric values. To achieve this, StringIndexer is employed.

In [81]:
dataset.show(3)

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|     0.0|   3.0|  male|22.0|   7.25|       S|
|     1.0|   1.0|female|38.0|71.2833|       C|
|     1.0|   3.0|female|26.0|  7.925|       S|
+--------+------+------+----+-------+--------+
only showing top 3 rows



In [82]:
from pyspark.ml.feature import StringIndexer

In [83]:
# Converts text column "Sex" to numeric values in new column "Gender".
dataset = StringIndexer(
    inputCol="Sex", 
    outputCol="Gender", 
    handleInvalid="keep").fit(dataset).transform(dataset)

In [84]:
# Indexes text column "Embarked" to numeric "Boarded", handling invalids with "keep".
dataset = StringIndexer(
    inputCol="Embarked", 
    outputCol="Boarded", 
    handleInvalid="keep").fit(dataset).transform(dataset)

In [85]:
dataset.show(2)

+--------+------+------+----+-------+--------+------+-------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|Gender|Boarded|
+--------+------+------+----+-------+--------+------+-------+
|     0.0|   3.0|  male|22.0|   7.25|       S|   0.0|    0.0|
|     1.0|   1.0|female|38.0|71.2833|       C|   1.0|    1.0|
+--------+------+------+----+-------+--------+------+-------+
only showing top 2 rows



##### Finally, we can drop ‘sex’ and ‘ embarked’ columns

In [86]:
# Removes the 'Sex' and 'Embarked' columns from the dataset sequentially.
# Drop unnecessary columns
dataset = dataset.drop('Sex')
dataset = dataset.drop('Embarked')

# Feature engineering
• Spark learns through two columns, label and feature. Therefore, all columns except the target column must be combined
into a single column. This is accomplished via VesctorAssembler.

In [87]:
from pyspark.ml.feature import VectorAssembler as va

In [88]:
# Aggregates specific columns into a 'features' column for machine learning models.
require_featured = ['Pclass', 'Age', 'Fare', 'Gender', 'Boarded']
assembler = va(inputCols=require_featured, outputCol='features') 
transformed_data = assembler.transform(dataset)

In [89]:
transformed_data.show(5)

+--------+------+----+-------+------+-------+--------------------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|            features|
+--------+------+----+-------+------+-------+--------------------+
|     0.0|   3.0|22.0|   7.25|   0.0|    0.0|[3.0,22.0,7.25,0....|
|     1.0|   1.0|38.0|71.2833|   1.0|    1.0|[1.0,38.0,71.2833...|
|     1.0|   3.0|26.0|  7.925|   1.0|    0.0|[3.0,26.0,7.92500...|
|     1.0|   1.0|35.0|   53.1|   1.0|    0.0|[1.0,35.0,53.0999...|
|     0.0|   3.0|35.0|   8.05|   0.0|    0.0|[3.0,35.0,8.05000...|
+--------+------+----+-------+------+-------+--------------------+
only showing top 5 rows



# Modeling

In [131]:
'''
First the transformed dataset into training and test sets with a distribution of 80% for training and 20% for testing. 
Then a RandomForest classifier is created with the specification of the target column ('Survived') and the 
feature column ('features'). The model is trained with the training data and used to make predictions on the test data, 
resulting in a new DataFrame with the predictions.
'''

# Splitting the dataset into train and test
(training_data, testing_data) = transformed_data.randomSplit([0.8, 0.2])

print("Number of train samples: " + str(training_data.count()))
print("Number of test samples: " + str(testing_data.count()))

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Survived', 
                            featuresCol='features', 
                            maxDepth=5)

model = rf.fit(training_data)
predictions = model.transform(testing_data)


Number of train samples: 583
Number of test samples: 129


# Evaluation

In [132]:
'''
Multi-class classification evaluator using PySpark's ML library, set to evaluate the accuracy ('accuracy') 
by comparing the actual survival results ('Survived') with the predicted ones ('prediction') from the model. 
The accuracy of the model's predictions on the test data is calculated and then printed.
'''
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='Survived', 
                                              predictionCol='prediction', 
                                              metricName='accuracy')

accuracy = evaluator.evaluate(predictions)

print("Training Accuracy: ", accuracy)

Training Accuracy:  0.7596899224806202


# Checking Spark jobs
After all you can check Spark Jobs on your local machine and manage them.

![](images\2.png)

![](images\3.png)