# Lab 1: Kafka Basics - Stock Market Data Streaming

## 🎯 Objectives
- Understand Kafka fundamentals: Topics, Partitions, Producers, Consumers
- Learn message serialization with JSON and custom serializers
- Practice basic producer/consumer patterns
- Explore Kafka UI for monitoring

## 📋 Prerequisites
- Kafka cluster running (`docker compose up -d`)
- Python dependencies installed (`./setup_kafka_lab.sh`)
- Basic understanding of message queues

## 🏗️ Architecture Overview
```
Stock Data Generator → Kafka Producer → Stock Data Topic
                                                      ↓
                                              Multiple Consumers
                                                      ↓
                                            Analytics, Alerts, Storage
```


In [11]:
# Install and Import Dependencies
%pip install kafka-python confluent-kafka pandas matplotlib seaborn plotly

import json
import time
import random
from datetime import datetime, timedelta
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List

print("✅ Dependencies installed and imported successfully!")


Note: you may need to restart the kernel to use updated packages.
✅ Dependencies installed and imported successfully!


In [12]:
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
TOPIC_NAME = 'stock-data'

# Stock symbols for our lab
STOCK_SYMBOLS = [
    "AAPL", "GOOGL", "MSFT", "TSLA", "AMZN", 
    "META", "NVDA", "NFLX", "ADBE", "CRM"
]

# Base prices for realistic data generation
BASE_PRICES = {
    "AAPL": 150.0, "GOOGL": 2800.0, "MSFT": 350.0, "TSLA": 250.0, "AMZN": 3200.0,
    "META": 300.0, "NVDA": 450.0, "NFLX": 400.0, "ADBE": 500.0, "CRM": 200.0
}

print(f"📊 Configured for {len(STOCK_SYMBOLS)} stock symbols")
print(f"🔗 Kafka Bootstrap Servers: {KAFKA_BOOTSTRAP_SERVERS}")
print(f"📝 Topic Name: {TOPIC_NAME}")


📊 Configured for 10 stock symbols
🔗 Kafka Bootstrap Servers: localhost:9092
📝 Topic Name: stock-data


In [13]:
# Stock Data Generator
class StockDataGenerator:
    """Generate realistic OHLCV stock data"""
    
    def __init__(self, symbols: List[str], base_prices: Dict[str, float]):
        self.symbols = symbols
        self.base_prices = base_prices
        self.current_prices = base_prices.copy()
    
    def generate_ohlcv(self, symbol: str) -> Dict:
        """Generate OHLCV data for a stock symbol"""
        current_price = self.current_prices[symbol]
        price_change = random.uniform(-0.01, 0.01)
        new_price = current_price * (1 + price_change)
        
        open_price = round(new_price * random.uniform(0.999, 1.001), 2)
        close_price = round(new_price * random.uniform(0.999, 1.001), 2)
        high_price = round(max(open_price, close_price) * random.uniform(1.001, 1.003), 2)
        low_price = round(min(open_price, close_price) * random.uniform(0.997, 0.999), 2)
        
        base_volume = random.randint(100000, 1000000)
        volume = base_volume + random.randint(-100000, 100000)
        
        self.current_prices[symbol] = close_price
        
        return {
            "symbol": symbol,
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "open": open_price,
            "high": high_price,
            "low": low_price,
            "close": close_price,
            "volume": volume,
            "exchange": "NASDAQ"
        }

# Initialize data generator
data_generator = StockDataGenerator(STOCK_SYMBOLS, BASE_PRICES)
print("✅ Stock Data Generator initialized")
print(f"📊 Available symbols: {', '.join(STOCK_SYMBOLS)}")


✅ Stock Data Generator initialized
📊 Available symbols: AAPL, GOOGL, MSFT, TSLA, AMZN, META, NVDA, NFLX, ADBE, CRM


## Exercise 1: Understanding Kafka Fundamentals

### 🎯 **Learning Objectives:**
- Understand Kafka core concepts: Topics, Partitions, Producers, Consumers
- Learn about message serialization and deserialization
- Practice basic producer/consumer patterns
- Explore Kafka cluster monitoring

