# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
pip install pyspark

In [1]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [3]:
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# Use VectorAssembler to change features columns to a vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')

# Transform the data
df = assembler.transform(df)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [6]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# Example dataset
data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


### Homework 1

In [39]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col

# Initialize a Spark session
spark = SparkSession.builder.appName("StockPriceClassification").getOrCreate()

# Load the dataset
file_path = "Uniqlo.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)


# Show the schema to understand the structure of the data
data.printSchema()


root
 |-- Date: date (nullable = true)
 |-- Open: integer (nullable = true)
 |-- High: integer (nullable = true)
 |-- Low: integer (nullable = true)
 |-- Close: integer (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Stock Trading: long (nullable = true)



### Homework 2

In [43]:
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

# Mendefinisikan window spec untuk mendapatkan nilai lag dari kolom Close
windowSpec = Window.orderBy("Date")

# Membuat kolom Label berdasarkan harga penutupan hari sebelumnya
data = data.withColumn("prev_Close", lag("Close").over(windowSpec))
data = data.withColumn("Label", (col("Close") > col("prev_Close")).cast("integer"))


In [44]:
from pyspark.ml.feature import VectorAssembler

# Menggunakan VectorAssembler untuk menggabungkan kolom yang diinginkan menjadi kolom features
assembler = VectorAssembler(inputCols=["Open", "High", "Low", "Volume"], outputCol="features")
data = assembler.transform(data)


In [46]:
# Menghapus baris yang memiliki nilai null di kolom Label dan features
data = data.filter(data['Label'].isNotNull() & data['features'].isNotNull())


In [48]:
# Membagi data menjadi data pelatihan dan pengujian (80% train, 20% test)
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

In [49]:
# Membuat model klasifikasi dengan Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="Label")

# Melatih model dengan data pelatihan
model = lr.fit(train_data)


In [50]:
# Membuat prediksi menggunakan data uji
predictions = model.transform(test_data)

# Menampilkan beberapa hasil prediksi
predictions.select("Date", "Label", "prediction").show(5)


+----------+-----+----------+
|      Date|Label|prediction|
+----------+-----+----------+
|2012-01-06|    1|       0.0|
|2012-01-12|    0|       0.0|
|2012-02-06|    1|       1.0|
|2012-02-09|    0|       0.0|
|2012-02-10|    0|       0.0|
+----------+-----+----------+
only showing top 5 rows



In [51]:
# Evaluasi model
evaluator = BinaryClassificationEvaluator(labelCol="Label")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy:.4f}")

Model Accuracy: 0.7779


### Homework 3

In [54]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [52]:
# Membagi data menjadi training dan test set
train_data, test_data = data.randomSplit([0.8, 0.2], seed=1234)

# Memeriksa ukuran data training dan testing
print(f"Training Data: {train_data.count()} rows")
print(f"Test Data: {test_data.count()} rows")


Training Data: 981 rows
Test Data: 244 rows


In [55]:
# Membuat model Logistic Regression
lr = LogisticRegression(featuresCol='features', labelCol='Label')

# Mendefinisikan evaluator untuk model klasifikasi
evaluator = BinaryClassificationEvaluator(labelCol="Label")

# Membuat parameter grid untuk tuning hyperparameter
param_grid = (ParamGridBuilder()
              .addGrid(lr.regParam, [0.1, 0.01])
              .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
              .build())

# Membuat cross-validation untuk tuning model
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=3)  # 3-fold cross-validation


In [60]:
# Melatih model dengan data pelatihan
cv_model = cv.fit(train_data)

# Menyimpan model hasil cross-validation
cv_model.save("model")

# Membuat prediksi menggunakan model terbaik
predictions = cv_model.transform(test_data)

# Menampilkan beberapa prediksi
predictions.select("features", "Label", "prediction").show(5)


+--------------------+-----+----------+
|            features|Label|prediction|
+--------------------+-----+----------+
|[13990.0,14030.0,...|    1|       1.0|
|[14520.0,14600.0,...|    0|       1.0|
|[15600.0,15830.0,...|    1|       1.0|
|[15900.0,15910.0,...|    0|       1.0|
|[15680.0,15680.0,...|    0|       1.0|
+--------------------+-----+----------+
only showing top 5 rows



In [61]:
# Evaluasi model menggunakan AUC (Area Under Curve)
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")


AUC: 0.5394383230314432
