

<div style="padding: 60px;
  text-align: center;
  background: #d4afb9;
  color: #003049;
  font-size: 20px;">
  <h2>Inclass: Spark Integration for Streaming Data</h2>
   <hr>
</div>



# 🌟 Introduction to Data Streaming  

📈 **Real-time data** adalah kunci di dunia keuangan yang bergerak cepat. **Data streaming** memungkinkan lembaga keuangan menganalisis data besar seperti harga saham, tren pasar, atau transaksi secara langsung. Dengan ini, keputusan dapat diambil cepat, risiko dimonitor, dan kecurangan dideteksi.  

🎯 **Contoh Penggunaan Data Streaming**:  

- 💹 **Analisis Pasar Saham**: Memantau harga saham dan membuat keputusan trading secara real-time.  
- 🚨 **Deteksi Kecurangan**: Identifikasi aktivitas mencurigakan untuk mengurangi risiko.  
- 🛒 **Pantauan Transaksi Pelanggan**: Tawarkan layanan personal berdasarkan pola transaksi langsung.  

---

# 🚀 Spark Structured Streaming  

**Spark Structured Streaming** adalah ekstensi dari Apache Spark yang memproses data **real-time** menggunakan API sederhana yang sama dengan batch processing.  

✨ **Kenapa Pilih Spark Structured Streaming?**  
- 🧑‍💻 **Skalabilitas**: Mampu menangani data dalam jumlah besar.  
- 🛡️ **Toleransi Kesalahan**: Data aman dengan fitur checkpointing.  
- ⏱️ **Analisis Real-Time**: Query data langsung dengan sintaks SQL-like.  
- 🔄 **Kode Terpadu**: Gunakan kode yang sama untuk batch dan streaming.  

💡 Spark Structured Streaming memproses data dalam **micro-batches**, yaitu membagi aliran data menjadi potongan kecil agar lebih mudah dikelola.  

📘 Lihat panduan lengkapnya di [Spark Documentation](https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html).  

---

## 🧠 Basic Concept  

### 🔄 Stream Processing  

🏦 Di industri keuangan, stream processing membantu menganalisis data sensitif waktu seperti pergerakan harga saham atau log transaksi pelanggan. Dengan **Structured Streaming**, respons cepat terhadap perubahan pasar atau deteksi fraud menjadi mungkin.  

💡 Bayangkan data stream sebagai **“Input Table”**: Setiap data baru adalah baris baru dalam tabel. Bedanya, pada batch processing, semua data diproses sekaligus, sedangkan pada streaming, data diproses terus-menerus saat datang.  

---

### ⚙️ Cara Kerja:  

1. **Data sebagai Tabel**:  
   Bayangkan data saham masuk setiap detik. Setiap harga baru jadi baris baru di tabel yang dapat langsung di-query.  

2. **Micro-Batching**:  
   Alih-alih memproses satu per satu, data dikumpulkan dalam batch kecil untuk efisiensi.  

3. **Proses Real-Time**:  
   Data langsung dianalisis, diubah, atau dihitung layaknya batch processing.  

---

### 🧾 Contoh Real-Time Table  

![](assets/structured-streaming-model.png)

1. **Input Table**:  
   Kosong di awal, lalu tumbuh saat data baru masuk (contoh: harga saham).  

2. **Proses**:  
   - 📊 Hitung rata-rata pergerakan harga saham.  
   - 📈 Analisis tren perubahan harga mendadak.  
   - 🕒 Agregasi data dalam rentang waktu (contoh: rata-rata harga 5 menit terakhir).  

3. **Result Table**:  
   Hasil di-update terus ke tabel, layar, atau database untuk mendukung **pengambilan keputusan real-time**.  

---


<div class="alert alert-info"><br>
  <center><h2>🔄 Workflow Spark Structured Streaming  </h2></center>
</div> 

# 🔄 Workflow Spark Structured Streaming  

## 💻 Contoh: Pantauan Transaksi Real-Time  

💡 Bayangkan data transaksi pelanggan dalam format **JSON**. Kita bisa:  
- Menghitung total transaksi per pelanggan.  
- Mendeteksi pola perilaku secara langsung.  

###  1️⃣ **Impor Library dan Buat `SparkSession`**  

Untuk memulai:  
1. 🛠️ Impor library:  
   - `StructType` & `StructField`: Mendefinisikan skema JSON.  
   - `from_json` & `col`: Parsing dan interaksi dengan data JSON.  
   - `sum`: Untuk operasi agregasi.  
2. 🚀 Buat `SparkSession`: Pusat operasi Spark, termasuk untuk Structured Streaming.  

✨ **SparkSession** adalah langkah awal untuk memproses aliran data dengan Spark.  

--- 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import from_json, col, sum

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TransactionStreaming") \
    .getOrCreate()

### 2️⃣ **Tentukan Schema Data**  

Schema mendefinisikan struktur data yang akan diterima. Gunakan `StructType` dan `StructField`.


In [None]:
# Define the schema for the transaction data
transaction_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("transaction_amount", DoubleType(), True)
])

