# Build Directory

In [None]:
import os

BASE_DIR = "/content/heart_bigdata"
RAW_DIR  = f"{BASE_DIR}/raw"
WH_DIR   = f"{BASE_DIR}/warehouse"

os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(WH_DIR, exist_ok=True)

print("Base Dir :", BASE_DIR)
print("Raw Dir  :", RAW_DIR)
print("Warehouse:", WH_DIR)


Base Dir : /content/heart_bigdata
Raw Dir  : /content/heart_bigdata/raw
Warehouse: /content/heart_bigdata/warehouse


Kode ini membuat struktur direktori dasar untuk proyek data. Ini mendefinisikan lokasi BASE_DIR, RAW_DIR (untuk data mentah), dan WH_DIR (untuk data olahan/warehouse) di /content/heart_bigdata. Kemudian, ia membuat direktori-direktori ini jika belum ada (os.makedirs(..., exist_ok=True)).

## Save data to Warehouse

In [None]:
import shutil

src_path = "/content/synthetic_heart_data.csv"
dst_path = f"{RAW_DIR}/synthetic_heart_data.csv"

shutil.copy(src_path, dst_path)

print("Dataset berhasil disimpan ke RAW storage")

Dataset berhasil disimpan ke RAW storage


kode ini menyalin (copy) sebuah file dataset (synthetic_heart_data.csv) dari lokasi sumber (/content/synthetic_heart_data.csv) ke direktori penyimpanan data mentah (RAW_DIR) yang telah ditentukan sebelumnya

## Read Data use Spark

In [None]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("HeartDisease_BigData_UAS")
    .getOrCreate()
)

df_raw = spark.read.option("header", True).csv(dst_path)

df_raw.printSchema()
df_raw.show(5)


root
 |-- age: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- cp: string (nullable = true)
 |-- trestbps: string (nullable = true)
 |-- chol: string (nullable = true)
 |-- fbs: string (nullable = true)
 |-- restecg: string (nullable = true)
 |-- thalach: string (nullable = true)
 |-- exang: string (nullable = true)
 |-- oldpeak: string (nullable = true)
 |-- slope: string (nullable = true)
 |-- ca: string (nullable = true)
 |-- thal: string (nullable = true)
 |-- target: string (nullable = true)

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  0|     136| 233|  0|      1|    125|    1|    1.6|    0|  1|   3|     0|
| 55|  1|  0|     130| 234|  0|      1|    115|    1|    1.2|    1|  0|   3|     0|
| 67|  1|  2|     139| 220|  0|      0|    164|    0|    0.4

Sesi Spark diinisialisasi untuk aplikasi 'HeartDisease_BigData_UAS'. Kemudian, ia membaca data dari file CSV yang berlokasi di dst_path ke dalam PySpark DataFrame df_raw, dengan menganggap baris pertama sebagai header

## Save into Parquet

In [None]:
PARQUET_PATH = f"{WH_DIR}/heart_data.parquet"

df_raw.write.mode("overwrite").parquet(PARQUET_PATH)

print("Data berhasil disimpan dalam format Parquet")

Data berhasil disimpan dalam format Parquet


In [None]:
df = spark.read.parquet(PARQUET_PATH)

print("Jumlah data:", df.count())
df.show(5)

Jumlah data: 30000
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  0|     136| 233|  0|      1|    125|    1|    1.6|    0|  1|   3|     0|
| 55|  1|  0|     130| 234|  0|      1|    115|    1|    1.2|    1|  0|   3|     0|
| 67|  1|  2|     139| 220|  0|      0|    164|    0|    0.4|    2|  0|   2|     1|
| 60|  1|  2|     134| 241|  0|      1|    153|    0|    1.8|    2|  0|   3|     1|
| 40|  0|  0|     136| 220|  1|      1|    151|    1|    0.1|    1|  1|   2|     0|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 5 rows


Kode ini berfungsi untuk menyimpan data dari DataFrame PySpark df_raw ke dalam format file Parquet. Pertama, ia mendefinisikan jalur penyimpanan untuk file Parquet menggunakan variabel PARQUET_PATH, yang menggabungkan direktori WH_DIR dengan nama file heart_data.parquet. Kemudian, data dari df_raw ditulis ke jalur tersebut. Penggunaan .mode("overwrite") memastikan bahwa jika file Parquet dengan nama yang sama sudah ada, file lama akan diganti dengan data yang baru.

