# Kafka Assignment

**Download Data Link** -> https://github.com/shashank-mishra219/Confluent-Kafka-Setup/blob/main/restaurant_orders.csv

## Step 1: Setup Confluent Kafka Account

## Step 2: Create one kafka topic named as "restaurent-take-away-data" with 3 partitions

## Step 3: Setup key (string) & value (json) schema in the confluent schema registry

In [2]:
import pandas as pd
from pandas.io.json import build_table_schema
import json

In [4]:
kafka_basic_schema = {
  "$id": "http://example.com/myURI.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": False,
  "description": "Sample schema to help you get started.",
  "title": "SampleRecord",
  "type": "object"
}

In [6]:
df = pd.read_csv("restaurant_orders.csv")

In [7]:
df_schema = build_table_schema(df, index=False, version=False)['fields']

In [8]:
#renaming the dictionary key from name to description
# to match the kafka schema string
for value in df_schema:
    value['description'] = value.pop('name')

df_schema

[{'type': 'integer', 'description': 'Order Number'},
 {'type': 'string', 'description': 'Order Date'},
 {'type': 'string', 'description': 'Item Name'},
 {'type': 'integer', 'description': 'Quantity'},
 {'type': 'number', 'description': 'Product Price'},
 {'type': 'integer', 'description': 'Total products'}]

In [9]:
df_final_schema = kafka_basic_schema.copy()
df_final_schema['properties'] = {}
for value in df_schema:
    name = value['description']
    value.pop('description')
    df_final_schema['properties'][name] = value

In [11]:
json.dumps(df_final_schema)

'{"$id": "http://example.com/myURI.schema.json", "$schema": "http://json-schema.org/draft-07/schema#", "additionalProperties": false, "description": "Sample schema to help you get started.", "title": "SampleRecord", "type": "object", "properties": {"Order Number": {"type": "integer"}, "Order Date": {"type": "string"}, "Item Name": {"type": "string"}, "Quantity": {"type": "integer"}, "Product Price": {"type": "number"}, "Total products": {"type": "integer"}}}'

## Step 4: Write a kafka producer program (python or any other language) to read data records from restaurent data csv file, 
* make sure schema is not hardcoded in the producer code, read the latest version of schema and schema_str from schema registry and use it for data serialization.

In [12]:
# importing packages

import argparse
from uuid import uuid4
from six.moves import input
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.json_schema import JSONSerializer

In [13]:
# Variables
API_KEY = 'IAFVAAGRB4VRBCWF'
ENDPOINT_SCHEMA_URL  = 'https://psrc-8qyy0.eastus2.azure.confluent.cloud'
API_SECRET_KEY = 'bFbcUILs/aj4d+xvKk47EbjUbvz3bYADSt6ucLPH6X5Yf4g1Z9N4cO+AkAHeuyDs'
BOOTSTRAP_SERVER = 'pkc-lgwgm.eastus2.azure.confluent.cloud:9092'
SECURITY_PROTOCOL = 'SASL_SSL'
SSL_MACHENISM = 'PLAIN'
SCHEMA_REGISTRY_API_KEY = 'BS3PHBXGYPRUDGFP'
SCHEMA_REGISTRY_API_SECRET = 'gMwNVdkw1Cn5pFHmtFKrZam64E5HeDEo1dJu2hxCcUSSjaIkuscBUw6ucAlmjT2l'


In [14]:
def sasl_conf():
    # connection of producer to kakfa confluent  
    sasl_conf = {'sasl.mechanism': SSL_MACHENISM,
                 # Set to SASL_SSL to enable TLS support.
                #  'security.protocol': 'SASL_PLAINTEXT'}
                'bootstrap.servers':BOOTSTRAP_SERVER,
                'security.protocol': SECURITY_PROTOCOL,
                'sasl.username': API_KEY,
                'sasl.password': API_SECRET_KEY
                }
    return sasl_conf


