# Continuously ingest documents into a vector store using Apache Kafka
Stream data from a CSV (simulating CDC or "Change Data Capture") and ingest it into a vector store: for semantic search.

This demo features the following open source libraries:

* **Quix Streams** to produce data to, and consume data from, Apache Kafka.

* **Qdrant** to create a database to store embeddings and for basic similarity search


## Setup

Install the libraries and Apache Kafka, then start the Kafka servers.

### 1. Install the main dependencies

Dependencies include:the Quix Streams library, Qdrant library, and the sentence transformers library (we'll use the default sentence transformers embedding model).


In [1]:
%pip install quixstreams qdrant-client sentence-transformers pandas

Note: you may need to restart the kernel to use updated packages.


### 3. Download and setup Kafka and Zookeeper instances

Using the default configurations (provided by Apache Kafka) for spinning up the instances.


In [2]:
!brew install java
!brew install kafka

[34m==>[0m [1mDownloading https://formulae.brew.sh/api/formula.jws.json[0m

[34m==>[0m [1mDownloading https://formulae.brew.sh/api/cask.jws.json[0m

To reinstall 22.0.2, run:
  brew reinstall openjdk
To reinstall 3.8.0, run:
  brew reinstall kafka


Run below cmd in local terminal to start zookeeper and kafka servers.
```sh
zookeeper-server-start -daemon /opt/homebrew/etc/kafka/zookeeper.properties
kafka-server-start -daemon /opt/homebrew/etc/kafka/server.properties
echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
sleep 10
```

### 4. Check that the Kafka Daemons are running

Show the running daemon processes by filtering the list for the keyword "kafka" while excluding the grep process itself


In [31]:
!ps aux | grep 'kafka'

yuc10            74662   0.5  0.6 415547760 100688 s000  S     2:08PM   0:11.32 /opt/homebrew/opt/openjdk/libexec/openjdk.jdk/Contents/Home/bin/java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/opt/homebrew/Cellar/kafka/3.8.0/libexec/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M -Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/opt/homebrew/Cellar/kafka/3.8.0/libexec/bin/../logs -Dlog4j.configuration=file:/opt/homebrew/Cellar/kafka/3.8.0/libexec/bin/../config/log4j.properties -cp /opt/homebrew/Cellar/kafka/3.8.0/libexec/bin/../libs/activation-1.1.1.jar:/opt/homebrew/Cellar/kafka/3.8.0/libexec/bin/../libs/aopalliance-repackaged-2.6.1.jar:/opt/homebrew/Cellar/kafka/3.8.0/libexec/bin/../libs/argparse4j-0.7.0.jar:/opt/homebrew/Ce

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


### 5. Import all the libraries and set some constants

Import the required libraries including Quix and Qdrant and set some constants for frequently used variables.


In [13]:
import json
import time
import uuid
import os
import pandas as pd
from qdrant_client import models, QdrantClient

# Quix stuff
from quixstreams.kafka import Producer
from quixstreams import Application, State
from sentence_transformers import SentenceTransformer

docs_topic_name = 'docs_books'
vectors_topic_name = 'vectors_all-minilm-l6-v2'
consumergroup_name = 'qdrant-demo'
collectionname = "book-catalog"

  from tqdm.autonotebook import tqdm, trange


## 1. First ingestion pass

* In the first pass, we'll add some initial entries to a "book-catalog" vector store via Kafka, then search the vector store to check that the data was ingested correctly.
* In the second round we'll go through the whole process again (albeit faster) with new data, and see how the matches change for the same search query.

### 1.1 Create data

Create sample data and dump it to CSV (so we can get used to producing to Kafka from a CSV file in the next steps)


In [14]:
# Let's create a dataset based on sci-fi books.
documents = [
  { "name": "The Time Machine", "description": "A man travels through time and witnesses the evolution of humanity.", "author": "H.G. Wells", "year": 1895 },
  { "name": "Ender's Game", "description": "A young boy is trained to become a military leader in a war against an alien race.", "author": "Orson Scott Card", "year": 1985 },
  { "name": "Brave New World", "description": "A dystopian society where people are genetically engineered and conditioned to conform to a strict social hierarchy.", "author": "Aldous Huxley", "year": 1932 },
  {"name": "An Absolutely Remarkable Thing", "description": "A young woman becomes famous after discovering a mysterious alien artifact in New York City.", "author": "Hank Green", "year": 2018},
  { "name": "Dune", "description": "A desert planet is the site of political intrigue and power struggles.", "author": "Frank Herbert", "year": 1965 },
  { "name": "Foundation", "description": "A mathematician develops a science to predict the future of humanity and works to save civilization from collapse.", "author": "Isaac Asimov", "year": 1951 },
  { "name": "Snow Crash", "description": "A futuristic world where the internet has evolved into a virtual reality metaverse.", "author": "Neal Stephenson", "year": 1992 },
  { "name": "Neuromancer", "description": "A hacker is hired to pull off a near-impossible hack and gets pulled into a web of intrigue.", "author": "William Gibson", "year": 1984 },
  { "name": "The War of the Worlds", "description": "A Martian invasion of Earth throws humanity into chaos.", "author": "H.G. Wells", "year": 1898 },
  { "name": "The Hunger Games", "description": "A dystopian society where teenagers are forced to fight to the death in a televised spectacle.", "author": "Suzanne Collins", "year": 2008 },
  { "name": "The Andromeda Strain", "description": "A deadly virus from outer space threatens to wipe out humanity.", "author": "Michael Crichton", "year": 1969 },
  { "name": "The Left Hand of Darkness", "description": "A human ambassador is sent to a planet where the inhabitants are genderless and can change gender at will.", "author": "Ursula K. Le Guin", "year": 1969 },
  { "name": "The Time Traveler's Wife", "description": "A love story between a man who involuntarily time travels and the woman he loves.", "author": "Audrey Niffenegger", "year": 2003 }
]

# Convert the list of dictionaries to a DataFrame
df = pd.DataFrame(documents)
# Save the DataFrame to a CSV file - producing from a CSV is a common test case when setting up a producer application.
df.to_csv('documents.csv')

### 1.2 Initialize the Quix Producer to send the docs to Kafka

Load the CSV file back in again and interate through it with the Quix Producer.


In [15]:
df = pd.read_csv('documents.csv')
outputtopicname = docs_topic_name
offsetlimit = len(df)-2
print(f"Producing to output topic: {outputtopicname}...\n\n")

with Producer(
    broker_address="127.0.0.1:9092",
    extra_config={"allow.auto.create.topics": "true"},
) as producer:
    for index, row in df.iterrows():
        doc_id = index
        doc_key = f"A{'0'*(10-len(str(doc_id)))}{doc_id}"
        doc_uuid = str(uuid.uuid4())
        value = {
            "Timestamp": time.time_ns(),
            "doc_id": doc_id,
            "doc_uuid": doc_uuid,
            "doc_name": row['name'],
            "doc_descr": row['description'],
            "doc_year": row['year'],
        }
        print(f"Producing value: {value}")
        producer.produce(
            topic=outputtopicname,
            headers=[("uuid", doc_uuid)],  # a dict is also allowed here
            key=doc_key,
            value=json.dumps(value),  # needs to be a string
        )

Producing to output topic: docs_books...


Producing value: {'Timestamp': 1724325086086099000, 'doc_id': 0, 'doc_uuid': '129e29ce-e4dd-4efe-a677-fa383b79098e', 'doc_name': 'The Time Machine', 'doc_descr': 'A man travels through time and witnesses the evolution of humanity.', 'doc_year': 1895}
Producing value: {'Timestamp': 1724325086139462000, 'doc_id': 1, 'doc_uuid': 'e83286b7-7477-4885-941b-534e016177a6', 'doc_name': "Ender's Game", 'doc_descr': 'A young boy is trained to become a military leader in a war against an alien race.', 'doc_year': 1985}
Producing value: {'Timestamp': 1724325086139545000, 'doc_id': 2, 'doc_uuid': 'bdb33880-a55d-4555-a2a3-028acd5939c0', 'doc_name': 'Brave New World', 'doc_descr': 'A dystopian society where people are genetically engineered and conditioned to conform to a strict social hierarchy.', 'doc_year': 1932}
Producing value: {'Timestamp': 1724325086139583000, 'doc_id': 3, 'doc_uuid': 'd76a0b98-9532-46a8-9e55-e5a57c804349', 'doc_name': 'An Absolutely R

### 1.3 Consume and enrich the data

Create a Quix Consumer/Producer to:
 1. **Consume**: read the docs from the docs topic, create embeddings for each doc
 3. **Enrich**: add the embeddings to docs data
 4. **Produce**: write the enriched data to a downstream vectors topic

Set the input/output topics and initialize the embedding model


In [16]:
inputtopicname = docs_topic_name
outputtopicname = vectors_topic_name
encoder = SentenceTransformer('all-MiniLM-L6-v2') # Model to create embeddings



Define the embedding function


In [17]:
def create_embeddings(row):
    text = row['doc_descr']
    embeddings = encoder.encode(text)
    embedding_list = embeddings.tolist() # Conversion step because SentenceTransformer outputs a numpy array but Qdrant expects a plain list
    print(f'Created vector: "{embedding_list}"')
    time.sleep(0.2) # Adding small pause since Colab sometimes chokes

    return embedding_list

Start the transformation process (consume->enrich->produce)


In [18]:
# Create a special stop condition just for this Notebook (otherwise the cell will run indefinitely)
print(f"Using offset limit {offsetlimit}")
def on_message_processed(topic, partition, offset):
    if offset > offsetlimit:
        app.stop()

# Define the consumer application and settings
app = Application(
    broker_address="127.0.0.1:9092",
    consumer_group=consumergroup_name,
    auto_offset_reset="earliest",
    on_message_processed=on_message_processed,
    consumer_extra_config={"allow.auto.create.topics": "true"},
)

# Define an input topic with JSON deserializer
input_topic = app.topic(inputtopicname, value_deserializer="json")
print(f"Consuming from input topic: {inputtopicname}")

# Define an output topic with JSON serializer
output_topic = app.topic(outputtopicname, value_serializer="json")
print(f"Producing to output topic: {outputtopicname}")

# Initialize a streaming dataframe based on the stream of messages from the input topic:
sdf = app.dataframe(topic=input_topic)

sdf = sdf.update(lambda val: print(f"Received update: {val}"))

# EMBEDDING HAPPENS HERE
### Trigger the embedding function for any new messages(rows) detected in the filtered SDF
sdf["embeddings"] = sdf.apply(create_embeddings, stateful=False)

# Update the timestamp column to the current time in nanoseconds
sdf["Timestamp"] = sdf["Timestamp"].apply(lambda row: time.time_ns())

# Publish the processed SDF to a Kafka topic specified by the output_topic object.
sdf = sdf.to_topic(output_topic)

app.run(sdf)

[2024-08-22 14:11:49,320] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': '127.0.0.1:9092'}" consumer_group="qdrant-demo" auto_offset_reset="earliest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-08-22 14:11:49,321] [INFO] [quixstreams] : Topics required for this application: "docs_books", "vectors_all-minilm-l6-v2"
[2024-08-22 14:11:49,333] [INFO] [quixstreams] : Creating a new topic "vectors_all-minilm-l6-v2" with config: "{'num_partitions': 1, 'replication_factor': 1, 'extra_config': {}}"


Using offset limit 11
Consuming from input topic: docs_books
Producing to output topic: vectors_all-minilm-l6-v2


[2024-08-22 14:11:50,339] [INFO] [quixstreams] : Topic "vectors_all-minilm-l6-v2" has been created
[2024-08-22 14:11:50,341] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
[2024-08-22 14:11:50,378] [INFO] [quixstreams] : Kafka topics validation complete
[2024-08-22 14:11:50,379] [INFO] [quixstreams] : Initializing state directory at "/Users/yuc10/Labs/tutorial-code/notebooks/state/qdrant-demo"
[2024-08-22 14:11:50,382] [INFO] [quixstreams] : Waiting for incoming messages


Received update: {'Timestamp': 1724325086086099000, 'doc_id': 0, 'doc_uuid': '129e29ce-e4dd-4efe-a677-fa383b79098e', 'doc_name': 'The Time Machine', 'doc_descr': 'A man travels through time and witnesses the evolution of humanity.', 'doc_year': 1895}
Created vector: "[-0.03813493996858597, 0.045177895575761795, -0.022021330893039703, 0.05923554301261902, 0.006653957534581423, 0.018390046432614326, 0.06810278445482254, -0.041662298142910004, 0.04437767714262009, -0.02525176852941513, -0.009794221259653568, -0.022916486486792564, -0.11903539299964905, 0.031318988651037216, -0.057532899081707, -0.00893409177660942, -0.10448414832353592, 0.007782097440212965, 0.05709606781601906, -0.05635252594947815, -0.005898001603782177, 0.026749499142169952, -0.04767238721251488, -1.9667804735945538e-06, -0.03601923957467079, 0.0012949301162734628, 0.06238840892910957, 0.00176862976513803, 0.05468188598752022, 0.02123238518834114, 0.07367482036352158, -0.012031622231006622, -0.03202751651406288, -0.037

[2024-08-22 14:11:58,021] [INFO] [quixstreams] : Stop processing of StreamingDataFrame


### 1.4 Consume vectors and sink them into the DB

Create a Quix "Sink" Consumer with Qdrant as a data sink.

The consumer:
 1. reads the embeddings from the vectors topic
 2. writes each embedding to the vector db along with the original text.

Initialize Qdrant


In [19]:
print(f"Using collection name {collectionname}")
# Initialize the vector db
qdrant = QdrantClient(path=f"./{collectionname}") # persist a Qdrant DB on the filesystem

# Create collection to store books
qdrant.recreate_collection(
    collection_name=collectionname,
    vectors_config=models.VectorParams(
        size=encoder.get_sentence_embedding_dimension(), # Vector size is defined by used model
        distance=models.Distance.COSINE
    )
)
print("(re)created collection")

Using collection name book-catalog
(re)created collection


  qdrant.recreate_collection(


Define the ingestion function


In [20]:
def ingest_vectors(row):

  single_record = models.PointStruct(
    id=row['doc_uuid'],
    vector=row['embeddings'],
    payload=row
    )

  qdrant.upload_points(
      collection_name=collectionname,
      points=[single_record]
    )

  print(f'Ingested vector entry id: "{row["doc_uuid"]}"...')

Start the consumer process (consume->sink)


In [21]:
inputtopicname = vectors_topic_name

# Create a special stop condition just for this Notebook (otherwise the cell will run indefinitely)
print(f"Using offset limit {offsetlimit}")
def on_message_processed(topic, partition, offset):
    if offset > offsetlimit:
        app.stop()

# Define the consumer application and settings
app = Application(
    broker_address="127.0.0.1:9092",
    consumer_group="vectorizer",
    auto_offset_reset="earliest",
    on_message_processed=on_message_processed,
    consumer_extra_config={"allow.auto.create.topics": "true"},
)

# Define an input topic with JSON deserializer
input_topic = app.topic(inputtopicname, value_deserializer="json")
print(f"Consuming from input topic: {inputtopicname}")

# Initialize a streaming dataframe based on the stream of messages from the input topic:
sdf = app.dataframe(topic=input_topic)

# INGESTION HAPPENS HERE
### Trigger the embedding function for any new messages(rows) detected in the filtered SDF
sdf = sdf.update(lambda row: ingest_vectors(row))
app.run(sdf)

[2024-08-22 14:12:16,490] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': '127.0.0.1:9092'}" consumer_group="vectorizer" auto_offset_reset="earliest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-08-22 14:12:16,491] [INFO] [quixstreams] : Topics required for this application: "vectors_all-minilm-l6-v2"
[2024-08-22 14:12:16,497] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
[2024-08-22 14:12:16,512] [INFO] [quixstreams] : Kafka topics validation complete
[2024-08-22 14:12:16,513] [INFO] [quixstreams] : Initializing state directory at "/Users/yuc10/Labs/tutorial-code/notebooks/state/vectorizer"
[2024-08-22 14:12:16,513] [INFO] [quixstreams] : Waiting for incoming messages
[2024-08-22 14:12:16,659] [INFO] [quixstreams] : Stop processing of StreamingDataFrame


Using offset limit 11
Consuming from input topic: vectors_all-minilm-l6-v2
Ingested vector entry id: "129e29ce-e4dd-4efe-a677-fa383b79098e"...
Ingested vector entry id: "e83286b7-7477-4885-941b-534e016177a6"...
Ingested vector entry id: "bdb33880-a55d-4555-a2a3-028acd5939c0"...
Ingested vector entry id: "d76a0b98-9532-46a8-9e55-e5a57c804349"...
Ingested vector entry id: "a21c7b0d-dc15-4c1e-aac3-e1ea5e0eb510"...
Ingested vector entry id: "5f421f04-aa72-41cc-ada5-9ce6c9bc8a30"...
Ingested vector entry id: "f1ef9fac-d4c1-4d4b-ac7a-891cbf98da9d"...
Ingested vector entry id: "35b33433-5d70-430b-b0fe-3d825b644774"...
Ingested vector entry id: "259b1b96-a943-4551-a994-b6f592ae1e26"...
Ingested vector entry id: "25639b85-9ae8-4eba-9692-e20351b45eb6"...
Ingested vector entry id: "8c9ab7da-e03d-490f-9325-9568703752ca"...
Ingested vector entry id: "afc3e4bb-e3d4-4fda-a90d-90623e0f69f6"...
Ingested vector entry id: "1a4f7b6b-b206-4b8e-960b-773c1557e4b6"...


### Run a test query on the ingested vectors

Use Qdrant to do a basic similarity seach to make sure the vectors have been ingested properly and are matching in the expected way.


In [22]:
query = "books like star wars" # Leave the test query as-is for the first attempt

hits = qdrant.search(
    collection_name=collectionname,
    query_vector=encoder.encode(query).tolist(),
    limit=10
)

print("Entries matching your query:")
for hit in hits:
  print(hit.payload['doc_name'], " | ", hit.payload['doc_descr'], "score:", hit.score)

Entries matching your query:
Dune  |  A desert planet is the site of political intrigue and power struggles. score: 0.21748407992672814
Foundation  |  A mathematician develops a science to predict the future of humanity and works to save civilization from collapse. score: 0.1679862275759687
The Hunger Games  |  A dystopian society where teenagers are forced to fight to the death in a televised spectacle. score: 0.16221441509557022
The Time Traveler's Wife  |  A love story between a man who involuntarily time travels and the woman he loves. score: 0.1485679027972775
Ender's Game  |  A young boy is trained to become a military leader in a war against an alien race. score: 0.14376739530973928
The Andromeda Strain  |  A deadly virus from outer space threatens to wipe out humanity. score: 0.14341334374961737
Snow Crash  |  A futuristic world where the internet has evolved into a virtual reality metaverse. score: 0.1433503028854854
An Absolutely Remarkable Thing  |  A young woman becomes fam

If everything went to plan, "*Dune*" should be top match for the query "*books like star wars*". This makes sense, since Dune is kind of like Star Wars (depending on who you ask). We can guess it matched because planet" is semantically close to "star" and "struggles" is semantically close to "wars".

Now let's suppose we update our catalog to with more books to acommodate all those who are looking for similar items. We want the vector store to be updated as soon as the new book entries are entered in the main catalog database. This will ensure we get as many good matches (and hopefully purchases) as possible without any delays.

# 2. Second ingestion pass

We're going to stream the sample data from a CSV againâ€”but in production scenario, these items would be added incrementally as changes to the product catalog are detected and streamed to Apache Kafka as they occur.

### 2.1 Add more data


In [23]:
documents = [
  {"name": "Childhood's End", "description": "A peaceful alien invasion leads to the end of humanity's childhood.", "author": "Arthur C. Clarke", "year": 1953 },
  {"name": "The Day of the Triffids", "description": "After a meteor shower blinds most of the population, aggressive plant life starts taking over.", "author": "John Wyndham", "year": 1951 },
  {"name": "Contact", "description": "Scientists receive a message from extraterrestrial beings and build a machine to meet them.", "author": "Carl Sagan", "year": 1985 },
  {"name": "The Three-Body Problem", "description": "Humanity faces a potential invasion from a distant alien civilization in crisis.", "author": "Liu Cixin", "year": 2008 },
  {"name": "Sphere", "description": "A team investigates a spaceship found on the ocean floor, not knowing its mysterious and possibly extraterrestrial origin.", "author": "Michael Crichton", "year": 1987 },
  {"name": "Footfall", "description": "Elephant-like aliens invade Earth, and humanity must find a way to fight back.", "author": "Larry Niven and Jerry Pournelle", "year": 1985 },
  {"name": "The Puppet Masters", "description": "Slug-like aliens invade Earth by attaching to humans and controlling their minds.", "author": "Robert A. Heinlein", "year": 1951 },
  {"name": "The Kraken Wakes", "description": "Alien beings from the depths of the ocean start attacking humanity.", "author": "John Wyndham", "year": 1953 },
  {"name": "The Invasion of the Body Snatchers", "description": "A small town discovers that some of its residents are being replaced by perfect physical copies that emerge from plantlike pods.", "author": "Jack Finney", "year": 1955 },
  {"name": "Calculating God", "description": "An alien arrives on Earth, seeking to understand why God has apparently been involved in Earth's evolution.", "author": "Robert J. Sawyer", "year": 2000 },
  {"name": "The Forge of God", "description": "Aliens arrive under the guise of friendship, but their true mission is to destroy Earth.", "author": "Greg Bear", "year": 1987 },
  {"name": "Roadside Picnic", "description": "Aliens visited Earth, leaving behind zones filled with dangerous objects and phenomena.", "author": "Arkady and Boris Strugatsky", "year": 1972 },
  {"name": "Out of the Dark", "description": "An alien race invades Earth, underestimating humanity's will to survive.", "author": "David Weber", "year": 2010 },
  {"name": "Arrival (Stories of Your Life and Others)", "description": "A linguist learns to communicate with aliens who have arrived on Earth, altering her perception of reality.", "author": "Ted Chiang", "year": 1998 },
  {"name": "To Serve Man", "description": "Aliens come to Earth claiming to be friends, but their true intentions are revealed in a horrifying twist.", "author": "Damon Knight", "year": 1950},
  {"name": "The Mote in God's Eye", "description": "Humanity encounters an alien race that poses a unique and unforeseen challenge.", "author": "Larry Niven and Jerry Pournelle", "year": 1974 },
  {"name": "Old Man's War", "description": "Earth's senior citizens are recruited to fight in an interstellar war, discovering new alien cultures and threats.", "author": "John Scalzi", "year": 2005 },
]

# Convert the list of dictionaries to a DataFrame
df = pd.DataFrame(documents)
# Save the DataFrame to a CSV file so that we can practice producing to Kafka from a CSV file in the next step
df.to_csv('documents.csv')

### 2.2 Produce more data to the docs topic


In [25]:
df = pd.read_csv('documents.csv')
outputtopicname = docs_topic_name
offsetlimit2 = len(df)
offsetlimit = offsetlimit + offsetlimit2

print(f"Producing to output topic: {outputtopicname}")
with Producer(
    broker_address="127.0.0.1:9092",
    extra_config={"allow.auto.create.topics": "true"},
) as producer:
    for index, row in df.iterrows():
        doc_id = index
        doc_key = f"A{'0'*(10-len(str(doc_id)))}{doc_id}"
        doc_uuid = str(uuid.uuid4())
        value = {
            "Timestamp": time.time_ns(),
            "doc_id": doc_id,
            "doc_uuid": doc_uuid,
            "doc_name": row['name'],
            "doc_descr": row['description'],
            "doc_year": row['year'],
        }
        print(f"Producing value: {value}")
        producer.produce(
            topic=outputtopicname,
            headers=[("uuid", doc_uuid)],  # a dict is also allowed here
            key=doc_key,
            value=json.dumps(value),  # needs to be a string
        )

Producing to output topic: docs_books
Producing value: {'Timestamp': 1724325168584434000, 'doc_id': 0, 'doc_uuid': 'c635c9d8-36a7-4a4a-a2c0-603a677e2fd4', 'doc_name': "Childhood's End", 'doc_descr': "A peaceful alien invasion leads to the end of humanity's childhood.", 'doc_year': 1953}
Producing value: {'Timestamp': 1724325168585113000, 'doc_id': 1, 'doc_uuid': 'f8fb7899-747b-4b20-8993-d73521a56e82', 'doc_name': 'The Day of the Triffids', 'doc_descr': 'After a meteor shower blinds most of the population, aggressive plant life starts taking over.', 'doc_year': 1951}
Producing value: {'Timestamp': 1724325168585513000, 'doc_id': 2, 'doc_uuid': '7724d9fb-deb5-40b0-8b9e-7cb9b7a1a5b1', 'doc_name': 'Contact', 'doc_descr': 'Scientists receive a message from extraterrestrial beings and build a machine to meet them.', 'doc_year': 1985}
Producing value: {'Timestamp': 1724325168585616000, 'doc_id': 3, 'doc_uuid': '2f26ac20-2f4f-41de-8cf3-c9db5f37e87b', 'doc_name': 'The Three-Body Problem', 'doc_d

### 2.3 Consume, Enrich and produce again


In [26]:
inputtopicname = docs_topic_name
outputtopicname = vectors_topic_name

# Create a special stop condition just for this Notebook (otherwise the cell will run indefinitely)
print(f"Using offset limit {offsetlimit}")
def on_message_processed(topic, partition, offset):
    if offset > offsetlimit:
        app.stop()

# Define your application and settings
app = Application(
    broker_address="127.0.0.1:9092",
    consumer_group=consumergroup_name,
    auto_offset_reset="earliest",
    on_message_processed=on_message_processed,
    consumer_extra_config={"allow.auto.create.topics": "true"},
)

# Define an input topic with JSON deserializer
input_topic = app.topic(inputtopicname, value_deserializer="json")
print(f"Consuming from input topic: {inputtopicname}")

# Define an output topic with JSON serializer
output_topic = app.topic(outputtopicname, value_serializer="json")
print(f"Producing to output topic: {outputtopicname}")

# Initialize a streaming dataframe based on the stream of messages from the input topic:
sdf = app.dataframe(topic=input_topic)

# Filter the SDF to include only incoming rows where the roles that dont match the bot's current role
sdf = sdf.update(lambda val: print(f"Received update: {val}"))

# EMBEDDING HAPPENS HERE
### Trigger the embedding function for any new messages(rows) detected in the filtered SDF
sdf["embeddings"] = sdf.apply(create_embeddings, stateful=False)

# Update the timestamp column to the current time in nanoseconds
sdf["Timestamp"] = sdf["Timestamp"].apply(lambda row: time.time_ns())

# Publish the processed SDF to a Kafka topic specified by the output_topic object.
sdf = sdf.to_topic(output_topic)

app.run(sdf)


# STOP THIS CELL MANUALLY WHEN THE BOOK ENTRIES HAVE BEEN ENRICHED WITH EMBEDDINGS

[2024-08-22 14:12:52,159] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': '127.0.0.1:9092'}" consumer_group="qdrant-demo" auto_offset_reset="earliest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-08-22 14:12:52,160] [INFO] [quixstreams] : Topics required for this application: "docs_books", "vectors_all-minilm-l6-v2"
[2024-08-22 14:12:52,165] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
[2024-08-22 14:12:52,189] [INFO] [quixstreams] : Kafka topics validation complete
[2024-08-22 14:12:52,189] [INFO] [quixstreams] : Initializing state directory at "/Users/yuc10/Labs/tutorial-code/notebooks/state/qdrant-demo"
[2024-08-22 14:12:52,190] [INFO] [quixstreams] : Waiting for incoming messages


Using offset limit 28
Consuming from input topic: docs_books
Producing to output topic: vectors_all-minilm-l6-v2
Received update: {'Timestamp': 1724325168584434000, 'doc_id': 0, 'doc_uuid': 'c635c9d8-36a7-4a4a-a2c0-603a677e2fd4', 'doc_name': "Childhood's End", 'doc_descr': "A peaceful alien invasion leads to the end of humanity's childhood.", 'doc_year': 1953}
Created vector: "[0.03473963961005211, 0.08905236423015594, 0.006120693404227495, 0.045037731528282166, 0.052357617765665054, 0.0040490408428013325, -0.05281733721494675, -0.019031524658203125, 0.025155916810035706, 0.05997135117650032, 0.04798443615436554, 0.002428787061944604, 0.018423352390527725, 0.00837881863117218, -0.02233979105949402, 0.08677897602319717, -0.03162854164838791, -0.10887140780687332, 0.007357876282185316, -0.02192520909011364, -0.08941147476434708, 0.09281668812036514, 0.06203293427824974, 0.03269309550523758, -0.02995116636157036, 0.10978981107473373, 0.0724916011095047, 0.04910831153392792, -0.04096504673

[2024-08-22 14:12:57,336] [INFO] [quixstreams] : Stop processing of StreamingDataFrame


### 2.4 Consume the new embeddings and update the vector store


In [27]:
inputtopicname = vectors_topic_name

# Create a special stop condition just for this Notebook (otherwise the cell will run indefinitely)
print(f"Using offset limit {offsetlimit}")
def on_message_processed(topic, partition, offset):
    if offset > offsetlimit:
        app.stop()

# Define the consumer application and settings
app = Application(
    broker_address="127.0.0.1:9092",
    consumer_group="vectorizer",
    auto_offset_reset="earliest",
    on_message_processed=on_message_processed,
    consumer_extra_config={"allow.auto.create.topics": "true"},
)

# Define an input topic with JSON deserializer
input_topic = app.topic(inputtopicname, value_deserializer="json")
print(f"Consuming from input topic: {inputtopicname}")

# Initialize a streaming dataframe based on the stream of messages from the input topic:
sdf = app.dataframe(topic=input_topic)

# INGESTION HAPPENS HERE
### Trigger the embedding function for any new messages(rows) detected in the filtered SDF
sdf = sdf.update(lambda row: ingest_vectors(row))
app.run(sdf)

# STOP THIS CELL MANUALLY WHEN THE BOOK ENTRIES HAVE BEEN INGESTED

[2024-08-22 14:12:59,624] [INFO] [quixstreams] : Starting the Application with the config: broker_address="{'bootstrap.servers': '127.0.0.1:9092'}" consumer_group="vectorizer" auto_offset_reset="earliest" commit_interval=5.0s commit_every=0 processing_guarantee="at-least-once"
[2024-08-22 14:12:59,625] [INFO] [quixstreams] : Topics required for this application: "vectors_all-minilm-l6-v2"
[2024-08-22 14:12:59,629] [INFO] [quixstreams] : Validating Kafka topics exist and are configured correctly...
[2024-08-22 14:12:59,645] [INFO] [quixstreams] : Kafka topics validation complete
[2024-08-22 14:12:59,645] [INFO] [quixstreams] : Initializing state directory at "/Users/yuc10/Labs/tutorial-code/notebooks/state/vectorizer"
[2024-08-22 14:12:59,646] [INFO] [quixstreams] : Waiting for incoming messages
[2024-08-22 14:12:59,681] [INFO] [quixstreams] : Stop processing of StreamingDataFrame


Using offset limit 28
Consuming from input topic: vectors_all-minilm-l6-v2
Ingested vector entry id: "c635c9d8-36a7-4a4a-a2c0-603a677e2fd4"...
Ingested vector entry id: "f8fb7899-747b-4b20-8993-d73521a56e82"...
Ingested vector entry id: "7724d9fb-deb5-40b0-8b9e-7cb9b7a1a5b1"...
Ingested vector entry id: "2f26ac20-2f4f-41de-8cf3-c9db5f37e87b"...
Ingested vector entry id: "e8efca92-704c-4430-a352-842966236b94"...
Ingested vector entry id: "7d6774a6-eee1-4429-a396-6185b932c205"...
Ingested vector entry id: "15f7602a-ebed-4e01-81ae-e5e0a1a0f50e"...
Ingested vector entry id: "62afdaa7-befd-4ee9-86b2-301265068917"...
Ingested vector entry id: "f5a2bf04-67b6-4acb-9392-b44703f2a2d3"...
Ingested vector entry id: "1dd9db8a-2fbd-48e1-85d9-bab3f2ee63bf"...
Ingested vector entry id: "00472adf-d790-4c49-95cf-13e18288e7a1"...
Ingested vector entry id: "9a0e8b53-7469-4675-816d-71430e8381ef"...
Ingested vector entry id: "c5628f48-9fb9-4d03-bf5c-d21339e92d01"...
Ingested vector entry id: "5215707d-0e18-

### 2.5 Run the same search again


In [28]:
print(f"Searching with query '{query}'...\n\n")

hits = qdrant.search(
    collection_name=collectionname,
    query_vector=encoder.encode(query).tolist(),
    limit=10
)

print("Entries matching your query:")
for hit in hits:
  print(hit.payload['doc_name'], " | ", hit.payload['doc_descr'], "score:", hit.score)

Searching with query 'books like star wars'...


Entries matching your query:
Old Man's War  |  Earth's senior citizens are recruited to fight in an interstellar war, discovering new alien cultures and threats. score: 0.29142238729427417
Dune  |  A desert planet is the site of political intrigue and power struggles. score: 0.21748407992672814
Footfall  |  Elephant-like aliens invade Earth, and humanity must find a way to fight back. score: 0.1806960918147597
Foundation  |  A mathematician develops a science to predict the future of humanity and works to save civilization from collapse. score: 0.1679862275759687
Contact  |  Scientists receive a message from extraterrestrial beings and build a machine to meet them. score: 0.16399434219935305
The Forge of God  |  Aliens arrive under the guise of friendship, but their true mission is to destroy Earth. score: 0.16392123364545524
The Hunger Games  |  A dystopian society where teenagers are forced to fight to the death in a televised spectacl

*Expected top match: "Old Man's War".*

Dune has now been knocked off the top slot as the most accurate match by our new arrival "Old Man's War". Is this tale of geriatric combat going to appeal more to star wars fans? It's debatable.

But,  in terms of matching, we can certainly understand why it received a higher score. The "term" war is almost a direct hit, and "interstellar" is probably semantically closer to the search term "star" than "planet".

Don't forget to shut down zookeeper kafka servers.
```sh
kafka-server-stop
zookeeper-server-stop
```