In [1]:
from confluent_kafka import DeserializingConsumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import StringDeserializer
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from datetime import datetime
import uuid
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [2]:
cloud_config= {
  'secure_connect_bundle': 'secure-connect-ecommerce-db.zip'
}

# This token JSON file is autogenerated when you download your token, 
# if yours is different update the file name below
with open("ecommerce_db-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

In [3]:
# Define Kafka configuration
kafka_config = {
    'bootstrap.servers': 'pkc-41p56.asia-south1.gcp.confluent.cloud:9092',
    'sasl.mechanisms': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': 'VWF3LDWHI4SZPTMB',
    'sasl.password': 'Xm2e+b34daf+74ZovKjLinsadoZUvF/7tcl1ou8oRlrKJsWDEcLPGztXV4Xtipi5',
    'group.id':'group20',
    'auto.offset.reset': 'earliest'
}

In [4]:
# Create a Schema Registry client
schema_registry_client = SchemaRegistryClient({
  'url': 'https://psrc-gk071.us-east-2.aws.confluent.cloud',
  'basic.auth.user.info': '{}:{}'.format('A7W6OLA7F7BVXXZJ', '8CePTTqwTTCOKlJHCEn4hcOZIi6LgZz6CN0GUpv7wz+VYFVaoyVAPbNTLLch8DML')
})

In [5]:
# Fetch the latest Avro schema for the value
subject_name = 'E-commerce_orders-value'
schema_str = schema_registry_client.get_latest_version(subject_name).schema.schema_str
print(schema_str)

{"type":"record","name":"Ecommerce","namespace":"com.kaggle.onlineretail","fields":[{"name":"order_id","type":"string"},{"name":"customer_id","type":"string"},{"name":"order_status","type":"string"},{"name":"order_purchase_timestamp","type":["long","int","string"]},{"name":"order_approved_at","type":["null","long","string"],"default":null},{"name":"order_delivered_carrier_date","type":["null","long","string"],"default":null},{"name":"order_delivered_customer_date","type":["null","long","string"],"default":null},{"name":"order_estimated_delivery_date","type":["null","long","string"],"default":null}]}


In [6]:
# Create Avro Deserializer for the value
key_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

# Define the DeserializingConsumer
consumer = DeserializingConsumer({
    'bootstrap.servers': kafka_config['bootstrap.servers'],
    'security.protocol': kafka_config['security.protocol'],
    'sasl.mechanisms': kafka_config['sasl.mechanisms'],
    'sasl.username': kafka_config['sasl.username'],
    'sasl.password': kafka_config['sasl.password'],
    'key.deserializer': key_deserializer,
    'value.deserializer': avro_deserializer,
    'group.id': kafka_config['group.id'],
    'auto.offset.reset': kafka_config['auto.offset.reset'],
    # 'enable.auto.commit': True,
    # 'auto.commit.interval.ms': 5000 # Commit every 5000 ms, i.e., every 5 seconds
})

In [7]:
consumer.subscribe(["E-commerce_orders"])

In [8]:
def process_message(message):
    # Process the Kafka message and derive the new columns
    key = message.key()  
    value = message.value()
    
    # Convert 'order_purchase_timestamp' string to datetime object
    order_purchase_timestamp_str = value['order_purchase_timestamp']
    order_purchase_timestamp = None
    if order_purchase_timestamp_str is not None:
        order_purchase_timestamp_str = order_purchase_timestamp_str +":00"
        order_purchase_timestamp = datetime.strptime(order_purchase_timestamp_str, '%d-%m-%Y %H:%M:%S')
    
    # Convert 'order_approved_at' to a datetime object
    order_approved_at_str = value['order_approved_at']
    order_approved_at = None
    if order_approved_at_str is not None:
        order_approved_at_str = order_approved_at_str + ":00"
        order_approved_at = datetime.strptime(order_approved_at_str, "%d-%m-%Y %H:%M:%S")
    
    # Convert 'order_delivered_carrier_date' to a datetime object
    order_delivered_carrier_date_str = value['order_delivered_carrier_date']
    order_delivered_carrier_date = None
    if order_delivered_carrier_date_str is not None:
        order_delivered_carrier_date_str = order_delivered_carrier_date_str + ":00"
        order_delivered_carrier_date = datetime.strptime(order_delivered_carrier_date_str, "%d-%m-%Y %H:%M:%S")
    
    # Convert 'order_delivered_customer_date' to a datetime object
    order_delivered_customer_date_str = value['order_delivered_customer_date']
    order_delivered_customer_date = None
    if order_delivered_customer_date_str is not None:
        order_delivered_customer_date_str = order_delivered_customer_date_str + ":00"
        order_delivered_customer_date = datetime.strptime(order_delivered_customer_date_str, "%d-%m-%Y %H:%M:%S")
    
    # Convert 'order_estimated_delivery_date' to a datetime object
    order_estimated_delivery_date_str = value['order_estimated_delivery_date']
    order_estimated_delivery_date = None
    if order_estimated_delivery_date_str is not None:
        order_estimated_delivery_date_str = order_estimated_delivery_date_str + ":00"
        order_estimated_delivery_date = datetime.strptime(order_estimated_delivery_date_str, "%d-%m-%Y %H:%M:%S")
    
    purchase_hour = order_purchase_timestamp.hour
    purchase_day_of_week = order_purchase_timestamp.strftime('%A')
    
    # Convert 'order_id' to a valid UUID format
    try:
        order_id = uuid.UUID(value['order_id'])
    except ValueError:
        # If the 'order_id' is not a valid UUID, handle the error or skip the message
        print(f"Invalid 'order_id': {value['order_id']}")
        return
    
    # Convert 'customer_id' to a valid UUID format
    try:
        customer_id = uuid.UUID(value['customer_id'])
    except ValueError:
        # If the 'order_id' is not a valid UUID, handle the error or skip the message
        print(f"Invalid 'customer_id': {value['customer_id']}")
        return

    try:
        # Ingest the transformed data into the 'orders' table in Cassandra
        query = "INSERT INTO ecommerce_keyspace.ecommerce_orders (order_id, customer_id, order_status, order_purchase_timestamp, " \
                "order_approved_at, order_delivered_carrier_date, order_delivered_customer_date, " \
                "order_estimated_delivery_date, order_hour, order_day_of_week) " \
                "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
                
        prepared = session.prepare(query)
        
        bound_statement = prepared.bind((
        order_id,
        customer_id,
        value['order_status'],
        order_purchase_timestamp,
        order_approved_at,
        order_delivered_carrier_date,
        order_delivered_customer_date,
        order_estimated_delivery_date,
        purchase_hour,
        purchase_day_of_week
    ))
        
        bound_statement.consistency_level = ConsistencyLevel.QUORUM
        
        session.execute(bound_statement)
        
        print(f'Record {key} inserted successfully !!')
        
        # Manually commit the offset to Kafka
        consumer.commit(message)
        
    except Exception as err:
        
        print(f"Exception occured while inserting {key} into the table: {err}")
    
    
    

In [9]:
# Continually read messages from Kafka
try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event, not an error
                print('Reached end of partition')
            else:
                print('Error while consuming: {}'.format(msg.error()))
        else:
            print('Successfully consumed record with key {} and value {}'.format(msg.key(), msg.value()))
            process_message(msg)

except KeyboardInterrupt:
    pass

finally:
    consumer.close()
    cluster.shutdown()

Successfully consumed record with key 9ef432eb6251297304e76186b10a928d_e481f51cbdc54678b7cc49136f2d6af7 and value {'order_id': 'e481f51cbdc54678b7cc49136f2d6af7', 'customer_id': '9ef432eb6251297304e76186b10a928d', 'order_status': 'delivered', 'order_purchase_timestamp': '02-10-2017 10:56', 'order_approved_at': '02-10-2017 11:07', 'order_delivered_carrier_date': '04-10-2017 19:55', 'order_delivered_customer_date': '10-10-2017 21:25', 'order_estimated_delivery_date': '18-10-2017 00:00'}
Record 9ef432eb6251297304e76186b10a928d_e481f51cbdc54678b7cc49136f2d6af7 inserted successfully !!
Successfully consumed record with key b0830fb4747a6c6d20dea0b8c802d7ef_53cdb2fc8bc7dce0b6741e2150273451 and value {'order_id': '53cdb2fc8bc7dce0b6741e2150273451', 'customer_id': 'b0830fb4747a6c6d20dea0b8c802d7ef', 'order_status': 'delivered', 'order_purchase_timestamp': '24-07-2018 20:41', 'order_approved_at': '26-07-2018 03:24', 'order_delivered_carrier_date': '26-07-2018 14:31', 'order_delivered_customer_da