def schema_config():
    # schema registry authentication
    return {'url':ENDPOINT_SCHEMA_URL,
    'basic.auth.user.info':f"{SCHEMA_REGISTRY_API_KEY}:{SCHEMA_REGISTRY_API_SECRET}"
    }

def delivery_report(err, msg):
    """
    Reports the success or failure of a message delivery.
    Args:
        err (KafkaError): The error that occurred on None on success.
        msg (Message): The message that was produced or failed.
    """

    if err is not None:
        print("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))

class Car: 
    # constructor  
    def __init__(self,record:dict):
        for k,v in record.items():
            setattr(self,k,v)
        
        self.record=record
   
    @staticmethod
    def dict_to_car(data:dict,ctx):
        return Car(record=data)

    def __str__(self):
        return f"{self.record}"

def car_to_dict(car:Car, ctx):
    """
    Returns a dict representation of a User instance for serialization.
    Args:
        user (User): User instance.
        ctx (SerializationContext): Metadata pertaining to the serialization
            operation.
    Returns:
        dict: Dict populated with user attributes to be serialized.
    """

    # User._address must not be serialized; omit from dict
    return car.record

# read the data from csv file
def get_car_instance(file_path, columns):
    df=pd.read_csv(file_path)
    df=df.iloc[:,:] 
    cars:List[Car]=[]
    #df.replace(np.nan, '', regex=True)
    nan_values = df.isna().any()
    for col, val in nan_values.items():
        #print(col, val)
        #break
        if val == True:
            type_ = df[col].dtype.name
            #print(type_)
            if type_ == 'int64' or type_ == 'int32':
                df[col].fillna(-99999, inplace=True)
            elif type_ == 'float64' or type_ == 'float32':
                df[col].fillna(-99999.9, inplace=True)
            else:
                df[col].fillna("*miss*", inplace=True)
    for data in df.values:
        car=Car(dict(zip(columns,data)))
        cars.append(car)
        yield car

def streamingToKafka(FILE_PATH, topic, schema_id, columns, processing_col_count):
    schema_registry_conf = schema_config()
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    schema_str = schema_registry_client.get_schema(schema_id).schema_str

    string_serializer = StringSerializer('utf_8')
    json_serializer = JSONSerializer(schema_str, schema_registry_client, car_to_dict)

    producer = Producer(sasl_conf())

    print("Producing user records to topic {}. ^C to exit.".format(topic))
    #while True:
        # Serve on_delivery callbacks from previous calls to produce()
    producer.poll(0.0)
    try:
        count = 1
        #for idx,car in enumerate(get_car_instance(FILE_PATH, columns)):
        for car in get_car_instance(FILE_PATH, columns):
            print(car)
            producer.produce(topic=topic,
                            key=string_serializer(str(uuid4()), car_to_dict),
                            value=json_serializer(car, SerializationContext(topic, MessageField.VALUE)),
                            on_delivery=delivery_report)
            count += 1

            # adding this condition as queue gets full while processing SeoulFloating file
            # if count%20000 == 0:
            #     time.sleep(30)

            if count > processing_col_count:
                break
            # if idx == 2:
            #     break
            #break
    except KeyboardInterrupt:
        pass
    except ValueError:
        print("Invalid input, discarding record...")
        pass

    print("\nFlushing records...")
    producer.flush()

## Step 5: From producer code, publish data in Kafka Topic one by one and use dynamic key while publishing the records into the Kafka Topic

In [16]:
FILE_PATH = 'restaurant_orders.csv'
topic = "restaurent-take-away-data"
schema_id = 100017
cols = list(pd.read_csv(FILE_PATH).columns)
processing_col_count = 2

In [18]:
streamingToKafka(FILE_PATH, topic, schema_id, cols, processing_col_count)

Producing user records to topic restaurent-take-away-data. ^C to exit.
{'Order Number': 16118, 'Order Date': '03/08/2019 20:25', 'Item Name': 'Plain Papadum', 'Quantity': 2, 'Product Price': 0.8, 'Total products': 6}
{'Order Number': 16118, 'Order Date': '03/08/2019 20:25', 'Item Name': 'King Prawn Balti', 'Quantity': 1, 'Product Price': 12.95, 'Total products': 6}

Flushing records...
User record b'a29102e6-8e37-4dd4-a39d-58c0999fd237' successfully produced to restaurent-take-away-data [2] at offset 0
User record b'cb9fb424-96e4-48c5-9143-1d69211138cf' successfully produced to restaurent-take-away-data [1] at offset 0


## Step 6: Write kafka consumer code and create two copies of same consumer code and save it with different names (kafka_consumer_1.py & kafka_consumer_2.py), 
* again make sure lates schema version and schema_str is not hardcoded in the consumer code, read it automatically from the schema registry to desrialize the data. <br>
* Now test two scenarios with your consumer code:
    1. Use "group.id" property in consumer config for both consumers and mention different group_ids in kafka_consumer_1.py & kafka_consumer_2.py,<br>
        apply "earliest" offset property in both consumers and run these two consumers from two different terminals. Calculate how many records each consumer
        consumed and printed on the terminal
    2. Use "group.id" property in consumer config for both consumers and mention same group_ids in kafka_consumer_1.py & kafka_consumer_2.py,<br>
        apply "earliest" offset property in both consumers and run these two consumers from two different terminals. Calculate how many records each consumer
        consumed and printed on the terminal

In [None]:
import argparse

from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry.json_schema import JSONDeserializer

### Kafka_consumer_1.py

In [None]:
def sasl_conf():

    sasl_conf = {'sasl.mechanism': SSL_MACHENISM,
                 # Set to SASL_SSL to enable TLS support.
                #  'security.protocol': 'SASL_PLAINTEXT'}
                'bootstrap.servers':BOOTSTRAP_SERVER,
                'security.protocol': SECURITY_PROTOCOL,
                'sasl.username': API_KEY,
                'sasl.password': API_SECRET_KEY
                }
    return sasl_conf

def schema_config():
    return {'url':ENDPOINT_SCHEMA_URL,
    'basic.auth.user.info':f"{SCHEMA_REGISTRY_API_KEY}:{SCHEMA_REGISTRY_API_SECRET}"
    }

class Car:   
    def __init__(self,record:dict):
        for k,v in record.items():
            setattr(self,k,v)
        
        self.record=record
   
    @staticmethod
    def dict_to_car(data:dict,ctx):
        return Car(record=data)

    def __str__(self):
        return f"{self.record}"


def kafka_consumer(topic, schema_id):
    schema_registry_conf = schema_config()
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    print('reading schema')
    schema_str = schema_registry_client.get_schema(schema_id).schema_str
    print(schema_str)
    json_deserializer = JSONDeserializer(schema_str,
                                         from_dict=Car.dict_to_car)

    print("deserializer start")
    consumer_conf = sasl_conf()
    consumer_conf.update({
                     'group.id': 'group1',
                     'auto.offset.reset': "latest"})

    print('consumer')
    consumer = Consumer(consumer_conf)
    consumer.subscribe([topic])


    while True:
        try:
            # SIGINT can't be handled when polling, limit timeout to 1 second.
            msg = consumer.poll(1.0)
            if msg is None:
                continue

            car = json_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))

            if car is not None:
                print("User record {}: car: {}\n"
                      .format(msg.key(), car))
        except KeyboardInterrupt:
            break

    consumer.close()

In [None]:
topic = 'topic_case'
schema_id = 100005

kafka_consumer(topic, schema_id)

### Kafka_consumer_2.py

## Step 7: Once above questions are done, write another kafka consumer to read data from kafka topic and from the consumer code create one csv file "output.csv" and append consumed records output.csv file