### 📚 **Key Concepts:**
1. **Topic**: A category or feed name to which messages are published
2. **Partition**: Topics are split into partitions for scalability
3. **Producer**: Application that sends messages to Kafka topics
4. **Consumer**: Application that reads messages from Kafka topics
5. **Broker**: Kafka server that stores and serves messages


In [14]:
# Exercise 1: Create Kafka Producer
print("🔧 Creating Kafka Producer...")

try:
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        key_serializer=lambda k: k.encode('utf-8') if k else None,
        acks='all',
        retries=3,
        batch_size=16384,
        linger_ms=10,
        compression_type='gzip'
    )
    
    print("✅ Kafka Producer created successfully!")
    print(f"📡 Bootstrap servers: {KAFKA_BOOTSTRAP_SERVERS}")
    print(f"📝 Topic: {TOPIC_NAME}")
    
except Exception as e:
    print(f"❌ Failed to create producer: {e}")
    print("💡 Make sure Kafka cluster is running: docker compose up -d")


🔧 Creating Kafka Producer...
✅ Kafka Producer created successfully!
📡 Bootstrap servers: localhost:9092
📝 Topic: stock-data


In [15]:
# Exercise 2: Generate and Send Stock Data
print("📈 Generating and sending stock data...")

def send_stock_data(num_messages: int = 10):
    """Send stock data to Kafka topic"""
    print(f"📊 Sending {num_messages} stock data messages...")
    
    for i in range(num_messages):
        symbol = random.choice(STOCK_SYMBOLS)
        ohlcv_data = data_generator.generate_ohlcv(symbol)
        
        future = producer.send(TOPIC_NAME, key=symbol, value=ohlcv_data)
        record_metadata = future.get(timeout=10)
        
        print(f"📊 Sent {symbol}: ${ohlcv_data['close']} -> Partition {record_metadata.partition}")
        time.sleep(0.1)
    
    producer.flush()
    print(f"✅ Successfully sent {num_messages} messages to topic '{TOPIC_NAME}'")

# Send some test data
send_stock_data(15)


📈 Generating and sending stock data...
📊 Sending 15 stock data messages...
📊 Sent NVDA: $448.27 -> Partition 0
📊 Sent AAPL: $150.96 -> Partition 0
📊 Sent AMZN: $3178.95 -> Partition 0
📊 Sent NFLX: $401.72 -> Partition 0
📊 Sent CRM: $198.56 -> Partition 0
📊 Sent NFLX: $399.14 -> Partition 0
📊 Sent CRM: $200.73 -> Partition 0
📊 Sent NVDA: $448.96 -> Partition 0
📊 Sent MSFT: $352.86 -> Partition 0
📊 Sent GOOGL: $2822.84 -> Partition 0
📊 Sent AMZN: $3160.33 -> Partition 0
📊 Sent CRM: $202.67 -> Partition 0
📊 Sent NVDA: $451.44 -> Partition 0
📊 Sent NVDA: $450.19 -> Partition 0
📊 Sent TSLA: $249.75 -> Partition 0
✅ Successfully sent 15 messages to topic 'stock-data'


In [16]:
# Exercise 3: Create Kafka Consumer
print("🔧 Creating Kafka Consumer...")

try:
    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id='stock-analytics-group',
        auto_offset_reset='earliest',
        enable_auto_commit=True,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        key_deserializer=lambda x: x.decode('utf-8') if x else None,
        consumer_timeout_ms=5000
    )
    
    print("✅ Kafka Consumer created successfully!")
    print(f"👥 Consumer Group: stock-analytics-group")
    print(f"📝 Topic: {TOPIC_NAME}")
    print(f"🔄 Auto Offset Reset: earliest")
    
except Exception as e:
    print(f"❌ Failed to create consumer: {e}")
    print("💡 Make sure Kafka cluster is running: docker compose up -d")


