# Data Scraping

Scrapes ``https://docs.soloplan.de/carlo/api/Soloplan.CarLo.Business.*`` in order to get all business objects and their properties.

To begin with, the most interesting interfaces (``ITour``, ``IConsignment``, ``ITransportOrder`` and ``ITourStatus``) are scraped. If a property is a complex type (which means another interface, class or enum from us), the scraper will recursively scrape the properties of the complex type.

In [1]:
ROOT_FOLDER = "/workspace/data/business_objects"
TRANSFORMED_DATA_OUTPUT_FOLDER_NAME = "documents"
SCRAPE_FILE_NAME = "scraped_domain_knowledge.json"

In [None]:
import json
import os
from bs4 import BeautifulSoup
import requests
from collections import OrderedDict

BASE_URL = "https://docs.soloplan.de/carlo/api/"

DOCS_TO_SCRAPE = [
    "https://docs.soloplan.de/carlo/api/Soloplan.CarLo.Business.ITour.html",
    "https://docs.soloplan.de/carlo/api/Soloplan.CarLo.Business.IConsignment.html",
    "https://docs.soloplan.de/carlo/api/Soloplan.CarLo.Business.ITransportOrder.html",
    "https://docs.soloplan.de/carlo/api/Soloplan.CarLo.Business.ITourStatus.html",
]

saved_interfaces = set()


def extract_docfx_data(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, "html.parser")

    # Extract the main content article
    article = soup.find("article", class_="content wrap")

    # Ensure article is found
    if not article:
        print(f"Article not found at {url}.")
        return

    # Extract interface details
    name = article.find("h1").text.strip()
    if name.startswith("Interface "):
        name = name[10:]
        data_type = "interface"
    else:
        return

    if name in saved_interfaces:
        print(f"Skipping {name} as it already exists.")
        return
    saved_interfaces.add(name)

    # search for <div class="markdown level0 summary">, that is next to the h1 and get the summary text
    overall_summary = ""
    summary_tag = article.find_next("div", class_="markdown level0 summary")
    if summary_tag:
        summary_p = summary_tag.find("p")
        if summary_p:
            overall_summary = summary_p.text.strip()

    if name.startswith("Class "):
        name = name[7:]
        data_type = "class"
    if name.startswith("Enum "):
        name = name[6:]
        data_type = "enum"

    # Extract Namespace and Assembly
    namespace = "Namespace not found."
    assembly = "Assembly not found."
    for h6 in article.find_all("h6"):
        text = h6.text.strip()
        if text.startswith("Namespace"):
            namespace = text[11:].strip()
        elif text.startswith("Assembly"):
            assembly = text[10:].strip()

    # check for a h5 element with "Syntax" as content
    syntax_tag = article.find("h5", string="Syntax")
    if syntax_tag:
        root_declaration_code = (
            syntax_tag.find_next("div", class_="codewrapper").find("code").text.strip()
        )
    else:
        # find the first code block in the article
        root_declaration_code = article.find("code").text.strip()

    # Extract properties and methods
    properties = []
    type_references = []
    property_headers = article.find_all("h4")
    for header in property_headers:
        property_name = header.text.strip()
        property_summary_tag = header.find_next("div", class_="markdown level1 summary")
        if property_summary_tag:
            try:
                property_summary = property_summary_tag.find("p").text.strip()
            except AttributeError:
                property_summary = "Summary not found."
        else:
            property_summary = "Summary not found."

        declaration_code_tag = header.find_next("div", class_="codewrapper")
        if declaration_code_tag:
            declaration_code = declaration_code_tag.find("code").text.strip()
        else:
            declaration_code = "Declaration not found."

        property_value_type_tag = header.find_next("h5", string="Property Value")
        if property_value_type_tag:
            property_value_type_td = property_value_type_tag.find_next("td")
            if property_value_type_td.find("a"):
                property_value_type = property_value_type_td.find("a").text.strip()
                type_url = BASE_URL + property_value_type_td.find("a")["href"]
                # recursively scrape the type if it's not already in the list
                if type_url not in DOCS_TO_SCRAPE:
                    DOCS_TO_SCRAPE.append(type_url)
                    # save the recursive type reference
                    type_references.append(property_value_type)
            else:
                property_value_type = property_value_type_td.text.strip()
        else:
            property_value_type = "Property type not found."

        properties.append(
            {
                "name": property_name,
                "summary": property_summary,
                "declaration": declaration_code,
                "type": property_value_type,
            }
        )

    # Extract extension methods
    extension_methods = []
    extension_header = article.find("h3", text="Extension Methods")
    if extension_header:
        links = extension_header.find_next_siblings("div")
        for link in links:
            method_name = link.find("a").text.strip()
            extension_methods.append(method_name)
    else:
        extension_methods = ["Extension methods not found."]

    return (
        name,
        {
            "summary": overall_summary,
            "type": data_type,
            "namespace": namespace,
            "assembly": assembly,
            "declaration": root_declaration_code,
            "properties": properties,
            "extension_methods": extension_methods,
            "type_references": type_references,
        },
    )


