# Testing Kafka Streams

In [20]:
import quixstreams
import time
import uuid
from quixstreams import Application
import kafka
import json

# Adding Data to Kafka

In [28]:
app = Application(broker_address="localhost:9092", consumer_group="text-splitter-v1")

# Define a topic with chat messages in JSON format
messages_topic = app.topic(name="messages", value_serializer="json")

messages = [
    {"chat_id": "id1", "text": "Lorem ipsum dolor sit amet"},
    {"chat_id": "id2", "text": "Consectetur adipiscing elit sed"},
    {"chat_id": "id1", "text": "Do eiusmod tempor incididunt ut labore et"},
    {"chat_id": "id3", "text": "Mollis nunc sed id semper"},
]

In [29]:
with app.get_producer() as producer:
    for message in messages:
        # Serialize chat message to send it to Kafka
        # Use "chat_id" as a Kafka message key
        kafka_msg = messages_topic.serialize(key=message["chat_id"], value=message)

        # Produce chat message to the topic
        print(f'Produce event with key="{kafka_msg.key}" value="{kafka_msg.value}"')
        producer.produce(
            topic=messages_topic.name,
            key=kafka_msg.key,
            value=kafka_msg.value,
        )

[2024-07-02 22:29:08,268] [INFO] [quixstreams] : Topics required for this application: "messages"


KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}

In [30]:
from quixstreams import Application

# Create an Application - the main configuration entry point
app = Application(
    broker_address="localhost:9092",
    consumer_group="text-splitter-v1",
    auto_offset_reset="earliest",
)

# Define a topic with chat messages in JSON format
messages_topic = app.topic(name="messages", value_deserializer="json")

# Create a StreamingDataFrame - the stream processing pipeline
# with a Pandas-like interface on streaming data
sdf = app.dataframe(topic=messages_topic)

# Print the input data
sdf = sdf.update(lambda message: print(f"Input:  {message}"))

# Define a transformation to split incoming sentences
# into words using a lambda function
sdf = sdf.apply(
    lambda message: [{"text": word} for word in message["text"].split()],
    expand=True,
)

# Calculate the word length and store the result in the column
sdf["length"] = sdf["text"].apply(lambda word: len(word))

# Print the output result
sdf = sdf.update(lambda word: print(f"Output: {word}"))

# Run the streaming application
if __name__ == "__main__":
    app.run(sdf)

[2024-07-02 22:37:38,940] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': 'localhost:9092'}" consumer_group="text-splitter-v1" auto_offset_reset="earliest" commit_interval=5.0s
[2024-07-02 22:37:38,941] [INFO] [quixstreams] : Topics required for this application: "messages"


KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}

In [22]:
# Connect to the public Quix hosted broker to consume data
app = Application(
    broker_address="localhost:9092",  # Kafka broker address
    consumer_group=str(uuid.uuid4()),  # Kafka consumer group
    producer_extra_config={'enable.idempotence': False}
)

print(app)

<quixstreams.app.Application object at 0x000001708AB708B0>


In [24]:
input_topic = app.topic("cardata", value_deserializer='json')

print(input_topic)

<Topic name="cardata"> 


In [26]:
# Converts JSON to a Streaming DataFrame (SDF) tabular format
sdf = app.dataframe(input_topic)
sdf["tokens_count"] = sdf["message"].apply(lambda message: len(message.split(" ")))
sdf = sdf[["role", "tokens_count"]]
sdf = sdf.update(lambda row: print(row))

print(sdf)

<quixstreams.dataframe.dataframe.StreamingDataFrame object at 0x000001708AB71BD0>


In [27]:
app.run(sdf)

[2024-07-02 22:24:27,620] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': 'localhost:9092'}" consumer_group="8364a7a5-db1a-4d9d-b988-aa6e19cb1261" auto_offset_reset="latest" commit_interval=5.0s
[2024-07-02 22:24:27,621] [INFO] [quixstreams] : Topics required for this application: "cardata"


KafkaException: KafkaError{code=_TRANSPORT,val=-195,str="Failed to get metadata: Local: Broker transport failure"}