🔧 Creating Kafka Consumer...
✅ Kafka Consumer created successfully!
👥 Consumer Group: stock-analytics-group
📝 Topic: stock-data
🔄 Auto Offset Reset: earliest


In [17]:
# Exercise 4: Consume Messages
print("🔄 Consuming messages from Kafka...")

def consume_messages(max_messages: int = 10):
    """Consume messages from Kafka topic"""
    messages_consumed = 0
    partition_counts = {}
    
    print(f"🚀 Starting to consume up to {max_messages} messages...")
    
    try:
        for message in consumer:
            if messages_consumed >= max_messages:
                break
            
            partition = message.partition
            partition_counts[partition] = partition_counts.get(partition, 0) + 1
            
            key = message.key
            value = message.value
            offset = message.offset
            timestamp = message.timestamp
            
            print(f"📊 Message {messages_consumed + 1}:")
            print(f"   Key: {key}")
            print(f"   Symbol: {value['symbol']}")
            print(f"   Price: ${value['close']}")
            print(f"   Volume: {value['volume']:,}")
            print(f"   Partition: {partition}")
            print(f"   Offset: {offset}")
            print(f"   Timestamp: {datetime.fromtimestamp(timestamp/1000)}")
            print()
            
            messages_consumed += 1
            
    except Exception as e:
        print(f"⚠️ Finished consuming: {e}")
    
    print(f"📈 Consumption Summary:")
    print(f"   Total messages consumed: {messages_consumed}")
    print(f"   Partition distribution: {partition_counts}")
    
    return messages_consumed, partition_counts

# Consume messages
messages_count, partition_distribution = consume_messages(10)


🔄 Consuming messages from Kafka...
🚀 Starting to consume up to 10 messages...
📊 Message 1:
   Key: AAPL
   Symbol: AAPL
   Price: $147.29
   Volume: 849,059
   Partition: 0
   Offset: 0
   Timestamp: 2025-09-21 22:56:47.238000

📊 Message 2:
   Key: CRM
   Symbol: CRM
   Price: $200.24
   Volume: 320,535
   Partition: 0
   Offset: 1
   Timestamp: 2025-09-21 23:04:43.846000

📊 Message 3:
   Key: MSFT
   Symbol: MSFT
   Price: $353.31
   Volume: 613,630
   Partition: 0
   Offset: 2
   Timestamp: 2025-09-21 23:04:43.977000

📊 Message 4:
   Key: AMZN
   Symbol: AMZN
   Price: $3193.57
   Volume: 1,016,545
   Partition: 0
   Offset: 3
   Timestamp: 2025-09-21 23:04:44.094000

📊 Message 5:
   Key: META
   Symbol: META
   Price: $300.17
   Volume: 692,115
   Partition: 0
   Offset: 4
   Timestamp: 2025-09-21 23:04:44.215000

📊 Message 6:
   Key: TSLA
   Symbol: TSLA
   Price: $249.58
   Volume: 881,526
   Partition: 0
   Offset: 5
   Timestamp: 2025-09-21 23:04:44.337000

📊 Message 7:
   Key: 

In [18]:
# Exercise 5: Custom Serializers and Deserializers
print("🔧 Demonstrating custom serialization...")

import pickle

class CustomSerializer:
    """Custom serializer for complex objects"""
    
    @staticmethod
    def serialize(obj):
        """Serialize object to bytes"""
        return pickle.dumps(obj)
    
    @staticmethod
    def deserialize(data):
        """Deserialize bytes to object"""
        return pickle.loads(data)

# Create producer with custom serializer
custom_producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=CustomSerializer.serialize,
    key_serializer=lambda k: k.encode('utf-8') if k else None
)

# Create consumer with custom deserializer
custom_consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    group_id='custom-serialization-group',
    auto_offset_reset='latest',
    enable_auto_commit=True,
    value_deserializer=CustomSerializer.deserialize,
    key_deserializer=lambda x: x.decode('utf-8') if x else None,
    consumer_timeout_ms=3000
)

print("✅ Custom serializers/deserializers created!")

# Test custom serialization
print("\n🧪 Testing custom serialization...")

