# Week 01. Environment setup

In [36]:
import json
from pathlib import Path
import sys

PROJECT_DIR = Path(".").absolute().parent.parent
sys.path.append(str(PROJECT_DIR))

from openai import OpenAI
import pandas as pd
from tqdm import tqdm

from project.settings.openai import openai_settings
from project.src.external.minsearch import minsearch  # minsearch.Index

## OpenAI example

In [2]:
# create OpenAI client:
openai_client = OpenAI(api_key=openai_settings.api_key.get_secret_value())

In [4]:
example_question = "Is it still possible to join the course?"

response = openai_client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "user",
            "content": example_question
        }
    ],
)

In [6]:
print(response.choices[0].message.content)

To determine if you can still join a course, you need to check a few specific details:

1. **Course Availability and Registration Deadlines:**
   Check the registration deadlines for the course you are interested in. Courses often have enrolment cut-offs, which can be found on the educational institution’s website or by contacting the registrar’s office.

2. **Seats and Waiting Lists:**
   Find out if there are still seats available or if the course has a waiting list. If the course is full, see if you can get on the waiting list.

3. **Course Start Date:**
   Verify the course start date. Most institutions have policies on late registration, but joining a course too late can mean missing fundamental content.

4. **Instructor Permission:**
   Sometimes, if the registration period has passed, you can seek permission directly from the course instructor. They may allow you to join their class, especially if you have a compelling reason.

5. **Institution Policies:**
   Review the institut

## Search engine

In [10]:
DATA_DIR = Path(PROJECT_DIR, "data")
assert DATA_DIR.exists()

In [11]:
# read all documentds:
with open(Path(DATA_DIR, "documents.json"), "rt") as f_in:
    docs_raw = json.load(f_in)

In [12]:
# collect all documents from all the courses
documents = []
for course_dict in docs_raw:
    for doc in course_dict["documents"]:
        doc["course"] = course_dict["course"]
        documents.append(doc)

In [13]:
# DataFrame from documents:
df_documents = pd.DataFrame(documents)
display(df_documents.course.value_counts())
df_documents.head()

course
data-engineering-zoomcamp    435
machine-learning-zoomcamp    375
mlops-zoomcamp               138
Name: count, dtype: int64

Unnamed: 0,text,section,question,course
0,The purpose of this document is to capture fre...,General course-related questions,Course - When will the course start?,data-engineering-zoomcamp
1,GitHub - DataTalksClub data-engineering-zoomca...,General course-related questions,Course - What are the prerequisites for this c...,data-engineering-zoomcamp
2,"Yes, even if you don't register, you're still ...",General course-related questions,Course - Can I still join the course after the...,data-engineering-zoomcamp
3,You don't need it. You're accepted. You can al...,General course-related questions,Course - I have registered for the Data Engine...,data-engineering-zoomcamp
4,You can start by installing and setting up all...,General course-related questions,Course - What can I do before the course starts?,data-engineering-zoomcamp


In [14]:
# Fit our search
model_index = minsearch.Index(
    text_fields=["question", "text", "section"],
    keyword_fields=["course"]
)

model_index.fit(documents);

In [15]:
example_question = "Is it still possible to join the course?"

serch_results = model_index.search(
    query=example_question,
    filter_dict={"course": "machine-learning-zoomcamp"},
    boost_dict={"question": 3.0, "section": 0.3},
    num_results=5
)

serch_results

[{'text': 'Yes, you can. You won’t be able to submit some of the homeworks, but you can still take part in the course.\nIn order to get a certificate, you need to submit 2 out of 3 course projects and review 3 peers’ Projects by the deadline. It means that if you join the course at the end of November and manage to work on two projects, you will still be eligible for a certificate.',
  'section': 'General course-related questions',
  'question': 'The course has already started. Can I still join it?',
  'course': 'machine-learning-zoomcamp'},
 {'text': "Here’s how you join a in Slack: https://slack.com/help/articles/205239967-Join-a-channel\nClick “All channels” at the top of your left sidebar. If you don't see this option, click “More” to find it.\nBrowse the list of public channels in your workspace, or use the search bar to search by channel name or description.\nSelect a channel from the list to view it.\nClick Join Channel.\nDo we need to provide the GitHub link to only our code co

## RAG: Search engine + OpenAI

In [14]:
example_question

'Is it still possible to join the course?'

In [15]:
prompt_template = """
You are a course teaching assistant.
Answer the QUESTION.
Use only the facts from CONTEXT (from FAQ database) to answer the QUESTION. If CONTEXT does not contain the answer, output NONE.

QUESTION: {question}

CONTEXT:
{context}
"""

In [16]:
# step 1: query the database
search_results = model_index.search(
    query=example_question,
    # filter_dict={"course": "machine-learning-zoomcamp"},
    filter_dict={"course": "data-engineering-zoomcamp"},
    boost_dict={"question": 3.0, "section": 0.3},
    num_results=5
)

# step 2: build the context
context = ""
for res in search_results:
    context += f""" - section: {res['section']}\n - question: {res['question']}\n - answer: {res['text']}\n\n"""

# step 3: generate prompt
prompt = prompt_template.format(question=example_question, context=context).strip()

In [17]:
# step 4: query ChatGPT via API:
response = openai_client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "user",
            "content": prompt
        }
    ],
)