# Batch Processing mapreduce

In [None]:
heart_rdd = df.rdd
print("Contoh data RDD:")
heart_rdd.take(3)

Contoh data RDD:


[Row(age='63', sex='1', cp='0', trestbps='136', chol='233', fbs='0', restecg='1', thalach='125', exang='1', oldpeak='1.6', slope='0', ca='1', thal='3', target='0'),
 Row(age='55', sex='1', cp='0', trestbps='130', chol='234', fbs='0', restecg='1', thalach='115', exang='1', oldpeak='1.2', slope='1', ca='0', thal='3', target='0'),
 Row(age='67', sex='1', cp='2', trestbps='139', chol='220', fbs='0', restecg='0', thalach='164', exang='0', oldpeak='0.4', slope='2', ca='0', thal='2', target='1')]

PySpark DataFrame df diubah menjadi RDD  bernama heart_rdd untuk memungkinkan operasi pemrosesan data secara terdistribusi.

In [None]:
# (target, 1)
target_map = heart_rdd.map(lambda r: (r.target, 1))

target_map.take(5)

[('0', 1), ('0', 1), ('1', 1), ('1', 1), ('0', 1)]

In [None]:
target_count = target_map.reduceByKey(lambda a, b: a + b)

target_count.collect()

[('0', 14604), ('1', 15396)]

kode ini menghitung distribusi kolom target. Ini dilakukan dengan mengubah heart_rdd menjadi target_map yang berisi pasangan (target, 1) untuk setiap baris, lalu menggunakan reduceByKey untuk menjumlahkan semua '1' berdasarkan setiap nilai target yang unik, sehingga menghasilkan jumlah total untuk setiap target. Hasilnya kemudian dikumpulkan dan ditampilkan.

In [None]:
# (target, (age, 1))
age_map = heart_rdd.map(lambda r: (r.target, (float(r.age), 1)))

age_sum = age_map.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

age_avg = age_sum.mapValues(lambda x: round(x[0] / x[1], 2))

age_avg.collect()


[('0', 56.51), ('1', 52.33)]

 kode diatas menghitung rata-rata usia (age) untuk setiap nilai target. Ini dicapai dengan membuat age_map yang berisi pasangan (target, (age, 1)). Kemudian, reduceByKey digunakan untuk menjumlahkan total usia dan total hitungan untuk setiap target. Setelah itu, mapValues digunakan untuk menghitung rata-rata usia dengan membagi total usia dengan total hitungan, dan hasilnya dibulatkan menjadi dua angka di belakang koma.

In [None]:
cp_dist = (
    heart_rdd
    .flatMap(lambda r: [(r.cp, 1)])
    .reduceByKey(lambda a, b: a + b)
)

cp_dist.collect()

[('0', 14457), ('2', 8407), ('3', 2305), ('1', 4831)]

Kode diatas menganalisis distribusi kolom cp (chest pain) dan kemudian mengurutkannya berdasarkan jumlah kemunculan. Pertama, cp_dist dibuat dengan mengubah heart_rdd menjadi pasangan (cp, 1) menggunakan flatMap, lalu menjumlahkan kemunculan setiap nilai cp menggunakan reduceByKey

In [None]:
cp_sorted = cp_dist.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

cp_sorted.take(5)

[(14457, '0'), (8407, '2'), (4831, '1'), (2305, '3')]

Hasilnya adalah RDD yang berisi setiap nilai cp dan jumlahnya. Selanjutnya, cp_sorted mengurutkan hasil ini dalam urutan menurun (ascending=False) berdasarkan jumlah kemunculan, dengan memetakan pasangan menjadi (jumlah, cp) terlebih dahulu agar pengurutan berdasarkan jumlah dapat dilakukan.

# EDA

In [None]:
df.printSchema()
df.show(5)

root
 |-- age: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- cp: string (nullable = true)
 |-- trestbps: string (nullable = true)
 |-- chol: string (nullable = true)
 |-- fbs: string (nullable = true)
 |-- restecg: string (nullable = true)
 |-- thalach: string (nullable = true)
 |-- exang: string (nullable = true)
 |-- oldpeak: string (nullable = true)
 |-- slope: string (nullable = true)
 |-- ca: string (nullable = true)
 |-- thal: string (nullable = true)
 |-- target: string (nullable = true)

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  0|     136| 233|  0|      1|    125|    1|    1.6|    0|  1|   3|     0|
| 55|  1|  0|     130| 234|  0|      1|    115|    1|    1.2|    1|  0|   3|     0|
| 67|  1|  2|     139| 220|  0|      0|    164|    0|    0.4

