# Handson: Batch Processing vs Stream Processing
## Mata Kuliah: Pemrosesan dan Infrastruktur Data

**Tujuan Pembelajaran:**
- Memahami konsep batch processing dan stream processing
- Menguasai implementasi batch processing menggunakan Pandas dan Apache Spark
- Memahami dan mengimplementasikan stream processing dengan Python dan Kafka
- Membandingkan performa antara batch dan stream processing
- Membangun pipeline data processing yang lengkap

**Prerequisites:**
- Python 3.8+
- Jupyter Notebook
- Pemahaman dasar tentang data processing

---

## 1. Setup Environment dan Import Libraries

Pertama-tama, kita akan menginstall dan mengimport semua library yang diperlukan untuk handson ini.

In [None]:
# Install required packages (uncomment if needed)
# !pip install pandas numpy matplotlib seaborn
# !pip install pyspark
# !pip install kafka-python
# !pip install faker
# !pip install psutil

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import threading
import queue
import json
import random
from datetime import datetime, timedelta
from faker import Faker
import psutil
import os
import warnings
warnings.filterwarnings('ignore')

# Set style for plots
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("✅ All libraries imported successfully!")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")

## 2. Batch Processing dengan Pandas

Batch processing adalah metode pemrosesan data di mana data dikumpulkan dan diproses dalam batch atau kelompok pada interval waktu tertentu.

### Karakteristik Batch Processing:
- **High Throughput**: Dapat memproses volume data yang besar
- **High Latency**: Delay antara data masuk dan hasil keluar
- **Scheduled Processing**: Dijalankan pada waktu tertentu
- **Complete Data**: Memproses dataset yang lengkap

In [None]:
# Generate sample e-commerce data for batch processing
fake = Faker()
np.random.seed(42)

def generate_ecommerce_data(num_records=100000):
    """Generate sample e-commerce transaction data"""
    data = []
    categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports', 'Beauty']
    
    for i in range(num_records):
        record = {
            'transaction_id': f'TXN_{i+1:06d}',
            'customer_id': f'CUST_{random.randint(1, 10000):05d}',
            'product_category': random.choice(categories),
            'product_price': round(random.uniform(10, 1000), 2),
            'quantity': random.randint(1, 5),
            'timestamp': fake.date_time_between(start_date='-30d', end_date='now'),
            'customer_age': random.randint(18, 70),
            'customer_city': fake.city(),
            'payment_method': random.choice(['Credit Card', 'Debit Card', 'PayPal', 'Cash'])
        }
        record['total_amount'] = round(record['product_price'] * record['quantity'], 2)
        data.append(record)
    
    return pd.DataFrame(data)

# Generate data
print("🔄 Generating e-commerce dataset...")
start_time = time.time()
df = generate_ecommerce_data(100000)
generation_time = time.time() - start_time

