# 3. Kafka Consumer - Read Messages from Topic

Consumes messages from the Debezium CDC topic `oracle-cdc.ADMIN.TAGS` in the MSK cluster.

**Run location:** This notebook must run from **inside the VPC** (e.g., SageMaker notebook instance). MSK brokers use private DNS and are unreachable from your laptopâ€”if it hangs at "Consuming...", run it from SageMaker.

In [None]:
import json
import boto3
from kafka import KafkaConsumer

In [None]:
# Get Kafka bootstrap servers from SSM
ssm = boto3.client('ssm')
bootstrap_servers_raw = ssm.get_parameter(Name='/kafka/bootstrap_servers', WithDecryption=True)['Parameter']['Value']
bootstrap_servers = [s.strip() for s in bootstrap_servers_raw.split(',')]
print(f"Bootstrap servers: {bootstrap_servers}")

# IMPORTANT: This notebook must run from inside the VPC (e.g., SageMaker notebook instance).
# MSK brokers use private DNS - unreachable from your laptop. If it hangs, run from SageMaker.

In [None]:
# Diagnostic: verify connectivity and list topics (fails fast with clear error if unreachable)
from kafka import KafkaAdminClient

TOPIC = 'oracle-cdc.ADMIN.TAGS'
try:
    admin = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers,
        request_timeout_ms=15000,
        api_version_auto_timeout_ms=15000,
    )
    topics = admin.list_topics()
    admin.close()
    print(f"Connected. Available topics: {sorted(topics)}")
    if TOPIC not in topics:
        print(f"\nWARNING: Topic '{TOPIC}' does not exist yet.")
        print("  - Ensure Debezium connector (5-debezium-cdc) is RUNNING and has completed initial snapshot.")
        print("  - Check connector status: AWS Console > MSK > Connectors")
        print("  - Check logs: aws logs tail /aws/msk-connect/<deployment_name> --follow")
except Exception as e:
    print(f"Connection FAILED: {e}")
    print("\nPossible causes:")
    print("  1. Running locally? This notebook must run from SageMaker (inside VPC) - MSK uses private DNS.")
    print("  2. Wrong deployment_name? SSM /kafka/bootstrap_servers must match your deployment.")
    print("  3. MSK cluster not ready or security group blocking traffic.")

In [None]:
# Create consumer (reads from earliest, stops after 5s of no new messages)
consumer = KafkaConsumer(
    TOPIC,
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest',
    consumer_timeout_ms=5000,
    request_timeout_ms=15000,
    api_version_auto_timeout_ms=15000,
)

#    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
#    key_deserializer=lambda k: k.decode('utf-8') if k else None,

## Consume and print messages

In [None]:
print(f"Consuming from topic '{TOPIC}'...\n")
messages = list(consumer)
print(f"Received {len(messages)} messages:\n")
for msg in messages:
    print(f"  partition={msg.partition} offset={msg.offset} key={msg.key}")
    print(f"  value: {msg.value}\n")
consumer.close()

## Quick consume (single cell - run after producing)

In [None]:
consumer = KafkaConsumer(TOPIC, bootstrap_servers=bootstrap_servers,
    auto_offset_reset='earliest', consumer_timeout_ms=5000,
    request_timeout_ms=15000,
    value_deserializer=lambda m: json.loads(m.decode('utf-8')))
for msg in consumer:
    print(msg.value)
consumer.close()