wget https://github.com/alexeygrigorev/llm-rag-workshop/raw/main/notebooks/documents.json

In [21]:
import json
from typing import List, TypedDict
from pprint import pprint

In [22]:
ELASTIC_SEARCH_URL = "http://elasticsearch:9200"
SEARCH_FILTER_TERM = "data-engineering-zoomcamp"
INDEX_NAME = "course-questions"

In [23]:
from elasticsearch import Elasticsearch
from elasticsearch import NotFoundError

es = Elasticsearch(ELASTIC_SEARCH_URL)
pprint(es.info())

ObjectApiResponse({'name': '802180b6a65d', 'cluster_name': 'docker-cluster', 'cluster_uuid': '4KCfHTl1QAONG0zILkySrA', 'version': {'number': '8.4.3', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '42f05b9372a9a4a470db3b52817899b99a76ee73', 'build_date': '2022-10-04T07:17:24.662462378Z', 'build_snapshot': False, 'lucene_version': '9.3.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})


In [24]:
class Document(TypedDict):
    id: str
    text: str
    course: str
    section: str
    question: str

In [25]:
with open('./documents.json') as file:
    documents_file = json.load(file)

In [26]:
print(documents_file[0]["documents"][0])
print(documents_file[0]["documents"][1])

{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.", 'section': 'General course-related questions', 'question': 'Course - When will the course start?'}
{'text': 'GitHub - DataTalksClub data-engineering-zoomcamp#prerequisites', 'section': 'General course-related questions', 'question': 'Course - What are the prerequisites for this course?'}


In [27]:
import hashlib

documents: List[Document] = []

for course in documents_file:
    course_name = course["course"]

    for document in course["documents"]:
        question = document["question"]
        question_hash = hashlib.md5(question.encode()).hexdigest()
        document["id"]= question_hash
        document["course"] = course_name
        documents.append(document)

print(documents[0])

{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.", 'section': 'General course-related questions', 'question': 'Course - When will the course start?', 'id': '2d669d12c0511996b393bff34bfbcf55', 'course': 'data-engineering-zoomcamp'}


In [28]:
index_settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "text": {
                "type": "text"
            },
            "section": {
                "type": "text"
            },
            "question": {
                "type": "text"
            },
            "course": {
                "type": "keyword"
            }
        }
    }
}

try:
    indeces = es.indices.get(index=INDEX_NAME)
    print("Index already exists")
    #     es.indices.delete(index=INDEX_NAME, ignore=[400, 404])
except NotFoundError:
    print("Creating index")
    response = es.indices.create(index=INDEX_NAME, body=index_settings)
    pprint(response)

Index already exists


In [29]:
from tqdm.auto import tqdm

for doc in tqdm(documents):
    es.index(index=INDEX_NAME, id=doc["id"], document=doc)

  5%|▍         | 47/948 [00:00<00:12, 73.98it/s]

100%|██████████| 948/948 [00:10<00:00, 92.34it/s] 


In [31]:
from retrieval_service import RetrievalService


user_question = "How can I join the course after one has started?"

retrieval_service = RetrievalService()
retrieval_result = retrieval_service.get_retrival_result(user_question, SEARCH_FILTER_TERM, INDEX_NAME, es)

print(retrieval_result[0], end="\n\n")

[RetrievalResult(score=62.70455, section='General course-related questions', course='data-engineering-zoomcamp', question='Course - Can I still join the course after the start date?', answer="Yes, even if you don't register, you're still eligible to submit the homeworks.\nBe aware, however, that there will be deadlines for turning in the final projects. So don't leave everything for the last minute."), RetrievalResult(score=47.086697, section='General course-related questions', course='data-engineering-zoomcamp', question='Course - Can I follow the course after it finishes?', answer='Yes, we will keep all the materials after the course finishes, so you can follow the course at your own pace after it finishes.\nYou can also continue looking at the homeworks and continue preparing for the next cohort. I guess you can also start working on your final capstone project.'), RetrievalResult(score=36.514423, section='General course-related questions', course='data-engineering-zoomcamp', questi