In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("KafkaExample") \
    .getOrCreate()
from kafka import KafkaConsumer
import requests
from io import BytesIO
import json
from fastavro import schemaless_reader
# Kafka server details
bootstrap_servers = 'broker:29092'  # Kafka server
topic = 'topic_prefix.kafkaDB.users'  # Kafka topic name
group_id = 'kpgroupqq1'  # Consumer group ID
schema_registry_url = 'https://rnung-103-224-144-138.a.free.pinggy.link'  # Schema registry URL


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def get_schema(schema_id):
    url = f"{schema_registry_url}/schemas/ids/{schema_id}"
    response = requests.get(url)
    response.raise_for_status()
    return response.json()["schema"]

# Initialize Kafka consumer
consumer = KafkaConsumer(
    topic,
    bootstrap_servers=bootstrap_servers,
    group_id=group_id,
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    consumer_timeout_ms=10000,
    value_deserializer=lambda x: x
)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
for message in consumer:
    value_bytes = message.value
    print(f"Received message with raw bytes: {value_bytes[:10]}...")

    if value_bytes[0] != 0:
        print("Unknown magic byte, not Confluent Avro format")
        continue

    # Extract schema ID
    schema_id = int.from_bytes(value_bytes[1:5], byteorder='big')
    print(f"Schema ID from message: {schema_id}")

    # Fetch and parse schema
    raw_schema = get_schema(schema_id)
    parsed_schema = json.loads(raw_schema)  # safe JSON parsing

    # Deserialize message
    payload = BytesIO(value_bytes[5:])
    try:
        record = schemaless_reader(payload, parsed_schema)
        print(f"Decoded record: {record}")
    except Exception as e:
        print(f"Error decoding Avro message: {e}")
      

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Received message with raw bytes: b'\x00\x00\x00\x00\x03\x00\x02\x02\x02\x02'...
Schema ID from message: 3
Decoded record: {'before': None, 'after': {'id': 1, 'name': 'aman'}, 'source': {'version': '2.7.3.Final', 'connector': 'mysql', 'name': 'topic_prefix', 'ts_ms': 1746162457000, 'snapshot': 'first', 'db': 'kafkaDB', 'sequence': None, 'ts_us': 1746162457000000, 'ts_ns': 1746162457000000000, 'table': 'users', 'server_id': 0, 'gtid': None, 'file': 'mysql-0-bin.000018', 'pos': 424, 'row': 0, 'thread': None, 'query': None}, 'transaction': None, 'op': 'r', 'ts_ms': 1746162457153, 'ts_us': 1746162457153828, 'ts_ns': 1746162457153828784}
Received message with raw bytes: b'\x00\x00\x00\x00\x03\x00\x02\x02\x04\x02'...
Schema ID from message: 3
Decoded record: {'before': None, 'after': {'id': 2, 'name': 'aman'}, 'source': {'version': '2.7.3.Final', 'connector': 'mysql', 'name': 'topic_prefix', 'ts_ms': 1746162457000, 'snapshot': 'true', 'db': 'kafkaDB', 'sequence': None, 'ts_us': 17461624570000

In [7]:
spark.stop()  

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…