print(f"✅ Dataset generated in {generation_time:.2f} seconds")
print(f"📊 Dataset shape: {df.shape}")
print(f"💾 Memory usage: {df.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

# Display sample data
print("\n📝 Sample data:")
print(df.head())

In [None]:
# Batch Processing Operations dengan Pandas

def batch_analytics(df):
    """Perform comprehensive batch analytics"""
    print("🔍 Starting Batch Analytics...")
    start_time = time.time()
    
    # 1. Sales by Category
    category_sales = df.groupby('product_category').agg({
        'total_amount': ['sum', 'mean', 'count'],
        'quantity': 'sum'
    }).round(2)
    
    # 2. Daily Sales Trend
    df['date'] = df['timestamp'].dt.date
    daily_sales = df.groupby('date')['total_amount'].sum().reset_index()
    
    # 3. Customer Segmentation
    customer_stats = df.groupby('customer_id').agg({
        'total_amount': 'sum',
        'transaction_id': 'count'
    }).reset_index()
    customer_stats.columns = ['customer_id', 'total_spent', 'transaction_count']
    
    # 4. Payment Method Analysis
    payment_analysis = df.groupby('payment_method')['total_amount'].agg(['sum', 'count']).reset_index()
    
    # 5. Age Group Analysis
    df['age_group'] = pd.cut(df['customer_age'], 
                           bins=[0, 25, 35, 45, 55, 100], 
                           labels=['18-25', '26-35', '36-45', '46-55', '56+'])
    age_analysis = df.groupby('age_group')['total_amount'].agg(['sum', 'mean', 'count'])
    
    processing_time = time.time() - start_time
    
    print(f"✅ Batch processing completed in {processing_time:.2f} seconds")
    
    return {
        'category_sales': category_sales,
        'daily_sales': daily_sales,
        'customer_stats': customer_stats,
        'payment_analysis': payment_analysis,
        'age_analysis': age_analysis,
        'processing_time': processing_time
    }

# Execute batch processing
results = batch_analytics(df)

# Display results
print("\n📊 Category Sales Summary:")
print(results['category_sales'])

print(f"\n💰 Total Revenue: ${df['total_amount'].sum():,.2f}")
print(f"🛒 Total Transactions: {len(df):,}")
print(f"👥 Unique Customers: {df['customer_id'].nunique():,}")

In [None]:
# Visualisasi Hasil Batch Processing
fig, axes = plt.subplots(2, 2, figsize=(15, 12))

# 1. Sales by Category
category_sales_plot = results['category_sales']['total_amount']['sum'].sort_values(ascending=True)
axes[0, 0].barh(category_sales_plot.index, category_sales_plot.values)
axes[0, 0].set_title('Total Sales by Category')
axes[0, 0].set_xlabel('Total Sales ($)')

# 2. Daily Sales Trend
axes[0, 1].plot(results['daily_sales']['date'], results['daily_sales']['total_amount'])
axes[0, 1].set_title('Daily Sales Trend')
axes[0, 1].set_xlabel('Date')
axes[0, 1].set_ylabel('Sales ($)')
axes[0, 1].tick_params(axis='x', rotation=45)

# 3. Payment Method Distribution
payment_counts = results['payment_analysis']['count']
axes[1, 0].pie(payment_counts, labels=results['payment_analysis']['payment_method'], autopct='%1.1f%%')
axes[1, 0].set_title('Payment Method Distribution')

# 4. Age Group Analysis
age_sales = results['age_analysis']['sum']
axes[1, 1].bar(age_sales.index, age_sales.values)
axes[1, 1].set_title('Sales by Age Group')
axes[1, 1].set_xlabel('Age Group')
axes[1, 1].set_ylabel('Total Sales ($)')

plt.tight_layout()
plt.show()

# Performance metrics
print(f"\n⚡ Batch Processing Performance:")
print(f"   - Processing Time: {results['processing_time']:.2f} seconds")
print(f"   - Records Processed: {len(df):,}")
print(f"   - Processing Rate: {len(df)/results['processing_time']:,.0f} records/second")

## 3. Batch Processing dengan Apache Spark (PySpark)

Apache Spark adalah framework untuk processing data besar yang dapat menangani batch dan stream processing. Mari kita coba implementasi dengan PySpark.

In [None]:
# PySpark Setup with fallback to Pandas simulation
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, sum as spark_sum, avg, count, max as spark_max
    from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
    
    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("BatchProcessingDemo") \
        .config("spark.driver.memory", "1g") \
        .config("spark.executor.memory", "1g") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()
    
    # Set log level to reduce verbose output
    spark.sparkContext.setLogLevel("WARN")
    
    print("✅ Spark Session created successfully!")
    print(f"🔥 Spark Version: {spark.version}")
    
    # Convert Pandas DataFrame to Spark DataFrame
    print("\n🔄 Converting Pandas DataFrame to Spark DataFrame...")
    spark_df = spark.createDataFrame(df)
    
    print(f"📊 Spark DataFrame shape: {spark_df.count()} rows, {len(spark_df.columns)} columns")
    print("\n📝 Spark DataFrame Schema:")
    spark_df.printSchema()
    
    SPARK_AVAILABLE = True
    
except Exception as e:
    print("⚠️  PySpark not available or Java Runtime missing")
    print(f"   Error: {type(e).__name__}")
    print("\n💡 Alternatives untuk menjalankan Spark:")
    print("   1. Install Java: brew install openjdk@11")
    print("   2. Install PySpark: pip install pyspark")
    print("   3. Use Docker: docker run -it apache/spark:latest")
    print("   4. Use cloud services: Databricks, EMR, Google Dataproc")
    print("\n🔄 Continuing with Pandas simulation of Spark operations...")
    
    SPARK_AVAILABLE = False

In [None]:
# Spark-style Operations (simulated with Pandas since Spark not available)
if SPARK_AVAILABLE:
    print("🔍 Starting Spark Batch Analytics...")
    start_time = time.time()
    
    # 1. Sales by Category using Spark
    spark_category_sales = spark_df.groupBy("product_category") \
        .agg(spark_sum("total_amount").alias("total_sales"),
             avg("total_amount").alias("avg_sales"),
             count("*").alias("transaction_count")) \
        .orderBy(col("total_sales").desc())
    
    print("📊 Category Sales (Spark):")
    spark_category_sales.show()
    
    # 2. Top customers
    spark_top_customers = spark_df.groupBy("customer_id") \
        .agg(spark_sum("total_amount").alias("total_spent"),
             count("*").alias("transactions")) \
        .orderBy(col("total_spent").desc()) \
        .limit(10)
    
    print("🏆 Top 10 Customers (Spark):")
    spark_top_customers.show()
    
    # 3. Daily sales using Spark SQL
    spark_df.createOrReplaceTempView("transactions")
    
    daily_sales_spark = spark.sql("""
        SELECT DATE(timestamp) as date, 
               SUM(total_amount) as daily_total,
               COUNT(*) as daily_transactions
        FROM transactions 
        GROUP BY DATE(timestamp) 
        ORDER BY date
    """)
    
    print("📅 Daily Sales (Spark SQL):")
    daily_sales_spark.show(10)
    
    spark_processing_time = time.time() - start_time
    print(f"✅ Spark processing completed in {spark_processing_time:.2f} seconds")
    
    # Cache DataFrame for faster subsequent operations
    spark_df.cache()
    cached_count = spark_df.count()  # Trigger caching
    print(f"🚀 DataFrame cached with {cached_count:,} records")
    
else:
    print("🔍 Starting Spark-style Analytics with Pandas simulation...")
    start_time = time.time()
    
    # 1. Sales by Category (Spark-style with Pandas)
    print("📊 Category Sales (Pandas simulation of Spark):")
    pandas_category_sales = df.groupby('product_category').agg({
        'total_amount': ['sum', 'mean', 'count']
    }).round(2)
    pandas_category_sales.columns = ['total_sales', 'avg_sales', 'transaction_count']
    pandas_category_sales = pandas_category_sales.sort_values('total_sales', ascending=False)
    print(pandas_category_sales)
    
    # 2. Top customers (Spark-style with Pandas)
    print("\n🏆 Top 10 Customers (Pandas simulation of Spark):")
    pandas_top_customers = df.groupby('customer_id').agg({
        'total_amount': 'sum',
        'transaction_id': 'count'
    }).round(2)
    pandas_top_customers.columns = ['total_spent', 'transactions']
    pandas_top_customers = pandas_top_customers.sort_values('total_spent', ascending=False).head(10)
    print(pandas_top_customers)
    
    # 3. Daily sales (SQL-style with Pandas)
    print("\n📅 Daily Sales (Pandas SQL-style):")
    df['date'] = df['timestamp'].dt.date
    pandas_daily_sales = df.groupby('date').agg({
        'total_amount': 'sum',
        'transaction_id': 'count'
    }).round(2)
    pandas_daily_sales.columns = ['daily_total', 'daily_transactions']
    pandas_daily_sales = pandas_daily_sales.sort_index()
    print(pandas_daily_sales.head(10))
    
    spark_processing_time = time.time() - start_time
    print(f"\n✅ Pandas simulation completed in {spark_processing_time:.2f} seconds")
    
    # Memory usage instead of caching
    memory_usage = df.memory_usage(deep=True).sum() / 1024**2
    print(f"🧠 DataFrame memory usage: {memory_usage:.2f} MB")
    
    print(f"\n📈 Performance Insights:")
    print(f"   - Pandas is efficient for datasets < 1GB")
    print(f"   - Spark excels with datasets > 1GB and distributed computing")
    print(f"   - This simulation shows similar operations in both frameworks")

## 4. Stream Processing Simulation dengan Python

Stream processing adalah metode pemrosesan data real-time di mana data diproses segera setelah tiba.

### Karakteristik Stream Processing:
- **Low Latency**: Minimal delay antara input dan output
- **Continuous Processing**: Data diproses secara kontinyu
- **Event-driven**: Respons terhadap event yang masuk
- **Windowing**: Agregasi data dalam window waktu tertentu

In [None]:
import time
from collections import defaultdict, deque
from threading import Thread
import queue

class StreamProcessor:
    """Real-time stream processor simulation"""
    
    def __init__(self, window_size=60):  # 60 seconds window
        self.window_size = window_size
        self.data_stream = queue.Queue()
        self.metrics = defaultdict(lambda: deque())
        self.running = False
        self.processed_count = 0
        
    def generate_stream_data(self, duration=30):
        """Generate streaming transaction data"""
        print(f"🌊 Starting data stream for {duration} seconds...")
        start_time = time.time()
        
        while time.time() - start_time < duration:
            # Generate random transaction
            transaction = {
                'timestamp': datetime.now(),
                'transaction_id': f'STREAM_{self.processed_count + 1:06d}',
                'customer_id': f'CUST_{random.randint(1, 1000):05d}',
                'product_category': random.choice(['Electronics', 'Clothing', 'Books', 'Home']),
                'amount': round(random.uniform(10, 500), 2),
                'quantity': random.randint(1, 3)
            }
            
            self.data_stream.put(transaction)
            time.sleep(random.uniform(0.1, 0.5))  # Random intervals
        
        # Signal end of stream
        self.data_stream.put(None)
        print("🔚 Stream generation completed")
    
    def process_stream(self):
        """Process incoming stream data with windowing"""
        print("🔄 Starting stream processing...")
        
        while self.running:
            try:
                # Get data from stream with timeout
                data = self.data_stream.get(timeout=1)
                
                if data is None:  # End of stream signal
                    break
                
                # Process the data
                self.process_transaction(data)
                self.processed_count += 1
                
                # Print real-time metrics every 10 transactions
                if self.processed_count % 10 == 0:
                    self.print_realtime_metrics()
                    
            except queue.Empty:
                continue
        
        print(f"✅ Stream processing completed. Processed {self.processed_count} transactions")
    
    def process_transaction(self, transaction):
        """Process individual transaction"""
        current_time = transaction['timestamp']
        
        # Add to sliding window
        category = transaction['product_category']
        
        # Store in time-based window
        self.metrics[f"{category}_amounts"].append((current_time, transaction['amount']))
        self.metrics[f"{category}_count"].append((current_time, 1))
        
        # Clean old data outside window
        self.clean_old_data(current_time)
    
    def clean_old_data(self, current_time):
        """Remove data outside the time window"""
        cutoff_time = current_time - timedelta(seconds=self.window_size)
        
        for key in list(self.metrics.keys()):
            while (self.metrics[key] and 
                   self.metrics[key][0][0] < cutoff_time):
                self.metrics[key].popleft()
    
    def print_realtime_metrics(self):
        """Print current window metrics"""
        print(f"\n📊 Real-time Metrics (Last {self.window_size}s window):")
        print(f"   Processed: {self.processed_count} transactions")
        
        # Calculate metrics for each category
        for category in ['Electronics', 'Clothing', 'Books', 'Home']:
            amounts_key = f"{category}_amounts"
            count_key = f"{category}_count"
            
            if amounts_key in self.metrics and self.metrics[amounts_key]:
                total_amount = sum(amount for _, amount in self.metrics[amounts_key])
                total_count = len(self.metrics[count_key])
                avg_amount = total_amount / total_count if total_count > 0 else 0
                
                print(f"   {category}: {total_count} txns, ${total_amount:.2f} total, ${avg_amount:.2f} avg")

# Initialize stream processor
processor = StreamProcessor(window_size=30)

# Start processing in a separate thread
processor.running = True
processing_thread = Thread(target=processor.process_stream)
processing_thread.start()

# Generate stream data
stream_thread = Thread(target=processor.generate_stream_data, args=(20,))
stream_thread.start()

# Wait for completion
stream_thread.join()
processor.running = False
processing_thread.join()

print(f"\n🎯 Stream Processing Summary:")
print(f"   Total Transactions Processed: {processor.processed_count}")
print(f"   Average Processing Rate: {processor.processed_count/20:.1f} transactions/second")

## 5. Stream Processing dengan Kafka dan Python

Apache Kafka adalah platform streaming terdistribusi yang memungkinkan real-time data streaming. Mari kita simulasikan penggunaan Kafka untuk stream processing.

In [None]:
# Kafka Simulation (Mock implementation since actual Kafka requires server setup)
class KafkaSimulator:
    """Simulate Kafka producer and consumer for learning purposes"""
    
    def __init__(self):
        self.topics = defaultdict(list)
        self.consumers = {}
        
    def create_producer(self, topic):
        """Create a Kafka producer"""
        return KafkaProducerSim(self, topic)
        
    def create_consumer(self, topic, group_id):
        """Create a Kafka consumer"""
        return KafkaConsumerSim(self, topic, group_id)

class KafkaProducerSim:
    """Simulated Kafka Producer"""
    
    def __init__(self, kafka_sim, topic):
        self.kafka_sim = kafka_sim
        self.topic = topic
        self.message_count = 0
        
    def send(self, message):
        """Send message to topic"""
        timestamp = datetime.now()
        self.kafka_sim.topics[self.topic].append({
            'timestamp': timestamp,
            'message': message,
            'offset': len(self.kafka_sim.topics[self.topic])
        })
        self.message_count += 1
        
    def flush(self):
        """Flush any pending messages"""
        pass

class KafkaConsumerSim:
    """Simulated Kafka Consumer"""
    
    def __init__(self, kafka_sim, topic, group_id):
        self.kafka_sim = kafka_sim
        self.topic = topic
        self.group_id = group_id
        self.offset = 0
        
    def poll(self, timeout_ms=1000):
        """Poll for new messages"""
        messages = []
        topic_messages = self.kafka_sim.topics[self.topic]
        
        while self.offset < len(topic_messages):
            messages.append(topic_messages[self.offset])
            self.offset += 1
            
        return {self.topic: messages} if messages else {}
        
    def commit(self):
        """Commit current offset"""
        pass

# Initialize Kafka simulator
kafka_sim = KafkaSimulator()

print("🚀 Kafka Simulator initialized!")
print("📝 Note: This is a simulation. In production, you would:")
print("   1. Install kafka-python: pip install kafka-python")
print("   2. Start Kafka server")
print("   3. Create topics")
print("   4. Use actual KafkaProducer and KafkaConsumer classes")

In [None]:
# Implement Kafka-style streaming pipeline
def kafka_producer_demo():
    """Demonstrate Kafka producer sending e-commerce events"""
    print("🔄 Starting Kafka Producer Demo...")
    
    # Create producer for e-commerce events
    producer = kafka_sim.create_producer('ecommerce-events')
    
    # Generate and send events
    for i in range(50):
        event = {
            'event_id': f'EVENT_{i+1:03d}',
            'event_type': random.choice(['purchase', 'cart_add', 'view', 'search']),
            'customer_id': f'CUST_{random.randint(1, 100):03d}',
            'product_id': f'PROD_{random.randint(1, 500):03d}',
            'timestamp': datetime.now().isoformat(),
            'amount': round(random.uniform(10, 300), 2) if random.random() > 0.3 else None
        }
        
        producer.send(json.dumps(event))
        
        if (i + 1) % 10 == 0:
            print(f"   📤 Sent {i+1} events to topic 'ecommerce-events'")
            
        time.sleep(0.1)  # Simulate real-time streaming
    
    producer.flush()
    print(f"✅ Producer sent {producer.message_count} events")
    return producer.message_count

def kafka_consumer_demo():
    """Demonstrate Kafka consumer processing events"""
    print("\n🔄 Starting Kafka Consumer Demo...")
    
    # Create consumer for processing events
    consumer = kafka_sim.create_consumer('ecommerce-events', 'analytics-group')
    
    # Process events
    processed_events = 0
    event_types = defaultdict(int)
    total_revenue = 0
    
    while True:
        # Poll for new messages
        message_batch = consumer.poll(timeout_ms=1000)
        
        if not message_batch:
            break
            
        for topic, messages in message_batch.items():
            for msg in messages:
                event = json.loads(msg['message'])
                processed_events += 1
                
                # Process different event types
                event_types[event['event_type']] += 1
                
                if event['event_type'] == 'purchase' and event['amount']:
                    total_revenue += event['amount']
                
                # Real-time processing simulation
                if processed_events % 10 == 0:
                    print(f"   📥 Processed {processed_events} events")
    
    consumer.commit()
    
    print(f"✅ Consumer processed {processed_events} events")
    print(f"📊 Event Type Distribution:")
    for event_type, count in event_types.items():
        print(f"   {event_type}: {count}")
    print(f"💰 Total Revenue from purchases: ${total_revenue:.2f}")
    
    return processed_events, dict(event_types), total_revenue

# Run Kafka demo
producer_count = kafka_producer_demo()
consumer_count, event_distribution, revenue = kafka_consumer_demo()

# Verify message delivery
print(f"\n✅ Message Delivery Verification:")
print(f"   Messages Sent: {producer_count}")
print(f"   Messages Processed: {consumer_count}")
print(f"   Delivery Success: {'✅' if producer_count == consumer_count else '❌'}")

## 6. Real-time Data Processing Pipeline

Mari kita bangun pipeline lengkap yang menggabungkan stream ingestion, real-time processing, dan output.

In [None]:
class RealTimePipeline:
    """Complete real-time data processing pipeline"""
    
    def __init__(self):
        self.metrics = {
            'total_events': 0,
            'fraud_alerts': 0,
            'high_value_transactions': 0,
            'processing_latency': []
        }
        self.alerts = []
        self.running = False
        
    def fraud_detection(self, transaction):
        """Simple fraud detection rules"""
        # Rule 1: Unusually high amount
        if transaction['amount'] > 1000:
            return True, "High amount transaction"
            
        # Rule 2: Multiple transactions from same customer in short time
        # (Simplified for demo)
        if random.random() < 0.05:  # 5% chance for demo
            return True, "Suspicious pattern detected"
            
        return False, None
        
    def process_transaction(self, transaction):
        """Process individual transaction with business logic"""
        start_time = time.time()
        
        # 1. Fraud Detection
        is_fraud, fraud_reason = self.fraud_detection(transaction)
        
        if is_fraud:
            self.metrics['fraud_alerts'] += 1
            alert = {
                'timestamp': datetime.now(),
                'transaction_id': transaction['transaction_id'],
                'reason': fraud_reason,
                'amount': transaction['amount']
            }
            self.alerts.append(alert)
            print(f"🚨 FRAUD ALERT: {fraud_reason} - Transaction {transaction['transaction_id']}")
        
        # 2. High-value transaction detection
        if transaction['amount'] > 500:
            self.metrics['high_value_transactions'] += 1
            print(f"💎 High-value transaction: ${transaction['amount']:.2f}")
        
        # 3. Real-time analytics update
        self.metrics['total_events'] += 1
        
        # 4. Calculate processing latency
        processing_time = (time.time() - start_time) * 1000  # ms
        self.metrics['processing_latency'].append(processing_time)
        
        return {
            'transaction_id': transaction['transaction_id'],
            'processed_at': datetime.now(),
            'is_fraud': is_fraud,
            'processing_time_ms': processing_time
        }
    
    def run_pipeline(self, duration=15):
        """Run the complete pipeline"""
        print(f"🚀 Starting Real-time Processing Pipeline for {duration} seconds...")
        self.running = True
        start_time = time.time()
        
        while time.time() - start_time < duration:
            # Simulate incoming transaction
            transaction = {
                'transaction_id': f'RT_{self.metrics["total_events"]+1:06d}',
                'customer_id': f'CUST_{random.randint(1, 1000):05d}',
                'amount': round(random.uniform(5, 1200), 2),
                'timestamp': datetime.now(),
                'merchant': random.choice(['Amazon', 'Walmart', 'Target', 'BestBuy', 'Costco'])
            }
            
            # Process transaction
            result = self.process_transaction(transaction)
            
            # Print metrics every 20 transactions
            if self.metrics['total_events'] % 20 == 0:
                self.print_metrics()
            
            # Simulate processing delay
            time.sleep(random.uniform(0.1, 0.3))
        
        self.running = False
        print("\n✅ Pipeline processing completed!")
        self.print_final_metrics()
    
    def print_metrics(self):
        """Print current metrics"""
        avg_latency = np.mean(self.metrics['processing_latency']) if self.metrics['processing_latency'] else 0
        print(f"📊 Current Metrics: {self.metrics['total_events']} events, "
              f"{self.metrics['fraud_alerts']} fraud alerts, "
              f"{avg_latency:.2f}ms avg latency")
    
    def print_final_metrics(self):
        """Print final pipeline metrics"""
        avg_latency = np.mean(self.metrics['processing_latency'])
        max_latency = np.max(self.metrics['processing_latency'])
        min_latency = np.min(self.metrics['processing_latency'])
        
        print(f"\n📈 Final Pipeline Metrics:")
        print(f"   Total Events Processed: {self.metrics['total_events']}")
        print(f"   Fraud Alerts Generated: {self.metrics['fraud_alerts']}")
        print(f"   High-Value Transactions: {self.metrics['high_value_transactions']}")
        print(f"   Processing Latency:")
        print(f"     - Average: {avg_latency:.2f}ms")
        print(f"     - Min: {min_latency:.2f}ms")
        print(f"     - Max: {max_latency:.2f}ms")
        print(f"   Fraud Rate: {(self.metrics['fraud_alerts']/self.metrics['total_events']*100):.1f}%")

# Run the real-time pipeline
pipeline = RealTimePipeline()
pipeline.run_pipeline(duration=20)

# Show fraud alerts
if pipeline.alerts:
    print(f"\n🚨 Fraud Alerts Summary ({len(pipeline.alerts)} total):")
    for alert in pipeline.alerts[-5:]:  # Show last 5 alerts
        print(f"   {alert['timestamp'].strftime('%H:%M:%S')} - "
              f"Transaction {alert['transaction_id']}: {alert['reason']} "
              f"(${alert['amount']:.2f})")

## 7. Performance Comparison: Batch vs Stream

Mari kita bandingkan performa antara batch processing dan stream processing menggunakan dataset yang sama.

In [None]:
def performance_comparison():
    """Compare batch vs stream processing performance"""
    
    # Generate test dataset
    test_size = 10000
    print(f"🧪 Performance Test - Processing {test_size:,} records")
    
    # Generate test data
    test_data = []
    for i in range(test_size):
        test_data.append({
            'id': i,
            'value': random.uniform(1, 1000),
            'category': random.choice(['A', 'B', 'C', 'D']),
            'timestamp': datetime.now() - timedelta(seconds=random.randint(0, 3600))
        })
    
    test_df = pd.DataFrame(test_data)
    
    # Batch Processing Test
    print("\n📊 Batch Processing Test:")
    batch_start = time.time()
    
    # Batch operations
    batch_results = {
        'total_value': test_df['value'].sum(),
        'avg_value': test_df['value'].mean(),
        'category_counts': test_df['category'].value_counts().to_dict(),
        'max_value': test_df['value'].max(),
        'std_value': test_df['value'].std()
    }
    
    batch_time = time.time() - batch_start
    batch_memory = psutil.Process().memory_info().rss / 1024**2  # MB
    
    print(f"   ⏱️  Processing Time: {batch_time:.4f} seconds")
    print(f"   🧠 Memory Usage: {batch_memory:.2f} MB")
    print(f"   🚀 Throughput: {test_size/batch_time:.0f} records/second")
    
    # Stream Processing Test
    print("\n🌊 Stream Processing Test:")
    stream_start = time.time()
    
    # Stream processing simulation
    stream_results = {
        'total_value': 0,
        'count': 0,
        'category_counts': defaultdict(int),
        'max_value': 0,
        'values_sum_sq': 0  # For std calculation
    }
    
    latencies = []
    
    for record in test_data:
        record_start = time.time()
        
        # Process individual record
        stream_results['total_value'] += record['value']
        stream_results['count'] += 1
        stream_results['category_counts'][record['category']] += 1
        stream_results['max_value'] = max(stream_results['max_value'], record['value'])
        stream_results['values_sum_sq'] += record['value'] ** 2
        
        # Calculate per-record latency
        record_latency = (time.time() - record_start) * 1000  # ms
        latencies.append(record_latency)
    
    # Calculate final aggregations
    stream_results['avg_value'] = stream_results['total_value'] / stream_results['count']
    variance = (stream_results['values_sum_sq'] / stream_results['count']) - (stream_results['avg_value'] ** 2)
    stream_results['std_value'] = variance ** 0.5
    
    stream_time = time.time() - stream_start
    stream_memory = psutil.Process().memory_info().rss / 1024**2  # MB
    
    print(f"   ⏱️  Processing Time: {stream_time:.4f} seconds")
    print(f"   🧠 Memory Usage: {stream_memory:.2f} MB")
    print(f"   🚀 Throughput: {test_size/stream_time:.0f} records/second")
    print(f"   📊 Avg Record Latency: {np.mean(latencies):.4f}ms")
    print(f"   📊 Max Record Latency: {np.max(latencies):.4f}ms")
    
    # Comparison Summary
    print(f"\n⚖️  Performance Comparison Summary:")
    print(f"   Processing Time - Batch: {batch_time:.4f}s, Stream: {stream_time:.4f}s")
    print(f"   Throughput - Batch: {test_size/batch_time:.0f} rec/s, Stream: {test_size/stream_time:.0f} rec/s")
    print(f"   Memory Usage - Batch: {batch_memory:.1f}MB, Stream: {stream_memory:.1f}MB")
    
    # Accuracy comparison
    print(f"\n🎯 Result Accuracy Comparison:")
    print(f"   Total Value - Batch: {batch_results['total_value']:.2f}, Stream: {stream_results['total_value']:.2f}")
    print(f"   Average Value - Batch: {batch_results['avg_value']:.2f}, Stream: {stream_results['avg_value']:.2f}")
    print(f"   Max Value - Batch: {batch_results['max_value']:.2f}, Stream: {stream_results['max_value']:.2f}")
    
    return {
        'batch': {'time': batch_time, 'memory': batch_memory, 'results': batch_results},
        'stream': {'time': stream_time, 'memory': stream_memory, 'results': stream_results, 'latencies': latencies}
    }

# Run performance comparison
comparison_results = performance_comparison()

In [None]:
# Visualize Performance Comparison
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 1. Processing Time Comparison
methods = ['Batch', 'Stream']
times = [comparison_results['batch']['time'], comparison_results['stream']['time']]
axes[0, 0].bar(methods, times, color=['blue', 'green'])
axes[0, 0].set_title('Processing Time Comparison')
axes[0, 0].set_ylabel('Time (seconds)')

# 2. Throughput Comparison
throughputs = [10000/comparison_results['batch']['time'], 10000/comparison_results['stream']['time']]
axes[0, 1].bar(methods, throughputs, color=['blue', 'green'])
axes[0, 1].set_title('Throughput Comparison')
axes[0, 1].set_ylabel('Records/Second')

# 3. Memory Usage Comparison
memory = [comparison_results['batch']['memory'], comparison_results['stream']['memory']]
axes[1, 0].bar(methods, memory, color=['blue', 'green'])
axes[1, 0].set_title('Memory Usage Comparison')
axes[1, 0].set_ylabel('Memory (MB)')

# 4. Stream Processing Latency Distribution
latencies = comparison_results['stream']['latencies']
axes[1, 1].hist(latencies, bins=50, color='green', alpha=0.7)
axes[1, 1].set_title('Stream Processing Latency Distribution')
axes[1, 1].set_xlabel('Latency (ms)')
axes[1, 1].set_ylabel('Frequency')

plt.tight_layout()
plt.show()

# Create comparison table
comparison_df = pd.DataFrame({
    'Metric': ['Processing Time (s)', 'Throughput (rec/s)', 'Memory Usage (MB)', 'Latency (ms)'],
    'Batch Processing': [
        f"{comparison_results['batch']['time']:.4f}",
        f"{10000/comparison_results['batch']['time']:.0f}",
        f"{comparison_results['batch']['memory']:.2f}",
        'N/A (Batch)'
    ],
    'Stream Processing': [
        f"{comparison_results['stream']['time']:.4f}",
        f"{10000/comparison_results['stream']['time']:.0f}",
        f"{comparison_results['stream']['memory']:.2f}",
        f"{np.mean(latencies):.4f}"
    ]
})

print("\n📋 Performance Comparison Table:")
print(comparison_df.to_string(index=False))

## 8. Praktikum: Log Processing System

Sebagai latihan akhir, mari kita implementasikan sistem pemrosesan log yang lengkap yang menangani batch analysis untuk log historis dan real-time monitoring untuk log yang masuk.

In [None]:
import re
from collections import Counter

class LogProcessor:
    """Complete log processing system for batch and stream processing"""
    
    def __init__(self):
        self.log_pattern = re.compile(
            r'(?P<ip>\d+\.\d+\.\d+\.\d+) - - \[(?P<timestamp>[^\]]+)\] '
            r'"(?P<method>\w+) (?P<url>[^\s]+) HTTP/[^"]*" '
            r'(?P<status>\d+) (?P<size>\d+|-)'
        )
        
    def generate_apache_logs(self, num_logs=5000):
        """Generate sample Apache access logs"""
        methods = ['GET', 'POST', 'PUT', 'DELETE']
        urls = ['/api/users', '/api/products', '/home', '/login', '/checkout', '/search']
        status_codes = [200, 301, 404, 500, 503]
        
        logs = []
        for i in range(num_logs):
            ip = f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}"
            timestamp = (datetime.now() - timedelta(hours=random.randint(0, 72))).strftime('%d/%b/%Y:%H:%M:%S +0000')
            method = random.choice(methods)
            url = random.choice(urls)
            status = random.choice(status_codes)
            size = random.randint(200, 5000) if status == 200 else '-'
            
            log_entry = f'{ip} - - [{timestamp}] "{method} {url} HTTP/1.1" {status} {size}'
            logs.append(log_entry)
            
        return logs
    
    def parse_log_entry(self, log_entry):
        """Parse individual log entry"""
        match = self.log_pattern.match(log_entry)
        if match:
            return match.groupdict()
        return None
    
    def batch_log_analysis(self, logs):
        """Comprehensive batch analysis of historical logs"""
        print("🔍 Starting Batch Log Analysis...")
        start_time = time.time()
        
        parsed_logs = []
        parse_errors = 0
        
        # Parse all logs
        for log in logs:
            parsed = self.parse_log_entry(log)
            if parsed:
                parsed_logs.append(parsed)
            else:
                parse_errors += 1
        
        # Create DataFrame for analysis
        df_logs = pd.DataFrame(parsed_logs)
        df_logs['timestamp'] = pd.to_datetime(df_logs['timestamp'], format='%d/%b/%Y:%H:%M:%S +0000')
        df_logs['status'] = df_logs['status'].astype(int)
        df_logs['size'] = pd.to_numeric(df_logs['size'], errors='coerce')
        
        # Batch Analytics
        analytics = {
            'total_requests': len(df_logs),
            'unique_ips': df_logs['ip'].nunique(),
            'status_distribution': df_logs['status'].value_counts().to_dict(),
            'method_distribution': df_logs['method'].value_counts().to_dict(),
            'top_urls': df_logs['url'].value_counts().head(5).to_dict(),
            'error_rate': (df_logs['status'] >= 400).sum() / len(df_logs) * 100,
            'avg_response_size': df_logs['size'].mean(),
            'peak_hour': df_logs['timestamp'].dt.hour.value_counts().index[0],
            'parse_errors': parse_errors
        }
        
        processing_time = time.time() - start_time
        analytics['processing_time'] = processing_time
        
        print(f"✅ Batch analysis completed in {processing_time:.2f} seconds")
        return analytics, df_logs
    
    def stream_log_monitoring(self, log_stream, duration=15):
        """Real-time log monitoring and alerting"""
        print(f"🌊 Starting Real-time Log Monitoring for {duration} seconds...")
        
        metrics = {
            'total_requests': 0,
            'error_count': 0,
            'unique_ips': set(),
            'suspicious_ips': Counter(),
            'response_times': [],
            'alerts': []
        }
        
        start_time = time.time()
        
        while time.time() - start_time < duration:
            try:
                # Simulate log entry arrival
                if hasattr(log_stream, '__iter__'):
                    log_entry = next(iter(log_stream))
                else:
                    log_entry = log_stream
                
                parsed = self.parse_log_entry(log_entry)
                if not parsed:
                    continue
                
                # Real-time processing
                metrics['total_requests'] += 1
                metrics['unique_ips'].add(parsed['ip'])
                
                # Track requests per IP for suspicious activity
                metrics['suspicious_ips'][parsed['ip']] += 1
                
                # Error detection
                status_code = int(parsed['status'])
                if status_code >= 400:
                    metrics['error_count'] += 1
                    
                    # Generate alert for 5xx errors
                    if status_code >= 500:
                        alert = {
                            'timestamp': datetime.now(),
                            'type': 'Server Error',
                            'ip': parsed['ip'],
                            'url': parsed['url'],
                            'status': status_code
                        }
                        metrics['alerts'].append(alert)
                        print(f"🚨 Server Error Alert: {parsed['ip']} - {parsed['url']} - Status {status_code}")
                
                # Suspicious activity detection (>50 requests from same IP)
                if metrics['suspicious_ips'][parsed['ip']] > 50:
                    alert = {
                        'timestamp': datetime.now(),
                        'type': 'Suspicious Activity',
                        'ip': parsed['ip'],
                        'request_count': metrics['suspicious_ips'][parsed['ip']]
                    }
                    metrics['alerts'].append(alert)
                    print(f"🚨 Suspicious Activity: {parsed['ip']} made {metrics['suspicious_ips'][parsed['ip']]} requests")
                
                # Print metrics every 100 requests
                if metrics['total_requests'] % 100 == 0:
                    error_rate = (metrics['error_count'] / metrics['total_requests']) * 100
                    print(f"📊 Processed: {metrics['total_requests']} requests, "
                          f"Error rate: {error_rate:.1f}%, "
                          f"Unique IPs: {len(metrics['unique_ips'])}")
                
                time.sleep(0.01)  # Simulate processing time
                
            except StopIteration:
                break
        
        print(f"✅ Stream monitoring completed!")
        return metrics