📋 **Penjelasan**:  
   - **`customer_id`**: ID unik pelanggan (String).  
   - **`transaction_amount`**: Jumlah transaksi (Double).  
   - **Nullable (`True`)**: Kolom dapat berisi nilai kosong.

### **3️⃣ Baca Data Streaming dengan Spark**

Kita menggunakan **`readStream`** untuk membaca data dari sumber streaming. Dalam contoh ini, data real-time disimulasikan melalui **socket**.

**🔌 Sumber Data: Socket**

Socket digunakan untuk mengirim data real-time.  
Konfigurasi yang diperlukan:  

- **`host`**: Lokasi server data (contoh: `localhost`).  
- **`port`**: Nomor port untuk koneksi (contoh: `9999`).  



In [None]:
# Read the stream of transaction data from a socket
raw_stream_df = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

**📦 Parsing Data JSON**

Data mentah dalam format JSON diubah menjadi kolom terstruktur dengan langkah berikut:  
1. **`from_json`**: Mengonversi string JSON menjadi kolom terstruktur sesuai **`transaction_schema`**.  
2. **`col("value")`**: Mengakses data mentah di stream.  
3. **`alias("data")` + `select("data.*")`**: Ekstrak kolom seperti **`customer_id`** dan **`transaction_amount`** untuk pemrosesan lebih mudah.

In [None]:
# Parse the JSON data
transaction_stream_df = raw_stream_df \
    .select(from_json(col("value"), transaction_schema).alias("data")) \
    .select("data.*")

### 4️⃣ **Proses Data**

Setelah data streaming dibaca, langkah selanjutnya adalah **mengolah data**. Pada contoh ini, kita ingin:  

- **Grouping**: Mengelompokkan data berdasarkan `customer_id` agar setiap pelanggan dapat dianalisis.  
- **Aggregation**:
  - **`sum("transaction_amount")`**: Menjumlahkan total transaksi setiap pelanggan.  
  - **`alias("total_amount")`**: Memberi nama baru pada kolom hasil agregasi agar lebih jelas.

**📊 Hitung Total Transaksi per Pelanggan**

In [None]:
# Calculate the total transaction amount for each customer
from pyspark.sql.functions import sum

customer_total_df = transaction_stream_df \
    .groupBy("customer_id") \
    .agg(sum("transaction_amount").alias("total_amount"))

### 5️⃣ **Tampilkan Hasil**

Setelah data diproses, hasilnya ditulis ke **console** agar bisa dimonitor.  

- **`writeStream`**: Menentukan tujuan keluaran data hasil proses.  
  - **`outputMode("complete")`**: Menampilkan hasil agregasi penuh setiap kali query dijalankan.  
  - **`format("console")`**: Menampilkan hasil di terminal.  
- **`start()`**: Memulai proses streaming secara real-time.  
- **`awaitTermination()`**: Menjaga aplikasi tetap berjalan hingga dihentikan manual.


**📤 Kode untuk Menulis Hasil ke Console**

In [None]:
# Output the result to the console
query = customer_total_df \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Wait for the termination of the query
query.awaitTermination()


<div class="alert alert-danger">
  <center>Kode ini akan error apabila langsung dijalankan tanpa menjalankan socket `nc` di terminal</center>
</div> 


### 6️⃣ **Simulasi Data Transaksi Real-Time**

Untuk menguji, gunakan **`nc` (netcat)** untuk mengirim data JSON ke socket.  
1. Jalankan perintah berikut di terminal:  
   ```bash
   nc -lk 9999
   ```  
   - **`-l`**: Mode listening untuk menerima koneksi.  
   - **`-k`**: Membuka koneksi terus-menerus.  

2. Masukkan data transaksi dalam format JSON:  
   ```json
   {"customer_id": "C001", "transaction_amount": 100.50}
   {"customer_id": "C002", "transaction_amount": 200.75}
   {"customer_id": "C001", "transaction_amount": 150.25}
   {"customer_id": "C003", "transaction_amount": 50.00}
   {"customer_id": "C002", "transaction_amount": 100.00}
   ```


**🖥️ Hasil di Console**

Spark akan memproses data dan menampilkan hasil agregasi secara real-time:

```plaintext
+-----------+------------+
|customer_id|total_amount|
+-----------+------------+
|       C001|      250.75|
|       C002|      300.75|
|       C003|       50.00|
+-----------+------------+
```


**📌 Penjelasan Hasil**
- **`C001`**: 100.50 + 150.25 = **250.75**  
- **`C002`**: 200.75 + 100.00 = **300.75**  
- **`C003`**: Hanya 50.00.

✨ Dengan simulasi ini, Anda bisa melihat bagaimana data streaming diproses secara real-time oleh Spark! 🚀

---

<div class="alert">

**Why Netcat? 🤔**

🔹 **Simulasi Mudah**: Uji aliran data real-time tanpa sistem kompleks.  
🔹 **Cocok untuk Pemula**: Belajar konsep streaming sebelum masuk ke tools produksi.

**Untuk Produksi** 

Ganti **Netcat** dengan sistem yang lebih andal, seperti: 
 
