# The RAG Flow Cleaning and Modularizing Code


In [55]:
import json
import os
import minsearch
from openai import OpenAI
from dotenv import load_dotenv


In [56]:
def read_and_parse_json(filename):
    with open(filename, 'rt') as f_in:
        docs_raw = json.load(f_in)
    
    documents = []

    for course_dict in docs_raw:
        for doc in course_dict['documents']:
            doc['course'] = course_dict['course']
            documents.append(doc)

    return documents

In [57]:
def index_and_fit(documents):
    index = minsearch.Index(
        text_fields=["question", "text", "section"],
        keyword_fields=["course"]
    )

    index.fit(documents)

    return index

In [58]:
def create_openai_client():
    load_dotenv()
    api_key = os.getenv('OPENAI_API_KEY')
    return OpenAI(api_key=api_key)

In [59]:
# Search function that takes a query and returns the search results
def search(query, index):
    boost = {'question': 3.0, 'section': 0.5}

    results = index.search(
        query=query,
        filter_dict={'course': 'data-engineering-zoomcamp'},
        boost_dict=boost,
        num_results=5
    )

    return results

In [60]:
# Function to build the prompt. It takes the query and the search results and returns the prompt
def build_prompt(query, search_results):
    prompt_template = """
    You're a course teaching assistant. A student asks you the following question.
    Use only the facts from the CONTEXT when answering the QUESTION.

    QUESTION: {question}

    CONTEXT: 
    {context}
    """.strip()

    context = ""

    for doc in search_results:
        context = context + f"section: {doc['section']}\nquestion: {doc['question']}\nanswer: {doc['text']}\n\n"

    prompt = prompt_template.format(question=query, context=context).strip()

    return prompt

In [61]:
# Function to generate the answer using the OpenAI API
def llm(prompt, client):
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "user", "content": prompt},
        ]
    )

    return response.choices[0].message.content

In [62]:
# This function takes a query and returns the answer using the LLM model
# It uses the search, build_prompt, and llm functions
def rag(query, index, client): 
    search_results = search(query, index)
    prompt = build_prompt(query, search_results)
    answer = llm(prompt, client)
    return answer

In [63]:
documents = read_and_parse_json('documents.json')
index = index_and_fit(documents)
client = create_openai_client()
query = "how do I run kafka?"

print(rag(query, index, client))

To run Kafka, follow the steps mentioned in the provided answers within the context:

For running a Java Kafka producer/consumer:
1. **Java Kafka**: Navigate to your project directory and run the following command in the terminal:
   ```shell
   java -cp build/libs/<jar_name>-1.0-SNAPSHOT.jar:out src/main/java/org/example/JsonProducer.java
   ```

For running a Python Kafka producer:
1. **Python Kafka**: Create a virtual environment to ensure all dependencies are correctly managed. Run the following commands:
   ```shell
   python -m venv env
   source env/bin/activate
   pip install -r ../requirements.txt
   ```

2. If you encounter a permission error while building scripts, make sure to grant execute permissions by running:
   ```shell
   chmod +x build.sh
   ```

3. If you come across the error "ModuleNotFoundError: No module named 'kafka.vendor.six.moves'", install the alternative Kafka package:
   ```shell
   pip install kafka-python-ng
   ```

These steps should help you set up a

# Search with Elasticsearch
In this section, we will now study how to use Elasticsearch to replace the way we are indexing and searching through the data.

To be able to run Elasticsearch we are going to create an container in docker with the following command:
```bash
docker run -it \
    --rm \
    --name elasticsearch \
    -p 9200:9200 \
    -p 9300:9300 \
    -e "discovery.type=single-node" \
    -e "xpack.security.enabled=false" \
    docker.elastic.co/elasticsearch/elasticsearch:8.4.3
```

In [64]:
from elasticsearch import Elasticsearch
from tqdm.auto import tqdm

In [65]:
# Create an Elasticsearch client
es_client = Elasticsearch('http://localhost:9200')

In [66]:
# Check the connection
es_client.info()

ObjectApiResponse({'name': '84c3a1d35d9f', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'n5jPCc4XQ_2-fuzBKD2j9Q', 'version': {'number': '8.4.3', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '42f05b9372a9a4a470db3b52817899b99a76ee73', 'build_date': '2022-10-04T07:17:24.662462378Z', 'build_snapshot': False, 'lucene_version': '9.3.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

In [67]:
# Define the index settings, mappings and name. Then reate the index using the Elasticsearch client
index_settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "text": {"type": "text"},
            "section": {"type": "text"},
            "question": {"type": "text"},
            "course": {"type": "keyword"} 
        }
    }
}