# Initialize log processor
log_processor = LogProcessor()

# Generate sample logs
print("📝 Generating sample Apache access logs...")
sample_logs = log_processor.generate_apache_logs(5000)
print(f"Generated {len(sample_logs):,} log entries")

# Show sample log entries
print(f"\n📋 Sample log entries:")
for i in range(3):
    print(f"   {sample_logs[i]}")

# Batch Analysis
batch_results, logs_df = log_processor.batch_log_analysis(sample_logs)

print(f"\n📊 Batch Analysis Results:")
print(f"   Total Requests: {batch_results['total_requests']:,}")
print(f"   Unique IPs: {batch_results['unique_ips']:,}")
print(f"   Error Rate: {batch_results['error_rate']:.2f}%")
print(f"   Peak Hour: {batch_results['peak_hour']}:00")
print(f"   Processing Time: {batch_results['processing_time']:.2f} seconds")

print(f"\n🔍 Top 5 URLs:")
for url, count in batch_results['top_urls'].items():
    print(f"   {url}: {count} requests")

print(f"\n📊 Status Code Distribution:")
for status, count in sorted(batch_results['status_distribution'].items()):
    print(f"   {status}: {count} requests")

In [None]:
# Stream Monitoring Demo
print("\n" + "="*60)
print("🌊 STREAM MONITORING DEMO")
print("="*60)

