# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Example dataset
data = [(1, Vectors.dense([2.0, 3.0]), 0), (2, Vectors.dense([1.0, 5.0]), 1), (3, Vectors.dense([2.5, 4.5]), 1), (4, Vectors.dense([3.0, 6.0]), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
# Practice: KMeans Clustering
from pyspark.ml.clustering import KMeans

# Example dataset
data = [(1, Vectors.dense([1.0, 1.0])), (2, Vectors.dense([5.0, 5.0])), (3, Vectors.dense([10.0, 10.0])), (4, Vectors.dense([15.0, 15.0]))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


## Inisialisasi

In [4]:
!pip install pyspark
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [21]:
from pyspark.sql import SparkSession

# Buat SparkSession
spark = SparkSession.builder \
    .appName("Bank Marketing Classification") \
    .getOrCreate()

# Path file
file_path = "/content/drive/My Drive/College/SMS3/BIGDATA/Prak14/bank-full.csv"

# Load dataset
data = spark.read.csv(file_path, sep=";", header=True, inferSchema=True)

# Tampilkan struktur dataset dan data sampel
data.printSchema()
data.show(5)


root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+

In [22]:
from pyspark.sql.functions import when, col

# Tambahkan kolom label
data = data.withColumn("label", when(col("y") == "yes", 1).otherwise(0))
data = data.drop("y")  # Hapus kolom y asli


In [23]:
from pyspark.sql.functions import col, lit, when

# Tangani nilai null di kolom kategorikal dengan mengganti null menjadi "unknown"
for col_name in categorical_cols:
    data = data.withColumn(col_name, when(col(col_name).isNull(), lit("unknown")).otherwise(col(col_name)))


In [24]:
# Tampilkan tipe data kolom
data.printSchema()

# Tampilkan nilai unik setiap kolom (opsional untuk debug)
for col_name in categorical_cols:
    data.select(col_name).distinct().show()


root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- label: integer (nullable = false)

+-------------+
|          job|
+-------------+
|   management|
|      retired|
|      unknown|
|self-employed|
|      student|
|  blue-collar|
| entrepreneur|
|       admin.|
|   technician|
|     services|
|    housemaid|
|   unemployed|
+-------------+

+--------+
| marital|
+--------+
|divorced|
| married|
|  single|
+--------+

+-

In [25]:
from pyspark.sql.functions import col, lit, when

# Ganti null di kolom string dengan "unknown"
categorical_cols = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'poutcome']

for col_name in categorical_cols:
    data = data.withColumn(col_name, when(col(col_name).isNull(), lit("unknown")).otherwise(col(col_name)))


In [26]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline

# Buat StringIndexer dan OneHotEncoder
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_indexed", handleInvalid="skip") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=f"{col}_indexed", outputCol=f"{col}_encoded") for col in categorical_cols]

# Tentukan kolom fitur
feature_cols = [f"{col}_encoded" for col in categorical_cols] + \
               ['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous']

# Gabungkan fitur
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Buat pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fit dan transformasi data
processed_data = pipeline.fit(data).transform(data)

# Pilih kolom 'features' dan 'label'
final_data = processed_data.select("features", "label")
final_data.show(5)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|(42,[1,11,14,16,1...|    0|
|(42,[2,12,13,16,1...|    0|
|(42,[7,11,13,16,1...|    0|
|(42,[0,11,16,17,1...|    0|
|(42,[12,16,18,20,...|    0|
+--------------------+-----+
only showing top 5 rows



In [27]:
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

In [28]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)

# Prediksi
predictions = lr_model.transform(test_data)
predictions.select("label", "prediction", "probability").show(5)


+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|    0|       0.0|[0.95806866124047...|
|    0|       0.0|[0.95034026133071...|
|    0|       0.0|[0.93166535904926...|
|    0|       0.0|[0.94675900602166...|
|    0|       0.0|[0.92815044272412...|
+-----+----------+--------------------+
only showing top 5 rows



In [29]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluator
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
auc = evaluator.evaluate(predictions)
print(f"AUC Model: {auc}")


AUC Model: 0.6595512567719696


In [30]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Grid parameter
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# CrossValidator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

# Fit cross-validation model
cv_model = crossval.fit(train_data)

# Evaluasi model terbaik
best_model = cv_model.bestModel
best_predictions = best_model.transform(test_data)
final_auc = evaluator.evaluate(best_predictions)
print(f"AUC Model Terbaik: {final_auc}")


AUC Model Terbaik: 0.6314645273122867