In [None]:
df.select(
    "age", "trestbps", "chol", "thalach", "oldpeak"
).describe().show()


+-------+------------------+------------------+------------------+------------------+------------------+
|summary|               age|          trestbps|              chol|           thalach|           oldpeak|
+-------+------------------+------------------+------------------+------------------+------------------+
|  count|             30000|             30000|             30000|             30000|             30000|
|   mean|54.365633333333335|131.67483333333334|246.03273333333334|          149.1529| 1.354053333333312|
| stddev|  8.64475004394768| 9.479087249065229|26.828096964844725|15.374043069299441|0.9892445892282998|
|    min|                29|               100|               130|               100|               0.0|
|    max|                77|                99|               341|                97|               6.1|
+-------+------------------+------------------+------------------+------------------+------------------+



Kode diatas memilih hanya kolom-kolom usia, tekanan darah, kolesterol, detak jantung maksimum, dan 'oldpeak'. Kemudian, .describe() menghitung statistik ringkasan seperti hitungan (count), rata-rata (mean), standar deviasi (stddev), nilai minimum (min), dan nilai maksimum (max) untuk kolom-kolom yang dipilih tersebut.

In [None]:
df.groupBy("target").count().show()
df.groupBy("sex").count().show()
df.groupBy("cp").count().show()

+------+-----+
|target|count|
+------+-----+
|     0|14604|
|     1|15396|
+------+-----+

+---+-----+
|sex|count|
+---+-----+
|  0| 9018|
|  1|20982|
+---+-----+

+---+-----+
| cp|count|
+---+-----+
|  3| 2305|
|  0|14457|
|  1| 4831|
|  2| 8407|
+---+-----+



 Kode diatas menghitung jumlah kemunculan (count) untuk setiap nilai unik di kolom-kolom yang diuji.

In [None]:
from pyspark.sql.functions import avg

df.groupBy("target").agg(
    avg("age").alias("avg_age")
).show()


+------+-----------------+
|target|          avg_age|
+------+-----------------+
|     0|56.51068200493015|
|     1|52.33093011171733|
+------+-----------------+



Kode diatas menghitung rata-rata usia (avg("age")) untuk setiap kelompok nilai unik di kolom target

# Preprocessing

Null Check

In [None]:
from pyspark.sql.functions import col, sum

df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
]).show()


+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|  0|  0|  0|       0|   0|  0|      0|      0|    0|      0|    0|  0|   0|     0|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+



karena tidak ada data yang kosong, maka tidak diperlukan handling missing value

Convert Type Data

In [None]:
num_cols = [
    "age","trestbps","chol","thalach","oldpeak",
    "cp","sex","fbs","restecg","exang","slope","ca","thal","target"
]

for c in num_cols:
    df = df.withColumn(c, col(c).cast("double"))


ode ini bertujuan untuk mengubah tipe data dari beberapa kolom yang spesifik dalam DataFrame PySpark df menjadi tipe data double.  Proses ini memastikan bahwa kolom-kolom numerik memiliki tipe data yang sesuai untuk analisis lebih lanjut atau pemodelan.

In [None]:
df.printSchema()

root
 |-- age: double (nullable = true)
 |-- sex: double (nullable = true)
 |-- cp: double (nullable = true)
 |-- trestbps: double (nullable = true)
 |-- chol: double (nullable = true)
 |-- fbs: double (nullable = true)
 |-- restecg: double (nullable = true)
 |-- thalach: double (nullable = true)
 |-- exang: double (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: double (nullable = true)
 |-- ca: double (nullable = true)
 |-- thal: double (nullable = true)
 |-- target: double (nullable = true)



## Feature Enggineering

In [None]:
# Target
target_col = "target"

numerical_features = [
    'age',
    'trestbps',
    'chol',
    'thalach',
    'oldpeak'
]

categorical_features = [
    'sex',
    'cp',
    'fbs',
    'restecg',
    'exang',
    'slope',
    'ca',
    'thal'
]


Kode diatas menentukan kolom-kolom apa saja yang merupakan fitur numerik dan kategorikal

One Hot Encoding

In [None]:
from pyspark.ml.feature import StringIndexer

indexers = [
    StringIndexer(
        inputCol=col,
        outputCol=f"{col}_idx",
        handleInvalid="keep"
    )
    for col in categorical_features
]


In [None]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=[f"{col}_idx" for col in categorical_features],
    outputCols=[f"{col}_ohe" for col in categorical_features]
)


