In [3]:
pip install python-docx elasticsearch

Collecting python-docx
  Obtaining dependency information for python-docx from https://files.pythonhosted.org/packages/3e/3d/330d9efbdb816d3f60bf2ad92f05e1708e4a1b9abe80461ac3444c83f749/python_docx-1.1.2-py3-none-any.whl.metadata
  Downloading python_docx-1.1.2-py3-none-any.whl.metadata (2.0 kB)
Collecting lxml>=3.1.0 (from python-docx)
  Obtaining dependency information for lxml>=3.1.0 from https://files.pythonhosted.org/packages/eb/6d/d1f1c5e40c64bf62afd7a3f9b34ce18a586a1cccbf71e783cd0a6d8e8971/lxml-5.3.0-cp312-cp312-macosx_10_9_universal2.whl.metadata
  Downloading lxml-5.3.0-cp312-cp312-macosx_10_9_universal2.whl.metadata (3.8 kB)
Downloading python_docx-1.1.2-py3-none-any.whl (244 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m244.3/244.3 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading lxml-5.3.0-cp312-cp312-macosx_10_9_universal2.whl (8.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.2/8.2 MB[0m 

In [4]:
import io

import requests
import docx

In [5]:
def clean_line(line):
    line = line.strip()
    line = line.strip('\uFEFF')
    return line

def read_faq(file_id):
    url = f'https://docs.google.com/document/d/{file_id}/export?format=docx'
    
    response = requests.get(url)
    response.raise_for_status()
    
    with io.BytesIO(response.content) as f_in:
        doc = docx.Document(f_in)

    questions = []

    question_heading_style = 'heading 2'
    section_heading_style = 'heading 1'
    
    heading_id = ''
    section_title = ''
    question_title = ''
    answer_text_so_far = ''
     
    for p in doc.paragraphs:
        style = p.style.name.lower()
        p_text = clean_line(p.text)
    
        if len(p_text) == 0:
            continue
    
        if style == section_heading_style:
            section_title = p_text
            continue
    
        if style == question_heading_style:
            answer_text_so_far = answer_text_so_far.strip()
            if answer_text_so_far != '' and section_title != '' and question_title != '':
                questions.append({
                    'text': answer_text_so_far,
                    'section': section_title,
                    'question': question_title,
                })
                answer_text_so_far = ''
    
            question_title = p_text
            continue
        
        answer_text_so_far += '\n' + p_text
    
    answer_text_so_far = answer_text_so_far.strip()
    if answer_text_so_far != '' and section_title != '' and question_title != '':
        questions.append({
            'text': answer_text_so_far,
            'section': section_title,
            'question': question_title,
        })

    return questions

In [8]:
faq_documents = {
    'llm-zoomcamp': '1qZjwHkvP0lXHiE4zdbWyUXSVfmVGzougDD6N37bat3E',
}

In [13]:
documents = []

for course, file_id in faq_documents.items():
    print(course)
    course_documents = read_faq(file_id)
    documents.append({'course': course, 'documents': course_documents})

llm-zoomcamp


In [14]:
documents

[{'course': 'llm-zoomcamp',
  'documents': [{'text': 'Yes, but if you want to receive a certificate, you need to submit your project while we’re still accepting submissions.',
    'section': 'General course-related questions',
    'question': 'I just discovered the course. Can I still join?'},
   {'text': "You don't need it. You're accepted. You can also just start learning and submitting homework (while the form is Open) without registering. It is not checked against any registered list. Registration is just to gauge interest before the start date.",
    'section': 'General course-related questions',
    'question': 'Course - I have registered for the [insert-zoomcamp-name]. When can I expect to receive the confirmation email?'},
   {'text': 'The zoom link is only published to instructors/presenters/TAs.\nStudents participate via Youtube Live and submit questions to Slido (link would be pinned in the chat when Alexey goes Live). The video URL should be posted in the announcements chan

In [17]:
import hashlib

def generate_document_id(doc):
    combined = f"{doc['course']}-{doc['question']}-{doc['text'][:10]}"
    hash_object = hashlib.md5(combined.encode())
    hash_hex = hash_object.hexdigest()
    document_id = hash_hex[:8]
    return document_id

In [19]:
processed_documents = []

# print(type(documents))
for document in documents:
    for doc in document['documents']:
        doc['course'] = document['course']
        # previously we used just "id" for document ID
        doc['document_id'] = generate_document_id(doc)
        processed_documents.append(doc)

print(len(processed_documents))

<class 'list'>
86


In [21]:
!pip install mage_ai

Collecting mage_ai
  Obtaining dependency information for mage_ai from https://files.pythonhosted.org/packages/fb/f5/5ea9bea4904d027bd07a122422898182caf0ffb2b5508b9b801a472a8197/mage_ai-0.9.73-py3-none-any.whl.metadata
  Downloading mage_ai-0.9.73-py3-none-any.whl.metadata (23 kB)
Collecting cachetools (from mage_ai)
  Obtaining dependency information for cachetools from https://files.pythonhosted.org/packages/a4/07/14f8ad37f2d12a5ce41206c21820d8cb6561b728e51fad4530dff0552a67/cachetools-5.5.0-py3-none-any.whl.metadata
  Downloading cachetools-5.5.0-py3-none-any.whl.metadata (5.3 kB)
Collecting Faker==4.14.0 (from mage_ai)
  Obtaining dependency information for Faker==4.14.0 from https://files.pythonhosted.org/packages/6e/6c/437383461986cc472b6ee001138ad637f638349e409a8fc21100fccd089d/Faker-4.14.0-py3-none-any.whl.metadata
  Downloading Faker-4.14.0-py3-none-any.whl.metadata (14 kB)
Collecting GitPython==3.1.41 (from mage_ai)
  Obtaining dependency information for GitPython==3

In [28]:
# Final code
from typing import Dict, List, Tuple, Union

import numpy as np
from elasticsearch import Elasticsearch
from datetime import datetime
from mage_ai.data_preparation.variable_manager import set_global_variable

if 'data_exporter' not in globals():
    from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def elasticsearch(
        documents: List[Dict[str, Union[Dict, List[int], np.ndarray, str]]], *args, **kwargs,
):
    """
    Exports document data to an Elasticsearch database.
    """

    connection_string = kwargs.get('connection_string', 'http://localhost:9200/')

    # Adjusting index name
    index_name_prefix = kwargs.get('index_name', 'documents')
    current_time = datetime.now().strftime("%Y%m%d_%M%S")
    index_name = f"{index_name_prefix}_{current_time}"
    print("index name:", index_name)

    # Setting global variable
    set_global_variable('resplendent_radiance', 'index_name', index_name)

    number_of_shards = kwargs.get('number_of_shards', 1)
    number_of_replicas = kwargs.get('number_of_replicas', 0)
    vector_column_name = kwargs.get('vector_column_name', 'embedding')

    dimensions = kwargs.get('dimensions')
    if dimensions is None and len(documents) > 0:
        document = documents[0]
        dimensions = len(document.get(vector_column_name) or [])

    es_client = Elasticsearch(connection_string)

    print(f'Connecting to Elasticsearch at {connection_string}')

    index_settings = {
        "settings": {
            "number_of_shards": number_of_shards,
            "number_of_replicas": number_of_replicas
        },
        "mappings": {
            "properties": {
                "text": {"type": "text"},
                "section": {"type": "text"},
                "question": {"type": "text"},
                "course": {"type": "keyword"},
                "document_id": {"type": "keyword"}
            }
        }
    }

    if not es_client.indices.exists(index=index_name):
        es_client.indices.create(index=index_name)
        print('Index created with properties:', index_settings)
        print('Embedding dimensions:', dimensions)

    print(f'Indexing {len(documents)} documents to Elasticsearch index {index_name}')
    for document in processed_documents:
        print(f'Indexing document {document["document_id"]}')

        es_client.index(index=index_name, document=document)

    print(document)