#Pertemuan 14: Advanced Machine Learning using Spark MLlib

##Introduction to Spark MLlib

In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients

print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Membuat SparkSession
spark = SparkSession.builder.appName("LogisticRegressionModel").getOrCreate()

# Dataset contoh yang berisi ID, Fitur, dan Label
data = [
    (1, [2.0, 3.0], 0),
    (2, [1.0, 5.0], 1),
    (3, [2.5, 4.5], 1),
    (4, [3.0, 6.0], 0)
]
columns = ['ID', 'Features', 'Label']

# Mengubah data menjadi DataFrame
df = spark.createDataFrame(data, columns)

# Menggunakan fungsi getItem() untuk mengekstrak elemen array menjadi kolom terpisah
df = df.withColumn('Feature1', col('Features').getItem(0)) \
       .withColumn('Feature2', col('Features').getItem(1))

# Menggunakan VectorAssembler untuk menggabungkan Feature1 dan Feature2 menjadi satu kolom vektor
vector_assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='FeatureVector')
df_transformed = vector_assembler.transform(df)

# Menampilkan DataFrame setelah fitur digabungkan ke dalam vektor
df_transformed.select('ID', 'Feature1', 'Feature2', 'FeatureVector', 'Label').show()

# Membuat model regresi logistik dan melatihnya dengan data yang sudah diproses
logistic_regression = LogisticRegression(featuresCol='FeatureVector', labelCol='Label')
lr_model = logistic_regression.fit(df_transformed)

# Menampilkan koefisien dan intercept dari model yang sudah dilatih
print(f'Model Coefficients: {lr_model.coefficients}')
print(f'Model Intercept: {lr_model.intercept}')


+---+--------+--------+-------------+-----+
| ID|Feature1|Feature2|FeatureVector|Label|
+---+--------+--------+-------------+-----+
|  1|     2.0|     3.0|    [2.0,3.0]|    0|
|  2|     1.0|     5.0|    [1.0,5.0]|    1|
|  3|     2.5|     4.5|    [2.5,4.5]|    1|
|  4|     3.0|     6.0|    [3.0,6.0]|    0|
+---+--------+--------+-------------+-----+

Model Coefficients: [-12.262057929180484,4.087352266486688]
Model Intercept: 11.56891272665312


In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("KMeansClusteringExample").getOrCreate()

# Example dataset
data = [(1, [1.0, 1.0]), (2, [5.0, 5.0]), (3, [10.0, 10.0]), (4, [15.0, 15.0])]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Menggunakan fungsi getItem() untuk mengekstrak elemen array menjadi kolom terpisah
df = df.withColumn('Feature1', col('Features').getItem(0)) \
       .withColumn('Feature2', col('Features').getItem(1))

# Menggunakan VectorAssembler untuk menggabungkan Feature1 dan Feature2 menjadi satu kolom vektor
vector_assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='FeatureVector')
df_transformed = vector_assembler.transform(df)

# Menampilkan DataFrame setelah fitur digabungkan ke dalam vektor
df_transformed.select('ID', 'Feature1', 'Feature2', 'FeatureVector').show()

# Train KMeans clustering model
kmeans = KMeans(featuresCol='FeatureVector', k=2)
model = kmeans.fit(df_transformed)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')


+---+--------+--------+-------------+
| ID|Feature1|Feature2|FeatureVector|
+---+--------+--------+-------------+
|  1|     1.0|     1.0|    [1.0,1.0]|
|  2|     5.0|     5.0|    [5.0,5.0]|
|  3|    10.0|    10.0|  [10.0,10.0]|
|  4|    15.0|    15.0|  [15.0,15.0]|
+---+--------+--------+-------------+

Cluster Centers: [array([12.5, 12.5]), array([3., 3.])]


# Praktikum

##Load Dataset dan Handling data

In [None]:
import pandas as pd
import numpy as np

# Membaca dataset
file_path = 'pestisida.csv'
data = pd.read_csv(file_path)

# Menampilkan informasi awal tentang dataset
print("Dataset sebelum pembersihan:")
print(data.head())

# Menghitung rata-rata dan median untuk pengisian data numerik
mean_low_estimate = data['LOW_ESTIMATE'].mean()
median_high_estimate = data['HIGH_ESTIMATE'].median()
median_county_code = data['COUNTY_CODE'].median()

# Mengisi data numerik yang hilang dengan mean atau median
data['LOW_ESTIMATE'].fillna(mean_low_estimate, inplace=True)
data['HIGH_ESTIMATE'].fillna(median_high_estimate, inplace=True)
data['COUNTY_CODE'].fillna(median_county_code, inplace=True)

# Mengisi data kategorikal yang hilang dengan 'Unknown'
data['COMPOUND'].fillna('Unknown', inplace=True)