# Simulate real-time log stream
def simulate_log_stream():
    """Generator that simulates real-time log entries"""
    log_templates = [
        '{ip} - - [{timestamp}] "GET /api/users HTTP/1.1" 200 1234',
        '{ip} - - [{timestamp}] "POST /login HTTP/1.1" 200 567',
        '{ip} - - [{timestamp}] "GET /products HTTP/1.1" 404 890',
        '{ip} - - [{timestamp}] "POST /checkout HTTP/1.1" 500 0',
        '{ip} - - [{timestamp}] "GET /search HTTP/1.1" 503 0'
    ]
    
    while True:
        ip = f"{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}.{random.randint(1,255)}"
        timestamp = datetime.now().strftime('%d/%b/%Y:%H:%M:%S +0000')
        template = random.choice(log_templates)
        yield template.format(ip=ip, timestamp=timestamp)

# Run stream monitoring
log_stream = simulate_log_stream()
stream_metrics = log_processor.stream_log_monitoring(log_stream, duration=10)

print(f"\n📈 Stream Monitoring Results:")
print(f"   Total Requests Processed: {stream_metrics['total_requests']:,}")
print(f"   Error Count: {stream_metrics['error_count']}")
print(f"   Unique IPs: {len(stream_metrics['unique_ips'])}")
print(f"   Alerts Generated: {len(stream_metrics['alerts'])}")

