# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
!pip install pyspark



In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression

# Create SparkSession
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Example dataset
data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0)
]
columns = ['ID', 'Features', 'Label']

df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("KMeansClusteringExample").getOrCreate()

# Example dataset with DenseVector
data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0]))
]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Taxi Data ML") \
    .getOrCreate()

data = spark.read.csv("cleaned_taxi_data.csv", header=True, inferSchema=True)
data.show()

+--------------------+-----------+-----------+---------------+------------------+-------+-------------------+-------------------+--------------------+---------------------+-------------------+
|    Trip_Distance_km|Time_of_Day|Day_of_Week|Passenger_Count|Traffic_Conditions|Weather|          Base_Fare|        Per_Km_Rate|     Per_Minute_Rate|Trip_Duration_Minutes|         Trip_Price|
+--------------------+-----------+-----------+---------------+------------------+-------+-------------------+-------------------+--------------------+---------------------+-------------------+
| 0.12498528560675952|          2|          0|            3.0|                 1|      0| 0.5183946488294316|                0.2|                0.55|   0.4250631368109379|0.09246378537997323|
|   0.319989952210897|          0|          0|            1.0|                 0|      0|  0.505016722408027|0.08000000000000002|               0.825|   0.3096751719933815|  0.134843007561345|
| 0.24596551786258417|          1| 

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Membuat Spark session
spark = SparkSession.builder \
    .appName("Taxi Data ML") \
    .getOrCreate()

# Memuat dataset
data = spark.read.csv("cleaned_taxi_data.csv", header=True, inferSchema=True)

# Mengubah kolom kategori menjadi numerik
indexer = StringIndexer(inputCols=["Traffic_Conditions", "Weather", "Day_of_Week"], outputCols=["Traffic_Idx", "Weather_Idx", "Day_Idx"])
data_indexed = indexer.fit(data).transform(data)

# Memilih fitur dan label
feature_cols = ["Trip_Distance_km", "Time_of_Day", "Passenger_Count", "Traffic_Idx", "Weather_Idx", "Day_Idx"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data_prepared = assembler.transform(data_indexed)

# Menggunakan Trip_Price sebagai label (Anda perlu mengonversi ini menjadi kategori)
data_final = data_prepared.withColumn("label", (data_prepared.Trip_Price > 0.2).cast("integer"))  # Contoh batasan

# Memilih kolom yang relevan
data_final = data_final.select("features", "label")

# Tampilkan data final
data_final.show()

# Membagi data menjadi training dan testing
train_data, test_data = data_final.randomSplit([0.8, 0.2], seed=42)

# Membuat model Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Membuat parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.maxIter, [10, 20]) \
    .build()

# Cross-validation
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy"),
                          numFolds=3)

# Latih model dengan cross-validation
cv_model = crossval.fit(train_data)

# Evaluasi model terbaik
cv_predictions = cv_model.transform(test_data)
cv_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy").evaluate(cv_predictions)

print(f"Akurasi setelah Cross-Validation: {cv_accuracy}")

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.12498528560675...|    0|
|[0.31998995221089...|    0|
|[0.24596551786258...|    0|
|[0.16973139890685...|    0|
|[0.05102990390242...|    0|
|[0.01795368743522...|    0|
|[0.20163372042637...|    0|
|[0.23788635851673...|    1|
|[0.32648090074517...|    0|
|[0.27993941870154...|    1|
|[0.05979958968809...|    0|
|[0.06035201083994...|    0|
|[0.10123117607707...|    0|
|[0.17580803157724...|    0|
|[0.14445813120958...|    0|
|[0.09681180686225...|    0|
|[0.20529351055740...|    0|
|[0.12222317984749...|    0|
|[0.15260634319941...|    0|
|(6,[0,2],[0.26391...|    1|
+--------------------+-----+
only showing top 20 rows

Akurasi setelah Cross-Validation: 0.8071428571428572
