# Kafka Consumer Basics

This notebook demonstrates how to consume messages from Kafka topics.

## Topics Covered:
- Creating consumers
- Consumer groups
- Manual and automatic offset management
- Consuming from specific partitions
- Seeking to specific offsets

## 1. Setup and Configuration

In [None]:
import os
import json
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError

# Kafka cluster connection
KAFKA_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'kafka1:29092,kafka2:29093,kafka3:29094')
print(f"Connecting to Kafka at: {KAFKA_SERVERS}")

## 2. Create a Simple Consumer

In [None]:
topic_name = 'test-topic'

# Create consumer
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=KAFKA_SERVERS.split(','),
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    key_deserializer=lambda k: k.decode('utf-8') if k else None,
    group_id='test-consumer-group',
    auto_offset_reset='earliest',  # Start from beginning if no offset exists
    enable_auto_commit=True,
    auto_commit_interval_ms=1000
)

print("✓ Consumer created successfully!")
print(f"  Subscribed to: {consumer.subscription()}")
print(f"  Consumer group: test-consumer-group")

## 3. Consume Messages (with timeout)

In [None]:
# Consume messages for 10 seconds
import time

message_count = 0
start_time = time.time()
timeout = 10  # seconds

print(f"Consuming messages for {timeout} seconds...\n")

try:
    for message in consumer:
        message_count += 1
        print(f"Message {message_count}:")
        print(f"  Key: {message.key}")
        print(f"  Value: {message.value}")
        print(f"  Partition: {message.partition}")
        print(f"  Offset: {message.offset}")
        print(f"  Timestamp: {message.timestamp}")
        print("-" * 50)
        
        # Break after timeout
        if time.time() - start_time > timeout:
            break
            
except KeyboardInterrupt:
    print("\nConsumer interrupted by user")

print(f"\n✓ Consumed {message_count} messages")

## 4. Consume Fixed Number of Messages

In [None]:
# Create a new consumer to start fresh
consumer2 = KafkaConsumer(
    topic_name,
    bootstrap_servers=KAFKA_SERVERS.split(','),
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='test-consumer-group-2',
    auto_offset_reset='earliest'
)

# Consume exactly 5 messages
messages_to_consume = 5
consumed = []

print(f"Consuming {messages_to_consume} messages...\n")

for i, message in enumerate(consumer2):
    consumed.append(message.value)
    print(f"{i+1}. {message.value}")
    
    if i + 1 >= messages_to_consume:
        break

consumer2.close()
print(f"\n✓ Consumed {len(consumed)} messages")

## 5. Manual Offset Management

In [None]:
# Consumer with manual offset commit
manual_consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=KAFKA_SERVERS.split(','),
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='manual-offset-group',
    enable_auto_commit=False,  # Manual commit
    auto_offset_reset='earliest'
)

message_count = 0
batch_size = 3

print(f"Manually committing offsets every {batch_size} messages...\n")

for message in manual_consumer:
    message_count += 1
    print(f"Consumed: {message.value}")
    
    # Commit offset after processing every batch_size messages
    if message_count % batch_size == 0:
        manual_consumer.commit()
        print(f"  ✓ Committed offset after {message_count} messages\n")
    
    if message_count >= 9:
        break

# Final commit
manual_consumer.commit()
manual_consumer.close()
print(f"\n✓ Processed {message_count} messages with manual commits")

## 6. Consume from Specific Partition

In [None]:
# Create consumer for specific partition
partition_consumer = KafkaConsumer(
    bootstrap_servers=KAFKA_SERVERS.split(','),
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest'
)

# Assign specific partition
partition = TopicPartition(topic_name, 0)
partition_consumer.assign([partition])

print(f"Consuming from {topic_name}, partition 0...\n")

count = 0
for message in partition_consumer:
    count += 1
    print(f"{count}. Partition {message.partition}, Offset {message.offset}: {message.value}")
    
    if count >= 5:
        break

partition_consumer.close()
print(f"\n✓ Consumed {count} messages from partition 0")

## 7. Seek to Specific Offset

In [None]:
# Create consumer
seek_consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=KAFKA_SERVERS.split(','),
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    group_id='seek-consumer-group'
)

# Wait for partition assignment
partitions = seek_consumer.assignment()
while not partitions:
    seek_consumer.poll(timeout_ms=100)
    partitions = seek_consumer.assignment()

# Seek to offset 5 on first partition
if partitions:
    partition = list(partitions)[0]
    seek_consumer.seek(partition, 5)
    print(f"Seeking to offset 5 on partition {partition.partition}...\n")
    
    count = 0
    for message in seek_consumer:
        count += 1
        print(f"Offset {message.offset}: {message.value}")
        
        if count >= 3:
            break

seek_consumer.close()
print(f"\n✓ Consumed {count} messages starting from offset 5")

## 8. Get Partition Information

In [None]:
# Create consumer to inspect partitions
info_consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=KAFKA_SERVERS.split(','),
    group_id='info-group'
)

# Get partition info
partitions = info_consumer.partitions_for_topic(topic_name)
print(f"Topic '{topic_name}' has {len(partitions)} partitions: {partitions}")

# Get beginning and end offsets
partition_list = [TopicPartition(topic_name, p) for p in partitions]
beginning_offsets = info_consumer.beginning_offsets(partition_list)
end_offsets = info_consumer.end_offsets(partition_list)

print("\nPartition Details:")
for partition in partition_list:
    begin = beginning_offsets[partition]
    end = end_offsets[partition]
    messages = end - begin
    print(f"  Partition {partition.partition}: {messages} messages (offset {begin} to {end})")

info_consumer.close()

## 9. Cleanup

In [None]:
# Close all consumers
consumer.close()
print("✓ All consumers closed")

## Key Takeaways

1. **Consumer Groups**: Multiple consumers in same group share partition load
2. **auto_offset_reset**: 'earliest' starts from beginning, 'latest' starts from end
3. **Manual Commits**: Better control over exactly when offsets are committed
4. **Partition Assignment**: Can consume from specific partitions
5. **Seeking**: Can jump to specific offsets for replay or skipping

## Next Steps

Try the Admin Operations notebook (03_kafka_admin_operations.ipynb) to manage topics!