# Transform and Forward Messages

This notebook consumes messages from one Kafka topic, modifies them, and sends them to a new topic in Confluent Cloud.

Make sure your `.env` file is set up correctly with:
- `BOOTSTRAP_SERVERS`
- `SASL_USERNAME`
- `SASL_PASSWORD`
- `TOPIC_NAME` (source)
- `FORWARD_TOPIC` (destination)

In [1]:
import os
import json
from kafka import KafkaConsumer, KafkaProducer
from dotenv import load_dotenv

load_dotenv()

source_topic = os.getenv("TOPIC_NAME")
dest_topic = os.getenv("FORWARD_TOPIC")

## Step 1: Setup Kafka Consumer and Producer

In [8]:
# Check and print broker address and credentials for debugging
bootstrap_servers = os.getenv("BOOTSTRAP_SERVERS")
sasl_username = os.getenv("SASL_USERNAME")
sasl_password = os.getenv("SASL_PASSWORD")

if not bootstrap_servers:
    raise ValueError("BOOTSTRAP_SERVERS environment variable is not set. Please check your .env file.")
if not sasl_username or not sasl_password:
    raise ValueError("SASL_USERNAME or SASL_PASSWORD environment variable is not set. Please check your .env file.")

print(f"Connecting to Kafka broker at: {bootstrap_servers}")
print(f"SASL_USERNAME: {sasl_username}")
print(f"SASL_PASSWORD is set: {bool(sasl_password)}")

# Consumer setup with error handling for broker connectivity
try:
    consumer = KafkaConsumer(
        source_topic,
        bootstrap_servers=bootstrap_servers,
        security_protocol="SASL_SSL",
        sasl_mechanism="PLAIN",
        sasl_plain_username=sasl_username,
        sasl_plain_password=sasl_password,
        auto_offset_reset="earliest",
        enable_auto_commit=True,
        value_deserializer=lambda m: m.decode('utf-8'),
        consumer_timeout_ms=5000
    )
except Exception as e:
    print("Failed to connect to Kafka broker. Please check the following:")
    print(f"- Broker address: {bootstrap_servers}")
    print("- Network connectivity to the broker (firewall/VPN issues?)")
    print("- SASL/SSL credentials and configuration")
    print(f"Error details: {e}")
    raise

# Producer setup
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    security_protocol="SASL_SSL",
    sasl_mechanism="PLAIN",
    sasl_plain_username=sasl_username,
    sasl_plain_password=sasl_password,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

Connecting to Kafka broker at: pkc-619z3.us-east1.gcp.confluent.cloud:9092
SASL_USERNAME: 7BQ7Z45X4VGRQE6N
SASL_PASSWORD is set: True
Failed to connect to Kafka broker. Please check the following:
- Broker address: pkc-619z3.us-east1.gcp.confluent.cloud:9092
- Network connectivity to the broker (firewall/VPN issues?)
- SASL/SSL credentials and configuration
Error details: NoBrokersAvailable


NoBrokersAvailable: NoBrokersAvailable

## Step 2: Consume, Transform, and Produce

In [11]:
if dest_topic is None:
    raise ValueError("Destination topic is not set. Please check your .env file and ensure FORWARD_TOPIC is defined.")

for msg in consumer:
    print("Original:", msg.value)
    transformed = {
        "original": msg.value,
        "edited": msg.value.upper(),
        "length": len(msg.value)
    }
    producer.send(dest_topic, transformed)

producer.flush()
print(f"Messages have been forwarded to topic: {dest_topic}")

Original: {"_id": "8k9Rynu7Y1g7", "content": "The universe is made of stories, not atoms.", "author": "Muriel Rukeyser", "tags": ["Famous Quotes"], "authorSlug": "muriel-rukeyser", "length": 43, "dateAdded": "2019-12-23", "dateModified": "2023-04-14"}
Original: {"_id": "S2Yfil_8fPx", "content": "Swim upstream. Go the other way. Ignore the conventional wisdom.", "author": "Sam Walton", "tags": ["Wisdom"], "authorSlug": "sam-walton", "length": 64, "dateAdded": "2020-11-13", "dateModified": "2023-04-14"}
Messages have been forwarded to topic: topic_1
