# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.

## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [45]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

25/11/22 08:22:42 WARN Instrumentation: [1a46a8ba] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [49]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName('MLlib Example2').getOrCreate()

# Example dataset
data = [(1, [2.0, 3.0], 0), (2, [1.0, 5.0], 1), (3, [2.5, 4.5], 1), (4, [3.0, 6.0], 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Instead of using 'Features' directly, we need to access the elements within the array
# Create new columns for 'Features[0]' and 'Features[1]' using Spark functions
df = df.withColumn('Feature0', col('Features').getItem(0)) \
       .withColumn('Feature1', col('Features').getItem(1))

# Now use VectorAssembler with the new columns
assembler = VectorAssembler(inputCols=['Feature0', 'Feature1'], outputCol='FeaturesVector')
df = assembler.transform(df)

# Train logistic regression model using the 'FeaturesVector' column
lr = LogisticRegression(featuresCol='FeaturesVector', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

                                                                                

Coefficients: [-12.262057924868083,4.0873522650485175]
Intercept: 11.568912722344368


In [48]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

from pyspark.sql.functions import col
df = df.withColumn('Feature1', col('Features').getItem(0))

assembler = VectorAssembler(inputCols=['Feature1'], outputCol='Features_vec')
df = assembler.transform(df)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features_vec', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

                                                                                

Cluster Centers: [array([12.5]), array([3.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.

In [44]:
# Import libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import pandas as pd
import requests
from io import StringIO

# Initialize Spark session
spark = SparkSession.builder.appName('MLlib Homework').getOrCreate()

# Load dataset
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
columns = ["sepal_length", "sepal_width", "petal_length", "petal_width", "species"]

# Download the dataset
response = requests.get(url)
iris_data = pd.read_csv(StringIO(response.text), header=None, names=columns)

# Create a Spark DataFrame from the pandas DataFrame
data = spark.createDataFrame(iris_data)

# Inspect the data
data.show(5)
data.printSchema()

# Data preparation
# Encode categorical target column (species) to numerical
indexer = StringIndexer(inputCol="species", outputCol="label")
data = indexer.fit(data).transform(data)

# Assemble feature columns into a single vector
feature_cols = ["sepal_length", "sepal_width", "petal_length", "petal_width"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
data = assembler.transform(data)

# Split the data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Build a classification model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Hyperparameter tuning with Cross-Validation
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Define evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Perform cross-validation
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)
cv_model = crossval.fit(train_data)

# Evaluate the model on test data
predictions = cv_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)

# Show results
print(f"Test Set Accuracy: {accuracy:.2f}")
predictions.select("features", "label", "prediction").show()

# Best model hyperparameters
best_model = cv_model.bestModel
print("Best Model Parameters:")
print(f"  - regParam: {best_model._java_obj.getRegParam()}")
print(f"  - elasticNetParam: {best_model._java_obj.getElasticNetParam()}")

# Stop Spark session
spark.stop()

+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|    species|
+------------+-----------+------------+-----------+-----------+
|         5.1|        3.5|         1.4|        0.2|Iris-setosa|
|         4.9|        3.0|         1.4|        0.2|Iris-setosa|
|         4.7|        3.2|         1.3|        0.2|Iris-setosa|
|         4.6|        3.1|         1.5|        0.2|Iris-setosa|
|         5.0|        3.6|         1.4|        0.2|Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows
root
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



                                                                                

Test Set Accuracy: 1.00
+-----------------+-----+----------+
|         features|label|prediction|
+-----------------+-----+----------+
|[4.6,3.1,1.5,0.2]|  0.0|       0.0|
|[4.7,3.2,1.6,0.2]|  0.0|       0.0|
|[4.8,3.1,1.6,0.2]|  0.0|       0.0|
|[4.9,3.1,1.5,0.1]|  0.0|       0.0|
|[5.1,3.3,1.7,0.5]|  0.0|       0.0|
|[5.1,3.8,1.5,0.3]|  0.0|       0.0|
|[5.4,3.7,1.5,0.2]|  0.0|       0.0|
|[5.7,4.4,1.5,0.4]|  0.0|       0.0|
|[4.4,3.0,1.3,0.2]|  0.0|       0.0|
|[4.9,2.4,3.3,1.0]|  1.0|       1.0|
|[4.9,3.1,1.5,0.1]|  0.0|       0.0|
|[5.2,2.7,3.9,1.4]|  1.0|       1.0|
|[5.6,2.9,3.6,1.3]|  1.0|       1.0|
|[5.8,2.7,4.1,1.0]|  1.0|       1.0|
|[6.5,2.8,4.6,1.5]|  1.0|       1.0|
|[6.9,3.1,4.9,1.5]|  1.0|       1.0|
|[5.0,2.3,3.3,1.0]|  1.0|       1.0|
|[5.5,2.6,4.4,1.2]|  1.0|       1.0|
|[5.6,3.0,4.1,1.3]|  1.0|       1.0|
|[5.7,2.6,3.5,1.0]|  1.0|       1.0|
+-----------------+-----+----------+
only showing top 20 rows
Best Model Parameters:
  - regParam: 0.01
  - elasticNetParam: 