![Top <](./images/watsonxdata.png "watsonxdata")

# Kafka Jupyter Notebook

Apache Kafka® <sup style="color: blue;">1,2</code></sup> is an event streaming platform. Kafka combines three capabilities so you can implement your use cases for event streaming.

* To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
* To store streams of events durably and reliably for as long as you want.
* To process streams of events as they occur or retrospectively.

#### Producers and Consumers
Producers are those client applications that publish (write) events to Kafka, and consumers are those that subscribe to (read and process) these events. In Kafka, producers and consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for. For example, producers never need to wait for consumers. Kafka provides various guarantees such as the ability to process events exactly-once.

#### Events
Events are organized and durably stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder. An example topic name could be "payments". Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events. Events in a topic can be read as often as needed—unlike traditional messaging systems, events are not deleted after consumption. Instead, you define for how long Kafka should retain your events through a per-topic configuration setting, after which old events will be discarded. Kafka's performance is effectively constant with respect to data size, so storing data for a long time is perfectly fine.

<p style="font-size: small;line-height: 0.4;"><sup>1. Additional details can be found on the <a href="https://kafka.apache.org/documentation/#gettingStarted">Kafka website</a>.</sup></p>
<p style="font-size: small;line-height: 0.4;"><sup>2. Thanks to Chunyu Jiang for prototyping the Kafka connector and suggestions on the code.</sup></p>

## Creating a Kafka Service

The first step is to create a Kafka service in our system. The following command will create the control file required to create the Kafka Docker images. The system does not contain any topics when it starts up.

In [None]:
kafka_compose = '''
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://watsonxdata:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
'''
with open("kafka-compose.yaml","w") as fd:
    fd.write(kafka_compose)

Now we can start the Kafka and Zookeeper service. The default Kafka port is 29092. 

In [None]:
%system docker compose -p kafka -f kafka-compose.yaml up --detach

### Load Kafka Libraries
Before we start using the system, we need to load the Python libraries required to communicate with Kafka.

In [None]:
%system python3 -m pip install confluent-kafka

### Open up the Kafka Port
We need to open the port that is used by Kafka so the client can communicate with the service.

In [None]:
%system firewall-cmd --add-port=29092/tcp --zone=public --permanent
%system firewall-cmd --reload

## Create a Topic

Message queues are referred to as topics in the Kafka documentation. We need to create a topic that Kafka will use in our system. The following code will connect to the Kafka server and add the topic <code style="color:blue;background-color:transparent;">watsonxdata</code>. The first step is to establish a connection to the Kafka service. If you receive an error message when running this code, it indicates that the Kafka service has not yet initialized. Try running the code again after a few seconds and then check to see if the topic has been created.

In [None]:
from confluent_kafka.admin import AdminClient, NewTopic

admin_client = AdminClient(
    {
        'bootstrap.servers': 'watsonxdata:29092',
    }
)

try:
    res = admin_client.create_topics(new_topics=[
            NewTopic(
                topic='watsonxdata',
                num_partitions=1,
                replication_factor=1,
            )
        ],
        validate_only=False,
        operation_timeout=10,
    )
except Exception as err:
    print(repr(err))

Check that our topic is there. 

In [None]:
try:
    topics = False
    metadata = admin_client.list_topics()
    for t in iter(metadata.topics.values()):
        topics = True
        print(f"Topic: {t.topic}")   
except Exception as err:
    print(repr(err))   

if (topics == False):
    print("No topics found")

## Registering the Kafka Service
The next step is to register the Kafka service with watsonx.data. Navigate to your watsonx.data UI screen and click on the infrastructure icon.

![Add Database](images/watsonx-kafka-add-database.png)

Select the Add database option and select Apache Kafka as the source.

![Add Database](images/watsonx-kafka-add-kafka.png)

Enter the following settings into the dialog.