In [18]:
response.choices[0].message.content

"Yes, it is still possible to join the course. Even if you don't register, you're still eligible to submit the homeworks. Just be mindful of the deadlines for turning in the final projects."

In [16]:
def search(query: str) -> [dict]:
    boost = {"question": 3.0, "section": 0.3}    
    search_results = model_index.search(
        query=query,
        # filter_dict={"course": "machine-learning-zoomcamp"},
        filter_dict={"course": "data-engineering-zoomcamp"},
        boost_dict=boost,
        num_results=5
    )
    return search_results

In [17]:
def build_prompt(query: str, search_results: [dict]):
    prompt_template = """
You are a course teaching assistant.
Answer the QUESTION.
Use only the facts from CONTEXT (from FAQ database) to answer the QUESTION. If CONTEXT does not contain the answer, output NONE.

QUESTION: {question}

CONTEXT:
{context}
    """

    context = ""
    for res in search_results:
        context += f""" - section: {res['section']}\n - question: {res['question']}\n - answer: {res['text']}\n\n"""
    
    prompt = prompt_template.format(question=query, context=context).strip()
    return prompt

In [18]:
def llm(prompt: str) -> str:
    """Request to ChatGPT API to make a query"""
    response = openai_client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "user",
                "content": prompt
            }
        ],
    )
    return response.choices[0].message.content

In [19]:
def rag(query: str):
    # 1. search locally
    faq_results = search(query)
    # 2. build a prompt
    prompt = build_prompt(query, search_results=faq_results)
    # 3. ask OpenAI
    answer = llm(prompt)
    return answer

In [20]:
rag("Is it possible to join the course after start?")

"Yes, it is possible to join the course after it has started. Even if you don't register initially, you are still eligible to submit the homeworks. However, be aware that there will be deadlines for turning in the final projects, so it is not advisable to leave everything to the last minute."

### Elasticsearch: replace search by the new search engine

In [21]:
from elasticsearch import Elasticsearch

In [28]:
# elasticsearch - container name in docker-compose
es_client = Elasticsearch("http://elasticsearch:9200")
# es_client.info()

In [31]:
# Elastic config definition
index_settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
    },
    "mappings": {
        "properties": {
            "text": {"type": "text"},
            "section": {"type": "text"},
            "question": {"type": "text"},
            "course": {"type": "keyword"},
        }
    },
}

index_name = "course-descripiton"

# create index
es_client.indices.create(
    index=index_name,
    body=index_settings
)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'course-descripiton'})

In [37]:
# Add document to index one by one:
for doc in tqdm(documents):
    es_client.index(index=index_name, document=doc)

100%|██████████| 948/948 [00:01<00:00, 525.97it/s]


In [41]:
query = "How to run kafka?"

search_query = {
    "size": 5,
    "query": {
        "bool": {
            "must": {
                "multi_match": {
                    "query": query,
                    "fields": ["question^3", "text", "section"],
                    "type": "best_fields",
                }
            },
            "filter": {
                "term": {
                    "course": "data-engineering-zoomcamp"
                }
            },
        }
    }
}

response = es_client.search(index=index_name, body=search_query)

In [45]:
# save initial documents found as doc list:
result_docs = []
for hit in response["hits"]["hits"]:
    result_docs.append(hit["_source"])

In [47]:
def elastic_search(query: str) -> list[dict]:
    # setup elastic request
    search_query = {
        "size": 5,
        "query": {
            "bool": {
                "must": {
                    "multi_match": {
                        "query": query,
                        "fields": ["question^3", "text", "section"],
                        "type": "best_fields",
                    }
                },
                "filter": {
                    "term": {
                        "course": "data-engineering-zoomcamp"
                    }
                },
            }
        }
    }
    
    # search in elastic
    response = es_client.search(index=index_name, body=search_query)
    
    # save initial documents found as doc list:
    result_docs = []
    for hit in response["hits"]["hits"]:
        result_docs.append(hit["_source"])

    return result_docs

In [57]:
# updated RAG flow
def rag(query: str) -> str:
    # 1. search in elastic
    faq_results = elastic_search(query)
    # 2. build a prompt
    prompt = build_prompt(query, search_results=faq_results)
    # 3. ask OpenAI
    answer = llm(prompt)
    return answer

In [55]:
answer = rag(query="Kafka?")
print(answer)

Based on the provided context, it appears that Kafka is covered extensively in Module 6, specifically related to issues and configurations with Python Kafka, Java Kafka, and Confluent Kafka. Here are a few highlights:

1. To resolve the "Permission denied" error when running `./build.sh`:
   - Run `chmod +x build.sh` in the terminal within the `/docker/spark` directory.

2. If the "kafka" module is not found when running `producer.py`:
   - Create a virtual environment and install packages using `python -m venv env`, activate it with `source env/bin/activate`, and install requirements with `pip install -r ../requirements.txt`.

3. For Java Kafka tests not picked up in VSCode:
   - Go to VS Code, navigate to JAVA PROJECTS in the Explorer, clean Workspace, and reload.

4. To find the schema registry URL in Confluent Kafka:
   - Navigate to your environment → "Stream Governance API" → The URL under “Endpoint” in Confluent Cloud.

Thus, if you have any specific query about Kafka, one of th