if stream_metrics['alerts']:
    print(f"\n🚨 Recent Alerts:")
    for alert in stream_metrics['alerts'][-3:]:  # Show last 3 alerts
        print(f"   {alert['timestamp'].strftime('%H:%M:%S')} - {alert['type']}: {alert}")

# Visualize log analysis results
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 1. Status Code Distribution
status_codes = list(batch_results['status_distribution'].keys())
status_counts = list(batch_results['status_distribution'].values())
axes[0, 0].bar(status_codes, status_counts, color=['green', 'orange', 'red'])
axes[0, 0].set_title('HTTP Status Code Distribution')
axes[0, 0].set_xlabel('Status Code')
axes[0, 0].set_ylabel('Count')

# 2. Request Methods
methods = list(batch_results['method_distribution'].keys())
method_counts = list(batch_results['method_distribution'].values())
axes[0, 1].pie(method_counts, labels=methods, autopct='%1.1f%%')
axes[0, 1].set_title('HTTP Method Distribution')

# 3. Hourly Request Pattern
hourly_requests = logs_df['timestamp'].dt.hour.value_counts().sort_index()
axes[1, 0].plot(hourly_requests.index, hourly_requests.values, marker='o')
axes[1, 0].set_title('Hourly Request Pattern')
axes[1, 0].set_xlabel('Hour of Day')
axes[1, 0].set_ylabel('Number of Requests')
axes[1, 0].grid(True)