- **Apache Kafka**  
- **Amazon Kinesis**  
- **Google Pub/Sub**
</div> 

 


# 🚀 Enhancing Real-Time Streaming with Messaging Systems  

Dalam dunia keuangan, seperti **monitoring transaksi**, **deteksi penipuan**, atau **analisis harga saham**, kita perlu alat andal untuk menangani data real-time. Di sinilah **Apache Kafka** hadir! Berbeda dengan server sederhana seperti `nc`, Kafka adalah platform terdistribusi yang dirancang khusus untuk mengelola data aliran besar secara efisien. 

## 🧐 Apa itu Apache Kafka?  

**Apache Kafka** adalah sistem pesan terdistribusi open-source untuk menangani aliran data secara real-time. Dibuat oleh LinkedIn, Kafka kini menjadi alat penting dalam industri, termasuk keuangan, untuk mengolah data dengan cepat dan skalabel.  

### 🌟 *Kenapa Kafka Penting untuk Keuangan?*  
Kafka membantu bisnis:  
1. 💸 **Memproses Transaksi**: Tangani jutaan transaksi per detik dengan latensi rendah.  
2. 🔒 **Deteksi Penipuan**: Identifikasi pola mencurigakan secara real-time.  
3. 📈 **Analisis Saham**: Pantau data pasar langsung untuk keputusan cepat.  
4. 👥 **Wawasan Pelanggan**: Personalisasi layanan berdasarkan data perilaku real-time.  

---

## 🧩 Komponen Utama Kafka  

Mari kita kenali bagian-bagian inti Kafka:  

1. **📌 Topics**:  
   - Saluran untuk pesan, seperti `"transactions"` atau `"stock_prices"`.  
   - Semua pesan terkait disimpan di topik untuk akses real-time.  

2. **✉️ Producers**:  
   - Sistem yang mengirim pesan ke topik Kafka. Contoh: Platform trading mengirim data ke `"transactions"`.  

3. **🛠️ Consumers**:  
   - Aplikasi yang membaca pesan dari topik. Misalnya, aplikasi deteksi penipuan membaca dari `"transactions"`.  

4. **🔗 Brokers**:  
   - Server yang menyimpan dan mengelola pesan. Kafka mendistribusikan dan mereplikasi data untuk ketahanan.  

5. **⚙️ Partitions**:  
   - Membagi topik menjadi bagian lebih kecil, misalnya berdasarkan wilayah: `"Jakarta"`, `"Bogor"`.  

6. **🔢 Offset**:  
   - ID unik untuk setiap pesan dalam partisi. Konsumen melacak pesan dengan offset ini.  


---

## 🔄 Bagaimana Kafka Mengelola Data Real-Time?  

1. **📤 Mengirim Data ke Kafka**  
   - **Producers** mengirim pesan ke topik. Contoh:  
     ```json  
     {"transaction_id": "T12345", "amount": 500.0, "currency": "USD"}  
     ```  

2. **📂 Partisi dan Penyimpanan**  
   - Kafka mempartisi data berdasarkan kunci (misalnya ID transaksi).  
   - Pesan disimpan berurutan untuk proses real-time.  

3. **📥 Mengonsumsi Data**  
   - **Consumers** membaca pesan dari topik dan memprosesnya (e.g., model deteksi penipuan).  

4. **⏳ Penyimpanan dan Retensi**  
   - Kafka menyimpan pesan untuk durasi tertentu, penting untuk audit & kepatuhan.  

5. **📈 Skalabilitas dan Ketahanan**  
   - Kafka tetap cepat meski volume data besar. Jika broker gagal, replika mengambil alih otomatis.  

---

## 📈 Finance Use Case: Real-Time Loan Monitoring with Kafka  

### **📝 Scenario**  

Sebuah bank ingin memproses aplikasi pinjaman secara **real-time** untuk memberikan keputusan cepat: **disetujui** atau **ditolak**. Dengan **Apache Kafka**, bank dapat meningkatkan efisiensi dan keandalan proses ini. 


---

### **🔄 Alur Proses**  

![](assets/loan_applications_kafka.png)


#### **1️⃣ Producer: Mengirim Data Aplikasi Pinjaman**  

- **Apa yang dilakukan**:  
  Aplikasi perbankan online mengirim data pelanggan (misalnya pendapatan, skor kredit, jumlah pinjaman) ke topik Kafka bernama `"loan_applications"`.  


#### **2️⃣ Broker: Mengelola Aliran Data**  

- **Peran Kafka**:  
  - Membagi topik `"loan_applications"` menjadi partisi (misalnya berdasarkan wilayah).  
  - Mereplikasi data untuk memastikan ketersediaan tinggi, meskipun server gagal.  
  - Menyimpan data untuk durasi tertentu guna keperluan reprocessing.  
- **Manfaat**:  
  Kafka memastikan **tidak ada data yang hilang** dan memungkinkan proses paralel untuk keputusan cepat.  

#### **3️⃣ Consumer: Prediksi Keputusan Pinjaman**  

