### Prepare the documents from KB

In [80]:
import requests
import json
import hashlib

docs_url = 'https://github.com/DataTalksClub/llm-zoomcamp/blob/main/01-intro/documents.json?raw=1'
docs_response = requests.get(docs_url)
documents_raw = docs_response.json()

documents = []

def generate_id(doc):
    doc_str = json.dumps(doc, sort_keys=True)  # Convert document to a JSON string
    doc_hash = hashlib.md5(doc_str.encode()).hexdigest()  # Generate an MD5 hash
    return doc_hash

for course in documents_raw:
    course_name = course['course']

    for doc in course['documents']:
        doc['course'] = course_name
        doc_id = generate_id(doc)  # Generate a unique ID for the document
        doc['doc_id'] = doc_id  # Add the unique ID as a doc_id key
        documents.append(doc)

In [81]:
documents[0:2]

[{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.",
  'section': 'General course-related questions',
  'question': 'Course - When will the course start?',
  'course': 'data-engineering-zoomcamp',
  'doc_id': 'bae7a31e6abaddb52b4061dcf238fc61'},
 {'text': 'GitHub - DataTalksClub data-engineering-zoomcamp#prerequisites',
  'section': 'General course-related questions',
  'question': 'Course - What are the prerequisites for this course?',
  'course': 'data-engineering-zoomcamp',
  'doc_id': '3e5d4959603c68a1e154fa2a6bd9d1e8'}]

### Create Elasticsearch index. Load the data from KB

In [82]:
from elasticsearch import Elasticsearch

elasticsearch_host = "http://localhost:9200"
es = Elasticsearch(hosts=elasticsearch_host)
es.info()

ObjectApiResponse({'name': '0e7ffde126cf', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'RMweVgX3SNqBLG5XOgEriQ', 'version': {'number': '9.0.2', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '0a58bc1dc7a4ae5412db66624aab968370bd44ce', 'build_date': '2025-05-28T10:06:37.834829258Z', 'build_snapshot': False, 'lucene_version': '10.1.0', 'minimum_wire_compatibility_version': '8.18.0', 'minimum_index_compatibility_version': '8.0.0'}, 'tagline': 'You Know, for Search'})

In [83]:
from elasticsearch.exceptions import NotFoundError, BadRequestError

index_settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "dynamic": "strict",
        "properties": {
            "text": {"type": "text"},
            "section": {"type": "text"},
            "question": {"type": "text"},
            "course": {"type": "keyword"},
            "doc_id": {"type": "keyword"}
        }
    }
}

index_name = "zoomcamp-courses-questions"

# Delete the existing index if it exists
try:
    es.indices.delete(index=index_name)
    print(f"Deleted index '{index_name}'")
except NotFoundError as e:
    print(f"Index '{index_name}' does not exist, no need to delete")
except BadRequestError as e:
    print(f"Error deleting index '{index_name}': {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

# Create the index with the new settings
try:
    response = es.indices.create(index=index_name, settings=index_settings["settings"], mappings=index_settings["mappings"])
    print(f"Created index '{index_name}': {response}")
except Exception as e:
    print(f"Error creating index '{index_name}': {e}")

response

Deleted index 'zoomcamp-courses-questions'
Created index 'zoomcamp-courses-questions': {'acknowledged': True, 'shards_acknowledged': True, 'index': 'zoomcamp-courses-questions'}


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'zoomcamp-courses-questions'})

In [84]:
from tqdm.auto import tqdm

for doc in tqdm(documents, ncols = 100):
    doc_id = doc["doc_id"]
    es.index(index=index_name, id=doc_id, document=doc)

es.count(index=index_name)

  0%|                                                                       | 0/948 [00:00<?, ?it/s]

ObjectApiResponse({'count': 628, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}})

## Retrieval (R)

In [85]:
from enum import Enum
from typing import Dict, Any, List
from typing import Optional
from elasticsearch import Elasticsearch

class Course(Enum):
    """Enum class for available course values."""
    DATA_ENGINEERING_ZOOMCAMP = "data-engineering-zoomcamp"
    MACHINE_LEARNING_ZOOMCAMP = "machine-learning-zoomcamp"
    MLOPS_ZOOMCAMP = "mlops-zoomcamp"
    LLM_ZOOMCAMP = "llm-zoomcamp"

def get_es_client() -> Elasticsearch:
    """
    Returns an Elasticsearch client.

    Returns:
        Elasticsearch: An Elasticsearch client.
    """
    return Elasticsearch("http://localhost:9200")

