### About
#### Exploring mage hybrid framework
#### Mage is an open-source hybrid framework for ETL.
#### The following tasks are explored using the mage framework
#### * Data ingestion
#### * Data chunking
#### * Data export using elasticsearch
#### * Data retrieval using elasticsearch

#### Github codespaces utilized for this task
#### Steps to run the mage.ai
```
Clone repository
git clone https://github.com/mage-ai/rag-project
cd rag-project
navigate to the rag-project/llm directory, add spacy to the requirements.txt.
Then update the Dockerfile found in the rag-project directory with the following:
RUN python -m spacy download en_core_web_sm
Run
./scripts/start.sh
Once started, go to http://localhost:6789/
```

### Q1. Running Mage
#### What's the version of mage?
#### Answer: v0.9.72

### Ingestion

#### For FAQ document: version 1
#### 1. Select New --> and create Retrieval Augmented Generation pipeline
#### 2. Select Data preparation and Go
#### 3. Select Load and Go
#### 4. In the load select Ingest and Go
#### 5. In the ingest select Add block --> custom code
#### Paste the following code in edit section



In [1]:
%%writefile data_ingest.py
#Data ingest
import io

import requests
import docx



if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
    from mage_ai.data_preparation.decorators import test


@data_loader
def load_data(*args, **kwargs):
    def clean_line(line):
        line = line.strip()
        line = line.strip('\uFEFF')
        return line
    
    def read_faq(file_id):
        url = f'https://docs.google.com/document/d/{file_id}/export?format=docx'
        
        response = requests.get(url)
        response.raise_for_status()
        
        with io.BytesIO(response.content) as f_in:
            doc = docx.Document(f_in)

        questions = []

        question_heading_style = 'heading 2'
        section_heading_style = 'heading 1'
        
        heading_id = ''
        section_title = ''
        question_title = ''
        answer_text_so_far = ''
        
        for p in doc.paragraphs:
            style = p.style.name.lower()
            p_text = clean_line(p.text)
        
            if len(p_text) == 0:
                continue
        
            if style == section_heading_style:
                section_title = p_text
                continue
        
            if style == question_heading_style:
                answer_text_so_far = answer_text_so_far.strip()
                if answer_text_so_far != '' and section_title != '' and question_title != '':
                    questions.append({
                        'text': answer_text_so_far,
                        'section': section_title,
                        'question': question_title,
                    })
                    answer_text_so_far = ''
        
                question_title = p_text
                continue
            
            answer_text_so_far += '\n' + p_text
        
        answer_text_so_far = answer_text_so_far.strip()
        if answer_text_so_far != '' and section_title != '' and question_title != '':
            questions.append({
                'text': answer_text_so_far,
                'section': section_title,
                'question': question_title,
            })

        return questions

    faq_documents = {
                'llm-zoomcamp': '1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E',
            }
    documents = []

    for course, file_id in faq_documents.items():
    
        course_documents = read_faq(file_id)
        documents.append({'course': course, 'documents': course_documents})
    print('Length:',len(documents))
    return [documents]

@test
def test_output(output, *args) -> None:
    """
    Template code for testing the output of the block.
    """
    assert output is not None, 'The output is undefined'

Overwriting data_ingest.py


### Q2. Reading the documents
```
Now we can ingest the documents. Create a custom code ingestion block

Let's read the documents. We will use the same code we used for parsing FAQ: parse-faq-llm.ipynb

Use the following document_id: 1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E

Which is the document ID of LLM FAQ version 1

```
#### How many FAQ documents we processed?
#### Answer: 1

### Chunking
#### 1. Select data preparation --> transform --> chunking
#### 2. In chunking select Add block --> custom code
#### Paste the following code in edit section


In [2]:
%%writefile data_chunking.py
#Data chunking
import re
from typing import Any, Dict, List
import hashlib