- **Peran Consumer**:  
  - Membaca data aplikasi dari topik Kafka.  
  - **Model pembelajaran mesin** menganalisis fitur seperti pendapatan dan skor kredit untuk memutuskan:  
    - ✅ **Disetujui**: Skor kredit tinggi dan pendapatan memadai.  
    - ❌ **Ditolak**: Skor kredit rendah atau jumlah pinjaman terlalu besar.  
- **Output**: Hasil dikirim ke topik baru, namun pada pembelajaran kali ini kita hanya menampilkan pada jupyter notebook console 




## ⚙️ Langkah Persiapan Kafka  

### **🖥️ Instalasi di Mac**  

1. **📥 Unduh Kafka**  
   - Kunjungi [Apache Kafka Download Page](https://kafka.apache.org/downloads).  
   - Unduh versi binari Kafka (misalnya `kafka_2.13-3.8.1.tgz`).  

2. **🗂️ Ekstraksi File Kafka**  
   - Ekstrak file ke direktori seperti `Tools/kafka`.  

3. **🔧 Konfigurasi Direktori Data Kafka**  
   - Buat folder `data` di dalam direktori Kafka.  
   - Edit file `config/kraft/server.properties` dan ubah:  
     ```properties
     log.dirs=data
     ```
   - Simpan perubahan.  

4. **🆔 Format Penyimpanan Kafka**  
   - Hasilkan UUID untuk cluster:  
     ```bash
     ./bin/kafka-storage.sh random-uuid
     ```  
   - Gunakan UUID ini untuk memformat penyimpanan Kafka:  
     ```bash
     ./bin/kafka-storage.sh format --cluster-id <UUID> --config config/kraft/server.properties
     ```  

5. **▶️ Menjalankan Kafka**  
   - Mulai Kafka:  
     ```bash
     ./bin/kafka-server-start.sh config/kraft/server.properties
     ```  
   - Hentikan Kafka dengan `CTRL+C`.  

---

### **🖥️ Instalasi di Windows**  

1. **📥 Unduh Kafka**  
   - Unduh file Kafka dan ekstrak menggunakan **WinRAR** atau alat serupa.  
   - Rename folder ke `kafka` untuk kemudahan.  

2. **🔧 Konfigurasi ZooKeeper dan Kafka**  
   - Ubah lokasi penyimpanan data di `config/zookeeper.properties` dan `config/kraft/server.properties`:  
     ```properties
     dataDir=data/zookeeper
     log.dirs=data/kafka-logs
     ```  

3. **🆔 Format Penyimpanan Kafka**  
   - Hasilkan UUID dengan:  
     ```bash
     bin\windows\kafka-storage.bat random-uuid
     ```  
   - Format penyimpanan Kafka:  
     ```bash
     bin\windows\kafka-storage.bat format --cluster-id <UUID> --config config\kraft\server.properties
     ```  

4. **▶️ Menjalankan Kafka**  
   - Jalankan ZooKeeper:  
     ```bash
     bin\windows\zookeeper-server-start.bat config\zookeeper.properties
     ```  
   - Jalankan Kafka:  
     ```bash
     bin\windows\kafka-server-start.bat config\kraft\server.properties
     ```  


## Step 2: Membuat Kafka Producer

![](assets/st_UI_producer.png)

Kafka producer digunakan untuk mengirim pesan ke topik Kafka agar dapat diproses lebih lanjut. Dalam langkah ini, kita akan:  

1. Mengkonfigurasi Kafka producer menggunakan pustaka `confluent_kafka`.  
2. Membangun antarmuka pengguna (UI) dengan Streamlit untuk input data aplikasi pinjaman.  
3. Mengirim data yang dikumpulkan ke topik Kafka bernama `"loan_applications"`.  

### **1️⃣ Kode Kafka Producer**

Berikut adalah kode Python untuk mengatur Kafka producer:


```python
from confluent_kafka import Producer
import json

# Konfigurasi Kafka producer
producer_config = {
    'bootstrap.servers': 'localhost:9092'  # Alamat Kafka broker
}

# Membuat instance Kafka producer
producer = Producer(producer_config)

def kirim_pesan(topik, nilai):
    """
    Mengirim pesan ke topik Kafka yang ditentukan.
    """
    try:
        # Kirim data ke topik Kafka dalam format JSON
        producer.produce(topik, value=json.dumps(nilai))
        producer.flush()  # Pastikan pesan terkirim
        print(f"Pesan berhasil dikirim ke topik '{topik}': {nilai}")
    except Exception as e:
        print(f"Kesalahan saat mengirim pesan: {e}")
```

**🔑 Poin Penting**:  

- **`bootstrap.servers`**: Alamat Kafka broker. Secara default, menggunakan `localhost:9092` untuk pengaturan lokal.  
- **`produce()`**: Mengirim data ke topik Kafka yang ditentukan.  
- **`flush()`**: Memastikan semua pesan yang tersimpan dalam buffer terkirim sebelum melanjutkan proses.  


### **2️⃣ Membuat Antarmuka dengan Streamlit**

Berikut adalah cara membuat antarmuka web interaktif menggunakan Streamlit untuk pengumpulan data aplikasi pinjaman:

```python
# Streamlit UI for loan applications
st.title("Credit Risk Prediction")

with st.form("credit_risk_form"):
    person_home_ownership = st.selectbox("Home Ownership", ["RENT", "OWN", "MORTGAGE"])
    loan_intent = st.selectbox("Loan Intent", ["PERSONAL", "EDUCATION", "DEBT CONSOLIDATION"])
    loan_grade = st.selectbox("Loan Grade", ["A", "B", "C", "D", "E", "F", "G"])
    person_age = st.number_input("Age", min_value=0, step=1)
    person_income = st.number_input("Income", min_value=0.0, step=0.1)
    person_emp_length = st.number_input("Employment Length (in years)", min_value=0, step=1)
    loan_amnt = st.number_input("Loan Amount", min_value=0.0, step=0.1)
    loan_int_rate = st.number_input("Interest Rate", min_value=0.0, step=0.1)
    cb_person_cred_hist_length = st.number_input("Credit History Length (in years)", min_value=0, step=1)

    submitted = st.form_submit_button("Submit")

# Menangani pengiriman formulir
if submitted:
    # Mengumpulkan data dari formulir ke dalam dictionary
    data_pinjaman = {
        "person_home_ownership": person_home_ownership,
        "loan_intent": loan_intent,
        "loan_grade": loan_grade,
        "person_age": person_age,
        "person_income": person_income,
        "person_emp_length": person_emp_length,
        "loan_amnt": loan_amnt,
        "loan_int_rate": loan_int_rate,
        "cb_person_cred_hist_length": cb_person_cred_hist_length
    }
    
    try:
        # Mengirim data ke Kafka
        kirim_pesan("loan_applications", data_pinjaman)
        st.success("✅ Aplikasi pinjaman berhasil dikirim ke Kafka!")
    except Exception as e:
        st.error(f"❌ Kesalahan saat mengirim data ke Kafka: {e}")
```





### **How This Works:**

- **Input Pengguna**:  
  Pengguna memasukkan detail pinjaman, seperti **usia**, **pendapatan**, **jumlah pinjaman**, dll., ke dalam formulir.  

- **Transformasi Data**:  
  Data dari formulir dikumpulkan ke dalam dictionary dan diubah menjadi format JSON sebelum dikirim ke Kafka.  

- **Pengiriman Pesan**:  
  Kafka producer mempublikasikan pesan JSON ke topik `"loan_applications"`.  


### 3️⃣ **Menjalankan Aplikasi Streamlit**

1. Simpan kode ke file, misalnya `streamlit_kafka_producer.py`.  
2. Jalankan aplikasi Streamlit:  
   ```bash
   streamlit run streamlit_kafka_producer.py
   ```  
3. Buka browser (Streamlit akan menampilkan tautannya di terminal). 


<div class="alert alert-info"><br>
  <center><h2>Read Data From Kafka</h2></center>
</div> 

## Step 3: Kafka Consumer untuk Membaca Data dan Mengubahnya ke DataFrame

### Membaca Data dari Kafka

Pada langkah ini, kita akan membaca pesan yang dikirim oleh Kafka Producer dan mengubahnya ke format terstruktur (seperti DataFrame). Untuk itu, kita akan menggunakan **PySpark** sebagai alat untuk memproses dan menganalisis data Kafka.

**1️⃣ Mengatur PySpark dengan Kafka**

Langkah pertama adalah membuat **Spark session** yang memungkinkan kita berinteraksi dengan Spark dan Kafka.

Namun ada beberapa configurasi yang peru kita atur:

- **`spark.jars.packages`**: Menentukan pustaka Kafka dan Spark SQL yang diperlukan untuk membaca data dari Kafka.
- **`master("local[*]")`**: Memberitahu Spark untuk berjalan secara lokal dengan memanfaatkan semua core CPU yang tersedia.

In [None]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("Credit Data Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,org.apache.kafka:kafka-clients:3.8.1")\
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()  # Create the session

spark

**2️⃣ Membaca Data dari Kafka**

Setelah Spark session dibuat, kita bisa membaca data dari topik Kafka menggunakan fungsi `spark.read.format("kafka")`.

Ada beberapa `option` (pengaturan) yang perlu didefinisikan:

- **`kafka.bootstrap.servers`**: Alamat Kafka broker, dalam hal ini `localhost:9092` untuk server lokal.
- **`subscribe`**: Nama topik Kafka yang akan dibaca, misalnya `loan_applications`.
- **`startingOffsets`**: Mengatur agar pembacaan dimulai dari data awal (earliest) yang tersedia.

In [None]:
kafka_df = (
    spark
    .read
    .format("kafka")  # Gunakan Kafka sebagai sumber data
    .option("kafka.bootstrap.servers", "localhost:9092")  # Alamat Kafka broker
    .option("subscribe", "loan_applications")  # Nama topik Kafka
    .option("startingOffsets", "earliest")  # Mulai membaca dari offset paling awal
    .load()  # Memuat data ke dalam DataFrame
)

In [None]:
# Menampilkan struktur data
kafka_df.printSchema()  # Print schema to see the structure of the data

In [None]:
kafka_df.show()  # Display the first few rows of the data


**3️⃣ Mendefinisikan Skema untuk Data**

Pesan dari Kafka disimpan dalam format **binary**, sehingga kita perlu mengubahnya menjadi format terstruktur. Untuk itu, kita mendefinisikan **schema** yang sesuai dengan data.


In [None]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define the schema for the data inside the 'value' column
schema = StructType([
    StructField("person_home_ownership", StringType(), True),
    StructField("loan_intent", StringType(), True),
    StructField("loan_grade", StringType(), True),
    StructField("person_age", IntegerType(), True),
    StructField("person_income", FloatType(), True),
    StructField("person_emp_length", IntegerType(), True),
    StructField("loan_amnt", FloatType(), True),
    StructField("loan_int_rate", FloatType(), True),
    StructField("cb_person_cred_hist_length", IntegerType(), True)
])

- **`StringType`** digunakan untuk data teks (misalnya `person_home_ownership`).
- **`IntegerType`** dan **`FloatType`** digunakan untuk data numerik (misalnya `person_age`, `loan_amnt`).

**4️⃣ Mengubah Pesan Kafka ke Format String dan Parsing JSON**

Pesan Kafka dalam kolom **`value`** perlu diubah dari format binary menjadi string, lalu diparsing ke format JSON menggunakan schema yang sudah didefinisikan.

In [None]:
# Ubah 'value' dari binary ke string dan parsing ke format JSON

# Konversi binary ke string
df = kafka_df.withColumn("value_str", col("value").cast("string"))

# Parsing JSON berdasarkan schema
df_parsed = df.withColumn("data", from_json(col("value_str"), schema)) 

- **`col("value").cast("string")`**: Mengonversi data Kafka dari format binary ke string.
- **`from_json(col("value_str"), schema)`**: Memparsing string JSON ke DataFrame terstruktur berdasarkan schema.


In [None]:
df_parsed.show()

**5️⃣ Memilih dan Menampilkan Data**

Setelah parsing, kita dapat memilih kolom yang relevan dan menampilkan hasilnya.

In [None]:
# Memilih kolom yang relevan
df_result = df_parsed.select(
    "data.*",  # Semua kolom dari data yang diparsing
    "key", "topic", "partition", "offset", "timestamp", "timestampType"  # Metadata Kafka
)



- **`select("data.*", ...)`**: This extracts all the fields from the parsed JSON data and adds Kafka metadata (like `key`, `topic`, `partition`, etc.).
- **`df_result.show()`**: This displays the DataFrame content, allowing you to view the structured data from Kafka.

**6️⃣ Hasil Akhir**

Setelah kode di atas dijalankan, data yang diterima dari Kafka akan tampil dalam format terstruktur seperti berikut:

In [None]:
# Menampilkan hasil
df_result.show()


<div class="alert alert-info"><br>
  <center><h2>Streaming Data From Kafka</h2></center>
</div> 

## Streaming Data dari Kafka

Pada langkah ini, kita akan menggunakan data **streaming** dari Kafka, yang memungkinkan kita membaca data secara real-time dan memprosesnya secara langsung. Tujuannya adalah membaca data dari topik Kafka dan mengubahnya menjadi format terstruktur menggunakan **PySpark**.


**1️⃣ Mengatur Spark Session**



In [None]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("Credit Data Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,org.apache.kafka:kafka-clients:3.8.1")\
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()  # Create the session

spark  # Display the Spark session

**2️⃣ Membaca Data Streaming dari Kafka**

Setelah Spark session dibuat, kita dapat menggunakan fungsi **`readStream`** untuk terus membaca pesan dari topik Kafka secara real-time.

In [None]:
stream_kafka_df = (
    spark
    .readStream
    .format("kafka")  # Membaca data dari Kafka
    .option("kafka.bootstrap.servers", "localhost:9092")  # Alamat server Kafka
    .option("subscribe", "loan_applications")  # Nama topik Kafka
    .option("startingOffsets", "earliest")  # Mulai membaca dari data awal
    .load()  # Memuat data ke dalam DataFrame
)

**3️⃣ Mendefinisikan Skema untuk Data**

Pesan dari Kafka biasanya berupa format **binary**, sehingga kita perlu mendefinisikan skema untuk mengubah data mentah menjadi format terstruktur.

In [None]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Definisi skema untuk kolom 'value'
schema = StructType([
    StructField("person_home_ownership", StringType(), True),
    StructField("loan_intent", StringType(), True),
    StructField("loan_grade", StringType(), True),
    StructField("person_age", IntegerType(), True),
    StructField("person_income", FloatType(), True),
    StructField("person_emp_length", IntegerType(), True),
    StructField("loan_amnt", FloatType(), True),
    StructField("loan_int_rate", FloatType(), True),
    StructField("cb_person_cred_hist_length", IntegerType(), True)
])

**4️⃣ Parsing Data dari Format Binary ke JSON**

Setelah skema didefinisikan, kita dapat mengonversi kolom **`value`** (yang berisi data binary) menjadi string yang dapat dibaca. Kemudian, kita parsing data JSON menggunakan skema yang sudah dibuat.

In [None]:
# Ubah 'value' dari binary ke string, lalu parsing ke format JSON
df_parsed = stream_kafka_df.withColumn("value_str", col("value").cast("string"))
df_data = df_parsed.withColumn("data", from_json(col("value_str"), schema))

# Memilih kolom yang diinginkan
df_result = df_data.select(
    "data.*",  # Semua kolom dari data yang diparsing
    "key", "topic", "partition", "offset", "timestamp", "timestampType"  # Metadata Kafka
)

**5️⃣ Menampilkan Data Secara Streaming**

Sekarang, kita akan menampilkan hasil streaming. Karena ini adalah operasi **streaming**, data akan terus diproses dan ditampilkan secara real-time.

Beberapa parameter yang bisa digunakan untuk streaming data:

- **`outputMode("append")`**: Mode ini menambahkan baris baru ke output tanpa mengubah data yang sudah ada.
- **`format("console")`**: Menulis output stream ke konsol. Alternatif lain adalah `"memory"` untuk menyimpan data di memori.
- **`awaitTermination()`**: Menjaga query tetap berjalan tanpa batas waktu, menunggu data baru untuk diproses.

In [None]:
# Tulis hasil stream ke konsol dalam mode append
query = (
    df_result
    .writeStream
    .outputMode("append")  # Menambahkan data baru ke output
    .format("console")  # Menampilkan hasil di konsol (bisa juga 'memory' untuk query di memori)
    .start()  # Memulai streaming query
)

# Menunggu stream berjalan
query.awaitTermination()


<div class="alert">
<p>

Pada langkah ini, kita telah membuat **Kafka consumer** yang membaca data secara terus-menerus dari topik Kafka. Dengan **PySpark**, kita berhasil:
1. Membaca data streaming dari Kafka.
2. Memparsing pesan JSON dari format binary menjadi terstruktur.
3. Menampilkan hasil data secara real-time di konsol.
</p>
</div> 


<div class="alert alert-info"><br>
  <center><h2>Streaming Data From Kafka + PySpark for Prediction</h2></center>
</div> 

## Streaming Data dari Kafka + PySpark untuk Prediksi

Pada bagian ini kita akan menerapkan **model machine learning** yang telah dilatih sebelumnya dalam skenario streaming, dengan memanfaatkan **Kafka** untuk menerima data secara real-time, **PySpark** untuk pemrosesan data, dan **model prediksi** untuk memberikan hasil secara langsung. Berikut langkah-langkahnya:

**1️⃣ Membuat Spark Session**

In [None]:
from pyspark.sql import SparkSession

# Membuat Spark session untuk menghubungkan ke cluster Spark
spark = SparkSession.builder \
    .appName("Credit Data Kafka Predict") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3,org.apache.kafka:kafka-clients:3.8.1")\
    .config("spark.sql.shuffle.partitions", 4)\
    .master("local[*]")\
    .getOrCreate()

spark

**2️⃣ Mendefinisikan Skema untuk Data Masuk**

In [None]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

# Define schema for the data in Kafka messages
schema = StructType([
    StructField("person_home_ownership", StringType(), True),
    StructField("loan_intent", StringType(), True),
    StructField("loan_grade", StringType(), True),
    StructField("person_age", IntegerType(), True),
    StructField("person_income", FloatType(), True),
    StructField("person_emp_length", IntegerType(), True),
    StructField("loan_amnt", FloatType(), True),
    StructField("loan_int_rate", FloatType(), True),
    StructField("cb_person_cred_hist_length", IntegerType(), True)
])

**3️⃣ Membaca Data Streaming dari Kafka**

In [None]:
stream_kafka_df_predict = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "loan_applications")
    .option("startingOffsets", "earliest")  # Start reading from the earliest message
    .load()
)