def set_search_query(
    question: str, 
    course_filter: Optional[Course] = None, 
    num_results: int = 5, 
    boost: int = 4
) -> Dict[str, Any]:
    """
    Set the search query for the Elasticsearch client.

    Args:
        question (str): The question to search for.
        course_filter (Optional[Course], optional): The course to filter by (must be from Course enum). 
                                                   If None, searches across all courses. Defaults to None.
        num_results (int, optional): The number of results to return. Defaults to 5.
        boost (int, optional): The boost factor for the question. Defaults to 4.
    
    Returns:
        Dict[str, Any]: The Elasticsearch search query dictionary.
    """
    # Build the base query structure
    query_structure = {
        "multi_match": {
            "query": question,
            "fields": [f"question^{boost}", "text", "section"],
            "type": "best_fields"
        }
    }
    
    # Conditionally add the course filter
    if course_filter is not None:
        search_query = {
            "size": num_results,
            "query": {
                "bool": {
                    "must": query_structure,
                    "filter": {
                        "term": {
                            "course": course_filter.value
                        }
                    }
                }
            }
        }
    else:
        # No course filter - search across all courses
        search_query = {
            "size": num_results,
            "query": query_structure
        }
    
    return search_query

def search_documents(
        search_query: Dict[str, Any],
        formatted_docs: bool = True,
        index: str = "zoomcamp-courses-questions"
    ) -> Dict[str, Any]:
    """
    Search documents in Elasticsearch.

    Args:
        search_query (dict): The search query to execute.
        index (str, optional): The index to search in. Defaults to "zoomcamp-courses-questions".

    Returns:
        dict: The response from Elasticsearch.
    """
    es = get_es_client()

    response = es.search(index=index, body=search_query)

    documents = [hit['_source'] for hit in response['hits']['hits']]

    if formatted_docs:
        return documents
    else:
        return response

In [86]:
def format_context(documents: List[Dict[str, Any]]) -> str:
    """
    Format the documents into a context string.

    Args:
        documents (List[Dict[str, Any]]): The documents to format.

    Returns:
        str: The formatted context string.
    """
    context_template = """Q: {question}
A: {text}
""".strip()

    context = ""

    for doc in documents:
        doc_str = context_template.format(question=doc['question'], text=doc['text'])
        context += doc_str + "\n\n"  # Add double newline between documents

    context = context.strip()

    return context