* Display name - <code style="color:blue;background-color: transparent;">kafka</code>
* Hostname - <code style="color:blue;background-color: transparent;">watsonxdata<code style="color:blue;background-color: transparent;">
* Port - <code style="color:blue;background-color: transparent;">29092<code style="color:blue;background-color: transparent;">
* Topics - <code style="color:blue;background-color: transparent;">watsonxdata<code style="color:blue;background-color: transparent;">

If your Kafka service had multiple topics, you can list them all in the Topics line.

![Add Database](images/watsonx-kafka-settings.png)

Once you have entered these values into the panel, use the Test Connection link to make sure the Kafka server is reachable. Once you register the Kafka service, you should see it reflected in the infrastructure screen.

![Add Database](images/watsonx-kafka-infrastructure.png)

At this point you must associate the Kafka service with the Presto engine. Hover over the Kafka catalog until you see the association icon. Click on that to view the associate dialog.

![Add Database](images/watsonx-kafka-associate.png)

Select the presto-01 engine and press Save and restart engine. The Kafka service should now show as connected to the Presto engine.

![Add Database](images/watsonx-kafka-associated.png)

It may take a few moments for the Presto engine to restart so any error codes you may receive in the following steps are due to the Presto service not being available yet.

Switch to the Data Manager view.

![Add Database](images/watsonx-kafka-datamanger-1.png)

You should see the Kafka catalog in the list on the left. The schema is called <code style="color:blue;background-color: transparent;">default</code>. If you don't see the service, you may need to refresh the screen to wait for Presto to become available. Clicking on the <code style="color:blue;background-color: transparent;">watsonxdata</code> topic (which is mapped to a table) will display information about this queue.

![Add Database](images/watsonx-kafka-datamanger-2.png)

When data is loaded into the queue, you will be able to click on the Data sample tab to look at the messages that have come in.

![Add Database](images/watsonx-kafka-data.png)


Note that the data will not be shown on your system yet since the messages have not yet been loaded into the queue.

## Producers and Consumers
Kafka uses the concept of producers and consumers which was discussed at the beginning of this notebook. A producer will generate records that are sent to the Kafka topic (queue) and held for consumers to read. In this system, we are going to write a Python program that will read the customer.csv file (found in the staging-bucket directory) and place the records into the Kafka queue (watsonxdata) at a rate of 10 record per second. As these records are produced, we can view the queue in the watsonx.data UI or through SQL.

The first step is to register a producer connection.

In [None]:
from confluent_kafka import Producer

producer = Producer(
   {'bootstrap.servers': 'watsonxdata:29092'}
)

### Read the CSV file into Pandas
This code will import the CSV file and create a JSON record for each one.

In [None]:
import csv

csv_in = "staging-bucket/customer.csv"
csv_in = "staging-bucket/orders.csv"
with open(csv_in, newline='') as csvfile:
    reader = csv.DictReader(csvfile)
    data = [row for row in reader]

### Create the Workload Function
When you run the next block of code, it will take each JSON record and send it to the Kafka queue. There is a check done to make sure that the <code style="color:blue;background-color: transparent;">unit_price</code> value is a number and positive. There are some records in the data set that are invalid and this will remove them from the data. 

The function will first delete the contents of the Topic queue (watsonxdata) and then it will insert records per second into the queue. If you run the function in the foreground, there is no delay in the insert time. If the function is run in the background, it will insert 20 rows per second. 

Running the next cell will only register the function, not run it. You have two options for running this code:

  * <code style="color:blue;background-color: transparent;">sendMessages(True)</code> - The mode will run in the foreground, placing the records in the queue (No delay)
  * <code style="color:blue;background-color: transparent;">sendMessages(False)</code> - This is the default mode where the code will run in the background (10 messages/sec)