**4️⃣ Memuat Model yang Telah Dilatih**

Kita akan memuat **model pre-trained** yang sebelumnya telah dilatih menggunakan data historis. Model ini akan digunakan untuk memprediksi hasil aplikasi pinjaman baru yang diterima melalui Kafka.

Model yang dimuat:

- **RandomForestClassificationModel**: Model untuk memprediksi apakah aplikasi pinjaman akan disetujui atau ditolak.
- **MinMaxScalerModel**: Model untuk **scaling** fitur ke dalam rentang standar.
- **PipelineModel**: Mengelola preprocessing seperti encoding, pengisian nilai yang hilang, dan transformasi fitur.

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel
from pyspark.ml.feature import MinMaxScalerModel, VectorAssembler
from pyspark.ml import PipelineModel

# Load pre-trained models
rf_model = RandomForestClassificationModel.load("models_new1/random_forest_credit_risk")
scaler_model = MinMaxScalerModel.load("models_new1/minmax_scaler")
pipeline_model = PipelineModel.load("models_new1/preprocessing_pipeline")

**5️⃣ Parsing dan Preprocessing Data**

Pesan dari Kafka berbentuk **binary**, sehingga kita perlu mengubahnya ke string, parsing JSON, lalu menyesuaikan dengan skema yang sudah dibuat.