Kode diatas mempersiapkan fitur kategorikal untuk pemodelan machine learning. StringIndexer mengubah kategori teks menjadi angka indeks, dan OneHotEncoder mengubah angka indeks tersebut menjadi format biner (one-hot encoding) agar tidak salah diinterpretasikan oleh model.

Scaling

In [None]:
from pyspark.ml.feature import VectorAssembler, MinMaxScaler

# Gabungkan fitur numerik sementara
num_assembler = VectorAssembler(
    inputCols=numerical_features,
    outputCol="num_features"
)

scaler = MinMaxScaler(
    inputCol="num_features",
    outputCol="num_scaled"
)


Kode diatas menggunakan VectorAssembler untuk menggabungkan fitur numerik menjadi satu vektor, kemudian MinMaxScaler untuk menskalakan fitur-fitur numerik tersebut agar nilainya berada di rentang 0-1

In [None]:
final_assembler = VectorAssembler(
    inputCols=["num_scaled"] + [f"{col}_ohe" for col in categorical_features],
    outputCol="features"
)


VectorAssembler digunakan lagi untuk menggabungkan semua fitur (numerik yang sudah diskalakan dan kategorikal yang sudah di-one-hot-encode) menjadi satu vektor fitur akhir untuk model.

In [None]:
from pyspark.ml import Pipeline

preprocess_pipeline = Pipeline(
    stages=[
        *indexers,        # StringIndexer
        encoder,          # OneHotEncoder
        num_assembler,    # Gabung numerik
        scaler,           # Scaling 0â€“1
        final_assembler   # Gabung semua fitur
    ]
)


Kode diatas membuat sebuah pipeline pra-pemrosesan yang mengintegrasikan semua langkah transformasi fitur: mengindeks fitur kategorikal, melakukan one-hot encoding, menggabungkan fitur numerik, menskalakannya, dan akhirnya menyatukan semua fitur menjadi satu vektor siap pakai untuk model.

In [None]:
preprocess_model = preprocess_pipeline.fit(df)
df_final = preprocess_model.transform(df)

df_final.select("features", "target").show(5, truncate=False)