# 4. Response Size Distribution
valid_sizes = logs_df['size'].dropna()
axes[1, 1].hist(valid_sizes, bins=30, alpha=0.7, color='skyblue')
axes[1, 1].set_title('Response Size Distribution')
axes[1, 1].set_xlabel('Response Size (bytes)')
axes[1, 1].set_ylabel('Frequency')

plt.tight_layout()
plt.show()

print(f"\n🎯 Log Processing System Summary:")
print(f"   Batch Processing: Analyzed {batch_results['total_requests']:,} historical logs")
print(f"   Stream Processing: Monitored {stream_metrics['total_requests']:,} real-time logs")
print(f"   System successfully demonstrated both processing paradigms!")

## 📚 Kesimpulan dan Pembelajaran

### Perbedaan Utama Batch vs Stream Processing:

| Aspek | Batch Processing | Stream Processing |
|-------|------------------|-------------------|
| **Latency** | High (minutes-hours) | Low (milliseconds-seconds) |
| **Throughput** | Very High | Medium-High |
| **Data Volume** | Large datasets | Individual records/small batches |
| **Use Cases** | Historical analysis, ETL, Reports | Real-time monitoring, Fraud detection |
| **Complexity** | Lower | Higher |
| **Resource Usage** | Scheduled, intensive | Continuous, moderate |