# Create a complex object
complex_data = {
    "symbol": "AAPL",
    "price_data": {
        "open": 150.0,
        "high": 155.0,
        "low": 148.0,
        "close": 152.0
    },
    "indicators": {
        "sma_20": 150.5,
        "rsi": 65.2,
        "macd": 1.2
    },
    "metadata": {
        "timestamp": datetime.now().isoformat(),
        "source": "market_data_api",
        "version": "1.0"
    }
}

# Send complex data
future = custom_producer.send(TOPIC_NAME, key="AAPL", value=complex_data)
record_metadata = future.get(timeout=10)

print(f"📊 Sent complex data to partition {record_metadata.partition}")

# Consume and verify
print("\n🔄 Consuming complex data...")
for message in custom_consumer:
    print(f"📊 Received complex data:")
    print(f"   Symbol: {message.value['symbol']}")
    print(f"   Price: ${message.value['price_data']['close']}")
    print(f"   RSI: {message.value['indicators']['rsi']}")
    print(f"   Source: {message.value['metadata']['source']}")
    break

print("✅ Custom serialization test completed!")


🔧 Demonstrating custom serialization...
✅ Custom serializers/deserializers created!

🧪 Testing custom serialization...
📊 Sent complex data to partition 0

🔄 Consuming complex data...
✅ Custom serialization test completed!


In [19]:
# Exercise 6: Error Handling and Retry Logic
print("🛡️ Demonstrating error handling and retry logic...")

from functools import wraps

class RetryableError(Exception):
    """Custom exception for retryable errors"""
    pass