In [None]:
def sendMessages(inline=False):
    
    from time import sleep
    from multiprocessing.pool import ThreadPool    
    from confluent_kafka.admin import AdminClient, KafkaException
    import json
    import decimal

    global running

    def callback(message):
        global running
        running = False
    
    def insertMessages(inline):

        if (inline == True):
            pause = .01
        else:
            pause = .05        
        
        row_count = len(data)
        current_row = 0
        if (inline == True):
            print(f"Loading {row_count} rows into the queue")
        for row in data:
            current_row += 1  
            order_id    = row.get("order_id","").strip()
            order_date  = row.get("order_date","").strip()
            customer_id = row.get("customer_id","").strip()
            product_id  = row.get("product_id","").strip()
            quantity    = row.get("quantity","").strip()
            unit_price  = row.get("unit_price","").strip()
            if (order_id in [None,""] or
                order_date in [None,""] or
                customer_id in [None,""] or
                product_id in [None,""] or
                quantity in [None,""] or
                unit_price in [None,""]):
                if (inline == True): 
                    print(f"\nRow {current_row:4d} Skipped (invalid data)")
                continue
            
            try:
                uprice = decimal.Decimal(unit_price)
                if (uprice <= 0):
                    if (inline == True): 
                        print(f"\nRow {current_row:4d} Skipped (invalid price)")
                    continue
            except:
                if (inline == True): 
                    print(f"\nRow {current_row:4d} Skipped (invalid price)")
                continue

            if (inline == True):
                print(f"Row {current_row:4d}/{row_count} Loaded                    ",end="\r")
        
            try:
                producer.produce("watsonxdata",value=json.dumps(row))
                producer.flush()
            except Exception as err:
                return False
            sleep(pause)

        return False

    running = True
 
    if (inline == False):
        pool = ThreadPool(processes=1)
        async_result = pool.apply_async(insertMessages, (False,), callback=callback)    
        print("Messages are being inserted in the background")
    else:
        insertMessages(True)

## Connect to watsonx.data

You can now connect to this Kafka queue through watsonx.data using a Presto connection. First load the Presto Magic commands.

In [None]:
%run presto.ipynb

We then connect to the watsonx.data engine with a connect command.

In [None]:
%%sql
   connect
   userid=ibmlhadmin
   password=password
   hostname=watsonxdata
   port=8443
   catalog=tpch
   schema=tiny
   certfile=/certs/lh-ssl-ts.crt

### Delete the Contents of a Topic
The data is available through the <code style="color:blue;background-color: transparent;">kafka</code> catalog. The schema is called <code style="color:blue;background-color: transparent;">default</code> when it is added to the watsonx.data system. The table name will be the name of the topic (queue). The next cell will make sure that there is no data in the queue.

In [None]:
admin_client = AdminClient(
    {
        'bootstrap.servers': 'watsonxdata:29092',
    }
)    

try:
    metadata = admin_client.delete_topics(['watsonxdata'])
    for topic, f in metadata.items():
        print(f"Topic {topic} deleted")
    
except Exception as err:
    print(repr(err))   

The Kafka server has been configured to auto-create Topics. What this means is that when a Topic is deleted (and the messages discarded), the Topic no longer exists, but will be recreated when a producer or consumer sends a request to use that topic. Trying to run the above command will result in an error message after a Topic has been deleted. However, the Topic does exist in the Kafka catalog.

You can check that the topic does not exist using the next command.

In [None]:
try:
    topics = False
    metadata = admin_client.list_topics()
    for t in iter(metadata.topics.values()):
        topics = True
        print(f"Topic: {t.topic}")   
except Exception as err:
    print(repr(err))   

if (topics == False):
    print("No topics found")

We can also check using the watsonx.data connection to get a count of messages in the Topic.

In [None]:
%sql select count(*) from  "kafka"."default"."watsonxdata"

Note how the topic has now been recreated!

In [None]:
try:
    topics = False
    metadata = admin_client.list_topics()
    for t in iter(metadata.topics.values()):
        topics = True
        print(f"Topic: {t.topic}")   
except Exception as err:
    print(repr(err))   

if (topics == False):
    print("No topics found")  

### Start the Workload
The next statement will begin inserting messages into the queue. If you want to load all messages immediately, change the <code style="color:blue;background-color: transparent;">run_foreground</code> parameter from <code style="color:blue;background-color: transparent;">False</code> to <code style="color:blue;background-color: transparent;">True</code>. Approximately 500 rows will be placed into the queue.