# Menghapus baris yang memiliki nilai hilang pada kolom penting
data.dropna(subset=['YEAR', 'STATE_CODE'], inplace=True)

# Menghapus baris yang masih memiliki nilai hilang di kolom lain
data.dropna(inplace=True)

# Memeriksa apakah ada nilai hilang setelah pembersihan
print("\nDataset setelah pembersihan:")
print(data.head())

# Menampilkan jumlah nilai yang hilang di setiap kolom
missing_values = data.isnull().sum()
print("\nJumlah nilai yang hilang di setiap kolom:")
print(missing_values)

# Menyimpan dataset yang telah dibersihkan
data.to_csv('pestisida_cleaned.csv', index=False)
print("\nDataset yang telah dibersihkan disimpan ke 'pestisida_cleaned.csv'")


Dataset sebelum pembersihan:
  COMPOUND  YEAR  STATE_CODE  COUNTY_CODE  LOW_ESTIMATE  HIGH_ESTIMATE
0      24D  2014           1          1.0        1698.6         1885.5
1      24D  2014           1          3.0        7513.6         8472.4
2      24D  2014           1          5.0        2613.6         2889.4
3      24D  2014           1          7.0        1259.2         1277.7
4      24D  2014           1          9.0        7590.5         7756.1

Dataset setelah pembersihan:
  COMPOUND  YEAR  STATE_CODE  COUNTY_CODE  LOW_ESTIMATE  HIGH_ESTIMATE
0      24D  2014           1          1.0        1698.6         1885.5
1      24D  2014           1          3.0        7513.6         8472.4
2      24D  2014           1          5.0        2613.6         2889.4
3      24D  2014           1          7.0        1259.2         1277.7
4      24D  2014           1          9.0        7590.5         7756.1