all_interfaces = {}

while DOCS_TO_SCRAPE:
    url = DOCS_TO_SCRAPE.pop(0)
    result = extract_docfx_data(url)

    if result is None:
        continue

    name, item = result
    if name in all_interfaces:
        continue

    all_interfaces[name] = item
    print(f"Processed {name}")

# Save all interfaces to a single JSON file
with open(os.path.join(ROOT_FOLDER, SCRAPE_FILE_NAME), "w", encoding="utf8") as f:
    ordered_interfaces = OrderedDict(sorted(all_interfaces.items()))
    json.dump(ordered_interfaces, f, indent=2, ensure_ascii=False)

# Data Transformation

For embeddings, the JSON structure might not be the best choice, thus these files need to be converted to plain text. Additional metdata will be extracted and added to the vector database.

In [3]:
import json
import os
import re
from typing import Any, Dict, List, Tuple
import tiktoken

TRANSFORMED_DATA_OUTPUT_FOLDER = os.path.join(
    ROOT_FOLDER, TRANSFORMED_DATA_OUTPUT_FOLDER_NAME
)

# Ensure you have the necessary directory structure
if not os.path.exists(TRANSFORMED_DATA_OUTPUT_FOLDER):
    os.makedirs(TRANSFORMED_DATA_OUTPUT_FOLDER)

# Initialize the tokenizer
tokenizer = tiktoken.get_encoding("cl100k_base")
max_tokens = 8192


def split_text_intelligently(
    properties: List[str], header: str, max_tokens=8192, max_bytes=32766
) -> List[Tuple[str, int]]:
    """
    Split the text intelligently based on the token limit. Since properties of interfaces are mostly used in this dataset, 'intelligently' means that we split the properties in a way that they are not split across multiple chunks.

    Args:
        properties (List[str]): List of property strings.
        header (str): File header string.
        max_tokens (int): Maximum number of token that the embedding model can handle.
        max_bytes (int): Maximum number of bytes allowed in a chunk (based on Azure AI Search limits).
    """
    header_tokens = tokenizer.encode(header)
    chunks = []

    current_chunk = header
    current_tokens = header_tokens.copy()

    for prop in properties:
        prop_tokens = tokenizer.encode(prop)

        if len(current_tokens) + len(prop_tokens) <= max_tokens:
            current_chunk += prop
            current_tokens.extend(prop_tokens)
        else:
            chunks.append((current_chunk, len(current_tokens)))
            current_chunk = header + prop
            current_tokens = header_tokens.copy() + prop_tokens

    if current_chunk:
        chunks.append((current_chunk, len(current_tokens)))

    return chunks


