In [1]:
import requests 

docs_url = 'https://github.com/alexeygrigorev/llm-rag-workshop/raw/main/notebooks/documents.json'
docs_response = requests.get(docs_url)
documents_raw = docs_response.json()

documents = []

for course in documents_raw:
    course_name = course['course']

    for doc in course['documents']:
        doc['course'] = course_name
        documents.append(doc)

In [2]:
documents[2]

{'text': "Yes, even if you don't register, you're still eligible to submit the homeworks.\nBe aware, however, that there will be deadlines for turning in the final projects. So don't leave everything for the last minute.",
 'section': 'General course-related questions',
 'question': 'Course - Can I still join the course after the start date?',
 'course': 'data-engineering-zoomcamp'}

In [3]:
!pip install minsearch

Collecting minsearch
  Downloading minsearch-0.0.3-py3-none-any.whl.metadata (6.1 kB)
Downloading minsearch-0.0.3-py3-none-any.whl (9.3 kB)
Installing collected packages: minsearch
Successfully installed minsearch-0.0.3


In [4]:
import minsearch

index = minsearch.Index(
    text_fields=["question", "text", "section"],
    keyword_fields=["course"]
)

index.fit(documents)

<minsearch.minsearch.Index at 0x11245f150>

In [14]:
from openai import OpenAI
import os
# openai_client = OpenAI()

openai_client = OpenAI(
        base_url="https://openrouter.ai/api/v1",
        api_key=os.getenv("OPENROUTER_API_KEY")
    )

In [15]:
def search(query):
    boost = {'question': 3.0, 'section': 0.5}

    results = index.search(
        query=query,
        filter_dict={'course': 'data-engineering-zoomcamp'},
        boost_dict=boost,
        num_results=5
    )

    return results

In [16]:
def build_prompt(query, search_results):
    prompt_template = """
You're a course teaching assistant. Answer the QUESTION based on the CONTEXT from the FAQ database.
Use only the facts from the CONTEXT when answering the QUESTION.

QUESTION: {question}

CONTEXT: 
{context}
""".strip()

    context = ""
    
    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"
    
    prompt = prompt_template.format(question=query, context=context).strip()
    return prompt

In [17]:
def llm(prompt):
    response = openai_client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[{"role": "user", "content": prompt}]
    )
    
    return response.choices[0].message.content

In [18]:
def rag(query):
    search_results = search(query)
    prompt = build_prompt(query, search_results)
    answer = llm(prompt)
    return answer

In [19]:
rag('how do I run kafka?')

'To run Kafka, you can execute the following command in your project directory for the Java Kafka producer:\n\n```\njava -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java\n```\n\nFor Python, ensure you create a virtual environment and install the necessary dependencies. You can create the environment and install packages with these commands:\n\n1. Create a virtual environment:\n   ```\n   python -m venv env\n   ```\n\n2. Activate the virtual environment:\n   - On MacOS/Linux:\n     ```\n     source env/bin/activate\n     ```\n   - On Windows:\n     ```\n     env\\Scripts\\activate\n     ```\n\n3. Install the required packages from `requirements.txt`:\n   ```\n   pip install -r ../requirements.txt\n   ```\n\nMake sure to run the activation command every time you need to use the virtual environment, and deactivate it with `deactivate` when done.'

In [20]:
rag('the course has already started, can I still enroll?')

"Yes, you can still enroll in the course even after it has started. You are eligible to submit homework assignments, but be aware that there will be deadlines for the final project, so it's advisable not to leave everything to the last minute."

## RAG with Vector Search

In [21]:
from qdrant_client import QdrantClient, models

  from .autonotebook import tqdm as notebook_tqdm


In [22]:
qd_client = QdrantClient("http://localhost:6333")

In [23]:
EMBEDDING_DIMENSIONALITY = 512
model_handle = "jinaai/jina-embeddings-v2-small-en"

In [24]:
collection_name = "zoomcamp-faq"

In [25]:
qd_client.delete_collection(collection_name=collection_name)

False

In [26]:
qd_client.create_collection(
    collection_name=collection_name,
    vectors_config=models.VectorParams(
        size=EMBEDDING_DIMENSIONALITY,
        distance=models.Distance.COSINE
    )
)

True

In [27]:
qd_client.create_payload_index(
    collection_name=collection_name,
    field_name="course",
    field_schema="keyword"
)

UpdateResult(operation_id=1, status=<UpdateStatus.COMPLETED: 'completed'>)

In [28]:
points = []

for i, doc in enumerate(documents):
    text = doc['question'] + ' ' + doc['text']
    vector = models.Document(text=text, model=model_handle)
    point = models.PointStruct(
        id=i,
        vector=vector,
        payload=doc
    )
    points.append(point)

In [29]:
qd_client.upsert(
    collection_name=collection_name,
    points=points
)

UpdateResult(operation_id=2, status=<UpdateStatus.COMPLETED: 'completed'>)

In [30]:
question = 'I just discovered the course. Can I still join it?'

In [31]:
def vector_search(question):
    print('vector_search is used')
    
    course = 'data-engineering-zoomcamp'
    query_points = qd_client.query_points(
        collection_name=collection_name,
        query=models.Document(
            text=question,
            model=model_handle 
        ),
        query_filter=models.Filter( 
            must=[
                models.FieldCondition(
                    key="course",
                    match=models.MatchValue(value=course)
                )
            ]
        ),
        limit=5,
        with_payload=True
    )
    
    results = []
    
    for point in query_points.points:
        results.append(point.payload)
    
    return results

In [32]:
def rag(query):
    search_results = vector_search(query)
    print(search_results)
    prompt = build_prompt(query, search_results)
    answer = llm(prompt)
    return answer

In [35]:
rag('how do I run kafka?')

