# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [1]:
# Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Menginisialisasi Spark Session
spark = SparkSession.builder.appName('Contoh MLlib').getOrCreate()

# load data contoh
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# persiapan data untuk modeling
assembler =  VectorAssembler(inputCols=['Feature'], outputCol='features')
df_transformed = assembler.transform(df)

# melatih model linear regression
lr = LinearRegression(featuresCol='features', labelCol='Target')
model = lr.fit(df_transformed)

# mncetak koefisien model
print(f'Koefisien: {model.coefficients}')
print(f'Intersep: {model.intercept}')

Koefisien: [0.9999999999999992]
Intersep: 15.000000000000009


In [2]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
spark = SparkSession.builder.appName('LogisticRegressionExample').getOrCreate()
# load data
data = [
    (1, 2.0, 3.0, 0),
    (2, 1.0, 5.0, 1),
    (3, 2.5, 4.5, 1),
    (4, 3.0, 6.0, 0)
]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# persiapan data
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df_transformed = assembler.transform(df)

# Cek hasil transformasinya (opsional, biar paham isinya)
df_transformed.select("Feature1", "Feature2", "Features", "Label").show()

# latih model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df_transformed)

# Output
print(f'Koefisien: {model.coefficients}')
print(f'Intersep: {model.intercept}')

+--------+--------+---------+-----+
|Feature1|Feature2| Features|Label|
+--------+--------+---------+-----+
|     2.0|     3.0|[2.0,3.0]|    0|
|     1.0|     5.0|[1.0,5.0]|    1|
|     2.5|     4.5|[2.5,4.5]|    1|
|     3.0|     6.0|[3.0,6.0]|    0|
+--------+--------+---------+-----+

Koefisien: [-12.26205792372122,4.087352264669246]
Intersep: 11.568912721182315


In [3]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors # Import Vectors

# Example dataset
# Convert the feature lists to VectorUDT using Vectors.dense
data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0]))
]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AQI").getOrCreate()
local = "C:/Users/ASUS/AQI (1).csv"
df = spark.read.csv(local, header=True, inferSchema=True)
df.show()

+--------------------+----------------+---------+--------------------+------------+---------------+---------------+--------------------+-------------+----------------+---------------+--------------------+--------------------+-----------------------+--------------------------+------------------------+--------------------------+
|             Country|            City|AQI Value|        AQI Category|CO AQI Value|CO AQI Category|Ozone AQI Value|  Ozone AQI Category|NO2 AQI Value|NO2 AQI Category|PM2.5 AQI Value|  PM2.5 AQI Category|AQI Value_Normalized|CO AQI Value_Normalized|Ozone AQI Value_Normalized|NO2 AQI Value_Normalized|PM2.5 AQI Value_Normalized|
+--------------------+----------------+---------+--------------------+------------+---------------+---------------+--------------------+-------------+----------------+---------------+--------------------+--------------------+-----------------------+--------------------------+------------------------+--------------------------+
|  russian fe

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import col

In [None]:
label = StringIndexer(inputCol="AQI Category", outputCol="label")

df = df.withColumnRenamed("PM2.5 AQI Value", "PM2_5_AQI_Value") \
       .withColumnRenamed("PM2.5 AQI Category", "PM2_5_AQI_Category")

vector = VectorAssembler(
    inputCols = ["CO AQI Value", "Ozone AQI Value", "NO2 AQI Value", "PM2_5_AQI_Value"],
    outputCol = "features"
)

In [None]:
train, test = df.randomSplit([0.8, 0.2], seed=13)
lr = LogisticRegression(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[label, vector, lr])

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [50, 100])
             .build())

evaluator = MulticlassClassificationEvaluator(metricName="f1")

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3,
                    parallelism=2)

cv_model = cv.fit(train)

predictions = cv_model.transform(test)
predictions.select("AQI Category", "label", "prediction", "probability").show(10, truncate=False)

eval1 = MulticlassClassificationEvaluator(metricName="accuracy")
eval2 = MulticlassClassificationEvaluator(metricName="f1")

accuracy = eval1.evaluate(predictions)
f1 = eval2.evaluate(predictions)
best = cv_model.bestModel
best_lr = best.stages[-1]

print("Evalusasi")
print("Accuracy :", accuracy)
print("F1 Score :", f1)

print("Parameter")
print("RegParam : ", best_lr._java_obj.getRegParam())
print("ElasticNetParam : ", best_lr._java_obj.getElasticNetParam())
print("MaxIter : ", best_lr._java_obj.getMaxIter())

+------------------------------+-----+----------+----------------------------------------------------------------------------------------------------------------------------+
|AQI Category                  |label|prediction|probability                                                                                                                 |
+------------------------------+-----+----------+----------------------------------------------------------------------------------------------------------------------------+
|unhealthy for sensitive groups|3.0  |2.0       |[7.607263156127664E-7,0.24228524132710497,0.3619431032487296,0.3532474687579035,0.03586466472892443,0.00665876121102213]    |
|moderate                      |1.0  |1.0       |[0.012928446217133844,0.8855562241229984,0.0314156044600294,0.06746389393556755,0.0024170912166985767,2.1874004757223964E-4]|
|moderate                      |1.0  |1.0       |[0.0010284753506607776,0.7739883220177988,0.07531562583766323,0.142776045057