## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName("MLlib Example").getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ["ID", "Feature", "Target"]
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=["Feature"], outputCol="Features")
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol="Features", labelCol="Target")
model = lr.fit(df_transformed)

# Print model coefficients
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/02 10:40:00 WARN Utils: Your hostname, user, resolves to a loopback address: 127.0.1.1; using 192.168.1.6 instead (on interface wlan0)
25/12/02 10:40:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/02 10:40:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/02 10:40:32 WARN Instrumentation: [f6a7585d] regParam is zero, which might cause numerical instability and overfitting.
25/12/02 10:40:36 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/12/02 10:40:37 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JN

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [2]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Example dataset
data = [(1, Vectors.dense([2.0, 3.0]), 0), (2, Vectors.dense([1.0, 5.0]), 1), (3, Vectors.dense([2.5, 4.5]), 1), (4, Vectors.dense([3.0, 6.0]), 0)]
columns = ["ID", "Features", "Label"]
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol="Features", labelCol="Label")
model = lr.fit(df)

# Display coefficients and summary
print(f"Coefficients: {model.coefficients}")
print(f"Intercept: {model.intercept}")

Coefficients: [-12.262057925078082,4.087352265122887]
Intercept: 11.568912722532378


In [3]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans

# Example dataset
data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ["ID", "Features"]
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol="Features", k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f"Cluster Centers: {centers}")

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.

### Load a Real-World Dataset
https://www.kaggle.com/datasets/yasserh/titanic-dataset

In [4]:
import kagglehub
from kagglehub import KaggleDatasetAdapter

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("MLlib Example").getOrCreate()

dataset_path = kagglehub.dataset_download("yasserh/titanic-dataset")

df = spark.read.csv(f"{dataset_path}/Titanic-Dataset.csv", header=True, inferSchema=True)

### Clean the Dataset

In [5]:
age_median = df.approxQuantile("Age", [0.5], 0)[0]
df = df.fillna({"Age": age_median})

mode_embarked = df.groupby("Embarked").count().orderBy("count", ascending=False).first()[0]
df = df.fillna({"Embarked": mode_embarked})

df = df.drop("Cabin")

indexer_sex = StringIndexer(inputCol="Sex", outputCol="Sex_Index")
df = indexer_sex.fit(df).transform(df).drop("Sex").withColumnRenamed("Sex_Index", "Sex")

indexer_embarked = StringIndexer(inputCol="Embarked", outputCol="Embarked_Index")
df = indexer_embarked.fit(df).transform(df).drop("Embarked").withColumnRenamed("Embarked_Index", "Embarked")

df = df.drop("Name")
df = df.drop("Ticket")
df = df.drop("Cabin")
df = df.drop("PassengerId")

df = df.dropDuplicates()

In [6]:
df.show(5, truncate=False)

+--------+------+----+-----+-----+-------+---+--------+
|Survived|Pclass|Age |SibSp|Parch|Fare   |Sex|Embarked|
+--------+------+----+-----+-----+-------+---+--------+
|0       |3     |50.0|0    |0    |8.05   |0.0|0.0     |
|0       |2     |25.0|1    |0    |26.0   |0.0|0.0     |
|0       |3     |22.0|0    |0    |10.5167|1.0|0.0     |
|0       |3     |28.0|0    |0    |7.7292 |0.0|2.0     |
|0       |3     |28.0|0    |0    |8.05   |1.0|0.0     |
+--------+------+----+-----+-----+-------+---+--------+
only showing top 5 rows


### Split the Training and Test Dataset

In [7]:
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

features = ["Pclass", "Age", "SibSp", "Parch", "Fare", "Sex", "Embarked"]
assembler = VectorAssembler(inputCols=features, outputCol="Features")

train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

In [8]:
train_df.show(5, truncate=False)

