# ETL Development

### setup

In [None]:
import os
import dotenv
import uuid
from frontmatter import Frontmatter
import re
from typing import List

In [None]:
dotenv.load_dotenv()

PROJECT_ROOT = os.getenv("PROJECT_ROOT")
RAW_DATA_SOURCE_PATH = os.getenv("RAW_DATA_SOURCE_PATH")

## extract
* manually cloned azure-docs repo
* saved to RAW_DATA_SOURCE_PATH  
  
```bash
> git clone --branch main --single-banch --depth 1 https://github.com/MicrosoftDocs/azure-docs.git
```

## transform
* find all md files in source dir
* process each doc 
  * metadata
  * full doc -> [sections] -> [[chunks]]  
  
* _input_: str - path to raw data (_n_ = 17423)  
* _output_: list - json of all docs (_n_ = 17281)  
  
**schema**
```json  
{  
    "id": str,  // Unique identifier for the document  
     "filename": str,  // Name of the markdown file  
     "path": str,  // Full file path of the markdown file  
     "title": str,  // Title of the document, extracted from frontmatter   or content
     "metadata": {  // Additional metadata extracted from the frontmatter  
          "description": str,  // Brief description of the document  
          "ms.date": str,  // Date associated with the document  
          "ms.topic": str  // Topic category of the document  
     },
    "sections": [
        {
            "section_content": str,  // Doc section (split by `# `)
            "chunks": list  // Section split into chunks of ~200 words
        },
    ]
}  
```

_example_
```json
{
    "id": "xx0108gaGG-089dg-4JGon-98sgG3l",
    "filename": "set-up-sso.md",
    "path": "full/path/to/doc.md",
    "title": "Set up single sign-on for Microsoft Defender for IoT sensor console",
    "metadata": {
        "description": "Learn how to set up single si...",
        "ms.date": "04/10/2024",
        "ms.topic": "how-to",
        },
    "sections": [
        {
            "section_content": "# Set up single sign-on for the sensor console\n\nIn this article, you learn how to set up single sign-on (SSO) for the Defender for IoT sensor console using Microsoft Entra ID. With SSO, your organization\'s users can simply sign into the sensor console, and don\'t need multiple login credentials across different sensors and sites. \n\nUsi",
            "chunks": ["chunk 1", "chunk 2", "chunk n"]
        },
    ]
}
```

In [None]:
def find_md_files(path):
    md_files = []
    for root, dirs, files in os.walk(path):
        for file in files:
            if file.endswith(".md"):
                md_files.append(os.path.join(root, file))
    return md_files


all_md_files = find_md_files(RAW_DATA_SOURCE_PATH)
all_md_files[:3]

In [None]:
def process_all_md_files(
    all_md_filepaths: List[str], chunk_size: int = 200
) -> List[dict]:
    """Process all markdown files into a list of indexable documents."""
    processed_files = []
    for file_path in all_md_filepaths:
        processed_file = process_markdown_file(file_path, chunk_size)
        if processed_file:
            processed_files.append(processed_file)
    return processed_files


def process_markdown_file(file_path: str, chunk_size: int = 200) -> dict:
    """Process a markdown file into an indexable document."""
    try:
        post = Frontmatter.read_file(file_path)
        doc_id = str(uuid.uuid4())
        try:
            title = post["attributes"]["title"]
        except:
            title = "Untitled"

        full_doc = post["body"]
        doc_sections = split_into_sections(full_doc)
        processed_sections = []
        for section in doc_sections:
            processed_sections.append(
                {
                    "section_content": section,
                    "chunks": chunk_section(section, chunk_size),
                }
            )

        return {
            "id": doc_id,
            "filename": os.path.basename(file_path),
            "path": file_path,
            "title": title,
            "sections": processed_sections,
            "metadata": {k: v for k, v in post["attributes"].items() if k != "title"},
        }
    except Exception as e:
        # print(f"Error processing file {file_path}: {e}")
        return None


def split_into_sections(content: str) -> List[str]:
    """
    Splits the full markdown content into sections.
    Sections are determined by headers starting with "# ".
    Any text before the first header is treated as its own section.
    """
    # Use regex to split content at positions where a line starts with "# "
    # The regex uses a lookahead to keep the header in the result.
    sections = re.split(r"(?m)^(?=# )", content)
    return [section.strip() for section in sections if section.strip()]


def chunk_section(section: str, chunk_size: int = 200) -> List[str]:
    """
    Breaks a section into chunks of approximately `chunk_size` words.
    Returns a list of string chunks.
    """
    words = section.split()
    chunks = []
    for i in range(0, len(words), chunk_size):
        chunk = " ".join(words[i : i + chunk_size])
        chunks.append(chunk)
    return chunks

In [None]:
processed_docs = process_all_md_files(all_md_files, chunk_size=200)
processed_doc = processed_docs[0]
print(f"Processed {len(processed_docs)} documents.")

In [None]:
print(f"schema: {processed_doc.keys()}\n")
print(f"metadata: {processed_doc["metadata"].keys()}\n")
print(f"sections: {processed_doc["sections"][0].keys()}\n")
print(f"first chunk: {(processed_doc["sections"][0]["chunks"][:1])}\n")

## load
* index azure docs to elasticsearch index
* save as volume  
  
**schema**
```json
{
  "id": "keyword",  
  "filename": "keyword",  
  "path": "keyword",  
  "title": {
      "type": "text",
      "fields": {
          "raw": { "type": "keyword" }
      }
  },
  "content": "text",  
  "metadata": {  
       "description": "text",  
       "ms.date": { "type": "date" },  
       "ms.topic": "keyword"  
  },
  "sections": {
       "type": "nested", 
       "properties": {
            "section_content": "text",  
            "chunks": {
                "type": "text"
            }
       }
  }
}
```