# Hands-On Pertemuan 14: Advanced Machine Learning using Spark MLlib

## Objectives:
- Understand and implement advanced machine learning tasks using Spark MLlib.
- Build and evaluate models using real-world datasets.
- Explore techniques like feature engineering and hyperparameter tuning.


## Introduction to Spark MLlib
Spark MLlib is a scalable library for machine learning that integrates seamlessly with the Spark ecosystem. It supports a wide range of tasks, including regression, classification, clustering, and collaborative filtering.

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

# Initialize Spark session
spark = SparkSession.builder.appName('Logistic Regression Example').getOrCreate()

# New example dataset - corrected to use Vectors
data = [
    (1, Vectors.dense(3.0, 5.0), 0),
    (2, Vectors.dense(1.5, 2.0), 1),
    (3, Vectors.dense(4.0, 6.0), 1),
    (4, Vectors.dense(2.5, 3.5), 0),
    (5, Vectors.dense(6.0, 8.0), 1),
    (6, Vectors.dense(7.5, 9.0), 1),
    (7, Vectors.dense(2.0, 1.0), 0),
    (8, Vectors.dense(5.5, 7.5), 1),
    (9, Vectors.dense(6.5, 4.5), 1),
    (10, Vectors.dense(1.0, 4.0), 0)
]
columns = ['ID', 'Features', 'Label']

# Create DataFrame from the data
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
df.show()


# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label') # use 'Features' column directly

# Fit the model
model = lr.fit(df) # fit the model on the original DataFrame

# Display coefficients and intercept
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

# Stop the Spark session
spark.stop()

+---+---------+-----+
| ID| Features|Label|
+---+---------+-----+
|  1|[3.0,5.0]|    0|
|  2|[1.5,2.0]|    1|
|  3|[4.0,6.0]|    1|
|  4|[2.5,3.5]|    0|
|  5|[6.0,8.0]|    1|
|  6|[7.5,9.0]|    1|
|  7|[2.0,1.0]|    0|
|  8|[5.5,7.5]|    1|
|  9|[6.5,4.5]|    1|
| 10|[1.0,4.0]|    0|
+---+---------+-----+

Coefficients: [1.3449795372823035,-0.19926264994091672]
Intercept: -3.2070200224753975


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

# New example dataset - corrected to use Vectors
data = [
    (1, Vectors.dense(3.0, 5.0), 0),
    (2, Vectors.dense(1.5, 2.0), 1),
    (3, Vectors.dense(4.0, 6.0), 1),
    (4, Vectors.dense(2.5, 3.5), 0),
    (5, Vectors.dense(6.0, 8.0), 1),
    (6, Vectors.dense(7.5, 9.0), 1),
    (7, Vectors.dense(2.0, 1.0), 0),
    (8, Vectors.dense(5.5, 7.5), 1),
    (9, Vectors.dense(6.5, 4.5), 1),
    (10, Vectors.dense(1.0, 4.0), 0)
]
columns = ['ID', 'Features', 'Label']

# Create DataFrame from the data
df = spark.createDataFrame(data, columns)

# Show the original DataFrame
df.show()

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')

# Fit the model
model = lr.fit(df)

# Display coefficients and intercept
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

# Stop the Spark session
spark.stop()

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009
+---+---------+-----+
| ID| Features|Label|
+---+---------+-----+
|  1|[3.0,5.0]|    0|
|  2|[1.5,2.0]|    1|
|  3|[4.0,6.0]|    1|
|  4|[2.5,3.5]|    0|
|  5|[6.0,8.0]|    1|
|  6|[7.5,9.0]|    1|
|  7|[2.0,1.0]|    0|
|  8|[5.5,7.5]|    1|
|  9|[6.5,4.5]|    1|
| 10|[1.0,4.0]|    0|
+---+---------+-----+

Coefficients: [1.3449795372823035,-0.19926264994091672]
Intercept: -3.2070200224753975


## Homework
- Load a real-world dataset into Spark and prepare it for machine learning tasks.
- Build a classification model using Spark MLlib and evaluate its performance.
- Explore hyperparameter tuning using cross-validation.


**1. Load a real-world dataset into Spark and prepare it for machine learning tasks.**

In [None]:
import os

# Membuat folder .kaggle
os.makedirs('/root/.kaggle', exist_ok=True)

# Memindahkan file kaggle.json ke folder .kaggle
!mv kaggle.json /root/.kaggle/