Jumlah nilai yang hilang di setiap kolom:
COMPOUND         0
YEAR             0
STATE_CO

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  data['LOW_ESTIMATE'].fillna(mean_low_estimate, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  data['HIGH_ESTIMATE'].fillna(median_high_estimate, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate objec


Dataset yang telah dibersihkan disimpan ke 'pestisida_cleaned.csv'


##Penggunaan Machine Learning

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when

# Membuat sesi Spark
spark = SparkSession.builder.appName("RandomForestClassification").getOrCreate()

# Membaca dataset
file_path = 'pestisida_cleaned.csv'  # Ganti dengan path dataset Anda
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Menampilkan informasi awal tentang dataset
print("Dataset sebelum diproses:")
data.show(5)
data.printSchema()

# Preprocessing data
# Mengonversi estimasi rendah dan tinggi menjadi kategori "Tinggi" atau "Rendah"
data = data.withColumn(
    "usage_category",
    when((data['HIGH_ESTIMATE'] - data['LOW_ESTIMATE']) > 10000, 'Tinggi')  # Misalnya, penggunaan tinggi jika perbedaan estimasi > 10.000
    .otherwise('Rendah')
)

# Menampilkan contoh data setelah menambahkan kategori penggunaan
data.show(5)

# Mengonversi label menjadi indeks numerik untuk klasifikasi
label_column = 'usage_category'  # Kolom target baru
feature_columns = ['STATE_CODE', 'COUNTY_CODE', 'LOW_ESTIMATE', 'HIGH_ESTIMATE']  # Kolom fitur

# Mengonversi label menjadi indeks numerik
indexer = StringIndexer(inputCol=label_column, outputCol='labelIndex')
data = indexer.fit(data).transform(data)

# Menggabungkan fitur menjadi satu vektor
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
data = assembler.transform(data)

# Membagi dataset menjadi data latih dan data uji
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Membuat model Random Forest
rf = RandomForestClassifier(featuresCol='features', labelCol='labelIndex', numTrees=100, seed=42)

# Melatih model
rf_model = rf.fit(train_data)

# Mengevaluasi model pada data uji
predictions = rf_model.transform(test_data)

# Menggunakan evaluator untuk menghitung akurasi
evaluator = MulticlassClassificationEvaluator(labelCol='labelIndex', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

# Menampilkan hasil evaluasi
print(f"Akurasi pada data uji: {accuracy}")

# Menampilkan beberapa prediksi
print("Beberapa prediksi:")
predictions.select('features', 'labelIndex', 'prediction', 'probability').show(10)

# Menyimpan model
rf_model.save("random_forest_model")
print("\nModel Random Forest disimpan ke 'random_forest_model'")


Dataset sebelum diproses:
+--------+----+----------+-----------+------------+-------------+
|COMPOUND|YEAR|STATE_CODE|COUNTY_CODE|LOW_ESTIMATE|HIGH_ESTIMATE|
+--------+----+----------+-----------+------------+-------------+
|     24D|2014|         1|        1.0|      1698.6|       1885.5|
|     24D|2014|         1|        3.0|      7513.6|       8472.4|
|     24D|2014|         1|        5.0|      2613.6|       2889.4|
|     24D|2014|         1|        7.0|      1259.2|       1277.7|
|     24D|2014|         1|        9.0|      7590.5|       7756.1|
+--------+----+----------+-----------+------------+-------------+
only showing top 5 rows

root
 |-- COMPOUND: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- STATE_CODE: integer (nullable = true)
 |-- COUNTY_CODE: double (nullable = true)
 |-- LOW_ESTIMATE: double (nullable = true)
 |-- HIGH_ESTIMATE: double (nullable = true)

+--------+----+----------+-----------+------------+-------------+--------------+
|COMPOUND|YEAR|S

##Hyperparameter

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import when

# Membuat sesi Spark
spark = SparkSession.builder.appName("RandomForestClassificationWithHyperparameterTuning").getOrCreate()

# Membaca dataset
file_path = 'pestisida_cleaned.csv'  # Ganti dengan path dataset Anda
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Menampilkan informasi awal tentang dataset
print("Dataset sebelum diproses:")
data.show(5)
data.printSchema()

# Preprocessing data
# Mengonversi estimasi rendah dan tinggi menjadi kategori "Tinggi" atau "Rendah"
data = data.withColumn(
    "usage_category",
    when((data['HIGH_ESTIMATE'] - data['LOW_ESTIMATE']) > 10000, 'Tinggi')
    .otherwise('Rendah')
)

# Menampilkan contoh data setelah menambahkan kategori penggunaan
data.show(5)

# Mengonversi label menjadi indeks numerik untuk klasifikasi
label_column = 'usage_category'  # Kolom target baru
feature_columns = ['STATE_CODE', 'COUNTY_CODE', 'LOW_ESTIMATE', 'HIGH_ESTIMATE']  # Kolom fitur

# Mengonversi label menjadi indeks numerik
indexer = StringIndexer(inputCol=label_column, outputCol='labelIndex')
data = indexer.fit(data).transform(data)

# Menggabungkan fitur menjadi satu vektor
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
data = assembler.transform(data)

# Membagi dataset menjadi data latih dan data uji
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Membuat model Random Forest
rf = RandomForestClassifier(featuresCol='features', labelCol='labelIndex', seed=42)

# Membuat grid parameter untuk tuning
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 150]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.maxBins, [32, 64]) \
    .build()

# Membuat evaluator untuk menghitung akurasi
evaluator = MulticlassClassificationEvaluator(labelCol='labelIndex', predictionCol='prediction', metricName='accuracy')

# Membuat CrossValidator
cross_validator = CrossValidator(
    estimator=rf,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=5,  # Jumlah lipatan k-fold cross-validation
    parallelism=2  # Menjalankan proses secara paralel
)

# Melatih model dengan cross-validation
cv_model = cross_validator.fit(train_data)

# Mengevaluasi model terbaik pada data uji
predictions = cv_model.transform(test_data)
accuracy = evaluator.evaluate(predictions)

# Menampilkan hasil evaluasi
print(f"Akurasi terbaik pada data uji: {accuracy}")

# Menampilkan beberapa prediksi
print("Beberapa prediksi:")
predictions.select('features', 'labelIndex', 'prediction', 'probability').show(10)

# Menyimpan model terbaik
cv_model.bestModel.save("best_random_forest_model")
print("\nModel Random Forest terbaik disimpan ke 'best_random_forest_model'")


Dataset sebelum diproses:
+--------+----+----------+-----------+------------+-------------+
|COMPOUND|YEAR|STATE_CODE|COUNTY_CODE|LOW_ESTIMATE|HIGH_ESTIMATE|
+--------+----+----------+-----------+------------+-------------+
|     24D|2014|         1|        1.0|      1698.6|       1885.5|
|     24D|2014|         1|        3.0|      7513.6|       8472.4|
|     24D|2014|         1|        5.0|      2613.6|       2889.4|
|     24D|2014|         1|        7.0|      1259.2|       1277.7|
|     24D|2014|         1|        9.0|      7590.5|       7756.1|
+--------+----+----------+-----------+------------+-------------+
only showing top 5 rows

root
 |-- COMPOUND: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- STATE_CODE: integer (nullable = true)
 |-- COUNTY_CODE: double (nullable = true)
 |-- LOW_ESTIMATE: double (nullable = true)
 |-- HIGH_ESTIMATE: double (nullable = true)

+--------+----+----------+-----------+------------+-------------+--------------+
|COMPOUND|YEAR|S