### Kapan Menggunakan Batch Processing:
- ✅ Analisis data historis
- ✅ ETL (Extract, Transform, Load) pipelines
- ✅ Laporan berkala (harian, mingguan, bulanan)
- ✅ Machine learning model training
- ✅ Data warehousing operations

### Kapan Menggunakan Stream Processing:
- ✅ Real-time monitoring dan alerting
- ✅ Fraud detection
- ✅ Live dashboards
- ✅ IoT data processing
- ✅ Recommendation engines
- ✅ Trading systems

### Tools dan Teknologi:

**Batch Processing:**
- Apache Spark
- Apache Hadoop (MapReduce)
- Pandas (Python)
- Apache Airflow
- AWS Batch, Google Cloud Dataflow

**Stream Processing:**
- Apache Kafka + Kafka Streams
- Apache Storm
- Apache Flink
- Apache Spark Streaming
- AWS Kinesis, Azure Stream Analytics

### Hybrid Architecture:
Dalam praktik modern, banyak sistem menggunakan **Lambda Architecture** atau **Kappa Architecture** yang menggabungkan batch dan stream processing untuk mendapatkan keuntungan dari kedua paradigma.

---

---

# 📝 PENUGASAN: Analisis dan Pembahasan Kode

## Petunjuk Pengerjaan:
1. **Jawab semua pertanyaan dengan detail dan contoh kode**
2. **Sertakan screenshot hasil eksekusi jika diperlukan**
3. **Berikan analisis perbandingan yang mendalam**
4. **Tugas dikerjakan secara individual**
5. **Format jawaban dalam markdown atau dokumen terstruktur**