index_name = "course_questions"
try: 
    es_client.indices.create(index=index_name, body=index_settings)
except:
    pass

In [68]:
# Using the same documents, index the documents into the Elasticsearch index
documents = read_and_parse_json('documents.json')
documents[0]

{'text': "The purpose of this document is to capture frequently asked technical questions\nThe exact day and hour of the course will be 15th Jan 2024 at 17h00. The course will start with the first  “Office Hours'' live.1\nSubscribe to course public Google Calendar (it works from Desktop only).\nRegister before the course starts using this link.\nJoin the course Telegram channel with announcements.\nDon’t forget to register in DataTalks.Club's Slack and join the channel.",
 'section': 'General course-related questions',
 'question': 'Course - When will the course start?',
 'course': 'data-engineering-zoomcamp'}

In [69]:
# Execute the indexing for all the documents
for doc in documents:
    es_client.index(index=index_name, document=doc)

In [70]:
# Query the Elasticsearch index using the search function
query = "I just discovered the course. Can I still join it?"

# Define the search query
# The query has two main components, the must and the filter
# The must component is used to search for the query in the text fields
# The filter component is used to filter the results based on the course name
search_query = {
    "size": 5,
    "query": {
        "bool": {
            "must": {
                "multi_match": {
                    "query": query,
                    "fields": ["question^3", "text", "section"], # ^3 boosts the question field
                    "type": "best_fields"
                }
            },
            "filter": {
                "term": {
                    "course": "data-engineering-zoomcamp"
                }
            }
        }
    }
}

In [71]:
response = es_client.search(index=index_name, body=search_query)

In [72]:
response['hits']['hits']

[{'_index': 'course_questions',
  '_id': 'Q_fsb5ABrIFw-M8t4ZBH',
  '_score': 73.286255,
  '_source': {'text': "Yes, even if you don't register, you're still eligible to submit the homeworks.\nBe aware, however, that there will be deadlines for turning in the final projects. So don't leave everything for the last minute.",
   'section': 'General course-related questions',
   'question': 'Course - Can I still join the course after the start date?',
   'course': 'data-engineering-zoomcamp'}},
 {'_index': 'course_questions',
  '_id': '9_cWcJABrIFw-M8tNZPO',
  '_score': 73.286255,
  '_source': {'text': "Yes, even if you don't register, you're still eligible to submit the homeworks.\nBe aware, however, that there will be deadlines for turning in the final projects. So don't leave everything for the last minute.",
   'section': 'General course-related questions',
   'question': 'Course - Can I still join the course after the start date?',
   'course': 'data-engineering-zoomcamp'}},
 {'_index'

In [73]:
result_docs = []
for hit in response['hits']['hits']:
    result_docs.append(hit['_source'])

In [74]:
# We create a function that integrates everything we have done so far
def elastic_search(query):
    search_query = {
        "size": 5,
        "query": {
            "bool": {
                "must": {
                    "multi_match": {
                        "query": query,
                        "fields": ["question^3", "text", "section"], # ^3 boosts the question field
                        "type": "best_fields"
                    }
                },
                "filter": {
                    "term": {
                        "course": "data-engineering-zoomcamp"
                    }
                }
            }
        }
    }

    response = es_client.search(index=index_name, body=search_query)

    result_docs = []
    for hit in response['hits']['hits']:
        result_docs.append(hit['_source'])

    return result_docs


In [75]:
elastic_search(query)

[{'text': "Yes, even if you don't register, you're still eligible to submit the homeworks.\nBe aware, however, that there will be deadlines for turning in the final projects. So don't leave everything for the last minute.",
  'section': 'General course-related questions',
  'question': 'Course - Can I still join the course after the start date?',
  'course': 'data-engineering-zoomcamp'},
 {'text': "Yes, even if you don't register, you're still eligible to submit the homeworks.\nBe aware, however, that there will be deadlines for turning in the final projects. So don't leave everything for the last minute.",
  'section': 'General course-related questions',
  'question': 'Course - Can I still join the course after the start date?',
  'course': 'data-engineering-zoomcamp'},
 {'text': 'Yes, we will keep all the materials after the course finishes, so you can follow the course at your own pace after it finishes.\nYou can also continue looking at the homeworks and continue preparing for the 

In [76]:
def rag(query, client): 
    search_results = elastic_search(query)
    prompt = build_prompt(query, search_results)
    answer = llm(prompt, client)
    return answer

In [77]:
rag(query, client)

"Yes, you can still join the course even if it has already started. You are eligible to submit the homeworks, but be mindful of the deadlines for turning in the final projects. So, don't leave everything for the last minute."