In [25]:
import sys

sys.path.append("../")

import cohere
import json

from pathlib import Path
from config import Paths, CohereConfig 

from bs4 import BeautifulSoup

from tiktoken import get_encoding

In [26]:
tokenizer = get_encoding("gpt2")

In [27]:
def parse_html(elem: BeautifulSoup):
    text = ''
    for e in elem.descendants:
        if isinstance(e, str):
            text += e.strip()
        elif e.name in ['br', 'p', 'h1', 'h2', 'h3', 'h4','tr', 'th']:
            text += '\n'
        elif e.name == 'li':
            text += '\n- '
    return text

def get_sections_by_tag(soup: BeautifulSoup, tag: str) -> dict:
    tags = soup.find_all(tag)

    if len(tags) == 0:
        raise ValueError(f"No {tag} tags found in the document")

    output = []

    elements = []
    for elem in soup.find_all():
        if elem != tags[0]:
            elements.append(str(elem))
        else:
            break
    output.append(
        {"section_name": "", "section_elements": elements}
    )

    for i in range(len(tags)):
        # Get the current tag
        current_tag = tags[i]

        # Get the next tag, if it exists
        if i < len(tags) - 1:
            next_tag = tags[i + 1]
        else:
            next_tag = None

        # Find all elements between the current tag and the next tag
        elements = []
        next_sibling = current_tag.next_sibling
        while next_sibling and next_sibling != next_tag:
            if next_sibling.name != tag:
                elements.append(str(next_sibling))
            next_sibling = next_sibling.next_sibling

        output.append(
            {"section_name": parse_html(current_tag), "section_elements": elements}
        )

    return output

def split_by_element(soup: BeautifulSoup, parent_name: str) -> dict:
    output = []
    curr_text, curr_elements = "", []
    for element in soup.find_all():
        text = parse_html(element)
        if len(tokenizer.encode(curr_text + text)) < 512:
            curr_text += " " + text
            curr_elements.append(str(element))
        else:
            text_formatted = parse_html(BeautifulSoup("\n".join(curr_elements), "html.parser"))
            if text_formatted:
                output.append(
                    {
                        "section": f'{parent_name}',
                        "text": text_formatted,
                        "elements": curr_elements,
                    }
                )
            curr_text, curr_elements = "", []
    return output

def split_by_hierarchy(soup: BeautifulSoup, parent_name: str) -> dict:
    output = []
    try:
        h3s = get_sections_by_tag(soup, "h3")
        for h3 in h3s:
            soup = BeautifulSoup(
                "\n".join(h3["section_elements"]), "html.parser"
            )
            text = parse_html(soup)
            if len(tokenizer.encode(text)) < 512:
                output.append(
                    {
                        "section": f'{parent_name}>{h3["section_name"]}',
                        "text": h3["section_name"] + "\n" + text,
                        "elements": h3["section_elements"],
                    }
                )
            else:
                try:
                    h4s = get_sections_by_tag(soup, "h4")
                    for h4 in h4s:
                        soup = BeautifulSoup(
                            "\n".join(h4["section_elements"]), "html.parser"
                        )
                        text = parse_html(soup)
                        if len(tokenizer.encode(text)) < 512:
                            output.append(
                                {
                                    "section": f'{parent_name}>{h3["section_name"]}>{h4["section_name"]}',
                                    "text": h4["section_name"] + "\n" + text,
                                    "elements": h4["section_elements"],
                                }
                            )
                        else:
                            output.extend(
                                split_by_element(soup, f'{parent_name}>{h3["section_name"]}>{h4["section_name"]}')
                            )
                except ValueError:
                    output.extend(split_by_element(soup, f'{parent_name}>{h3["section_name"]}'))
    except ValueError:
        output.extend(split_by_element(soup, parent_name))

    return output

In [28]:
test_file = "handbook_total-rewards_benefits_parental-leave-toolkit-index.json"

handbook_parsed = Path(f"{Paths.data}/handbook_parsed")
handbook_processed = Path(f"{Paths.data}/handbook_processed")

for f in handbook_processed.glob("*.json"):
    f.unlink()

for f in handbook_parsed.glob("*.json"):
    output = []
    with open(f, "r") as f:
        data = json.load(f)
        for section in data["elements"]:
            soup = BeautifulSoup("\n".join(section["section_elements"]), "html.parser")
            text = parse_html(soup)
            if len(tokenizer.encode(text)) < 512:
                output.append(
                    {
                        "title": data["title"],
                        "path": data["path"],
                        "section": section["section_name"],
                        "text": text,
                        "elements": section["section_elements"],
                    }
                )
            else:
                reduced_elements = split_by_hierarchy(soup, section["section_name"])
                for element in reduced_elements:
                    output.append(
                        {
                            "title": data["title"],
                            "path": data["path"],
                            "section": element["section"],
                            "text": element["text"],
                            "elements": element["elements"],
                        }
                    )
    filename = f.name.replace(f"{Paths.data}/handbook_parsed", "")
    full_filename = f"{Paths.data}/handbook_processed{filename}"    
    with open(full_filename, "w") as fw:
        json.dump(output, fw)

In [29]:
handbook_merged = Path(f"{Paths.data}/handbook_merged")
full_filename = f"{handbook_merged}/handbook.json"

output_merged = []
for f in handbook_processed.glob("*.json"):
    with open(f, "r") as f:
        data = json.load(f)
        output_merged.extend(data)

with open(full_filename, "w") as fw:
    json.dump(output_merged, fw)