In [1]:
!pip install confluent_kafka pandas





[notice] A new release of pip is available: 24.0 -> 24.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
from confluent_kafka import Consumer, KafkaError
from kafka import KafkaProducer
import json
import time
import pandas as pd
import base64
from decimal import Decimal

# Kafka On-prem Configuration
bootstrap_servers_local = 'localhost:9092'
topic_local = 'cdc.classicmodels.orderdetails'
group_id_local = 'my_consumer_group'

# Kafka Cloud Configuration
connection_string = "your-connection-string"
namespace = connection_string.split("sb://")[1].split(".")[0]
bootstrap_servers_cloud = f"{namespace}.servicebus.windows.net:9093"
sasl_mechanism = 'PLAIN'
security_protocol = 'SASL_SSL'
sasl_plain_username = '$ConnectionString'
sasl_plain_password = connection_string
event_hub_name = "your-EH-name"

# Kafka consumer configuration (On-prem)
conf_local = {
    'bootstrap.servers': bootstrap_servers_local,
    'group.id': group_id_local,
    'auto.offset.reset': 'earliest'  # read from beginning of topic
}

# Kafka producer configuration (Cloud)
producer_cloud = KafkaProducer(
    bootstrap_servers=bootstrap_servers_cloud,
    security_protocol=security_protocol,
    sasl_mechanism=sasl_mechanism,
    sasl_plain_username=sasl_plain_username,
    sasl_plain_password=sasl_plain_password,
    key_serializer=lambda v: str(v).encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Create Kafka Consumer instance (On-prem)
consumer_local = Consumer(conf_local)
consumer_local.subscribe([topic_local])

# Initialize an empty list to store extracted data
data_list = []

try:
    while True:
        # Poll for messages
        msg = consumer_local.poll(1.0)  # 1 second timeout
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition
                print(f"Reached end of partition {msg.partition()}")
                continue
            else:
                # Error
                print(f"Error: {msg.error()}")
                break
        else:
            # Decode message value
            kafka_message = msg.value().decode('utf-8')
            print(f"Received message: {kafka_message}")

            # Parse JSON message
            message = json.loads(kafka_message)
            
            # Extract necessary fields from payload
            payload = message['payload']
            after_data = payload['after']
            orderNumber = after_data['orderNumber']
            productCode = after_data['productCode']
            quantityOrdered = after_data['quantityOrdered']
            priceEach_base64 = after_data['priceEach']
            orderLineNumber = after_data['orderLineNumber']

            # Decode priceEach from base64 and convert to decimal
            priceEach_bytes = base64.b64decode(priceEach_base64)
            priceEach = Decimal(int.from_bytes(priceEach_bytes, byteorder='big')) / Decimal(100)  # Adjust scale

            payload['after']['priceEach'] = float(priceEach)

            # Append extracted data to list
            data_list.append({
                'OrderNumber': orderNumber,
                'ProductCode': productCode,
                'QuantityOrdered': quantityOrdered,
                'PriceEach': float(priceEach),
                'OrderLineNumber': orderLineNumber
            })

            # Send only the payload to cloud Kafka (Azure Event Hub)
            producer_cloud.send(event_hub_name, key=str(orderNumber), value=after_data)
            producer_cloud.flush()

except KeyboardInterrupt:
    pass

finally:
    # Close consumer
    consumer_local.close()
    # Close producer
    producer_cloud.close()

# Convert list of dictionaries to DataFrame
df = pd.DataFrame(data_list)

# Display the DataFrame
print("DataFrame from Kafka messages:")
print(df)
