In [1]:
import threading
from confluent_kafka import DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
from pymongo import MongoClient
from datetime import datetime


# Define Kafka configuration
kafka_config = {
    'bootstrap.servers': 'pkc-60py3.europe-west9.gcp.confluent.cloud:9092',
    'sasl.mechanisms': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': '',
    'sasl.password': '',
    'group.id': '',
    'auto.offset.reset': 'latest'
}

# Create a Schema Registry client
schema_registry_client = SchemaRegistryClient({
  'url': 'https://psrc-l6o18.us-east-2.aws.confluent.cloud',
  'basic.auth.user.info': '{}:{}'.format('', '')
})

# Fetch the latest Avro schema for the value
subject_name = 'delivery_truck_logistics-value'
schema_str = schema_registry_client.get_latest_version(subject_name).schema.schema_str

# Create Avro Deserializer for the value
key_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

# Define the DeserializingConsumer
consumer = DeserializingConsumer({
    'bootstrap.servers': kafka_config['bootstrap.servers'],
    'security.protocol': kafka_config['security.protocol'],
    'sasl.mechanisms': kafka_config['sasl.mechanisms'],
    'sasl.username': kafka_config['sasl.username'],
    'sasl.password': kafka_config['sasl.password'],
    'key.deserializer': key_deserializer,
    'value.deserializer': avro_deserializer,
    'group.id': kafka_config['group.id'],
    'auto.offset.reset': kafka_config['auto.offset.reset']
    # 'enable.auto.commit': True,
    # 'auto.commit.interval.ms': 5000 # Commit every 5000 ms, i.e., every 5 seconds
})

# To handle serialization of datetime objects,defining a custom encoder.
def datetime_encoder(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()

# Subscribe to the 'delivery_truck_logistics' topic
consumer.subscribe(['delivery_truck_logistics'])

# MongoDB connection string
conn_string = ""

# Connect to MongoDB
client = MongoClient(conn_string)
# Select the database
db = client['']
collection = db['']
#Continually read messages from Kafka

try:
    while True:
        msg = consumer.poll(1.0)# How many seconds to wait for message

        if msg is None:
            continue
        if msg.error():
            print('Consumer error: {}'.format(msg.error()))
            continue

        print('Successfully consumed record with key {} and value {}'.format(msg.key(), msg.value()))

        #Change the category column to lowercase,in source it's in uppercase.
        columns_to_capitalize = ['Driver_Name', 'Origin_Location', 'Destination_Location', 'Current_Location',
                                 'DestinationLocation', 'vehicleType','customerNameCode', 'supplierNameCode',
                                 'GpsProvider']
        columns_to_upper = ['ontime','delay','OriginLocation_Code','DestinationLocation_Code',
                            'customerID','BookingID','vehicle_no']
        columns_to_decimal = ['Org_lat_lon','Des_lat_lon']
        float_int = ['Driver_MobileNo']
        datetime_column = ['Data_Ping_time','Planned_ETA','actual_eta','trip_start_date','trip_end_date']
        date_column = ['BookingID_Date']
        # Iterate over the columns and apply the method
        for column in columns_to_capitalize:
            msg.value()[column] = msg.value()[column].capitalize()
        for column in columns_to_upper:
            msg.value()[column] = msg.value()[column].upper()
        for column in columns_to_decimal:
            msg.value()[column] = tuple(map(float, msg.value()[column].split(',')))
        for column in float_int:
            msg.value()[column] = int(msg.value()[column])
        for column in datetime_column:
            msg.value()[column] = msg.value()[column].strftime('%Y-%m-%d %H:%M:%S')
        for column in date_column:
            msg.value()[column] = msg.value()[column].strftime('%Y-%m-%d')
        # Data validation checks
        if 'BookingID' not in msg.value() or msg.value()['BookingID'] is None:
            print("Skipping message due to missing or null 'BookingID'.")
            continue

        # Data type validation checks
        if not isinstance(msg.value()['BookingID'], str):
            print("Skipping message due to 'BookingID' not being a string.")
            continue

        # We can add more checks as needed but this is just a demo

        # Check if a document with the same 'bookingID' exists
        existing_document = collection.find_one({'BookingID': msg.value()['BookingID']})

        if existing_document:
            print(f"Document with bookingID '{msg.value()['BookingID']}' already exists. Skipping insertion.")
        else:
            # Insert data into MongoDB
            collection.insert_one(msg.value())
            print("Inserted message into MongoDB:", msg.value())


except KeyboardInterrupt:
    pass
finally:
    consumer.commit()
    consumer.close()
    client.close()