+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|features                                                                                                                                                                  |target|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|(30,[0,1,2,3,4,5,7,11,13,17,20,22,27],[0.7083333333333333,0.5121951219512195,0.48815165876777256,0.29523809523809524,0.26229508196721313,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|0.0   |
|(30,[0,1,2,3,4,5,7,11,13,17,18,21,27],[0.5416666666666666,0.43902439024390244,0.49289099526066354,0.2,0.19672131147540986,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])               |0.0   |
|(30,[0,1,2,3,4,5,8,11,14,16,19,21,26],[0.7916666666666666,0.5487804878048781,0.4265402843601896,0.6

Kode diatas melatih (fit) pipeline pra-pemrosesan yang telah didefinisikan sebelumnya menggunakan DataFrame df untuk mempelajari parameter seperti skala min-max dan pemetaan StringIndexer. Kemudian, pipeline yang sudah terlatih (preprocess_model) digunakan untuk mengubah (transform) df menjadi df_final dengan menerapkan semua langkah pra-pemrosesan.

In [None]:
df_final.show(5)

+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+------+-------+------+-------+-----------+---------+---------+------+--------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|sex_idx|cp_idx|fbs_idx|restecg_idx|exang_idx|slope_idx|ca_idx|thal_idx|      sex_ohe|       cp_ohe|      fbs_ohe|  restecg_ohe|    exang_ohe|    slope_ohe|       ca_ohe|     thal_ohe|        num_features|          num_scaled|            features|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+------+-------+------+-------+-----------+---------+---------+------+--------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+
|63.

# Spark SQL

In [None]:
df_final.createOrReplaceTempView("heart")

Kode ini mendaftarkan DataFrame df_final sebagai tabel SQL temporer bernama heart. Ini memungkinkan Anda untuk menjalankan kueri SQL langsung pada DataFrame menggunakan spark.sql()

In [None]:
spark.sql("""
SELECT target, COUNT(*) AS total
FROM heart
GROUP BY target
""").show()


+------+-----+
|target|total|
+------+-----+
|   0.0|14604|
|   1.0|15396|
+------+-----+



ini menghitung total jumlah data untuk setiap nilai unik di kolom target dalam tabel heart dan menampilkan hasilnya.

In [None]:
spark.sql("""
SELECT
    target,
    ROUND(AVG(age),2) AS avg_age,
    ROUND(AVG(chol),2) AS avg_chol
FROM heart
GROUP BY target
""").show()


+------+-------+--------+
|target|avg_age|avg_chol|
+------+-------+--------+
|   0.0|  56.51|  252.71|
|   1.0|  52.33|   239.7|
+------+-------+--------+



 ini menghitung rata-rata usia (avg_age) dan rata-rata kolesterol (avg_chol) untuk setiap nilai target, dengan hasilnya dibulatkan dua angka di belakang koma

In [None]:
spark.sql("""
SELECT *
FROM heart
WHERE age > (
    SELECT AVG(age) FROM heart
)
AND chol > (
    SELECT AVG(chol) FROM heart
)
""").show(5)


+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+------+-------+------+-------+-----------+---------+---------+------+--------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|sex_idx|cp_idx|fbs_idx|restecg_idx|exang_idx|slope_idx|ca_idx|thal_idx|      sex_ohe|       cp_ohe|      fbs_ohe|  restecg_ohe|    exang_ohe|    slope_ohe|       ca_ohe|     thal_ohe|        num_features|          num_scaled|            features|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+------+-------+------+-------+-----------+---------+---------+------+--------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+--------------------+--------------------+--------------------+
|61.

 ini memilih semua baris dari tabel heart di mana nilai age lebih besar dari rata-rata age keseluruhan DAN nilai chol lebih besar dari rata-rata chol keseluruhan

In [None]:
spark.sql("""
SELECT /*+ BROADCAST(h) */
    target, COUNT(*) AS total
FROM heart h
GROUP BY target
""").explain("formatted")


== Physical Plan ==
AdaptiveSparkPlan (6)
+- HashAggregate (5)
   +- Exchange (4)
      +- HashAggregate (3)
         +- Project (2)
            +- Scan parquet  (1)


(1) Scan parquet 
Output [1]: [target#116]
Batched: true
Location: InMemoryFileIndex [file:/content/heart_bigdata/warehouse/heart_data.parquet]
ReadSchema: struct<target:string>

(2) Project
Output [1]: [cast(target#116 as double) AS target#904]
Input [1]: [target#116]

(3) HashAggregate
Input [1]: [target#904]
Keys [1]: [knownfloatingpointnormalized(normalizenanandzero(target#904)) AS target#904]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#1864L]
Results [2]: [target#904, count#1865L]

(4) Exchange
Input [2]: [target#904, count#1865L]
Arguments: hashpartitioning(target#904, 200), ENSURE_REQUIREMENTS, [plan_id=1908]

(5) HashAggregate
Input [2]: [target#904, count#1865L]
Keys [1]: [target#904]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#1863L]
Results [2]: [target#904, count(1)#18

sama dengan kueri kedua yang menghitung total per target, namun ditambahkan hint BROADCAST(h) yang menyarankan Spark untuk menyiarkan tabel heart ke semua node pekerja untuk optimasi join (meskipun di sini tidak ada join eksplisit, ini menunjukkan bagaimana hint digunakan)

# RDD

map

In [None]:
pair_rdd = df_final.rdd.map(lambda r: (r.target, 1))

Flat Map

In [None]:
flat_rdd = df_final.rdd.flatMap(lambda r: [(r.sex, 1)])
flat_rdd.reduceByKey(lambda a,b: a+b).collect()

[(1.0, 20982), (0.0, 9018)]

map menghasilkan satu elemen output per elemen input (misalnya, membuat pasangan (target, 1) atau (sex, 1)), sedangkan flatMap dapat menghasilkan nol atau lebih elemen output per elemen input (digunakan untuk membuat pasangan (sex, 1) dari setiap baris).

reduceByKey menggabungkan nilai-nilai dengan kunci yang sama. Contohnya, digunakan untuk menghitung total (count) kemunculan setiap nilai target atau sex dengan menjumlahkan '1' yang berpasangan dengan kunci yang sama.

Partition

In [None]:
pair_rdd = df_final.rdd.map(lambda r: (r.target, 1))

partitioned_rdd = pair_rdd.partitionBy(2)

# Cek distribusi partisi
partitioned_rdd.glom().map(len).collect()

[14604, 15396]

Mendistribusikan elemen-elemen RDD ke sejumlah partisi tertentu berdasarkan kunci

ByKey

In [None]:
pair_rdd.reduceByKey(lambda a,b: a+b).collect()

[(0.0, 14604), (1.0, 15396)]

In [None]:
comb_rdd = df_final.rdd.map(lambda r: (r.target, r.age))

avg_age = comb_rdd.combineByKey(
    lambda v: (v,1),
    lambda acc,v: (acc[0]+v, acc[1]+1),
    lambda a,b: (a[0]+b[0], a[1]+b[1])
).mapValues(lambda x: round(x[0]/x[1],2))

avg_age.collect()

[(0.0, 56.51), (1.0, 52.33)]

combineByKey digunakan untuk menghitung rata-rata usia (avg_age) per target dengan menjaga jumlah usia dan hitungan untuk setiap kunci.

# Modelling

In [None]:
ml_df = df_final.select("features", "target")
ml_df.show(5)

+--------------------+------+
|            features|target|
+--------------------+------+
|(30,[0,1,2,3,4,5,...|   0.0|
|(30,[0,1,2,3,4,5,...|   0.0|
|(30,[0,1,2,3,4,5,...|   1.0|
|(30,[0,1,2,3,4,5,...|   1.0|
|(30,[0,1,2,3,4,6,...|   0.0|
+--------------------+------+
only showing top 5 rows


pertama-tama data disiapkan untuk pemodelan machine learning dengan memilih kolom features (yang sudah diproses) dan target dari df_final ke dalam DataFrame baru ml_df

Split Data

In [None]:
train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

print("Train rows:", train_df.count())
print("Test rows :", test_df.count())

Train rows: 24032
Test rows : 5968


Selanjutnya, ml_df dibagi menjadi data pelatihan (train_df) dan data pengujian (test_df) menggunakan randomSplit dengan rasio 80:20, serta menampilkan jumlah baris untuk masing-masing set.

## Logistic Regression

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="target"
)

lr_model = lr.fit(train_df)


model Logistic Regression (lr) diinisialisasi dengan menentukan kolom fitur (featuresCol) dan label (labelCol), lalu model tersebut dilatih (fit) menggunakan data pelatihan (train_df)

In [None]:
lr_preds = lr_model.transform(test_df)

lr_preds.select("target", "prediction", "probability").show(5, truncate=False)


+------+----------+-----------------------------------------+
|target|prediction|probability                              |
+------+----------+-----------------------------------------+
|1.0   |1.0       |[0.1348477693126808,0.8651522306873192]  |
|1.0   |1.0       |[0.004210194883808294,0.9957898051161918]|
|1.0   |1.0       |[0.4628853480584867,0.5371146519415133]  |
|1.0   |1.0       |[0.015158350913777142,0.9848416490862228]|
|1.0   |1.0       |[0.03540145058969744,0.9645985494103025] |
+------+----------+-----------------------------------------+
only showing top 5 rows


Setelah model terlatih (lr_model), ia digunakan untuk membuat prediksi (transform) pada data pengujian (test_df), menghasilkan lr_preds yang berisi kolom target, prediction, dan probability yang kemudian ditampilkan beberapa baris pertamanya.

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr_auc_eval = BinaryClassificationEvaluator(
    labelCol="target",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

lr_auc = lr_auc_eval.evaluate(lr_preds)

print("Logistic Regression AUC:", lr_auc)


Logistic Regression AUC: 0.9923700694907579


performa model Logistic Regression dievaluasi menggunakan BinaryClassificationEvaluator untuk menghitung metrik Area Under the Receiver Operating Characteristic (AUC), yang menunjukkan nilai 0.99

## Random Forest Classification

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol="target",
    featuresCol="features",
    numTrees=50,
    maxDepth=8,
    seed=42
)

rf_model = rf.fit(train_df)


model Random Forest Classifier diinisiaoisasi dengan parameter seperti numTrees = 50 dan maxDepth = 8

In [None]:
rf_preds = rf_model.transform(test_df)

rf_preds.select("target", "prediction", "probability").show(5, truncate=False)


+------+----------+----------------------------------------+
|target|prediction|probability                             |
+------+----------+----------------------------------------+
|1.0   |1.0       |[0.09014046614322498,0.9098595338567751]|
|1.0   |1.0       |[0.08816594251528052,0.9118340574847196]|
|1.0   |1.0       |[0.17444579769289192,0.825554202307108] |
|1.0   |1.0       |[0.09833680184610526,0.9016631981538947]|
|1.0   |1.0       |[0.08838271618639368,0.9116172838136063]|
+------+----------+----------------------------------------+
only showing top 5 rows


In [None]:
rf_auc = lr_auc_eval.evaluate(rf_preds)

print("Random Forest AUC:", rf_auc)


Random Forest AUC: 0.9882236074466955


Setelah model terlatih (rf_model), model membuat prediksi (rf_preds) pada data pengujian (test_df) dan menampilkan beberapa hasilnya. Terakhir, performa model Random Forest dievaluasi menggunakan metrik AUC (rf_auc), yang menunjukkan nilai 0.98

# HyperParameter Tuning

## Logistic Regression


In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(
    featuresCol="features",
    labelCol="target",
    maxIter=50
)


In [None]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (
    ParamGridBuilder()
    .addGrid(lr.regParam, [0.0, 0.01, 0.1])
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
    .build()
)

print("Total kombinasi grid:", len(paramGrid))


Total kombinasi grid: 9


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="target",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)


In [None]:
from pyspark.ml.tuning import CrossValidator

cv_lr = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    seed=42
)


In [None]:
cv_lr_model = cv_lr.fit(train_df)


In [None]:
best_lr_model = cv_lr_model.bestModel

lr_tuned_preds = best_lr_model.transform(test_df)

lr_tuned_auc = evaluator.evaluate(lr_tuned_preds)

Bagian kode ini melakukan hyperparameter tuning untuk model Logistic Regression. Pertama, model Logistic Regression diinisialisasi. Kemudian, ParamGridBuilder digunakan untuk membuat grid kombinasi hyperparameter (regParam dan elasticNetParam). Setelah itu, BinaryClassificationEvaluator disiapkan untuk mengukur performa model menggunakan metrik AUC. Semua ini digabungkan dalam CrossValidator untuk melatih dan mengevaluasi model pada grid parameter yang telah ditentukan. Akhirnya, model terbaik dipilih, digunakan untuk membuat prediksi pada data pengujian, dan performanya (AUC) dihitung.

## Random Forest Classification

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol="target",
    featuresCol="features",
    seed=42
)


In [None]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (
    ParamGridBuilder()
    .addGrid(rf.numTrees, [50, 100])
    .addGrid(rf.maxDepth, [5, 10])
    .addGrid(rf.maxBins, [32, 64])
    .build()
)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="target",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)


In [None]:
from pyspark.ml.tuning import CrossValidator

cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    seed=42
)


In [None]:
cv_model = cv.fit(train_df)

In [None]:
best_rf_model = cv_model.bestModel

rf_tuned_preds = best_rf_model.transform(test_df)

rf_tuned_auc = evaluator.evaluate(rf_tuned_preds)

Tuned Random Forest AUC: 0.9893805885320578


Sama seperti bagian kode diatas, bagian kode ini juga melakukan hyperparameter tuning, tetapi untuk model Random Forest

# Perbandingan Performa Akhir

In [None]:
print("Logistic Regression AUC:", lr_auc)
print("Tuned Logistic Regression AUC:", lr_tuned_auc)

Logistic Regression AUC: 0.9923700694907579
Tuned Logistic Regression AUC: 0.9923700694907579


In [None]:
print("Base Random Forest AUC:", rf_auc)
print("Tuned Random Forest AUC:", rf_tuned_auc)

Base Random Forest AUC: 0.9882236074466955
Tuned Random Forest AUC: 0.9893805885320578


Penjelasan:

Hyperparameter tuning pada Logistic Regression dilakukan menggunakan Grid Search dengan variasi parameter regularisasi dan elastic net. Hasil tuning menunjukkan bahwa Logistic Regression mampu memberikan performa yang lebih baik dibandingkan Random Forest, yang mengindikasikan bahwa hubungan antara fitur dan target bersifat lebih linear dan dapat dimodelkan secara efektif oleh model linier dengan regularisasi yang tepat