<a href="https://colab.research.google.com/github/hyulianton/BigData/blob/main/Klasifikasi_dengan_Apache_Spark_MLlib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Contoh 1: Klasifikasi dengan Apache Spark MLlib (PySpark)

**Tujuan:** Mendemonstrasikan bagaimana melakukan klasifikasi menggunakan Logistic Regression di Spark MLlib (PySpark) pada dataset Iris.

**Dataset:** Iris Dataset (dataset klasik untuk klasifikasi, tersedia di scikit-learn).

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz

In [3]:
!tar -xf spark-3.5.5-bin-hadoop3.tgz

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"

In [5]:
!pip install findspark -q -q -q
!pip install pyspark -q -q -q

In [6]:
# --- Langkah 1: Inisialisasi SparkSession ---
# SparkSession adalah entry point untuk memprogram Spark dengan Dataset dan DataFrame API.
# Ini ibarat pintu gerbang ke cluster Spark kita.
from pyspark.sql import SparkSession

# Membuat SparkSession.
# .builder: Memulai konfigurasi SparkSession.
# .appName("IrisClassification"): Memberi nama aplikasi Spark kita.
# .master("local[*]"): Menentukan mode Spark. "local[*]" berarti Spark akan berjalan secara lokal
#                      menggunakan semua core CPU yang tersedia. Untuk cluster, ini akan diganti
#                      dengan URL master Spark (misal: "spark://<master_ip>:7077").
# .getOrCreate(): Mengembalikan SparkSession yang sudah ada atau membuat yang baru.
spark = SparkSession.builder \
    .appName("IrisClassification") \
    .master("local[*]") \
    .getOrCreate()

print("SparkSession berhasil diinisialisasi.")
print(f"Spark UI available at: {spark.sparkContext.uiWebUrl}")

# --- Langkah 2: Muat Dataset (Simulasi Data Big Data) ---
# Karena dataset Iris kecil, kita akan memuatnya melalui scikit-learn,
# lalu mengkonversinya menjadi Spark DataFrame.
# Dalam skenario Big Data sesungguhnya, data akan dimuat langsung dari HDFS, S3, dll.

from sklearn.datasets import load_iris
import pandas as pd

# Muat dataset Iris dari scikit-learn
iris = load_iris()
# Konversi ke Pandas DataFrame untuk kemudahan
iris_df_pandas = pd.DataFrame(data=iris.data, columns=iris.feature_names)
# Tambahkan kolom target (label)
iris_df_pandas['target'] = iris.target
iris_df_pandas['target_name'] = iris.target_names[iris.target]

print("\nDataset Iris (Pandas DataFrame):")
print(iris_df_pandas.head())
print(f"Ukuran dataset: {iris_df_pandas.shape[0]} baris, {iris_df_pandas.shape[1]} kolom.")

# Konversi Pandas DataFrame ke Spark DataFrame
# Ini adalah langkah krusial untuk membawa data ke ekosistem Spark
iris_df_spark = spark.createDataFrame(iris_df_pandas)

print("\nDataset Iris (Spark DataFrame - Schema):")
iris_df_spark.printSchema()
print("\nDataset Iris (Spark DataFrame - Contoh Data):")
iris_df_spark.show(5)

# --- Langkah 3: Persiapan Data untuk MLlib (Feature Engineering) ---
# MLlib membutuhkan fitur-fitur (input untuk model) dalam satu kolom bertipe Vector.
# Ini adalah konsep `VectorAssembler`.

from pyspark.ml.feature import VectorAssembler

# Tentukan kolom fitur (input untuk model) dan kolom target (label)
feature_cols = ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)']
label_col = 'target' # Menggunakan kolom target numerik

# Inisialisasi VectorAssembler:
# inputCols: Daftar kolom yang akan digabungkan menjadi satu vektor fitur.
# outputCol: Nama kolom baru yang akan berisi vektor fitur.
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Terapkan transformasi assembler ke Spark DataFrame
# Ini akan menghasilkan DataFrame baru dengan kolom 'features' yang berisi vektor fitur.
data = assembler.transform(iris_df_spark)

print("\nSpark DataFrame setelah VectorAssembler (kolom 'features' ditambahkan):")
data.select("features", label_col).show(5, truncate=False) # truncate=False agar vektor fitur tidak terpotong

# --- Langkah 4: Pembagian Data (Training dan Testing) ---
# Kita perlu membagi data menjadi set pelatihan (training) dan set pengujian (testing)
# untuk mengevaluasi kinerja model.

# randomSplit: Membagi DataFrame secara acak berdasarkan bobot yang diberikan.
# [0.7, 0.3]: 70% untuk training, 30% untuk testing.
# seed: Untuk reproduksibilitas (agar hasil pembagian selalu sama).
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)

print(f"\nUkuran data training: {train_data.count()} baris.")
print(f"Ukuran data testing: {test_data.count()} baris.")

# --- Langkah 5: Membangun dan Melatih Model Klasifikasi (Logistic Regression) ---
# Menggunakan algoritma Logistic Regression dari MLlib.

