<a href="https://colab.research.google.com/github/gryffindour/Journal-Idea/blob/main/Nazwa_Destia_Praktikum_4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==============================
# SEL 1: INSTALL & SETUP LENGKAP
# ==============================
# Instal Java 11 (wajib untuk PySpark 3.5+)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Set JAVA_HOME
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

# Instal PySpark dan findspark
!pip install pyspark findspark

# Inisialisasi PySpark
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StandardScaler, MinMaxScaler, Bucketizer, StringIndexer, OneHotEncoder, Tokenizer, HashingTF, IDF

# Buat SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PraktikumPreprocessing") \
    .getOrCreate()

print("✅ SparkSession berhasil dibuat!")

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
✅ SparkSession berhasil dibuat!


# ==============================
# SEL 2: DATA CLEANING
# ==============================
# Data sampel yang "kotor"
data_kotor = [
    (1, 'Budi Susanto', 25, 5500000, 'L', '2022-01-15', 'Jakarta', 'Transaksi berhasil, barang bagus'),
    (2, 'Ani Lestari', None, 8000000, 'P', '2022-02-20', 'Bandung', 'Pengiriman cepat dan barang sesuai'),
    (3, 'Candra Wijaya', 35, 12000000, 'L', '2022-01-18', 'Surabaya', 'Sangat puas dengan pelayanannya'),
    (4, 'Dewi Anggraini', 22, 4800000, 'P', '2022-03-10', 'JKT', 'Barang diterima dalam kondisi baik'),
    (5, 'Eka Prasetyo', 45, 15000000, 'L', '2022-04-01', 'Jakarta', 'Transaksi gagal, mohon diperiksa'),
    (6, 'Budi Susanto', 25, 5500000, 'L', '2022-01-15', 'Jakarta', 'Transaksi berhasil, barang bagus'),  # Duplikat
    (7, 'Fina Rahmawati', 29, 9500000, 'P', '2022-05-12', 'Bandung', None),  # Missing ulasan
    (8, 'Galih Nugroho', 31, -7500000, 'L', '2022-06-25', 'Surabaya', 'Barang oke'),  # Gaji negatif
    (9, 'Hesti Wulandari', 55, 25000000, 'P', '2022-07-30', 'Jakarta', 'Pelayanan ramah dan cepat'),
    (10, 'Indra Maulana', 150, 6200000, 'L', '2022-08-05', 'Medan', 'Produk original')  # Usia outlier
]

# Skema
skema = StructType([
    StructField("id_pelanggan", IntegerType(), True),
    StructField("nama", StringType(), True),
    StructField("usia", IntegerType(), True),
    StructField("gaji", IntegerType(), True),
    StructField("jenis_kelamin", StringType(), True),
    StructField("tgl_registrasi", StringType(), True),
    StructField("kota", StringType(), True),
    StructField("ulasan", StringType(), True)
])

# Buat DataFrame
df = spark.createDataFrame(data=data_kotor, schema=skema)
print("Dataset Awal:")
df.show(truncate=False)

# 1.1 Imputasi Missing Values
mean_usia = df.select(avg("usia")).collect()[0][0]
df_filled = df.na.fill({'usia': int(mean_usia), 'ulasan': 'Tidak ada ulasan'})

# 1.2 Hapus Duplikat
df_bersih = df_filled.dropDuplicates()

# 1.3 Perbaiki Noise & Inkonsistensi
df_bersih = df_bersih.withColumn("kota", when(col("kota") == "JKT", "Jakarta").otherwise(col("kota"))) \
                     .withColumn("gaji", abs(col("gaji"))) \
                     .filter(col("usia") <= 100)

print("✅ Data setelah cleaning:")
df_bersih.show()

# ==============================
# SEL 3: TRANSFORMASI & FEATURE ENGINEERING
# ==============================
# 2.1 Standarisasi
assembler = VectorAssembler(inputCols=["usia", "gaji"], outputCol="fitur_numerik")
df_vec = assembler.transform(df_bersih)
scaler = StandardScaler(inputCol="fitur_numerik", outputCol="fitur_standar", withMean=True, withStd=True)
df_scaled = scaler.fit(df_vec).transform(df_vec)

# 2.2 Agregasi
df_agg = df_bersih.groupBy("kota").agg(count("id_pelanggan").alias("jumlah"), avg("gaji").alias("rata_gaji"))

# 2.3 Diskretisasi Usia
bucketizer = Bucketizer(splits=[0, 20, 40, float('Inf')], inputCol="usia", outputCol="kelompok_usia")
df_binned = bucketizer.transform(df_bersih)

# 3.1 Ekstraksi Tanggal
df_eng = df_bersih.withColumn("tgl_ts", to_date("tgl_registrasi", "yyyy-MM-dd")) \
                  .withColumn("bulan_reg", month("tgl_ts")) \
                  .withColumn("tahun_reg", year("tgl_ts"))

# 3.2 Encoding Kategorikal
indexer_jk = StringIndexer(inputCol="jenis_kelamin", outputCol="jk_idx")
indexer_kota = StringIndexer(inputCol="kota", outputCol="kota_idx")
df_idx = indexer_jk.fit(df_eng).transform(df_eng)
df_idx = indexer_kota.fit(df_idx).transform(df_idx)