---

## 🎯 TUGAS 1: Analisis Batch Processing dengan Pandas

### A. Analisis Kode Generate Data

**Pertanyaan:**
1. Jelaskan mengapa kita menggunakan `np.random.seed(42)` dalam fungsi `generate_ecommerce_data()`?
2. Analisis struktur data yang dihasilkan dan jelaskan setiap field yang ada
3. Berapa memory usage untuk dataset 100,000 records? Hitung estimasi untuk 1 juta records
4. Mengapa kita menggunakan `round()` pada `product_price` dan `total_amount`?

**Yang harus dijawab:**
```python
# Contoh analisis yang diharapkan:
# 1. Fungsi random seed
# 2. Analisis memory usage
# 3. Data type optimization
# 4. Scaling considerations
```

### B. Analisis Operasi Batch Processing

**Pertanyaan:**
1. Jelaskan setiap operasi dalam fungsi `batch_analytics()`:
   - Category sales aggregation
   - Daily sales trend
   - Customer segmentation
   - Payment method analysis
   - Age group analysis

2. Mengapa kita menggunakan `.round(2)` pada hasil aggregation?

3. Analisis performa: bagaimana performa berubah jika dataset bertambah 10x lipat?

4. Bandingkan penggunaan `groupby()` vs `pivot_table()` untuk kasus ini

**Kode yang harus dianalisis:**
```python
# Analisis kode berikut:
category_sales = df.groupby('product_category').agg({
    'total_amount': ['sum', 'mean', 'count'],
    'quantity': 'sum'
}).round(2)

# Bandingkan dengan alternatif:
# pivot_table, query(), dll
```

## 🎯 TUGAS 2: Analisis Spark vs Pandas Simulation

### A. Perbandingan Implementasi

**Pertanyaan:**
1. Bandingkan syntax Spark SQL vs Pandas operations. Berikan 3 contoh operasi yang menunjukkan perbedaan:

```sql
-- Spark SQL
SELECT DATE(timestamp) as date, 
       SUM(total_amount) as daily_total,
       COUNT(*) as daily_transactions
FROM transactions 
GROUP BY DATE(timestamp) 
ORDER BY date
```

```python
# Pandas equivalent
df.groupby(df['timestamp'].dt.date).agg({
    'total_amount': 'sum',
    'transaction_id': 'count'
})
```

2. Mengapa Spark menggunakan lazy evaluation? Apa keuntungan dan kerugiannya?

3. Jelaskan konsep caching dalam Spark dan bandingkan dengan memory management Pandas

4. Kapan sebaiknya menggunakan Spark vs Pandas? Berikan decision matrix

### B. Error Handling dan Fallback Strategy

**Pertanyaan:**
1. Analisis error handling dalam kode PySpark setup:
```python
try:
    spark = SparkSession.builder...
except Exception as e:
    print("⚠️ PySpark not available...")
    SPARK_AVAILABLE = False
```

2. Jelaskan mengapa kita perlu fallback ke Pandas simulation

3. Berikan 3 alternatif deployment untuk Spark yang disebutkan dalam error message

4. Apa dampak performance jika kita menggunakan Pandas untuk big data?

## 🎯 TUGAS 3: Analisis Stream Processing Implementation

### A. Stream Processor Architecture


**Pertanyaan:**
1. Analisis class `StreamProcessor` dan jelaskan setiap komponen:
   - `data_stream` (Queue)
   - `metrics` (defaultdict dengan deque)
   - `window_size` parameter
   - Threading implementation

2. Jelaskan konsep sliding window dalam stream processing:
```python
def clean_old_data(self, current_time):
    cutoff_time = current_time - timedelta(seconds=self.window_size)
    for key in list(self.metrics.keys()):
        while (self.metrics[key] and 
               self.metrics[key][0][0] < cutoff_time):
            self.metrics[key].popleft()
```

3. Mengapa menggunakan `deque()` instead of regular list untuk windowing?

4. Analisis trade-off antara window size dan memory usage

### B. Threading dan Concurrency

**Pertanyaan:**
1. Jelaskan mengapa kita menggunakan 2 thread terpisah:
   - `processing_thread` 
   - `stream_thread`

2. Bagaimana koordinasi antara producer dan consumer thread?

3. Apa yang terjadi jika processing thread lebih lambat dari data generation?

4. Analisis potential race conditions dan bagaimana mengatasinya

**Kode yang harus dianalisis:**
```python
# Thread coordination
processor.running = True
processing_thread = Thread(target=processor.process_stream)
processing_thread.start()

stream_thread = Thread(target=processor.generate_stream_data, args=(20,))
stream_thread.start()

# Wait for completion
stream_thread.join()
processor.running = False
processing_thread.join()
```