In [None]:
from pyspark.sql.functions import col, from_json

# Parsing JSON dari kolom 'value' Kafka
df_parsed = stream_kafka_df_predict.withColumn("value_str", col("value").cast("string"))
df_data = df_parsed.withColumn("data", from_json(col("value_str"), schema)).select("data.*")

**6️⃣ Definisi Logika Prediksi dan Penerapan Pipeline**

Langkah ini mendefinisikan logika untuk memproses data streaming dan menggunakan model yang sudah dilatih untuk prediksi.


In [None]:
from pyspark.sql import functions as F

# Define function to apply the prediction and add the result column
def predict_and_add_column(input_data):
    # Apply preprocessing pipeline (e.g., encoding categorical variables)
    processed_data = pipeline_model.transform(input_data)
    
    # Assemble features into a single vector column
    assembler = VectorAssembler(
        inputCols=['person_age', 'person_income', 'person_emp_length', 'loan_amnt', 'loan_int_rate',
                   'cb_person_cred_hist_length', 'person_home_ownership_encoded', 'loan_intent_encoded', 'loan_grade_encoded'],
        outputCol="features_unscaled"
    )
    assembled_data = assembler.transform(processed_data)
    
    # Scale the features
    scaled_data = scaler_model.transform(assembled_data)
    
    # Predict the loan approval (1 = Approved, 0 = Denied)
    predictions = rf_model.transform(scaled_data)
    
    # Add 'approval_status' column based on prediction
    result_data = predictions.withColumn(
        "approval_status",
        F.when(col("prediction") == 1, "Approved").otherwise("Denied")
    )
    
    return result_data

