# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [8]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [16]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


IllegalArgumentException: requirement failed: Column features must be of type class org.apache.spark.ml.linalg.VectorUDT:struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually class org.apache.spark.sql.types.ArrayType:array<double>.

In [10]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


IllegalArgumentException: requirement failed: Column Features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type array<double>.

## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


In [1]:
from pyspark.sql import SparkSession

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("WineQuality").getOrCreate()

# Membaca dataset dengan pemisah ';'
dataset = spark.read.csv("winequality-red.csv", header=True, inferSchema=True, sep=";")

# Menampilkan sampel data
dataset.show(5)


+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density|  pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|          7.4|             0.7|        0.0|           1.9|    0.076|               11.0|                34.0| 0.9978|3.51|     0.56|    9.4|      5|
|          7.8|            0.88|        0.0|           2.6|    0.098|               25.0|                67.0| 0.9968| 3.2|     0.68|    9.8|      5|
|          7.8|            0.76|       0.04|           2.3|    0.092|               15.0|                54.0|  0.997|3.26|     0.65|    9.8|      5|
|         11.2|            0.28|       0.56|           1.9|    0.075|               17.0|           

In [2]:
from pyspark.sql.functions import col, isnan, when, count

# Menampilkan jumlah nilai yang hilang di setiap kolom
dataset.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in dataset.columns]).show()


+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|fixed acidity|volatile acidity|citric acid|residual sugar|chlorides|free sulfur dioxide|total sulfur dioxide|density| pH|sulphates|alcohol|quality|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+
|            0|               0|          0|             0|        0|                  0|                   0|      0|  0|        0|      0|      0|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+---+---------+-------+-------+



In [3]:
from pyspark.ml.feature import VectorAssembler

# Membuat kolom fitur
feature_columns = dataset.columns[:-1]  # Semua kolom kecuali 'quality'
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
dataset = assembler.transform(dataset)

# Menampilkan data yang sudah digabung
dataset.select("features", "quality").show(5)


+--------------------+-------+
|            features|quality|
+--------------------+-------+
|[7.4,0.7,0.0,1.9,...|      5|
|[7.8,0.88,0.0,2.6...|      5|
|[7.8,0.76,0.04,2....|      5|
|[11.2,0.28,0.56,1...|      6|
|[7.4,0.7,0.0,1.9,...|      5|
+--------------------+-------+
only showing top 5 rows



In [4]:
from pyspark.ml.feature import StringIndexer

# Mengubah label (quality) menjadi numerik
indexer = StringIndexer(inputCol="quality", outputCol="label")
dataset = indexer.fit(dataset).transform(dataset)

# Menampilkan data dengan label numerik
dataset.select("features", "label").show(5)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[7.4,0.7,0.0,1.9,...|  0.0|
|[7.8,0.88,0.0,2.6...|  0.0|
|[7.8,0.76,0.04,2....|  0.0|
|[11.2,0.28,0.56,1...|  1.0|
|[7.4,0.7,0.0,1.9,...|  0.0|
+--------------------+-----+
only showing top 5 rows



In [5]:
from pyspark.ml.classification import RandomForestClassifier

# Membagi dataset menjadi training dan testing
train_data, test_data = dataset.randomSplit([0.8, 0.2], seed=42)

# Inisialisasi model Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="quality", numTrees=10)

# Melatih model
rf_model = rf.fit(train_data)

# Prediksi pada data testing
predictions = rf_model.transform(test_data)
predictions.select("features", "quality", "prediction").show(5)


+--------------------+-------+----------+
|            features|quality|prediction|
+--------------------+-------+----------+
|[4.9,0.42,0.0,2.1...|      7|       6.0|
|[5.0,0.74,0.0,1.2...|      6|       6.0|
|[5.0,1.04,0.24,1....|      5|       6.0|
|[5.2,0.32,0.25,1....|      5|       6.0|
|[5.3,0.47,0.11,2....|      7|       6.0|
+--------------------+-------+----------+
only showing top 5 rows



In [6]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Mengukur akurasi
evaluator = MulticlassClassificationEvaluator(labelCol="quality", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")


Accuracy: 0.610909090909091


In [7]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Membuat grid parameter
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [10, 20, 50])
             .addGrid(rf.maxDepth, [5, 10, 15])
             .build())

# CrossValidator
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Latih model dengan cross-validation
cv_model = crossval.fit(train_data)

# Evaluasi model
cv_predictions = cv_model.transform(test_data)
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f"Accuracy with Cross-Validation: {cv_accuracy}")


Accuracy with Cross-Validation: 0.6763636363636364
