In [1]:
!pip install minsearch

Collecting minsearch
  Downloading minsearch-0.0.4-py3-none-any.whl.metadata (8.1 kB)
Collecting scikit-learn (from minsearch)
  Downloading scikit_learn-1.7.1-cp313-cp313-win_amd64.whl.metadata (11 kB)
Collecting scipy>=1.8.0 (from scikit-learn->minsearch)
  Downloading scipy-1.16.1-cp313-cp313-win_amd64.whl.metadata (60 kB)
Collecting joblib>=1.2.0 (from scikit-learn->minsearch)
  Downloading joblib-1.5.1-py3-none-any.whl.metadata (5.6 kB)
Collecting threadpoolctl>=3.1.0 (from scikit-learn->minsearch)
  Downloading threadpoolctl-3.6.0-py3-none-any.whl.metadata (13 kB)
Downloading minsearch-0.0.4-py3-none-any.whl (11 kB)
Downloading scikit_learn-1.7.1-cp313-cp313-win_amd64.whl (8.7 MB)
   ---------------------------------------- 0.0/8.7 MB ? eta -:--:--
   ---- ----------------------------------- 1.0/8.7 MB 5.6 MB/s eta 0:00:02
   --------- ------------------------------ 2.1/8.7 MB 5.4 MB/s eta 0:00:02
   ---------------- ----------------------- 3.7/8.7 MB 6.2 MB/s eta 0:00:01
   ----

In [14]:
import minsearch
import json

In [3]:
with open('documents.json', 'rt') as f_in:
    docs_raw = json.load(f_in)

documents = []
for course_dict in docs_raw:
    for doc in course_dict['documents']:
        doc['course'] = course_dict['course']
        documents.append(doc)

## R: Retrieval (define search engine)

In [19]:
index = minsearch.Index(
    text_fields=["question", "text", "section"],
    keyword_fields=["course"]
)

In [20]:
q = 'the course has already started, can I still enroll?'

In [21]:
index.fit(documents)

<minsearch.minsearch.Index at 0x72ad97a99f40>

In [25]:
boost = {'question': 3.0, 'section': 0.5}

search_results = index.search(
    query=q,
    boost_dict=boost,
    filter_dict={'course': 'data-engineering-zoomcamp'},
    num_results=5,
)

## AG: Augmented Generation (LLM + engine)

In [24]:
from dotenv import load_dotenv
from datetime import datetime
from langchain_openai import AzureChatOpenAI
from langchain.schema.messages import HumanMessage
from langchain.prompts import ChatPromptTemplate

In [25]:
load_dotenv()
query = 'the course has already started, can I still enroll?'
llm_client = AzureChatOpenAI(deployment_name="gpt-4o-mini")
# response = llm_client.invoke([HumanMessage(content=query)])

In [26]:
def generate_prompt(question: str, search_results: list[str]):
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system", """
                You're a course teaching assistant, consider the current date: {current_date}. Answer the QUESTION based on the CONTEXT from the FAQ database.
                Use only the facts from the CONTEXT when answering the QUESTION.
                If the answer is not in the CONTEXT, say "This question is not related to our main topic".
                CONTEXT: {context}"""
            ),
            ("human", "{query}"),
            ("placeholder", "{agent_scratchpad}"),
            ]
        )
    context = ''
    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"
    return prompt.format_messages(query=question, context=context, current_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

In [27]:
def search(query: str) -> list[str]:
    boost = {'question': 3.0, 'section': 0.5}
    index = minsearch.Index(
        text_fields=["question", "text", "section"],
        keyword_fields=["course"]
    )

    results = index.search(
        query=query,
        filter_dict={'course': 'data-engineering-zoomcamp'},
        boost_dict=boost,
        num_results=5
    )

    return results

In [28]:
def get_client_llm() -> AzureChatOpenAI:
    return AzureChatOpenAI(
        deployment_name="gpt-4o-mini",
        model_name='gpt-4o-mini',
        )

In [29]:
def llm_execute(llm_client: AzureChatOpenAI, prompt: list[str]) -> str:
    response = llm_client.invoke(prompt)
    return response.content

In [12]:
def rag(query: str, llm_client: AzureChatOpenAI) -> str:
    search_results = search(query)
    prompt = generate_prompt(query, search_results)
    answer = llm_execute(llm_client, prompt)
    return answer

In [17]:
rag('How can i use kafka?', llm_client)

'This question is not related to our main topic.'

## Elastic Search: real engine

In [23]:
from elasticsearch import Elasticsearch
import json
from tqdm.auto import tqdm
from typing import List, Dict, Optional

class ElasticsearchManager:
    def __init__(self, index_name: str = "course-questions"):
        self.es_client = self._create_elasticsearch_client()
        self.index_name = index_name
        self._setup_index()

    @staticmethod
    def _create_elasticsearch_client() -> Elasticsearch:
        """Create and return an Elasticsearch client instance."""
        return Elasticsearch(
            'http://localhost:9200',
            headers={"Accept": "application/vnd.elasticsearch+json; compatible-with=8"}
        )

    def _setup_index(self) -> None:
        """Set up the Elasticsearch index with proper mappings if it doesn't exist."""
        index_settings = {
            "settings": {
                "number_of_shards": 1,
                "number_of_replicas": 0
            },
            "mappings": {
                "properties": {
                    "text": {"type": "text"},
                    "section": {"type": "text"},
                    "question": {"type": "text"},
                    "course": {"type": "keyword"} 
                }
            }
        }

        if not self.es_client.indices.exists(index=self.index_name):
            self.es_client.indices.create(
                index=self.index_name, 
                body=index_settings
            )

    def index_documents(self, documents: List[Dict]) -> None:
        """Index a list of documents into Elasticsearch."""
        for doc in tqdm(documents, desc="Indexing documents"):
            self.es_client.index(
                index=self.index_name,
                document=doc
            )

    def search(self, query: str, course_filter: Optional[str] = None) -> List[Dict]:
        """Search documents with optional course filter."""
        search_query = {
            "size": 5,
            "query": {
                "bool": {
                    "must": {
                        "multi_match": {
                            "query": query,
                            "fields": ["question^3", "text", "section"],
                            "type": "best_fields"
                        }
                    }
                }
            }
        }

        if course_filter:
            search_query["query"]["bool"]["filter"] = {
                "term": {"course": course_filter}
            }

        response = self.es_client.search(
            index=self.index_name,
            body=search_query
        )
        
        return [hit["_source"] for hit in response["hits"]["hits"]]


def load_and_prepare_documents(file_path: str) -> List[Dict]:
    """Load and prepare documents from JSON file."""
    with open(file_path, 'rt') as f_in:
        docs_raw = json.load(f_in)

    documents = []
    for course_dict in docs_raw:
        for doc in course_dict['documents']:
            doc['course'] = course_dict['course']
            documents.append(doc)
    
    return documents

In [None]:
documents = load_and_prepare_documents('documents.json')
es_manager = ElasticsearchManager()
es_manager.index_documents(documents)
results = es_manager.search(query="data pipelines", course_filter="data-engineering-zoomcamp")

In [37]:
def rag(query: str, llm_client: AzureChatOpenAI) -> str:
    search_results = results
    prompt = generate_prompt(query, search_results)
    answer = llm_execute(llm_client, prompt)
    return answer

In [38]:
rag(query, llm_client)

'This question is not related to our main topic.'