vector_search is used
[{'text': 'In the project directory, run:\njava -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java', 'section': 'Module 6: streaming with kafka', 'question': 'Java Kafka: How to run producer/consumer/kstreams/etc in terminal', 'course': 'data-engineering-zoomcamp'}, {'text': 'For example, when running JsonConsumer.java, got:\nConsuming form kafka started\nRESULTS:::0\nRESULTS:::0\nRESULTS:::0\nOr when running JsonProducer.java, got:\nException in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed\nSolution:\nMake sure in the scripts in src/main/java/org/example/ that you are running (e.g. JsonConsumer.java, JsonProducer.java), the StreamsConfig.BOOTSTRAP_SERVERS_CONFIG is the correct server url (e.g. europe-west3 from example vs europe-west2)\nMake sure cluster key and secrets are updated in src/main/java/org/example/Secrets.java (KAFKA_CLU

"To run Kafka, you can follow these steps based on your context:\n\n1. **Ensure your Kafka broker is running**: Check if your Kafka broker Docker container is working by running `docker ps`. If it's not running, navigate to the folder with your Docker Compose YAML file and execute `docker compose up -d` to start all instances.\n\n2. **Run the producer or consumer**:\n   - For Java Kafka applications, go to your project directory and run the following command, replacing `<jar_name>` with the actual name of your JAR file:\n     ```\n     java -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java\n     ```\n   - Ensure that your `StreamsConfig.BOOTSTRAP_SERVERS_CONFIG` in your Java scripts points to the correct Kafka server URL. Additionally, confirm that `KAFKA_CLUSTER_KEY` and `KAFKA_CLUSTER_SECRET` in your `Secrets.java` are updated correctly.\n\nBy following these steps, you should be able to run Kafka successfully."

In [34]:
[{'text': 'In the project directory, run:\njava -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java', 'section': 'Module 6: streaming with kafka', 'question': 'Java Kafka: How to run producer/consumer/kstreams/etc in terminal', 'course': 'data-engineering-zoomcamp'}, {'text': 'For example, when running JsonConsumer.java, got:\nConsuming form kafka started\nRESULTS:::0\nRESULTS:::0\nRESULTS:::0\nOr when running JsonProducer.java, got:\nException in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed\nSolution:\nMake sure in the scripts in src/main/java/org/example/ that you are running (e.g. JsonConsumer.java, JsonProducer.java), the StreamsConfig.BOOTSTRAP_SERVERS_CONFIG is the correct server url (e.g. europe-west3 from example vs europe-west2)\nMake sure cluster key and secrets are updated in src/main/java/org/example/Secrets.java (KAFKA_CLUSTER_KEY and KAFKA_CLUSTER_SECRET)', 'section': 'Module 6: streaming with kafka', 'question': 'Java Kafka: When running the producer/consumer/etc java scripts, no results retrieved or no message sent', 'course': 'data-engineering-zoomcamp'}, {'text': 'If you have this error, it most likely that your kafka broker docker container is not working.\nUse docker ps to confirm\nThen in the docker compose yaml file folder, run docker compose up -d to start all the instances.', 'section': 'Module 6: streaming with kafka', 'question': 'kafka.errors.NoBrokersAvailable: NoBrokersAvailable', 'course': 'data-engineering-zoomcamp'}, {'text': "Solution from Alexey: create a virtual environment and run requirements.txt and the python files in that environment.\nTo create a virtual env and install packages (run only once)\npython -m venv env\nsource env/bin/activate\npip install -r ../requirements.txt\nTo activate it (you'll need to run it every time you need the virtual env):\nsource env/bin/activate\nTo deactivate it:\ndeactivate\nThis works on MacOS, Linux and Windows - but for Windows the path is slightly different (it's env/Scripts/activate)\nAlso the virtual environment should be created only to run the python file. Docker images should first all be up and running.", 'section': 'Module 6: streaming with kafka', 'question': 'Module “kafka” not found when trying to run producer.py', 'course': 'data-engineering-zoomcamp'}, {'text': "Below I have listed some steps I took to rectify this and potentially other minor errors, in Windows:\nUse the git bash terminal in windows.\nActivate python venv from git bash: source .venv/Scripts/activate\nModify the seed_kafka.py file: in the first line, replace python3 with python.\nNow from git bash, run the seed-kafka cmd. It should work now.\nAdditional Notes:\nYou can connect to the RisingWave cluster from Powershell with the command psql -h localhost -p 4566 -d dev -U root , otherwise it asks for a password.\nThe equivalent of source commands.sh  in Powershell is . .\\commands.sh from the workshop directory.\nHope this can save you from some trouble in case you're doing this workshop on Windows like I am.\n—--------------------------------------------------------------------------------------", 'section': 'Workshop 2 - RisingWave', 'question': 'Psycopg2 InternalError: Failed to run the query - when running the seed-kafka command after initial setup.', 'course': 'data-engineering-zoomcamp'}]


[{'text': 'In the project directory, run:\njava -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java',
  'section': 'Module 6: streaming with kafka',
  'question': 'Java Kafka: How to run producer/consumer/kstreams/etc in terminal',
  'course': 'data-engineering-zoomcamp'},
 {'text': 'For example, when running JsonConsumer.java, got:\nConsuming form kafka started\nRESULTS:::0\nRESULTS:::0\nRESULTS:::0\nOr when running JsonProducer.java, got:\nException in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed\nSolution:\nMake sure in the scripts in src/main/java/org/example/ that you are running (e.g. JsonConsumer.java, JsonProducer.java), the StreamsConfig.BOOTSTRAP_SERVERS_CONFIG is the correct server url (e.g. europe-west3 from example vs europe-west2)\nMake sure cluster key and secrets are updated in src/main/java/org/example/Secrets.java (KAFKA_CLUSTER_KEY and KA