@transformer
def chunk_documents(data: List[Dict[str, Any]], *args, **kwargs):
    # Generate a unique document ID
    def generate_document_id(doc):
        combined = f"{doc['course']}-{doc['question']}-{doc['text'][:10]}"
        hash_object = hashlib.md5(combined.encode())
        hash_hex = hash_object.hexdigest()
        document_id = hash_hex[:8]
        return document_id
    documents = []
    for course_dict in data:
        for doc in course_dict['documents']:
            doc['course'] = course_dict['course']
            #print(doc)
            #break
            # previously we used just "id" for document ID
            doc['document_id'] = generate_document_id(doc)
            documents.append(doc)
    print(f'Documents:', len(documents))
    return [documents]

Writing data_chunking.py


### Q3. Chunking
#### How many documents (chunks) do we have in the output?
#### Answer: 
```
Documents: 86
```

### Export
#### 1. Select data preparation --> export --> vector database
#### 2. In vector database select Add block --> custom code
#### Paste the following code in edit section
#### Change the connection string as http://elasticsearch:9200



In [3]:
%%writefile data_export_elasticsearch.py
#Data export
import json
from typing import Dict, List, Tuple, Union

import numpy as np
from elasticsearch import Elasticsearch
from datetime import datetime
from mage_ai.data_preparation.variable_manager import set_global_variable

@data_exporter
def elasticsearch(documents, *args, **kwargs):
    connection_string = kwargs.get('connection_string', 'http://localhost:9200')
    index_name_prefix = kwargs.get('index_name', 'documents')
    current_time = datetime.now().strftime("%Y%m%d_%M%S")
    index_name = f"{index_name_prefix}_{current_time}"
    print("index name:", index_name)
    set_global_variable('ominous_maelstrom', 'index_name', index_name)
    number_of_shards = kwargs.get('number_of_shards', 1)
    number_of_replicas = kwargs.get('number_of_replicas', 0)
    dimensions = kwargs.get('dimensions')

    

    es_client = Elasticsearch(connection_string)

    print(f'Connecting to Elasticsearch at {connection_string}')

    index_settings = {
    "settings": {
        "number_of_shards": number_of_shards,
        "number_of_replicas": number_of_replicas
    },
    "mappings": {
        "properties": {
            "text": {"type": "text"},
            "section": {"type": "text"},
            "question": {"type": "text"},
            "course": {"type": "keyword"},
            "document_id": {"type": "keyword"}
        }
    }
}
    # Recreate the index by deleting if it exists and then creating with new settings
    if es_client.indices.exists(index=index_name):
        es_client.indices.delete(index=index_name)
        print(f'Index {index_name} deleted')

    es_client.indices.create(index=index_name, body=index_settings)
    print('Index created with properties:')
    print(json.dumps(index_settings, indent=2))
    

    count = len(documents)
    print(f'Indexing {count} documents to Elasticsearch index {index_name}')
    last_document = None
    for idx, document in enumerate(documents):
        if idx % 100 == 0:
		        print(f'{idx + 1}/{count}')

        es_client.index(index=index_name, document=document)
        last_document = document
    if last_document:
        print('Last document indexed:')
        print(json.dumps(last_document, indent=2))

Writing data_export_elasticsearch.py


### Q4. Export
#### What's the last document id and the index name.
#### Anwer: document_id: d8c4c7bb | index_name: documents_20240819_2747
```
index name: documents_20240819_2747
Connecting to Elasticsearch at http://elasticsearch:9200
index name: documents_20240819_2747
Connecting to Elasticsearch at http://elasticsearch:9200
Index created with properties:
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": "0"
  },
  "mappings": {
    "properties": {
      "text": {
        "type": "text"
      },
      "section": {
        "type": "text"
      },
      "question": {
        "type": "text"
      },
      "course": {
        "type": "keyword"
      },
      "document_id": {
        "type": "keyword"
      }
    }
  }
}
Indexing 86 documents to Elasticsearch index documents_20240819_2747
1/86



Last document indexed:
{
  "text": "Answer",
  "section": "Workshops: X",
  "question": "Question",
  "course": "llm-zoomcamp",
  "document_id": "d8c4c7bb"
}
```

### Testing the retrieval
#### 1. Select inference --> retrieval --> iterative retrieval
#### 2. In iterative retrieval select Add block --> custom code
#### Change the index name in iterative retrieval(documents_20240819_2747).
#### Paste the following code in edit section