<div class="alert">
  <details>
    <summary><b>Penjelasan</b></summary> 
    
**1. Preprocessing Data**
Pertama-tama, kita menggunakan **pipeline model** yang sudah dilatih untuk memproses data yang masuk. Pipeline ini memastikan bahwa fitur kategori di-encode dengan benar dan data ditransformasikan ke dalam format yang dibutuhkan oleh model machine learning.

- **Preprocessing** di pipeline model dapat mencakup berbagai langkah, seperti mengisi nilai yang hilang, encoding variabel kategorikal (misalnya, `person_home_ownership`), dan mengubah format data agar sesuai untuk input model.

**2. Menyusun Fitur**
Selanjutnya, kita menyusun semua fitur individu menjadi satu vektor fitur yang tunggal. Proses ini dilakukan dengan menggunakan **VectorAssembler**, yang menggabungkan kolom-kolom fitur menjadi satu kolom vektor. Ini adalah langkah penting karena banyak model machine learning (termasuk Random Forest) membutuhkan input dalam bentuk vektor fitur.

**3. Scaling Fitur**
Fitur-fitur yang telah disusun dalam vektor kemudian akan **diskalakan** menggunakan **MinMaxScaler**. Scaling bertujuan untuk menormalkan nilai-nilai fitur sehingga berada dalam rentang yang sama. Hal ini penting karena banyak model machine learning, termasuk Random Forest, lebih efektif bila fitur-fitur memiliki skala yang seragam.

