# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
!pip install pyspark



In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression

# Create SparkSession
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Example dataset
data = [
    (1, Vectors.dense([2.0, 3.0]), 0),
    (2, Vectors.dense([1.0, 5.0]), 1),
    (3, Vectors.dense([2.5, 4.5]), 1),
    (4, Vectors.dense([3.0, 6.0]), 0)
]
columns = ['ID', 'Features', 'Label']

df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder.appName("KMeansClusteringExample").getOrCreate()

# Example dataset with DenseVector
data = [
    (1, Vectors.dense([1.0, 1.0])),
    (2, Vectors.dense([5.0, 5.0])),
    (3, Vectors.dense([10.0, 10.0])),
    (4, Vectors.dense([15.0, 15.0]))
]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Diabetes Dataset") \
    .getOrCreate()

# Load the dataset
data = spark.read.csv("diabetes_dataset_cleaned.csv", header=True, inferSchema=True)
data.show(5)

+----+------+----+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+----------+-------------------+----------------------+------------------------------+
|year|gender| age|location|race:AfricanAmerican|race:Asian|race:Caucasian|race:Hispanic|race:Other|hypertension|heart_disease|smoking_history|  bmi|hbA1c_level|blood_glucose_level|diabetes|Is Outlier|     Normalized bmi|Normalized hbA1c_level|Normalized blood_glucose_level|
+----+------+----+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+----------+-------------------+----------------------+------------------------------+
|2020|Female|32.0| Alabama|                   0|         0|             0|            0|         1|           0|            0|          never|27.32|        5.0|               

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Membuat SparkSession
spark = SparkSession.builder \
    .appName("Diabetes Dataset Analysis") \
    .getOrCreate()

# Memuat dataset dari CSV
dataset = spark.read.csv("diabetes_dataset_cleaned.csv", header=True, inferSchema=True)

# Menampilkan 5 baris pertama untuk memverifikasi data
dataset.show(5)

# Menghapus baris yang merupakan outlier
dataset = dataset.filter(dataset['Is Outlier'] == False)

# Mengonversi kolom 'Is Outlier' dari Boolean ke Integer
dataset = dataset.withColumn("Is Outlier", dataset["Is Outlier"].cast("integer"))

# Mengonversi kolom 'smoking_history' menjadi numerik dengan StringIndexer
# Kita akan memberi nilai numerik untuk kategori dalam kolom 'smoking_history'
smoking_indexer = StringIndexer(inputCol="smoking_history", outputCol="smoking_history_indexed")

# Menyiapkan fitur untuk model klasifikasi (kecuali 'year' dan 'diabetes' sebagai label)
feature_columns = [
    'age', 'race:AfricanAmerican', 'race:Asian', 'race:Caucasian', 'race:Hispanic',
    'race:Other', 'hypertension', 'heart_disease', 'bmi',
    'hbA1c_level', 'blood_glucose_level', 'smoking_history_indexed'
]

# Menyiapkan label untuk model (yaitu 'Is Outlier')
label_indexer = StringIndexer(inputCol="Is Outlier", outputCol="label")

# Menyiapkan fitur menggunakan VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Membangun pipeline
pipeline = Pipeline(stages=[smoking_indexer, label_indexer, assembler])

# Melakukan transformasi data menggunakan pipeline
model_data = pipeline.fit(dataset).transform(dataset)

# Menampilkan fitur dan label yang sudah diproses
model_data.select("features", "label").show(5, truncate=False)

# Menyimpan data yang telah diproses jika diperlukan
# model_data.write.csv("processed_diabetes_data.csv", header=True)


+----+------+----+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+----------+-------------------+----------------------+------------------------------+
|year|gender| age|location|race:AfricanAmerican|race:Asian|race:Caucasian|race:Hispanic|race:Other|hypertension|heart_disease|smoking_history|  bmi|hbA1c_level|blood_glucose_level|diabetes|Is Outlier|     Normalized bmi|Normalized hbA1c_level|Normalized blood_glucose_level|
+----+------+----+--------+--------------------+----------+--------------+-------------+----------+------------+-------------+---------------+-----+-----------+-------------------+--------+----------+-------------------+----------------------+------------------------------+
|2020|Female|32.0| Alabama|                   0|         0|             0|            0|         1|           0|            0|          never|27.32|        5.0|               

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_data, test_data = model_data.randomSplit([0.8, 0.2], seed=1234)
dt_classifier = DecisionTreeClassifier(featuresCol='features', labelCol='label')
dt_model = dt_classifier.fit(train_data)

# Melakukan prediksi
predictions = dt_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Accuracy: {accuracy:.2f}")

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Membangun grid untuk hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(dt_classifier.maxDepth, [2, 5, 10]) \
    .addGrid(dt_classifier.minInstancesPerNode, [1, 2, 5]) \
    .build()

# CrossValidator
crossval = CrossValidator(estimator=dt_classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

cv_model = crossval.fit(train_data)
best_model = cv_model.bestModel
cv_predictions = best_model.transform(test_data)
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f"Best Model Accuracy: {cv_accuracy:.2f}")

Best Model Accuracy: 1.00