class NonRetryableError(Exception):
    """Custom exception for non-retryable errors"""
    pass

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    """Decorator for retry logic with exponential backoff"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = base_delay
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except RetryableError as e:
                    if attempt == max_retries:
                        print(f"❌ Max retries ({max_retries}) exceeded. Giving up.")
                        raise e
                    
                    print(f"⚠️ Attempt {attempt + 1} failed: {e}")
                    print(f"🔄 Retrying in {delay} seconds...")
                    time.sleep(delay)
                    delay = min(delay * 2, max_delay)
                    
                except NonRetryableError as e:
                    print(f"❌ Non-retryable error: {e}")
                    raise e
                    
        return wrapper
    return decorator

# Robust producer with error handling
@retry_with_backoff(max_retries=3, base_delay=1)
def send_message_with_retry(producer, topic, key, value):
    """Send message with retry logic"""
    try:
        future = producer.send(topic, key=key, value=value)
        record_metadata = future.get(timeout=10)
        print(f"✅ Message sent successfully to partition {record_metadata.partition}")
        return record_metadata
    except Exception as e:
        if "timeout" in str(e).lower():
            raise RetryableError(f"Timeout sending message: {e}")
        else:
            raise NonRetryableError(f"Non-retryable error: {e}")

# Create dedicated consumer for error handling demo
error_handling_consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    group_id='error-handling-demo-group',
    auto_offset_reset='latest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    key_deserializer=lambda x: x.decode('utf-8') if x else None,
    consumer_timeout_ms=3000
)

# Robust consumer with error handling
def consume_with_error_handling(consumer, max_messages=5):
    """Consume messages with error handling"""
    messages_processed = 0
    errors_handled = 0
    
    print(f"🔄 Starting robust message consumption...")
    
    try:
        for message in consumer:
            if messages_processed >= max_messages:
                break
                
            try:
                data = message.value
                
                if not isinstance(data, dict) or 'symbol' not in data:
                    print(f"⚠️ Invalid message format, skipping...")
                    errors_handled += 1
                    continue
                
                if random.random() < 0.3:  # 30% chance of error
                    raise RetryableError("Simulated processing error")
                
                print(f"📊 Processed message: {data['symbol']} - ${data['close']}")
                messages_processed += 1
                
            except RetryableError as e:
                print(f"⚠️ Retryable error processing message: {e}")
                errors_handled += 1
                
            except Exception as e:
                print(f"❌ Non-retryable error: {e}")
                errors_handled += 1
                
    except Exception as e:
        print(f"❌ Consumer error: {e}")
    
    print(f"\n📈 Processing Summary:")
    print(f"   Messages processed: {messages_processed}")
    print(f"   Errors handled: {errors_handled}")
    
    return messages_processed, errors_handled

# Test error handling
print("\n🧪 Testing error handling...")

# Send some test data
for i in range(5):
    symbol = random.choice(STOCK_SYMBOLS)
    ohlcv_data = data_generator.generate_ohlcv(symbol)
    
    try:
        send_message_with_retry(producer, TOPIC_NAME, symbol, ohlcv_data)
    except Exception as e:
        print(f"❌ Failed to send message after retries: {e}")

# Test consumer error handling
messages_processed, errors_handled = consume_with_error_handling(error_handling_consumer, 5)

print("\n✅ Error handling demonstration completed!")


🛡️ Demonstrating error handling and retry logic...

🧪 Testing error handling...
✅ Message sent successfully to partition 0
✅ Message sent successfully to partition 0
✅ Message sent successfully to partition 0
✅ Message sent successfully to partition 0
✅ Message sent successfully to partition 0
🔄 Starting robust message consumption...

📈 Processing Summary:
   Messages processed: 0
   Errors handled: 0

✅ Error handling demonstration completed!


In [20]:
# Exercise 7: Performance Analysis and Monitoring
print("📊 Analyzing Kafka performance...")

from collections import defaultdict
import statistics

class PerformanceMonitor:
    """Monitor Kafka performance metrics"""
    
    def __init__(self):
        self.metrics = {
            'producer_throughput': [],
            'consumer_throughput': [],
            'latency': [],
            'message_sizes': [],
            'partition_distribution': defaultdict(int)
        }
    
    def record_producer_metric(self, messages_sent, time_taken):
        """Record producer performance metric"""
        throughput = messages_sent / time_taken if time_taken > 0 else 0
        self.metrics['producer_throughput'].append(throughput)
    
    def record_consumer_metric(self, messages_consumed, time_taken):
        """Record consumer performance metric"""
        throughput = messages_consumed / time_taken if time_taken > 0 else 0
        self.metrics['consumer_throughput'].append(throughput)
    
    def record_latency(self, latency_ms):
        """Record message latency"""
        self.metrics['latency'].append(latency_ms)
    
    def record_message_size(self, size_bytes):
        """Record message size"""
        self.metrics['message_sizes'].append(size_bytes)
    
    def record_partition(self, partition):
        """Record partition distribution"""
        self.metrics['partition_distribution'][partition] += 1
    
    def get_summary(self):
        """Get performance summary"""
        summary = {}
        
        if self.metrics['producer_throughput']:
            summary['avg_producer_throughput'] = statistics.mean(self.metrics['producer_throughput'])
            summary['max_producer_throughput'] = max(self.metrics['producer_throughput'])
        
        if self.metrics['consumer_throughput']:
            summary['avg_consumer_throughput'] = statistics.mean(self.metrics['consumer_throughput'])
            summary['max_consumer_throughput'] = max(self.metrics['consumer_throughput'])
        
        if self.metrics['latency']:
            summary['avg_latency_ms'] = statistics.mean(self.metrics['latency'])
            summary['max_latency_ms'] = max(self.metrics['latency'])
            summary['min_latency_ms'] = min(self.metrics['latency'])
        
        if self.metrics['message_sizes']:
            summary['avg_message_size_bytes'] = statistics.mean(self.metrics['message_sizes'])
            summary['total_messages'] = len(self.metrics['message_sizes'])
        
        summary['partition_distribution'] = dict(self.metrics['partition_distribution'])
        
        return summary

# Initialize performance monitor
perf_monitor = PerformanceMonitor()

# High-performance producer with gzip compression (compatible)
high_perf_producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    batch_size=32768,
    linger_ms=50,
    compression_type='gzip',  # Use gzip (always available)
    acks=1,
    retries=0
)

# High-performance consumer
high_perf_consumer = KafkaConsumer(
    TOPIC_NAME,
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
    group_id='performance-test-group',
    auto_offset_reset='latest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    key_deserializer=lambda x: x.decode('utf-8') if x else None,
    consumer_timeout_ms=2000,
    fetch_min_bytes=1024,
    fetch_max_wait_ms=500
)

print("✅ High-performance producer and consumer created!")

# Performance test: Producer throughput
print("\n🚀 Testing producer throughput...")
num_messages = 100
start_time = time.time()

for i in range(num_messages):
    symbol = random.choice(STOCK_SYMBOLS)
    ohlcv_data = data_generator.generate_ohlcv(symbol)
    
    message_size = len(json.dumps(ohlcv_data).encode('utf-8'))
    perf_monitor.record_message_size(message_size)
    
    future = high_perf_producer.send(TOPIC_NAME, key=symbol, value=ohlcv_data)
    record_metadata = future.get(timeout=5)
    
    perf_monitor.record_partition(record_metadata.partition)

high_perf_producer.flush()
end_time = time.time()
producer_time = end_time - start_time

perf_monitor.record_producer_metric(num_messages, producer_time)

print(f"📊 Producer Performance:")
print(f"   Messages sent: {num_messages}")
print(f"   Time taken: {producer_time:.2f} seconds")
print(f"   Throughput: {num_messages/producer_time:.2f} messages/second")

# Performance test: Consumer throughput
print("\n🔄 Testing consumer throughput...")
start_time = time.time()
messages_consumed = 0

for message in high_perf_consumer:
    if messages_consumed >= num_messages:
        break
    
    data = message.value
    latency_ms = (time.time() * 1000) - message.timestamp
    perf_monitor.record_latency(latency_ms)
    messages_consumed += 1

end_time = time.time()
consumer_time = end_time - start_time

perf_monitor.record_consumer_metric(messages_consumed, consumer_time)

print(f"📊 Consumer Performance:")
print(f"   Messages consumed: {messages_consumed}")
print(f"   Time taken: {consumer_time:.2f} seconds")
print(f"   Throughput: {messages_consumed/consumer_time:.2f} messages/second")

# Get performance summary
summary = perf_monitor.get_summary()
print(f"\n📈 Performance Summary:")
for key, value in summary.items():
    if isinstance(value, float):
        print(f"   {key}: {value:.2f}")
    else:
        print(f"   {key}: {value}")

print("\n✅ Performance analysis completed!")


📊 Analyzing Kafka performance...
✅ High-performance producer and consumer created!

🚀 Testing producer throughput...
📊 Producer Performance:
   Messages sent: 100
   Time taken: 5.44 seconds
   Throughput: 18.40 messages/second

🔄 Testing consumer throughput...
📊 Consumer Performance:
   Messages consumed: 0
   Time taken: 2.00 seconds
   Throughput: 0.00 messages/second

📈 Performance Summary:
   avg_producer_throughput: 18.40
   max_producer_throughput: 18.40
   avg_consumer_throughput: 0.00
   max_consumer_throughput: 0.00
   avg_message_size_bytes: 166.35
   total_messages: 100
   partition_distribution: {0: 100}

✅ Performance analysis completed!


In [27]:
# Exercise 9: Best Practices and Cleanup
print("🧹 Cleaning up resources and reviewing best practices...")

def cleanup_resources():
    """Properly close all Kafka resources"""
    resources_to_close = [
        ('Producer', producer),
        ('Custom Producer', custom_producer),
        ('High Performance Producer', high_perf_producer),
        ('Consumer', consumer),
        ('Custom Consumer', custom_consumer),
        ('High Performance Consumer', high_perf_consumer),
        ('Error Handling Consumer', error_handling_consumer),
        ('Visualization Consumer', visualization_consumer)
    ]
    
    print("🔧 Closing Kafka resources...")
    
    for resource_name, resource in resources_to_close:
        try:
            if resource:
                resource.close()
                print(f"✅ Closed {resource_name}")
        except Exception as e:
            print(f"⚠️ Error closing {resource_name}: {e}")
    
    print("\n✅ All resources closed successfully!")

# Cleanup all resources
cleanup_resources()

# Best Practices Summary
print("\n📚 Kafka Best Practices Summary:")
print("\n🔧 Producer Best Practices:")
print("   1. Use appropriate acknowledgment levels (acks)")
print("   2. Implement retry logic with exponential backoff")
print("   3. Use batching for better throughput")
print("   4. Choose appropriate compression (gzip, snappy, lz4)")
print("   5. Always call flush() before closing")
print("   6. Use meaningful keys for partitioning")

print("\n🔄 Consumer Best Practices:")
print("   1. Use consumer groups for scalability")
print("   2. Implement proper error handling")
print("   3. Monitor consumer lag")
print("   4. Use appropriate auto_offset_reset")
print("   5. Implement idempotent processing")
print("   6. Handle rebalancing gracefully")

print("\n📊 Performance Best Practices:")
print("   1. Tune batch sizes based on use case")
print("   2. Use compression for large messages")
print("   3. Monitor throughput and latency")
print("   4. Optimize partition count")
print("   5. Use appropriate replication factor")
print("   6. Implement proper monitoring")

print("\n🛡️ Error Handling Best Practices:")
print("   1. Implement retry logic with backoff")
print("   2. Use dead letter queues for failed messages")
print("   3. Implement circuit breaker patterns")
print("   4. Log errors appropriately")
print("   5. Monitor error rates")
print("   6. Implement graceful degradation")

print("\n📈 Monitoring Best Practices:")
print("   1. Monitor producer/consumer throughput")
print("   2. Track message latency")
print("   3. Monitor consumer lag")
print("   4. Set up alerts for failures")
print("   5. Use proper logging levels")
print("   6. Implement health checks")

# Lab Summary
print("\n🎯 Lab 1 Summary - What We Learned:")
print("✅ Kafka Fundamentals: Topics, Partitions, Producers, Consumers")
print("✅ Message Serialization: JSON and custom serializers")
print("✅ Error Handling: Retry logic and exception handling")
print("✅ Performance Analysis: Throughput and latency measurement")
print("✅ Data Visualization: Real-time charts and dashboards")
print("✅ Best Practices: Resource management and optimization")

print("\n🚀 Next Steps:")
print("- Lab 2: Consumer Groups and Load Balancing")
print("- Lab 3: Partitioning Strategies")
print("- Lab 4: Offset Management")
print("- Lab 5: Real-time Analytics")

print("\n💡 Key Takeaways:")
print("1. Kafka enables high-throughput, low-latency messaging")
print("2. Proper configuration is crucial for performance")
print("3. Error handling and monitoring are essential")
print("4. Visualization helps understand data patterns")
print("5. Resource cleanup prevents memory leaks")

print("\n🎉 Lab 1 completed successfully!")
print("Ready to move on to Lab 2: Consumer Groups!")


🧹 Cleaning up resources and reviewing best practices...
🔧 Closing Kafka resources...
✅ Closed Producer
✅ Closed Custom Producer
✅ Closed High Performance Producer
✅ Closed Consumer
✅ Closed Custom Consumer
✅ Closed High Performance Consumer
✅ Closed Error Handling Consumer
✅ Closed Visualization Consumer

✅ All resources closed successfully!

📚 Kafka Best Practices Summary:

🔧 Producer Best Practices:
   1. Use appropriate acknowledgment levels (acks)
   2. Implement retry logic with exponential backoff
   3. Use batching for better throughput
   4. Choose appropriate compression (gzip, snappy, lz4)
   5. Always call flush() before closing
   6. Use meaningful keys for partitioning

🔄 Consumer Best Practices:
   1. Use consumer groups for scalability
   2. Implement proper error handling
   3. Monitor consumer lag
   4. Use appropriate auto_offset_reset
   5. Implement idempotent processing
   6. Handle rebalancing gracefully

📊 Performance Best Practices:
   1. Tune batch sizes based 