In [87]:
def test_set_search_query():
    """
    Test suite for the set_search_query function to validate its behavior
    with and without course filters.
    """
    
    def test_with_course_filter():
        """Test set_search_query with a course filter."""
        print("🧪 Testing set_search_query WITH course filter...")
        
        query = set_search_query(
            question="How do I copy files to a Docker container?",
            course_filter=Course.MACHINE_LEARNING_ZOOMCAMP,
            num_results=3,
            boost=4
        )
        
        # Test basic structure
        assert isinstance(query, dict), "Query should be a dictionary"
        assert "size" in query, "Query should have 'size' field"
        assert "query" in query, "Query should have 'query' field"
        
        # Test size parameter
        assert query["size"] == 3, f"Expected size 3, got {query['size']}"
        
        # Test query structure with course filter
        assert "bool" in query["query"], "Query should have 'bool' structure when course filter is provided"
        bool_query = query["query"]["bool"]
        
        assert "must" in bool_query, "Bool query should have 'must' clause"
        assert "filter" in bool_query, "Bool query should have 'filter' clause when course is specified"
        
        # Test multi_match structure
        multi_match = bool_query["must"]["multi_match"]
        assert multi_match["query"] == "How do I copy files to a Docker container?", "Question should match input"
        assert "question^4" in multi_match["fields"], "Should boost question field with factor 4"
        assert "text" in multi_match["fields"], "Should include text field"
        assert "section" in multi_match["fields"], "Should include section field"
        assert multi_match["type"] == "best_fields", "Should use best_fields type"
        
        # Test course filter
        course_filter = bool_query["filter"]["term"]
        assert course_filter["course"] == "machine-learning-zoomcamp", "Should filter by correct course"
        
        print("✅ Test with course filter PASSED")
        return True
    
    def test_without_course_filter():
        """Test set_search_query without a course filter."""
        print("🧪 Testing set_search_query WITHOUT course filter...")
        
        query = set_search_query(
            question="How do I copy files to a Docker container?",
            num_results=5,
            boost=2
        )
        
        # Test basic structure
        assert isinstance(query, dict), "Query should be a dictionary"
        assert "size" in query, "Query should have 'size' field"
        assert "query" in query, "Query should have 'query' field"
        
        # Test size parameter (should use default or specified value)
        assert query["size"] == 5, f"Expected size 5, got {query['size']}"
        
        # Test query structure without course filter
        assert "multi_match" in query["query"], "Query should have direct 'multi_match' when no course filter"
        assert "bool" not in query["query"], "Query should NOT have 'bool' structure when no course filter"
        
        # Test multi_match structure
        multi_match = query["query"]["multi_match"]
        assert multi_match["query"] == "How do I copy files to a Docker container?", "Question should match input"
        assert "question^2" in multi_match["fields"], "Should boost question field with factor 2"
        assert "text" in multi_match["fields"], "Should include text field"
        assert "section" in multi_match["fields"], "Should include section field"
        assert multi_match["type"] == "best_fields", "Should use best_fields type"
        
        print("✅ Test without course filter PASSED")
        return True
    
    def test_different_courses():
        """Test set_search_query with different course enum values."""
        print("🧪 Testing set_search_query with different course values...")
        
        courses_to_test = [
            Course.DATA_ENGINEERING_ZOOMCAMP,
            Course.MACHINE_LEARNING_ZOOMCAMP,
            Course.MLOPS_ZOOMCAMP,
            Course.LLM_ZOOMCAMP
        ]
        
        for course in courses_to_test:
            query = set_search_query(
                question="Test question",
                course_filter=course
            )
            
            # Test that the correct course value is used
            expected_course = course.value
            actual_course = query["query"]["bool"]["filter"]["term"]["course"]
            assert actual_course == expected_course, f"Expected {expected_course}, got {actual_course}"
        
        print("✅ Test with different courses PASSED")
        return True
    
    def test_default_parameters():
        """Test set_search_query with default parameters."""
        print("🧪 Testing set_search_query with default parameters...")
        
        query = set_search_query(question="Test question")
        
        # Should use defaults: course_filter=None, num_results=5, boost=4
        assert query["size"] == 5, f"Expected default size 5, got {query['size']}"
        
        # Should not have course filter (direct multi_match)
        assert "multi_match" in query["query"], "Should have direct multi_match with defaults"
        
        # Should use default boost of 4
        fields = query["query"]["multi_match"]["fields"]
        assert "question^4" in fields, "Should use default boost of 4"
        
        print("✅ Test with default parameters PASSED")
        return True
    
    def test_edge_cases():
        """Test edge cases and parameter validation."""
        print("🧪 Testing edge cases...")
        
        # Test with empty question
        query = set_search_query(question="")
        assert query["query"]["multi_match"]["query"] == "", "Should handle empty question"
        
        # Test with very high boost
        query = set_search_query(question="test", boost=100)
        assert "question^100" in query["query"]["multi_match"]["fields"], "Should handle high boost values"
        
        # Test with very high num_results
        query = set_search_query(question="test", num_results=1000)
        assert query["size"] == 1000, "Should handle high num_results values"
        
        print("✅ Edge cases test PASSED")
        return True
    
    # Run all tests
    print("🚀 Starting set_search_query validation tests...\n")
    
    try:
        test_with_course_filter()
        test_without_course_filter() 
        test_different_courses()
        test_default_parameters()
        test_edge_cases()
        
        print("\n🎉 ALL TESTS PASSED! The set_search_query function is working correctly.")
        return True
        
    except AssertionError as e:
        print(f"\n❌ TEST FAILED: {e}")
        return False
    except Exception as e:
        print(f"\n💥 UNEXPECTED ERROR: {e}")
        return False

# Run the test suite
test_set_search_query()


🚀 Starting set_search_query validation tests...

🧪 Testing set_search_query WITH course filter...
✅ Test with course filter PASSED
🧪 Testing set_search_query WITHOUT course filter...
✅ Test without course filter PASSED
🧪 Testing set_search_query with different course values...
✅ Test with different courses PASSED
🧪 Testing set_search_query with default parameters...
✅ Test with default parameters PASSED
🧪 Testing edge cases...
✅ Edge cases test PASSED

🎉 ALL TESTS PASSED! The set_search_query function is working correctly.


True

# Augmenting (A)

