The 'ml' service should be up and running waiting to consume messages from the KAFKA_CONSUMER_TOPIC set in the docker-compose.yaml file for the 'ml' service. Run the bash script below and see the outputs.


In [None]:
# jupyterlab_vim

In [None]:
# create topic for ml service to publish to

In [1]:
! python /home/app/kafka/src/create_topic.py --topic=book_emotions

<BrokerConnection node_id=bootstrap-0 host=kafka-broker:9092 <connecting> [IPv4 ('172.23.0.5', 9092)]>: connecting to kafka-broker:9092 [('172.23.0.5', 9092) IPv4]
Probing node bootstrap-0 broker version
<BrokerConnection node_id=bootstrap-0 host=kafka-broker:9092 <connecting> [IPv4 ('172.23.0.5', 9092)]>: Connection complete.
Broker version identified as 2.6.0
Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
Probing node bootstrap-0 broker version
Broker version identified as 2.6.0
Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
<BrokerConnection node_id=1 host=kafka-broker:9092 <connecting> [IPv4 ('172.23.0.5', 9092)]>: connecting to kafka-broker:9092 [('172.23.0.5', 9092) IPv4]
Probing node 1 broker version
<BrokerConnection node_id=1 host=kafka-broker:9092 <connecting> [IPv4 ('172.23.0.5', 9092)]>: Connection complete.
<BrokerConnection node_id=bootstrap-0 host=kafka-broker:9092 <connected> [IPv4 ('172.23.

In [None]:
# create and publish source topic for ml service to ingest from

In [2]:
! python /home/app/kafka/src/create_topic.py --topic=book --file=/home/app/data/brothers-karamazov.txt --format=json

<BrokerConnection node_id=bootstrap-0 host=kafka-broker:9092 <connecting> [IPv4 ('172.23.0.5', 9092)]>: connecting to kafka-broker:9092 [('172.23.0.5', 9092) IPv4]
Probing node bootstrap-0 broker version
<BrokerConnection node_id=bootstrap-0 host=kafka-broker:9092 <connecting> [IPv4 ('172.23.0.5', 9092)]>: Connection complete.
Broker version identified as 2.6.0
Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
Probing node bootstrap-0 broker version
Broker version identified as 2.6.0
Set configuration api_version=(2, 6, 0) to skip auto check_version requests on startup
<BrokerConnection node_id=1 host=kafka-broker:9092 <connecting> [IPv4 ('172.23.0.5', 9092)]>: connecting to kafka-broker:9092 [('172.23.0.5', 9092) IPv4]
Probing node 1 broker version
<BrokerConnection node_id=1 host=kafka-broker:9092 <connecting> [IPv4 ('172.23.0.5', 9092)]>: Connection complete.
<BrokerConnection node_id=bootstrap-0 host=kafka-broker:9092 <connected> [IPv4 ('172.23.

In [3]:
## output of ml inference
from kafka import KafkaConsumer
from kafka.admin import KafkaAdminClient

bootstrap_servers = "kafka-broker:9092"

admin_client = KafkaAdminClient(
    bootstrap_servers=bootstrap_servers,
    client_id="demo",
)

In [4]:
producer_topic = os.getenv("KAFKA_PRODUCER_TOPIC", "book_emotions")
producer_topic = KafkaConsumer(
    producer_topic,
    bootstrap_servers=["kafka-broker:9092"],
    auto_offset_reset="earliest",
    enable_auto_commit=False,
)

result_batch = producer_topic.poll(timeout_ms=1000)
for partition, messages in result_batch.items():
    for message in messages:
        print(message.value.decode("utf-8"))
producer_topic.close()

{"emotion": "neutral", "text": "The Project Gutenberg eBook of The Brothers Karamazov", "line": 1}
{"emotion": "neutral", "text": "", "line": 2}
{"emotion": "neutral", "text": "This ebook is for the use of anyone anywhere in the United States and", "line": 3}
{"emotion": "neutral", "text": "most other parts of the world at no cost and with almost no restrictions", "line": 4}
{"emotion": "neutral", "text": "whatsoever. You may copy it, give it away or re-use it under the terms", "line": 5}
{"emotion": "neutral", "text": "of the Project Gutenberg License included with this ebook or online", "line": 6}
{"emotion": "neutral", "text": "at www.gutenberg.org. If you are not located in the United States,", "line": 7}
{"emotion": "neutral", "text": "you will have to check the laws of the country where you are located", "line": 8}
{"emotion": "neutral", "text": "", "line": 10}
{"emotion": "fear", "text": "before using this eBook.", "line": 9}
{"emotion": "neutral", "text": "Title: The Brothers K

In [None]:
# ingest this new topic into rising wave for further analysis

In [5]:
! export PYTHONPATH=../risingwave:$PYTHONPATH && python ../risingwave/src/main.py --topic='book_emotions' --bootstrap-servers='kafka-broker:9092' --schema='{"emotion": "VARCHAR", "text": "VARCHAR", "line": "INTEGER"}' --source=kafka


                    CREATE SOURCE IF NOT EXISTS "book_emotions" (
                        emotion VARCHAR, text VARCHAR, line INTEGER
                        )
                    INCLUDE header AS kafka_header,
                    INCLUDE timestamp AS kafka_timestamp
                    WITH (
                        connector = 'kafka',
                        topic='book_emotions',
                        properties.bootstrap.server='kafka-broker:9092',
                        scan.startup.mode='earliest',
                        properties.client.id='risingwave',
                        properties.fetch.queue.backoff.ms=1000,
                        properties.statistics.interval.ms=1000
                    ) FORMAT PLAIN ENCODE JSON;
                    


In [6]:
import psycopg2
import pandas as pd

conn = psycopg2.connect(host="risingwave", port=4566, user="root", dbname="dev")
conn.autocommit = True

with conn.cursor() as cur:
    cur.execute(
        "SELECT kafka_timestamp, line, text, emotion FROM book_emotions LIMIT 10;"
    )
    values = cur.fetchall()

In [7]:
df = pd.DataFrame(values, columns=["timestamp", "line", "text", "emotion"])
print(df.tail())

                         timestamp  line  \
5 2025-03-09 00:30:20.075000+00:00     6   
6 2025-03-09 00:30:20.113000+00:00     7   
7 2025-03-09 00:30:20.113000+00:00     8   
8 2025-03-09 00:30:20.151000+00:00    10   
9 2025-03-09 00:30:20.151000+00:00     9   

                                                text  emotion  
5  of the Project Gutenberg License included with...  neutral  
6  at www.gutenberg.org. If you are not located i...  neutral  
7  you will have to check the laws of the country...  neutral  
8                                                     neutral  
9                           before using this eBook.     fear  


In [8]:
with conn.cursor() as cur:
    cur.execute(
        "SELECT emotion, count(*) FROM book_emotions group by emotion order by emotion;"
    )
    values = cur.fetchall()
summary = pd.DataFrame(values, columns=["emotion", "count"])

In [9]:
print(summary["count"].sum())
summary

2165


Unnamed: 0,emotion,count
0,anger,180
1,disgust,278
2,fear,133
3,joy,131
4,neutral,1066
5,sadness,206
6,surprise,171


In [10]:
with conn.cursor() as cur:
    cur.execute(
        "select * from book_emotions_view LIMIT 10;"
    )
    values = cur.fetchall()
view = pd.DataFrame(values, columns=['emotions', 'text', 'line', 'timestamp', 'metadata'])
print(view.head())
metadata = view['metadata'].loc[0]
print(type(metadata))
print(metadata)


  emotions                                               text  line  \
0  neutral  The Project Gutenberg eBook of The Brothers Ka...     1   
1  neutral                                                        2   
2  neutral  This ebook is for the use of anyone anywhere i...     3   
3  neutral  most other parts of the world at no cost and w...     4   
4  neutral  whatsoever. You may copy it, give it away or r...     5   

                         timestamp  \
0 2025-03-09 00:30:19.405000+00:00   
1 2025-03-09 00:30:19.405000+00:00   
2 2025-03-09 00:30:20.032000+00:00   
3 2025-03-09 00:30:20.033000+00:00   
4 2025-03-09 00:30:20.074000+00:00   

                                            metadata  
0  {"(content-type,\"\\\\x6170706c69636174696f6e2...  
1  {"(content-type,\"\\\\x6170706c69636174696f6e2...  
2  {"(content-type,\"\\\\x6170706c69636174696f6e2...  
3  {"(content-type,\"\\\\x6170706c69636174696f6e2...  
4  {"(content-type,\"\\\\x6170706c69636174696f6e2...  
<class 'str'>
