1. simulasi Data Warehouse/Batch Job

In [10]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# Tambahkan import multiprocessing jika ini Soal 3
# import multiprocessing 

# Generasi 50000 baris data simulasi (Menggantikan pd.read_csv)
N = 50000
start_date = datetime(2024, 1, 1)

data = {
    'Transaction_ID': np.arange(1, N + 1),
    'Product_Category': np.random.choice(['Electronics', 'Clothing', 'Books', 'Beauty'], N),
    'Purchase_Amount': np.round(np.random.uniform(10, 1000, N), 2),
    'Country': np.random.choice(['Indonesia', 'Singapura', 'Malaysia', 'Thailand'], N),
    'Transaction_Date': [start_date + timedelta(days=np.random.randint(0, 365)) for _ in range(N)]
}

df = pd.DataFrame(data)
df['Transaction_Date'] = pd.to_datetime(df['Transaction_Date'])
print(f"Data simulasi {N} baris berhasil dibuat.")

Data simulasi 50000 baris berhasil dibuat.


In [25]:
# TUGAS: Hitung Total Revenue dan Transaksi per Kategori

batch_analysis = df.groupby('Product_Category').agg(
Total_Revenue=('Purchase_Amount', 'sum'),
Total_Transactions=('Transaction_ID', 'count')
).sort_values(by='Total_Revenue', ascending=False).reset_index()
print("Pemrosesan Batch Selesai.")

Pemrosesan Batch Selesai.


In [12]:
print("--- Hasil Analisis Batch Historis ---")
print(batch_analysis.to_markdown(index=False))

--- Hasil Analisis Batch Historis ---
| Product_Category   |   Total_Revenue |   Total_Transactions |
|:-------------------|----------------:|---------------------:|
| Clothing           |     6.37022e+06 |                12607 |
| Beauty             |     6.31121e+06 |                12439 |
| Electronics        |     6.30604e+06 |                12560 |
| Books              |     6.20905e+06 |                12394 |


In [13]:
# Simpan hasil Batch Processing (Total Revenue per Kategori)
batch_analysis.to_csv('output_batch_analysis.csv', index=False)
print("File output_batch_analysis.csv berhasil dibuat.")

File output_batch_analysis.csv berhasil dibuat.


2. simulasi Kafka Producer

In [21]:
df['Transaction_Date'] = pd.to_datetime(df['Transaction_Date'])

latest_date = df['Transaction_Date'].max()

print("Tanggal Transaksi Terbaru (Simulasi Stream):", latest_date.strftime('%Y-%m-%d'))

Tanggal Transaksi Terbaru (Simulasi Stream): 2024-12-30


In [15]:
# Simulasi Producer: Hanya ambil data hari terbaru
real_time_stream = df[df['Transaction_Date'] == latest_date].copy()
print("Jumlah Transaksi yang akan dikirim ke Kafka:", len(real_time_stream))

Jumlah Transaksi yang akan dikirim ke Kafka: 118


In [16]:
print("--- Data Terbaru (Simulasi Payload Kafka) ---")
# Tampilkan 5 data pertama yang akan 'dikirim'
print(real_time_stream.head().to_markdown(index=False))

--- Data Terbaru (Simulasi Payload Kafka) ---
|   Transaction_ID | Product_Category   |   Purchase_Amount | Country   | Transaction_Date    |
|-----------------:|:-------------------|------------------:|:----------|:--------------------|
|               24 | Books              |            934.71 | Malaysia  | 2024-12-30 00:00:00 |
|               54 | Clothing           |            673.22 | Singapura | 2024-12-30 00:00:00 |
|              233 | Electronics        |             58.09 | Malaysia  | 2024-12-30 00:00:00 |
|              997 | Beauty             |            537.19 | Thailand  | 2024-12-30 00:00:00 |
|             1403 | Books              |            540.26 | Thailand  | 2024-12-30 00:00:00 |


3. simulasi Spark Streaming Consumer

In [22]:
df['Transaction_Date'] = pd.to_datetime(df['Transaction_Date'])
latest_date = df['Transaction_Date'].max()

# Consumer menerima data terbaru dari Producer
real_time_data = df[df['Transaction_Date'] == latest_date].copy()
print("Consumer siap memproses data dari:", latest_date.strftime('%Y-%m-%d'))

Consumer siap memproses data dari: 2024-12-30


In [19]:
# TUGAS: Hitung Total Revenue Real-time
revenue_today = real_time_data['Purchase_Amount'].sum()
transactions_today = len(real_time_data)
print("Pemrosesan Real-time Selesai.")

Pemrosesan Real-time Selesai.


In [20]:
print("--- Metrik Real-Time Hari Ini ---")
print(f"Total Transaksi: {transactions_today}")
print(f"Total Pendapatan: $ {revenue_today:,.2f}")

--- Metrik Real-Time Hari Ini ---
Total Transaksi: 118
Total Pendapatan: $ 56,660.50