from pyspark.ml.classification import LogisticRegression

# Inisialisasi model LogisticRegression:
# labelCol: Nama kolom yang berisi label (target).
# featuresCol: Nama kolom yang berisi vektor fitur (hasil dari VectorAssembler).
# family: Tipe klasifikasi ('binomial' untuk 2 kelas, 'multinomial' untuk >2 kelas).
lr = LogisticRegression(labelCol=label_col, featuresCol="features", family="multinomial")

print("\nMemulai pelatihan model Logistic Regression...")
# Melatih model menggunakan data training. Ini adalah langkah komputasi utama yang terdistribusi.
lr_model = lr.fit(train_data)
print("Pelatihan model selesai.")

# --- Langkah 6: Evaluasi Model ---
# Menggunakan model yang sudah dilatih untuk membuat prediksi pada data testing
# dan mengevaluasi seberapa baik kinerja model.

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Melakukan prediksi pada data testing.
# Model akan menambahkan kolom 'prediction' dan 'probability' ke DataFrame.
predictions = lr_model.transform(test_data)

print("\nPrediksi pada data testing (kolom 'prediction' dan 'probability' ditambahkan):")
predictions.select(label_col, "prediction", "probability").show(5, truncate=False)

# Inisialisasi evaluator untuk klasifikasi multi-kelas
# labelCol: Nama kolom label sebenarnya.
# predictionCol: Nama kolom hasil prediksi model.
# metricName: Metrik evaluasi yang akan digunakan ('accuracy' adalah umum).
evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName="accuracy")

# Hitung akurasi model
accuracy = evaluator.evaluate(predictions)

print(f"\nAkurasiasi Model pada data testing: {accuracy:.4f}")

# --- Langkah 7: Menghentikan SparkSession ---
# Penting untuk menghentikan SparkSession setelah selesai menggunakan sumber daya.
spark.stop()
print("\nSparkSession dihentikan.")

SparkSession berhasil diinisialisasi.
Spark UI available at: http://a7c41d6b328d:4040

Dataset Iris (Pandas DataFrame):
   sepal length (cm)  sepal width (cm)  petal length (cm)  petal width (cm)  \
0                5.1               3.5                1.4               0.2   
1                4.9               3.0                1.4               0.2   
2                4.7               3.2                1.3               0.2   
3                4.6               3.1                1.5               0.2   
4                5.0               3.6                1.4               0.2   

   target target_name  
0       0      setosa  
1       0      setosa  
2       0      setosa  
3       0      setosa  
4       0      setosa  
Ukuran dataset: 150 baris, 6 kolom.

Dataset Iris (Spark DataFrame - Schema):
root
 |-- sepal length (cm): double (nullable = true)
 |-- sepal width (cm): double (nullable = true)
 |-- petal length (cm): double (nullable = true)
 |-- petal width (cm): double (n

**Penjelasan Alur Cerita Program Spark MLlib:**

1.  **Inisialisasi SparkSession:** Ini seperti "menghidupkan" mesin pabrik Big Data kita. Tanpa ini, tidak ada yang bisa kita lakukan. `local[*]` berarti kita menggunakan semua inti CPU di mesin lokal kita sebagai "cluster" mini.
2.  **Muat Dataset:** Kita mengambil "bahan baku" (data Iris) dan mengubahnya menjadi format yang bisa dipahami dan diproses oleh Spark (Spark DataFrame). Dalam skenario nyata, ini akan melibatkan membaca data dari lokasi terdistribusi seperti HDFS atau S3.
3.  **Persiapan Data (Feature Engineering):** Ini adalah tahap "merakit" bahan baku menjadi bentuk yang bisa "dicerna" oleh algoritma ML. `VectorAssembler` mengambil kolom-kolom fitur individual dan menggabungkannya menjadi satu kolom vektor, yang merupakan format standar yang dibutuhkan oleh MLlib.
4.  **Pembagian Data:** Kita memisahkan sebagian data untuk "pelatihan" model (data _training_) dan sebagian lain untuk "menguji" seberapa baik model kita bekerja pada data yang belum pernah dilihatnya (data _testing_). Ini penting untuk menghindari _overfitting_.
5.  **Membangun & Melatih Model:** Di sinilah "otak" pabrik bekerja. Kita memilih algoritma `LogisticRegression` dan "mengajarkan" model menggunakan `train_data`. Seluruh proses pelatihan ini secara otomatis didistribusikan oleh Spark ke seluruh _node_ (atau _core_ lokal kita).
6.  **Evaluasi Model:** Setelah model dilatih, kita menguji kemampuannya. Kita membiarkan model membuat prediksi pada `test_data`, lalu membandingkan prediksi itu dengan label sebenarnya untuk melihat seberapa akurat model kita.
7.  **Menghentikan SparkSession:** Setelah pekerjaan selesai, kita "mematikan" mesin pabrik untuk membebaskan sumber daya.