+--------+------+----+-----+-----+--------+---+--------+-----------------------------------+
|Survived|Pclass|Age |SibSp|Parch|Fare    |Sex|Embarked|Features                           |
+--------+------+----+-----+-----+--------+---+--------+-----------------------------------+
|0       |1     |2.0 |1    |2    |151.55  |1.0|0.0     |[1.0,2.0,1.0,2.0,151.55,1.0,0.0]   |
|0       |1     |18.0|1    |0    |108.9   |0.0|1.0     |[1.0,18.0,1.0,0.0,108.9,0.0,1.0]   |
|0       |1     |19.0|3    |2    |263.0   |0.0|0.0     |[1.0,19.0,3.0,2.0,263.0,0.0,0.0]   |
|0       |1     |21.0|0    |1    |77.2875 |0.0|0.0     |[1.0,21.0,0.0,1.0,77.2875,0.0,0.0] |
|0       |1     |22.0|0    |0    |135.6333|0.0|1.0     |[1.0,22.0,0.0,0.0,135.6333,0.0,1.0]|
+--------+------+----+-----+-----+--------+---+--------+-----------------------------------+
only showing top 5 rows


In [9]:
test_df.show(5, truncate=False)

+--------+------+----+-----+-----+------+---+--------+---------------------------------+
|Survived|Pclass|Age |SibSp|Parch|Fare  |Sex|Embarked|Features                         |
+--------+------+----+-----+-----+------+---+--------+---------------------------------+
|0       |1     |19.0|1    |0    |53.1  |0.0|0.0     |[1.0,19.0,1.0,0.0,53.1,0.0,0.0]  |
|0       |1     |24.0|0    |0    |79.2  |0.0|1.0     |[1.0,24.0,0.0,0.0,79.2,0.0,1.0]  |
|0       |1     |25.0|1    |2    |151.55|1.0|0.0     |[1.0,25.0,1.0,2.0,151.55,1.0,0.0]|
|0       |1     |28.0|0    |0    |26.55 |0.0|0.0     |(7,[0,1,4],[1.0,28.0,26.55])     |
|0       |1     |28.0|0    |0    |42.4  |0.0|0.0     |(7,[0,1,4],[1.0,28.0,42.4])      |
+--------+------+----+-----+-----+------+---+--------+---------------------------------+
only showing top 5 rows


### Train the LogisticRegression Model

In [10]:
lr = LogisticRegression(featuresCol="Features", labelCol="Survived")
model = lr.fit(train_df)

predictions = model.transform(test_df)
predictions.select("Survived", "prediction", "probability").show(50, truncate=False)

evaluator = BinaryClassificationEvaluator(labelCol="Survived")
accuracy = evaluator.evaluate(predictions)

+--------+----------+-----------------------------------------+
|Survived|prediction|probability                              |
+--------+----------+-----------------------------------------+
|0       |1.0       |[0.44415659912712796,0.5558434008728721] |
|0       |1.0       |[0.3689044414680602,0.6310955585319398]  |
|0       |1.0       |[0.0638702133835211,0.9361297866164789]  |
|0       |1.0       |[0.4639518799977746,0.5360481200022253]  |
|0       |1.0       |[0.45507557450082886,0.5449244254991712] |
|0       |1.0       |[0.35785878445314984,0.6421412155468502] |
|0       |1.0       |[0.4782961401296129,0.5217038598703871]  |
|0       |0.0       |[0.6094746603286376,0.3905253396713624]  |
|0       |0.0       |[0.6624269920490056,0.33757300795099443] |
|0       |0.0       |[0.6229242657250683,0.3770757342749317]  |
|0       |0.0       |[0.5698533666829934,0.43014663331700664] |
|0       |0.0       |[0.6374540215798941,0.36254597842010594] |
|0       |0.0       |[0.6306984093654345

In [11]:
print(f"Model Accuracy: {accuracy}")

Model Accuracy: 0.8087774294670851


### Explore Hyperparameter Tuning Using Cross-Validation

In [12]:
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5)

cv_model = cv.fit(train_df)

cv_predictions = cv_model.transform(test_df)

cv_accuracy = evaluator.evaluate(cv_predictions)

print(f"Cross-Validated Model Accuracy: {cv_accuracy}")

                                                                                

Cross-Validated Model Accuracy: 0.7920585161964476
