In [1]:
from elasticsearch import Elasticsearch

In [2]:
es_client = Elasticsearch('http://localhost:9200') 

In [4]:
index_settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "text": {"type": "text"},
            "section": {"type": "text"},
            "question": {"type": "text"},
            "course": {"type": "keyword"} 
        }
    }
}

index_name = "course-questions"

es_client.indices.create(index=index_name, body=index_settings)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'course-questions'})

In [5]:
from tqdm.auto import tqdm

In [7]:
import json

In [8]:
with open('document.json', 'rt') as file_in:
    raw_docs = json.load(file_in)

In [9]:
for course_dict in tqdm(raw_docs):
    for doc in course_dict['documents']:
        doc['course'] = course_dict['course']
        es_client.index(index=index_name, document=doc)

  0%|          | 0/3 [00:00<?, ?it/s]

In [10]:
def search_documents(query):
    search_query = {
        "size": 5,
        "query": {
            "bool": {
                "must": {
                    "multi_match": {
                        "query": query,
                        "fields": ["question^3", "text", "section"],
                        "type": "best_fields"
                    }
                },
                "filter": {
                    "term": {
                        "course": "data-engineering-zoomcamp"
                    }
                }
            }
        }
    }
    
    response = es_client.search(index=index_name, body=search_query)
    
    result_docs = []
    
    for hit in response['hits']['hits']:
        result_docs.append(hit['_source'])
    
    return result_docs

In [11]:
from openai import OpenAI

In [12]:
client = OpenAI(
    base_url="http://localhost:11434/v1/",
    api_key="ollama"
)

In [13]:
def llm(prompt):
    response = client.chat.completions.create(
        model= 'deepseek-r1:1.5b', # 'gemma3:1b',
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

In [14]:
def build_prompt(query, search_results):
    prompt_template = """
    You're a course teaching assistant. Answer the QUESTION based on the CONTEXT from the FAQ database.
    Use only the facts from the CONTEXT when answering the QUESTION.
    If the CONTEXT does not have the Answer then output as you could not find the answer
    
    QUESTION: {question}
    
    CONTEXT: 
    {context}
    """.strip()
    
    context = ""
        
    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"
    
    return prompt_template.format(question=query, context=context).strip()

In [15]:
def rag(query):
    search_results = search_documents(query)
    prompt = build_prompt(query, search_results)
    return llm(prompt)

In [16]:
query = "How can I install kafka ?"

In [17]:
rag(query)

'<think>\nOkay, so the user is asking how to install Kafka. Let\'s break down the context provided.\n\nFirst, I see that there are sections about Confluent Kafka and DuckDB within a workshop 1 - dlthub context. Under each, there are respective questions answered but no direct answers.\n\nLooking at Confluent Kafka section: The environment uses Confluent Cloud. They mention accessing the right navigation bar to find a "Stream Governance API," which should be under Kafka topics or similar. They then explain that by clicking on that link and entering the URL provided, they can get credentials from the "Credentials" section below.\n\nIn DuckDB section: To read from Parquet files, they provide code examples using Pyarrow.parquet and Spark\'s parquet driver. Neither of these is directly related to installing Kafka.\n\nNow, looking at other modules:\n- Module 6: streaming with Kafka doesn\'t list an install instruction.\n- Module 5: pyspark similarly has no installation steps mentioned for Ka