ohe = OneHotEncoder(inputCols=["jk_idx", "kota_idx"], outputCols=["jk_ohe", "kota_ohe"])
df_final = ohe.fit(df_idx).transform(df_idx)

# 3.3 TF-IDF dari Ulasan
tokenizer = Tokenizer(inputCol="ulasan", outputCol="kata")
df_token = tokenizer.transform(df_final)
hashingTF = HashingTF(inputCol="kata", outputCol="tf", numFeatures=20)
df_tf = hashingTF.transform(df_token)
idf = IDF(inputCol="tf", outputCol="tfidf")
df_tfidf = idf.fit(df_tf).transform(df_tf)

print("✅ Feature engineering selesai. Contoh hasil:")
df_tfidf.select("nama", "bulan_reg", "jk_ohe", "kota_ohe", "tfidf").show(truncate=False)

# ==============================
# SEL 4: LATIHAN 4.1
# ==============================
# 1. Agregasi lanjutan
df_lat1 = df_bersih.groupBy("jenis_kelamin", "kota").agg(max("gaji").alias("gaji_max"), min("usia").alias("usia_min"))
print("1. Agregasi Lanjutan:")
df_lat1.show()

# 2. Diskretisasi Gaji
bucket_gaji = Bucketizer(splits=[0, 7000000, 15000000, float('Inf')], inputCol="gaji", outputCol="level_idx")
df_level = bucket_gaji.transform(df_bersih)
df_level = df_level.withColumn("level_gaji",
    when(col("level_idx") == 0, "Rendah")
    .when(col("level_idx") == 1, "Menengah")
    .otherwise("Tinggi"))
print("2. Level Gaji:")
df_level.select("gaji", "level_gaji").show()

# 3. Fitur Interaksi
df_inter = df_bersih.withColumn("usia_x_gaji", col("usia") * col("gaji"))
print("3. Fitur Interaksi:")
df_inter.select("id_pelanggan", "usia", "gaji", "usia_x_gaji").show(5)

# ==============================
# SEL 5: TUGAS 4.2
# ==============================
data_produk = [
    (101, 'Laptop A', 'Elektronik', 15000000, 4.5, 120, '2023-01-20', 'stok_tersedia'),
    (102, 'Smartphone B', 'Elektronik', 8000000, 4.7, 250, '2023-02-10', 'stok_tersedia'),
    (103, 'Headphone C', 'Aksesoris', 1200000, 4.2, None, '2023-02-15', 'stok_habis'),
    (104, 'Laptop A', 'Elektronik', 15000000, 4.5, 120, '2023-01-20', 'stok_tersedia'),
    (105, 'Tablet D', 'Elektronik', 6500000, None, 80, '2023-03-01', 'stok_tersedia'),
    (106, 'Charger E', 'Aksesoris', 250000, -4.0, 500, '2023-03-05', 'Stok_Tersedia'),
    (107, 'Smartwatch F', 'Elektronik', 3100000, 4.8, 150, '2023-04-12', 'stok_habis')
]

skema_produk = StructType([
    StructField("id_produk", IntegerType()),
    StructField("nama_produk", StringType()),
    StructField("kategori", StringType()),
    StructField("harga", IntegerType()),
    StructField("rating", FloatType()),
    StructField("terjual", IntegerType()),
    StructField("tgl_rilis", StringType()),
    StructField("status_stok", StringType())
])

df_tugas = spark.createDataFrame(data=data_produk, schema=skema_produk)

# Cleaning
terjual_med = df_tugas.approxQuantile("terjual", [0.5], 0.01)[0]
rating_mean = df_tugas.filter(col("rating") > 0).select(avg("rating")).collect()[0][0]
df_t = df_tugas.na.fill({'terjual': int(terjual_med), 'rating': float(rating_mean)}) \
               .dropDuplicates() \
               .withColumn("rating", abs(col("rating"))) \
               .withColumn("status_stok", lower(col("status_stok")))

# Transformasi (standarisasi)
vec_asm = VectorAssembler(inputCols=["harga", "rating", "terjual"], outputCol="fitur")
df_v = vec_asm.transform(df_t)
df_std = StandardScaler(inputCol="fitur", outputCol="fitur_std", withMean=True, withStd=True).fit(df_v).transform(df_v)

# Feature Engineering
df_fe = df_std.withColumn("bulan_rilis", month(to_date("tgl_rilis", "yyyy-MM-dd")))
idx1 = StringIndexer(inputCol="kategori", outputCol="kat_idx")
idx2 = StringIndexer(inputCol="status_stok", outputCol="stok_idx")
df_i = idx2.fit(idx1.fit(df_fe).transform(df_fe)).transform(df_fe)
ohe2 = OneHotEncoder(inputCols=["kat_idx", "stok_idx"], outputCols=["kat_ohe", "stok_ohe"])
df_tugas_akhir = ohe2.fit(df_i).transform(df_i)

print("✅ Hasil Akhir Tugas (10 baris pertama):")
df_tugas_akhir.select(
    "id_produk", "nama_produk", "kategori", "status_stok",
    "bulan_rilis", "fitur_std", "kat_ohe", "stok_ohe"
).show(10, truncate=False)

# ==============================
# SEL 6: SELESAI
# ==============================
spark.stop()
print("✅ Praktikum selesai. SparkSession dihentikan.")