**4. Prediksi dengan Model Random Forest**
Setelah fitur-fitur diskalakan, kita mengaplikasikan model **RandomForest** yang telah dilatih sebelumnya untuk melakukan prediksi status persetujuan pinjaman. Model ini akan menghasilkan prediksi dalam bentuk nilai numerik, yaitu `1` untuk disetujui (approved) dan `0` untuk ditolak (denied).

**5. Menambahkan Kolom Prediksi**
Setelah prediksi dihasilkan, kita menambahkan kolom baru, yaitu **approval_status**, yang berisi status persetujuan pinjaman berdasarkan hasil prediksi. Jika prediksi bernilai `1`, maka kolom `approval_status` akan berisi "Approved", dan jika bernilai `0`, kolom tersebut akan berisi "Denied".

    
</details>
</div> 

**7️⃣ Streaming Prediksi dan Menampilkan Hasil**

Setelah fungsi prediksi diterapkan pada data streaming, hasil prediksi akan ditampilkan secara real-time.

In [None]:
# Apply prediction logic on the streaming data
predicted_stream = predict_and_add_column(df_data)

# Output the results to the console (for debugging or display in Jupyter notebook)
query = (
    predicted_stream
    .writeStream
    .outputMode("append")  # Keep adding new predictions as data comes in
    .format("console")  # Display results in console
    .start()
)

# Wait for the stream to finish
query.awaitTermination()

<div class="alert">
<p>

Langkah ini mengintegrasikan Kafka, PySpark, dan model machine learning untuk memproses dan memprediksi data real-time. **Manfaatnya**:
1. **Real-time Processing**: Prediksi dilakukan segera setelah data diterima.
2. **Scalable Architecture**: Spark dan Kafka memungkinkan pengolahan data dalam skala besar.
3. **Efisiensi Model**: Memanfaatkan pipeline dan model pre-trained untuk alur kerja yang konsisten.

</p>
</div> 