def extract_propertyid_and_caption(content: str) -> Tuple[int, str]:
    """
    Extract the property ID and caption from the [SoloProperty()] attribute in the content.

    Args:
        content (str): The content to extract the property ID and caption from.

    Returns:
        Tuple[int, str]: The property ID and caption.
    """
    # Find the SoloProperty attribute
    match = re.search(r"\[SoloProperty\((.*?)\)\]", content)
    if not match:
        return None, None  # or raise an exception

    # Extract the arguments as a single string
    arguments_str = match.group(1)

    # Split the arguments considering potential commas within strings
    arguments = []
    temp_arg = ""
    in_quotes = False
    for char in arguments_str:
        if char == '"' and not in_quotes:
            in_quotes = True
        elif char == '"' and in_quotes:
            in_quotes = False
        if char == "," and not in_quotes:
            arguments.append(temp_arg.strip())
            temp_arg = ""
        else:
            temp_arg += char
    arguments.append(temp_arg.strip())  # Add the last argument

    # Extract the first and third argument, ensuring to strip quotes
    property_id = int(arguments[0])
    caption = arguments[2].strip('"')

    return property_id, caption


def transform_interface(key: str, value: object) -> Tuple[str, List[str]]:
    """
    Transform the interface into a text document.

    Args:
        key (str): The interface name.
        value (object): The interface object.

    Returns:
        Tuple[str, List[str]]: The header and properties as strings.
    """

    ipropertyid, icaption = extract_propertyid_and_caption(value["declaration"])

    header = f"Interface: {key}\n"
    header += f"Summary: {value['summary']}\n"
    header += f"PropertyId: {ipropertyid}\n"
    header += f"Caption: {icaption}\n\nProperties:"
    properties = []

    for prop in value["properties"]:
        ppropertyid, pcaption = extract_propertyid_and_caption(prop["declaration"])

        summary = prop["summary"].strip().replace("\n", " ").replace("\r", "")
        prop_text = f"- {prop['name']}\n"
        prop_text += f"   - Type: {prop['type']}\n"
        prop_text += f"   - Description: {summary}\n"
        prop_text += f"   - PropertyId: {ppropertyid}\n"
        prop_text += f"   - Caption: {pcaption}\n\n"
        properties.append(prop_text)

    return header, properties


def extract_metadata(
    key, value, embedding_size=0, chunk_id=0, total_chunks=1
) -> Dict[str, Any]:
    filename = key
    if total_chunks > 1:
        filename = key + f"_chunk_{chunk_id}"
    return {
        "name": key,
        "summary": value["summary"],
        "type": value["type"],
        "namespace": value["namespace"],
        "assembly": value["assembly"],
        "type_references": value["type_references"],
        "filename": f"{filename}.txt",
        "chunk_id": chunk_id,
        "total_chunks": total_chunks,
    }


# Load all interfaces from the single JSON file
with open(os.path.join(ROOT_FOLDER, SCRAPE_FILE_NAME), "r", encoding="utf8") as f:
    interfaces: Dict[str, object] = json.load(f)

# Transform and save the documents
for key, value in interfaces.items():
    header, properties = transform_interface(key, value)

    # Split the document intelligently if it exceeds the token limit
    chunks = split_text_intelligently(properties, header, max_tokens=max_tokens)
    total_chunks = len(chunks)
    metadata = extract_metadata(key, value)

    for chunk_id, (chunk, size) in enumerate(chunks):
        chunk_suffix = f"_chunk_{chunk_id}" if total_chunks > 1 else ""
        chunk_key = f"{key}{chunk_suffix}"

        # Save the text document chunk
        with open(
            os.path.join(TRANSFORMED_DATA_OUTPUT_FOLDER, f"{chunk_key}.txt"), "w"
        ) as f:
            f.write(chunk)

        # Adjust metadata for chunks
        chunk_metadata = extract_metadata(key, value, size, chunk_id, total_chunks)

        # Save the metadata for the chunk
        with open(
            os.path.join(TRANSFORMED_DATA_OUTPUT_FOLDER, f"{chunk_key}.metadata.json"),
            "w",
        ) as f:
            json.dump(chunk_metadata, f, indent=2)