In [None]:
run_foreground = False
sendMessages(run_foreground)

If the program is running in the background, we can check the progress by using SQL. The sendMessages code (above) will set a Python variable (running) to False when it stops creating messages.

The next block of code will run until the insert process is complete. The code checks the record count every second.

In [None]:
from time import sleep
while (running == True):
    x = %sql --raw select count(*) from "kafka"."default"."watsonxdata"
    print(f"SQL count  : {x[0][0]:4}",end="\r")
    sleep(1)

x = %sql --raw select count(*) from "kafka"."default"."watsonxdata"
print(f"\nFinal count: {x[0][0]:4}")

## Kafka Consumer
Once the messages are on the queue, you can read them by using a Kafka Consumer. This block of code will connect to the <code style="color:blue;background-color: transparent;">watsonxdata</code> Topic and read the records on the queue. The returned JSON records will be shredded and placed into a Pandas dataframe for viewing.

The <code style="color:blue;background-color: transparent;">group.id</code> in the program below is generated each time the code is run. This guarantees that the program will see all of the messages since the topic was created (offest: earliest). If you place a value in there (i.e., watsonx), the program will read the data on the first run. After that, it will only read messages that are "new". The code will exit after 10 attempts at reading something from the topic. 

In [None]:
from confluent_kafka import Consumer
from IPython.display import display, HTML
import uuid

consumer = Consumer(
   {
    'bootstrap.servers'   : 'watsonxdata:29092',
    'group.id'            : uuid.uuid1(),
    'session.timeout.ms'  : 6000,
    'default.topic.config': {
        'auto.offset.reset': 'earliest'
    } 
   }
)

consumer.subscribe(["watsonxdata"])

rows = []

retries = 0

try:
    while True:
        msg = consumer.poll(1)
        if msg is None:
            retries += 1
            if (retries <= 10 and len(rows) == 0):
                pass
            else:
                break
        elif msg.error():
            print("Error: %s".format(msg.error()))
            break
        else:
            retries = 0
            row = msg.value().decode('utf8')
            rows.append(row)
            
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

if (len(rows) > 0):
    json_array = [ json.loads(row) for row in rows ]
    df = pd.DataFrame(json_array)
    display(HTML(df.to_html()))
else:
    print("No records found")

## Watsonx.data SQL
You can view the data in watsonx.data using the SQL interface in the UI. You can also use Presto SQL to view the data. The following example will shred the message contents into column using the built-in JSON functions.

In [None]:
%%sql
SELECT
  cast(json_extract_scalar(_message,'order_id') as integer) as order_id,
  cast(json_extract_scalar(_message,'order_date') as date) as order_date,
  cast(json_extract_scalar(_message,'customer_id') as integer) as customer_id,
  cast(json_extract_scalar(_message,'product_id') as integer) as product_id,
  cast(json_extract_scalar(_message,'quantity') as integer)  as quantity,
  cast(json_extract_scalar(_message,'unit_price') as decimal(7,2)) as unit_price
FROM
  "kafka"."default"."watsonxdata"

### Clear the Messages
We can clear the messages from the topic (Queue) and attempt to query the data again to recreate the topic.

In [None]:
admin_client = AdminClient(
    {
        'bootstrap.servers': 'watsonxdata:29092',
    }
)    
try:
    res = admin_client.delete_topics(['watsonxdata'])
except Exception as err:
    print(repr(err))

%sql select count(*) from "kafka"."default"."watsonxdata"

## Shutdown Kafka
The following code will shutdown our Kafka service. This will remove all of the Topics that were created so that the scripts in this section can be run again.

In [None]:
%system docker compose -p kafka -f kafka-compose.yaml down

#### Credits: IBM 2024, George Baklarz [baklarz@ca.ibm.com], Chunyu Jiang [Chunyu.Jiang@ibm.com]