In [4]:
%%writefile data_retrieval_elasticsearch.py
#iterative retrieval
from typing import Dict, List, Union

import numpy as np
from elasticsearch import Elasticsearch, exceptions


SAMPLE_QUERY = "When is the next cohort?"


@data_loader
def search(*args, **kwargs) -> List[Dict]:
    
    
    
    connection_string = kwargs.get('connection_string', 'http://localhost:9200')
    index_name = kwargs.get('index_name', 'documents')
    
    

    script_query = {
        "size": 1,
        "query": {
            "bool": {
                "must": {
                    "multi_match": {
                        "query":SAMPLE_QUERY,
                        "fields": ["question^5", "text", "section"],
                        "type": "best_fields"
                    }
                },
                "filter": {
                    "term": {
                        "course": 'llm-zoomcamp'
                    }
                }
            }
        }
    }


    

    
    print("Sending script query:", script_query)

    es_client = Elasticsearch(connection_string)
    
    try:
        response = es_client.search(
            index=index_name,
            body= script_query,
                
            
        )

        print("Raw response from Elasticsearch:", response)

        return [hit['_source'] for hit in response['hits']['hits']]
    
    except exceptions.BadRequestError as e:
        print(f"BadRequestError: {e.info}")
        return []
    except Exception as e:
        print(f"Unexpected error: {e}")
        return []

Writing data_retrieval_elasticsearch.py


### Q5. Testing the retrieval
#### Use the query: "When is the next cohort?"
#### What's the ID of the top matching result?
#### Answer: bf024675

```

Sending script query: {'size': 1, 'query': {'bool': {'must': {'multi_match': {'query': 'When is the next cohort?', 'fields': ['question^5', 'text', 'section'], 'type': 'best_fields'}}, 'filter': {'term': {'course': 'llm-zoomcamp'}}}}}

Raw response from Elasticsearch: {'took': 4, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 68, 'relation': 'eq'}, 'max_score': 42.219727, 'hits': [{'_index': 'documents_20240819_2747', '_id': 't9tSaZEBWNW8oyffAbuq', '_score': 42.219727, '_source': {'text': 'Summer 2025 (via Alexey).', 'section': 'General course-related questions', 'question': 'When will the course be offered next?', 'course': 'llm-zoomcamp', 'document_id': 'bf024675'}}]}}


```

### Reindexing
#### Follow the above steps from ingestion
#### Elasticsearch index for FAQ document: version 2 

```
#new doc index
index name: documents_20240819_2652
Connecting to Elasticsearch at http://elasticsearch:9200
index name: documents_20240819_2652
Connecting to Elasticsearch at http://elasticsearch:9200
Index created with properties:
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": "0"
  },
  "mappings": {
    "properties": {
      "text": {
        "type": "text"
      },
      "section": {
        "type": "text"
      },
      "question": {
        "type": "text"
      },
      "course": {
        "type": "keyword"
      },
      "document_id": {
        "type": "keyword"
      }
    }
  }
}
Indexing 86 documents to Elasticsearch index documents_20240819_2652


Last document indexed:
{
  "text": "Answer",
  "section": "Workshops: X",
  "question": "Question",
  "course": "llm-zoomcamp",
  "document_id": "d8c4c7bb"
}
```

### Q6. Reindexing
#### Change the index name in iterative retrieval(documents_20240819_2652).
#### Use the query "When is the next cohort?". What's the ID of the top matching result?
#### Answer: b6fa77f3

```
Sending script query: {'size': 1, 'query': {'bool': {'must': {'multi_match': {'query': 'When is the next cohort?', 'fields': ['question^5', 'text', 'section'], 'type': 'best_fields'}}}}}



Raw response from Elasticsearch: {'took': 39, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 68, 'relation': 'eq'}, 'max_score': 86.06232, 'hits': [{'_index': 'documents_20240819_2652', '_id': 'a9S_aZEB0Xc6WBWVCDAH', '_score': 86.06232, '_source': {'text': 'Summer 2026.', 'section': 'General course-related questions', 'question': 'When is the next cohort?', 'course': 'llm-zoomcamp', 'document_id': 'b6fa77f3'}}]}}


```