# Load & Process Data

## Setup

In [1]:
import os
import openai
from pathlib import Path
from pprint import pprint
import ray
from tqdm import tqdm

In [2]:
import sys; sys.path.append("..")
import warnings; warnings.filterwarnings("ignore")
from dotenv import load_dotenv; load_dotenv()

True

In [3]:
EFS_DIR = Path("/efs/shared_storage/simon")
ROOT_DIR = Path(os.getcwd()).parent
print (ROOT_DIR)

/home/ray/default/llm-applications


In [4]:
# Credentials
ray.init(runtime_env={"env_vars": {
    "OPENAI_API_BASE": os.environ["OPENAI_API_BASE"],
    "OPENAI_API_KEY": os.environ["OPENAI_API_KEY"], 
    "ANYSCALE_API_BASE": os.environ["ANYSCALE_API_BASE"],
    "ANYSCALE_API_KEY": os.environ["ANYSCALE_API_KEY"],
    "DB_CONNECTION_STRING": os.environ["DB_CONNECTION_STRING"],
}})

2023-08-31 00:09:30,291	INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 10.0.30.102:6379...
2023-08-31 00:09:30,300	INFO worker.py:1612 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://session-hvq6cjxyd917stdzvn4cs58auc.i.anyscaleuserdata.com [39m[22m
2023-08-31 00:09:30,303	INFO packaging.py:346 -- Pushing file package 'gcs://_ray_pkg_9cbeca6ac29c746df9cb6049e3e1f7b6.zip' (0.38MiB) to Ray cluster...
2023-08-31 00:09:30,304	INFO packaging.py:359 -- Successfully pushed file package 'gcs://_ray_pkg_9cbeca6ac29c746df9cb6049e3e1f7b6.zip'.


0,1
Python version:,3.9.15
Ray version:,2.6.2
Dashboard:,http://session-hvq6cjxyd917stdzvn4cs58auc.i.anyscaleuserdata.com


### Utils

In [22]:
import json

def write_json(data, filename):
    with open(filename, 'w') as f:
        json.dump(data, f, indent=4)

def read_json(filename):
    with open(filename, 'r') as f:
        data = json.load(f)
    return data

[2m[1m[36m(autoscaler +5m0s)[0m Resized to 32 CPUs, 2 GPUs.


## Load data

Our data is already ready at `/efs/shared_storage/goku/docs.ray.io/en/master/` (on Staging, `us-east-1`) but if you wanted to load it yourself, run this bash command (change `/desired/output/directory`, but make sure it's on the shared storage,
so that it's accessible to the workers):
```bash
export DOCS_PATH=/efs/shared_storage/simon/docs.ray.io/en/master/
wget -e robots=off --recursive --no-clobber --page-requisites \
  --html-extension --convert-links --restrict-file-names=windows \
  --domains docs.ray.io --no-parent --accept=html \
  -P $DOCS_PATH https://docs.ray.io/en/master/
```

In [6]:
# Ray dataset
docs_path = Path(EFS_DIR, "docs.ray.io/en/master/")
ds = ray.data.from_items([{"path": path} for path in docs_path.rglob("*.html") if not path.is_dir()])
print(f"{ds.count()} documents")

3266 documents


## Process data

In [9]:
from bs4 import BeautifulSoup, NavigableString, Tag
import matplotlib.pyplot as plt
import pandas as pd

In [11]:
def load_html_file(path):
    with open(path) as f:
        soup = BeautifulSoup(f.read())
    html_tags = [
        ("div", {"role": "main"}),
        ("main", {"id": "main-content"}),
    ]
    text = None
    for tag, attrs in html_tags:
        text = soup.find(tag, attrs)
        # if found, break
        if text is not None:
            break

    return text

In [12]:
class TaggedStr:
    def __init__(self, value, tag):
        self.value = value
        self.tag = tag

    def __repr__(self):
        return repr(self.value) + f" [{self.tag}]" if self.tag else ""

In [13]:
def convert_to_tagged_text(path, element, section=None):
    "Recursively convert a BeautifulSoup element to text, keeping track of sections."
    results = []
    for child in element.children:
        if isinstance(child, NavigableString):
            results.append(TaggedStr(str(child), section))
        elif isinstance(child, Tag):
            if child.name == "section" and "id" in child.attrs:
                results.extend(convert_to_tagged_text(path, child, section=child.attrs["id"]))
            elif not child.find_all("section"):
                results.append(TaggedStr(child.get_text(), section))
            else:
                results.extend(convert_to_tagged_text(path, child, section))
    return results

In [14]:
def group_tagged_text(chunks):
    result = []
    for item in chunks:
        if result and item.value.strip() == "":
            result[-1].value += item.value
        elif result and item.tag == result[-1].tag:
            result[-1].value += item.value
        else:
            result.append(item)
    return result

In [15]:
def path_to_uri(path, scheme="https://", domain="docs.ray.io"):
    return scheme + domain + path.split(domain)[-1]

In [16]:
def parse_file(record):
    html_content = load_html_file(record["path"])
    if not html_content:
        return []
    parsed_data = [
        {
            "source": path_to_uri(str(record["path"])) + ("#" + chunk.tag if chunk.tag else ""),
            "text": chunk.value,
        }
        for chunk in group_tagged_text(convert_to_tagged_text(record["path"], html_content))
    ]
    return parsed_data

In [19]:
# Extract sections
sections_ds = ds.flat_map(parse_file)
sections = sections_ds.take_all()
print (len(sections))

2023-08-31 00:12:55,910	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(parse_file)]
2023-08-31 00:12:55,911	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-08-31 00:12:55,911	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

[2m[1m[36m(autoscaler +3m45s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
[2m[1m[36m(autoscaler +3m45s)[0m Adding 1 node(s) of type worker-node-type-0.
8944


## Save data

In [None]:
write_json(sections, Path(ROOT_DIR, "datasets/eval_full_corpus.json"))