# Setup

In [80]:
# pip install python-dotenv


In [81]:
import random
import time
from datetime import datetime, timezone
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from dotenv import dotenv_values

### Kafka Config

In [82]:
imp_keys = dotenv_values(".env")

In [83]:
#Define Kafka Config
kakfa_config = {
    'bootstrap.servers': imp_keys['BOOTSTRAP_SERVER'],
    'sasl.mechanisms': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': imp_keys['SASL_USERNAME'],
    'sasl.password': imp_keys['SASL_PASSWORD']
}

In [84]:
#Define Schema Registry Config
schema_registry_client = SchemaRegistryClient({
    'url' : imp_keys['SCHEMA_REGISTRY_URL'],
    'basic.auth.user.info' : f"{imp_keys['SCHEMA_REGISTRY_API_KEY']}:{imp_keys['SCHEMA_REGISTRY_API_SECRET']}"
})

In [85]:
# Fetch the latest Avro schema for the value
subject_name = 'FedEx-value'
schema_str = schema_registry_client.get_latest_version(subject_name).schema.schema_str
print("Schema from Registery---")
print(schema_str)
print("=====================")

Schema from Registery---
{"type":"record","name":"fedEx","namespace":"com.assignment.fedex","fields":[{"name":"shipment_id","type":"string"},{"name":"origin","type":"string"},{"name":"destination","type":"string"},{"name":"status","type":"string"},{"name":"timestamp","type":{"type":"long","logicalType":"timestamp-millis"}}]}


In [86]:
#Create Avro serializer for the value
key_serializer = StringSerializer('utf_8')
value_serializer = AvroSerializer(schema_registry_client, schema_str)

In [87]:
#Creating Delivery Report
def delivery_report(err, msg):
    """
    Reports the failure or success of a message delivery.

    Args:
        err (KafkaError): The error that occurred on None on success.

        msg (Message): The message that was produced or failed.

    Note:
        In the delivery report callback the Message.key() and Message.value()
        will be the binary format as encoded by any configured Serializers and
        not the same object that was passed to produce().
        If you wish to pass the original object(s) for key and value to delivery
        report callback we recommend a bound callback or lambda where you pass
        the objects along.

    """
    if err is not None:
        print(f"Delivery failed for User record {msg.key()}: {err}")
        return
    print(f'User record {msg.key()} successfully produced to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
    print("=====================")

# Producer

In [88]:
producer = SerializingProducer({
    **kakfa_config, 
    'key.serializer' : key_serializer,
    'value.serializer' : value_serializer
})

In [89]:
print(kakfa_config['sasl.password'])

gJ8V9FS5W3ghxikA7kJwzqe/6zCPaU5iVRnQWBAlyubr6GKLSuIvbj4owwXMLoTf


In [90]:
origin = ['Mumbai, IN', 'Delhi, IN', 'Bangalore, IN', 'Chennai, IN', 'Kolkata, IN']
destination = ["Los Angeles, CA"
               ,"Dallas, TX"
               , "Philadelphia, PA"
               , "Phoenix, AZ"
               ,"San Diego, CA"
               ,"San Antonio, TX"
               ,"Portland, OR"
               ,"Detroit, MI"
               ,"Charlotte, NC"
               ,"Minneapolis, MN"]
status = ['Shipped', 'In Transit', 'Delivered', 'Returned']
timestamp = datetime.now(timezone.utc)

In [91]:
print(timestamp)

2025-02-04 07:56:54.528040+00:00


In [92]:
i = 0
while True:
    i += 1
    data = {
        "shipment_id": f"SH00{i}",
        "origin": random.choice(origin),
        "destination": random.choice(destination),
        "status": random.choice(status),
        "timestamp": timestamp
    }
    print(data)
    producer.produce(
        topic = 'FedEx', 
        key = str(data["shipment_id"]),
        value = data,
        on_delivery = delivery_report
    )
    producer.flush()
    time.sleep(2)
    # if i == 10:
    #     break   

{'shipment_id': 'SH001', 'origin': 'Mumbai, IN', 'destination': 'Minneapolis, MN', 'status': 'Delivered', 'timestamp': datetime.datetime(2025, 2, 4, 7, 56, 54, 528040, tzinfo=datetime.timezone.utc)}


%6|1738655814.806|GETSUBSCRIPTIONS|rdkafka#producer-4| [thrd:main]: Telemetry client instance id changed from AAAAAAAAAAAAAAAAAAAAAA to iUmQohexTMWATrr9hBXCjw


User record b'SH001' successfully produced to FedEx [1] at offset 9
{'shipment_id': 'SH002', 'origin': 'Kolkata, IN', 'destination': 'Philadelphia, PA', 'status': 'Shipped', 'timestamp': datetime.datetime(2025, 2, 4, 7, 56, 54, 528040, tzinfo=datetime.timezone.utc)}
User record b'SH002' successfully produced to FedEx [1] at offset 10
{'shipment_id': 'SH003', 'origin': 'Delhi, IN', 'destination': 'San Diego, CA', 'status': 'Returned', 'timestamp': datetime.datetime(2025, 2, 4, 7, 56, 54, 528040, tzinfo=datetime.timezone.utc)}
User record b'SH003' successfully produced to FedEx [1] at offset 11
{'shipment_id': 'SH004', 'origin': 'Chennai, IN', 'destination': 'Los Angeles, CA', 'status': 'In Transit', 'timestamp': datetime.datetime(2025, 2, 4, 7, 56, 54, 528040, tzinfo=datetime.timezone.utc)}
User record b'SH004' successfully produced to FedEx [0] at offset 6
{'shipment_id': 'SH005', 'origin': 'Bangalore, IN', 'destination': 'Charlotte, NC', 'status': 'Shipped', 'timestamp': datetime.date

KeyboardInterrupt: 

%5|1738661382.911|REQTMOUT|rdkafka#producer-4| [thrd:sasl_ssl://b5-pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092/5]: sasl_ssl://b5-pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092/5: Timed out MetadataRequest in flight (after 60061ms, timeout #0)
%4|1738661382.911|REQTMOUT|rdkafka#producer-4| [thrd:sasl_ssl://b5-pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092/5]: sasl_ssl://b5-pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092/5: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1738661382.915|FAIL|rdkafka#producer-4| [thrd:sasl_ssl://b5-pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092/5]: sasl_ssl://b5-pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092/5: 1 request(s) timed out: disconnect (average rtt 37.464ms) (after 652467ms in state UP)
%5|1738661383.242|REQTMOUT|rdkafka#producer-4| [thrd:sasl_ssl://pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092/bootst]: sasl_ssl://pkc-l7pr2.ap-south-1.aws.confluent.cloud:9092/bootstrap: Timed out GetTelemetrySubscriptionsRequest in fl