# Mengatur izin untuk file kaggle.json
!chmod 600 /root/.kaggle/kaggle.json

mv: cannot stat 'kaggle.json': No such file or directory


In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("aparnashastry/building-permit-applications-data")

print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/aparnashastry/building-permit-applications-data?dataset_version_number=1...


100%|██████████| 18.0M/18.0M [00:01<00:00, 13.6MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/aparnashastry/building-permit-applications-data/versions/1


In [None]:
# Import library yang diperlukan
import pandas as pd
import numpy as np

# Memuat dataset CSV
df = pd.read_csv(path + "/Building_Permits.csv")


# Tampilkan informasi umum dataset
print("Dataset Info:")
df.info()

  df = pd.read_csv(path + "/Building_Permits.csv")


Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 198900 entries, 0 to 198899
Data columns (total 43 columns):
 #   Column                                  Non-Null Count   Dtype  
---  ------                                  --------------   -----  
 0   Permit Number                           198900 non-null  object 
 1   Permit Type                             198900 non-null  int64  
 2   Permit Type Definition                  198900 non-null  object 
 3   Permit Creation Date                    198900 non-null  object 
 4   Block                                   198900 non-null  object 
 5   Lot                                     198900 non-null  object 
 6   Street Number                           198900 non-null  int64  
 7   Street Number Suffix                    2216 non-null    object 
 8   Street Name                             198900 non-null  object 
 9   Street Suffix                           196132 non-null  object 
 10  Unit                          

In [None]:
df['Estimated Cost'].fillna(df['Estimated Cost'].mean(), inplace=True)
df['Number of Existing Stories'].fillna(df['Number of Existing Stories'].median(), inplace=True)
df['Description'].fillna('No Description', inplace=True)

df['Number of Proposed Stories'].fillna(df['Number of Proposed Stories'].median(), inplace=True)
df['Revised Cost'].fillna(df['Revised Cost'].mean(), inplace=True)
df['Existing Use'].fillna(df['Existing Use'].mode()[0], inplace=True)
df['Existing Units'].fillna(df['Existing Units'].median(), inplace=True)
df['Proposed Units'].fillna(df['Proposed Units'].median(), inplace=True)
df['Plansets'].fillna(df['Plansets'].median(), inplace=True)
df['Existing Construction Type'].fillna(df['Existing Construction Type'].mode()[0], inplace=True)
df['Proposed Construction Type'].fillna(df['Proposed Construction Type'].mode()[0], inplace=True)
df['Supervisor District'].fillna(df['Supervisor District'].median(), inplace=True)

df['Permit Expiration Date'].fillna(df['Permit Expiration Date'].mode()[0], inplace=True)
df['Issued Date'].fillna(df['Issued Date'].mode()[0], inplace=True) #
df['First Construction Document Date'].fillna(df['First Construction Document Date'].mode()[0], inplace=True)
df['Existing Construction Type Description'].fillna(df['Existing Construction Type Description'].mode()[0], inplace=True)
df['Proposed Construction Type Description'].fillna(df['Proposed Construction Type Description'].mode()[0], inplace=True)


df.dropna(subset=['Street Suffix'], inplace=True)
df.dropna(subset=['Neighborhoods - Analysis Boundaries'], inplace=True)
df.dropna(subset=['Proposed Use'], inplace=True)
df.dropna(subset=['Zipcode'], inplace=True)

df.drop(columns=['Street Number Suffix', 'Unit', 'Completed Date', 'Voluntary Soft-Story Retrofit', 'TIDF Compliance', 'Unit Suffix',
                'Structural Notification', 'Site Permit', 'Fire Only Permit'
                ], inplace=True)


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['Estimated Cost'].fillna(df['Estimated Cost'].mean(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df['Number of Existing Stories'].fillna(df['Number of Existing Stories'].median(), inplace=True)
The behavior will change in pandas 3.0. This inplace method will

In [None]:
df['Permit Number'] = pd.to_numeric(df['Permit Number'], errors='coerce')
# Convert date columns
date_columns = ['Permit Creation Date', 'Current Status Date', 'Filed Date', 'Issued Date', 'First Construction Document Date']
for col in date_columns:
    df[col] = pd.to_datetime(df[col], errors='coerce')
    df.info()


<class 'pandas.core.frame.DataFrame'>
Index: 153202 entries, 2 to 198885
Data columns (total 34 columns):
 #   Column                                  Non-Null Count   Dtype         
---  ------                                  --------------   -----         
 0   Permit Number                           153202 non-null  int64         
 1   Permit Type                             153202 non-null  int64         
 2   Permit Type Definition                  153202 non-null  object        
 3   Permit Creation Date                    153202 non-null  datetime64[ns]
 4   Block                                   153202 non-null  object        
 5   Lot                                     153202 non-null  object        
 6   Street Number                           153202 non-null  int64         
 7   Street Name                             153202 non-null  object        
 8   Street Suffix                           153202 non-null  object        
 9   Description                             15

**2. Build a classification model using Spark MLlib and evaluate its performance.**

In [None]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("BuildingPermitML").getOrCreate()

# Load the dataset into a Spark DataFrame
spark_df = spark.createDataFrame(df)

# Display the schema of the Spark DataFrame
spark_df.printSchema()

# Display the first few rows of the Spark DataFrame
spark_df.show(5)

root
 |-- Permit Number: long (nullable = true)
 |-- Permit Type: long (nullable = true)
 |-- Permit Type Definition: string (nullable = true)
 |-- Permit Creation Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- Lot: string (nullable = true)
 |-- Street Number: long (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Street Suffix: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Current Status: string (nullable = true)
 |-- Current Status Date: timestamp (nullable = true)
 |-- Filed Date: timestamp (nullable = true)
 |-- Issued Date: timestamp (nullable = true)
 |-- First Construction Document Date: timestamp (nullable = true)
 |-- Number of Existing Stories: double (nullable = true)
 |-- Number of Proposed Stories: double (nullable = true)
 |-- Permit Expiration Date: string (nullable = true)
 |-- Estimated Cost: double (nullable = true)
 |-- Revised Cost: double (nullable = true)
 |-- Existing Use: string (nullable = 

**Klasifikasi menggunakan Random Forest Classifier**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline


# Convert 'Permit Type' to a numeric label using StringIndexer
indexer = StringIndexer(inputCol="Permit Type", outputCol="label")

# Use VectorAssembler to combine features into a single feature vector
assembler = VectorAssembler(
    inputCols=["Number of Existing Stories", "Number of Proposed Stories", "Estimated Cost", "Revised Cost"],
    outputCol="features"
)

# Initialize Random Forest Classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Create a pipeline to chain the steps
pipeline = Pipeline(stages=[indexer, assembler, rf])

# Split data into training and testing sets (80% train, 20% test)
train_data, test_data = spark_df.randomSplit([0.8, 0.2], seed=42)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model's accuracy using MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Model Accuracy: {accuracy}")
print(f"Training Data Count: {train_data.count()}")
print(f"Test Data Count: {test_data.count()}")

rf_model = model.stages[2]  # Model RandomForestClassifier

# Menampilkan feature importances
print("Feature Importances: ", rf_model.featureImportances)

Model Accuracy: 0.9302989968509561
Training Data Count: 122399
Test Data Count: 30803
Feature Importances:  (4,[0,1,2,3],[0.08252510489126044,0.03563458114463323,0.14611795823908227,0.7357223557250241])


**3. Explore hyperparameter tuning using cross-validation.**


In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Definisikan parameter grid untuk tuning
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [10, 20, 30])  # Jumlah pohon
             .addGrid(rf.maxDepth, [5, 10, 15])   # Kedalaman maksimum pohon
             .build())

# Evaluator untuk cross-validation
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Inisialisasi CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  # 5-fold cross-validation

# Lakukan cross-validation pada data pelatihan
cvModel = crossval.fit(train_data)

# Lakukan prediksi pada data pengujian
cvPredictions = cvModel.transform(test_data)

# Evaluasi model yang di-tuning
cvAccuracy = evaluator.evaluate(cvPredictions)

# Tampilkan hasil akurasi setelah tuning hiperparameter
print(f"Accuracy after Hyperparameter Tuning: {cvAccuracy}")

rf_model2 = cvModel.bestModel.stages[2]  # Model RandomForestClassifier

# Menampilkan fitur penting (feature importances)
print("Feature Importances: ", rf_model2.featureImportances)


Accuracy after Hyperparameter Tuning: 0.9365646203291887
Feature Importances:  (4,[0,1,2,3],[0.11845418281966119,0.07767704080658837,0.1686568032132273,0.6352119731605231])