In [88]:
def build_prompt(question: str, context: str) -> str:
    """
    Build a prompt for the LLM.

    Args:
        question (str): The question to answer.
        context (str): The context to use for the answer.

    Returns:
        str: The prompt for the LLM.
    """

    prompt = f"""You're a course teaching assistant. Answer the QUESTION based on the CONTEXT from the FAQ database.
Use only the facts from the CONTEXT when answering the QUESTION.

QUESTION: {question}

CONTEXT:
{context}""".strip()

    return prompt

# Generation (G)

In [89]:
import os
from dotenv import load_dotenv
from openai import OpenAI

def get_openai_client():
    """
    Get the OpenAI client.
    """
    env_file_path = ".env"
    load_dotenv(dotenv_path=env_file_path, verbose=True, override=True)
    open_ai_api_key = os.getenv('OPENAI_API_KEY')

    return OpenAI(api_key=open_ai_api_key)

def get_llm_response(prompt: str, model: str = "gpt-4o") -> str:
    """
    Get a response from the OpenAI client.
    """
    client = get_openai_client()

    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )

    return response.choices[0].message.content

# RAG

In [90]:
question = "How to run a script while a web-server is working?"

def rag(question: str, course_filter: Optional[Course] = None, debug: bool = False):
    search_query = set_search_query(question=question, course_filter=course_filter)
    serach_results = search_documents(search_query)
    context = format_context(serach_results)
    prompt = build_prompt(question=question, context=context)
    response = get_llm_response(prompt)

    if debug:
        print(f"Search query: {search_query}")
        print(f"Search results: {serach_results}")
        print(f"Context: {context}")
        print(f"Prompt: {prompt}")
        print(f"Response: {response}")
        print(f"--------------------------------")

    return response

rag(question=question, debug=False)

'To run a script while a web server is working, you can start by running your web server in one terminal (or command window, PowerShell, etc.). Then, open another terminal and run your Python script from there. This allows the web server and the Python script to operate simultaneously.'

# Homework

### Q3. Searching

In [91]:
question = "How do execute a command on a Kubernetes pod?"

search_query = set_search_query(question=question)

results = search_documents(search_query, formatted_docs=False)

print(results['hits']['max_score'])

44.50556


### Q4. Filtering

In [92]:
question = "How do copy a file to a Docker container?"

search_query = set_search_query(question=question, course_filter=Course.MACHINE_LEARNING_ZOOMCAMP, num_results=3, boost=4)

results = search_documents(search_query)

print(results[2]['text'])

You can copy files from your local machine into a Docker container using the docker cp command. Here's how to do it:
In the Dockerfile, you can provide the folder containing the files that you want to copy over. The basic syntax is as follows:
COPY ["src/predict.py", "models/xgb_model.bin", "./"]											Gopakumar Gopinathan


### Q5. Building a prompt

In [93]:
context = format_context(results)

prompt = build_prompt(question=question, context=context)

print(len(prompt))

1446


### Q6. Tokens

In [98]:
%pip install tiktoken


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [95]:
import tiktoken

encoding = tiktoken.encoding_for_model("gpt-4o")

tokens = encoding.encode(prompt)

print(f"Number of tokens: {len(tokens)}\n")

# print(f"Tokens decoded:")
# for token in tokens:
#     print(f"{token}: {encoding.decode([token])}")

Number of tokens: 320



### Bonus: generating the answer (ungraded)

In [96]:
response = get_llm_response(prompt)

print(response)

You can copy a file from your local machine to a Docker container using the `docker cp` command. The basic syntax for copying a file or directory into a running Docker container is:

```bash
docker cp /path/to/local/file_or_directory container_id:/path/in/container
```


### Bonus: calculating the costs (ungraded)

In [97]:
input_price_per_token_gpt_4o = 2.5/1000000
output_price_per_token_gpt_4o = 10/1000000
num_requests = 1000
avg_per_request_tokens = 150
avg_output_tokens = 250

total_cost = (num_requests * avg_per_request_tokens * input_price_per_token_gpt_4o + num_requests * avg_output_tokens * output_price_per_token_gpt_4o)

print(f"Total cost: ${total_cost:.2f}")

Total cost: $2.88


# Change to Vector Search with Qdrant

In [6]:
from rag.config import QDRANT_URL

from qdrant_client import QdrantClient

client = QdrantClient(url=QDRANT_URL)

In [None]:
qd_client = QdrantClient()