In [None]:
pip install confluent_kafka

In [None]:
from confluent_kafka import Consumer
import json
from datetime import datetime
import time

In [None]:
config = {
    'bootstrap.servers': 'pkc-619z3.us-east1.gcp.confluent.cloud:9092',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': 'F7GPBXJJMUWEQZ46',  # Replace with your API key
    'sasl.password': 'hcdQ2o8QBNnQazdHj/MD9tldGeP9nQAsXNpkzchCFv0/D4+FcGcftuEU/7syhD37',
    'group.id': 'my_consumer_group',  # Consumer group ID
    'auto.offset.reset': 'earliest',  # Start reading at the earliest message
}

In [None]:
class MessageMetrics:
    """Class to track message consumption metrics"""
    
    def __init__(self):
        self.messages_processed = 0
        self.start_time = datetime.now()
        self.bytes_processed = 0
        self.errors = 0
        
    def update(self, message_size):
        self.messages_processed += 1
        self.bytes_processed += message_size
        
    def get_stats(self):
        duration = (datetime.now() - self.start_time).total_seconds()
        msg_rate = self.messages_processed / duration if duration > 0 else 0
        
        return {
            'messages_processed': self.messages_processed,
            'runtime_seconds': f"{duration:.2f}",
            'messages_per_second': f"{msg_rate:.2f}",
            'bytes_processed': self.bytes_processed,
            'errors': self.errors
        }

metrics = MessageMetrics()

In [None]:
def process_message(msg):
    """Process a single message and update metrics"""
    try:
        # Decode and parse the message
        message_data = json.loads(msg.value().decode('utf-8'))
        
        # Update metrics
        metrics.update(len(msg.value()))
        
        # Extract key information
        customer_id = message_data.get('id')
        country = message_data.get('country')
        company = message_data.get('company')
        produced_at = message_data.get('produced_at')
        
        # Calculate processing latency if produced_at is available
        if produced_at:
            produced_time = datetime.fromisoformat(produced_at)
            latency = (datetime.now() - produced_time).total_seconds()
            latency_str = f", Latency: {latency:.2f}s"
        else:
            latency_str = ""
        
        # Print formatted message info
        timestamp = datetime.now().strftime('%H:%M:%S')
        print(f"[{timestamp}] 📥 Customer {customer_id} from {country} at {company}{latency_str}")
        
        # Print metrics every 100 messages
        if metrics.messages_processed % 100 == 0:
            stats = metrics.get_stats()
            print(f"\n=== Processing Stats ===")
            print(f"Messages Processed: {stats['messages_processed']}")
            print(f"Runtime: {stats['runtime_seconds']} seconds")
            print(f"Rate: {stats['messages_per_second']} msgs/sec")
            print(f"Bytes Processed: {stats['bytes_processed']}")
            print(f"Errors: {stats['errors']}\n")
            
    except json.JSONDecodeError as e:
        metrics.errors += 1
        print(f"❌ Error decoding message: {e}")
    except Exception as e:
        metrics.errors += 1
        print(f"❌ Error processing message: {e}")

In [None]:
def consume_messages(consumer, topic_name, max_messages=None, timeout=1.0):
    """Consume messages from the specified topic"""
    
    print(f"🎯 Starting consumer for topic: {topic_name}")
    consumer.subscribe([topic_name])
    
    try:
        while True:
            # Poll for messages
            msg = consumer.poll(timeout=timeout)
            
            if msg is None:
                continue
            
            if msg.error():
                metrics.errors += 1
                print(f"❌ Consumer error: {msg.error()}")
                continue
                
            # Process the message
            process_message(msg)
            
            # Check if we've reached max_messages
            if max_messages and metrics.messages_processed >= max_messages:
                print(f"\n✅ Reached maximum message count: {max_messages}")
                break
                
    except KeyboardInterrupt:
        print("\n⚡ Consumption stopped by user")
    finally:
        # Print final stats
        stats = metrics.get_stats()
        print(f"\n=== Final Processing Stats ===")
        print(f"Messages Processed: {stats['messages_processed']}")
        print(f"Runtime: {stats['runtime_seconds']} seconds")
        print(f"Rate: {stats['messages_per_second']} msgs/sec")
        print(f"Bytes Processed: {stats['bytes_processed']}")
        print(f"Errors: {stats['errors']}")
        
        # Clean up
        consumer.close()
        print("\n✨ Consumer closed successfully")

In [None]:
# Create Consumer instance
consumer = Consumer(**config)

# Start consuming messages (stop after 1000 messages)
consume_messages(
    consumer=consumer,
    topic_name='customers',
    max_messages=1000,  # Set to None to consume indefinitely
    timeout=1.0
)