In [26]:
ROOT_DIR = '/Users/himanshukumar/Desktop/nyuinfo_rag'

In [27]:
from pathlib import Path
DOCS_DIR = Path(ROOT_DIR, "engineering.nyu.edu")

In [28]:
import ray
ds = ray.data.from_items([{"path": path} for path in DOCS_DIR.rglob("*.html") if not path.is_dir()])

In [29]:
from pathlib import Path
from bs4 import BeautifulSoup

In [30]:
def extract_sections(record):
    with open(record["path"], "r", encoding="latin-1") as html_file:
        html_content = html_file.read()
        soup = BeautifulSoup(html_content, "html.parser")
    
    sections = soup.find_all("p")
    section_text = ""
    seen_texts = set()

    for section in sections:
        text = section.get_text(strip=True)
        if text and text not in seen_texts:
            seen_texts.add(text)
            section_text += text + " "

    section_text = section_text.strip()

    if section_text:
        uri = path_to_uri(path=record["path"])
        return [{"source": f"{uri}", "text": section_text}]
    else:
        return []

def path_to_uri(path, scheme="https://", domain="engineering.nyu.edu"):
    uri = scheme + domain + str(path).split(domain)[-1]
    return uri[:-5] if uri.endswith(".html") else uri

In [31]:
sample_html_fp = Path(ROOT_DIR, "engineering.nyu.edu/research/student-research/research-expo.html")
extract_sections({"path": sample_html_fp})


[{'source': 'https://engineering.nyu.edu/research/student-research/research-expo',
  'text': 'Looking forNewsorEvents? NYU Tandon\'s annual research showcase At the end of each academic year, NYU Tandon School of Engineering showcases research projects by faculty and students â\x80\x94 along with interactive, family-friendly tech activities. Tandon Research Excellence Exhibit is an annual public event that features exhibits that illustrate the scope of engineering and the applied sciences â\x80\x94 and their potential for improving the world. This yearâ\x80\x99s Exhibit will celebrate the incredible academic contributions of our community and the ways in which they connect toTandonâ\x80\x99s seven areas of research excellence. The most exciting work being done in labs and prototyping facilities comes alive for spectators of all ages. Child-friendly STEM projects developed for and by the students in NYU Tandonâ\x80\x99s manyK12 STEM Educationprograms offer the youngest attendees an intr

In [32]:
# Extract sections
sections_ds = ds.flat_map(extract_sections)
sections_ds.count()

2023-12-13 17:45:02,904	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)]
2023-12-13 17:45:02,906	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-13 17:45:02,909	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

5847

In [33]:
from functools import partial
from langchain.text_splitter import RecursiveCharacterTextSplitter

def chunk_section(section, chunk_size, chunk_overlap):
    text_splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", " ", ""],
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len)
    chunks = text_splitter.create_documents(
        texts=[section["text"]], 
        metadatas=[{"source": section["source"]}])
    return [{"text": chunk.page_content, "source": chunk.metadata["source"]} for chunk in chunks]

In [34]:
chunk_size = 300
chunk_overlap = 50

In [35]:
# Scale chunking
chunks_ds = sections_ds.flat_map(partial(
    chunk_section, 
    chunk_size=chunk_size, 
    chunk_overlap=chunk_overlap))
print(f"{chunks_ds.count()} chunks")

2023-12-13 17:47:19,650	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)]
2023-12-13 17:47:19,653	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-13 17:47:19,655	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

85631 chunks


In [36]:
chunks_ds.show(50)

2023-12-13 17:49:35,134	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)] -> LimitOperator[limit=50]
2023-12-13 17:49:35,137	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-13 17:49:35,140	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

{'text': 'Looking forNewsorEvents? Professor Emeritus Harvard UniversityPh.D.,', 'source': 'https://engineering.nyu.edu/hellmut-juretschke'}
{'text': 'Looking forNewsorEvents? Industry Professor (deceased) IDC Foundation Chair in Design & Construction The entire NYU Tandon School of Engineering community mourns the June 22 passing of Industry Professor Michael Horodniceanu, a member of our alumni community since 1978, a valued member of our', 'source': 'https://engineering.nyu.edu/dr-michael-horodniceanu'}
{'text': 'community since 1978, a valued member of our faculty, and most recently, theÂfounding chair of the IDC Innovation Hub. In Memoriam Â Dr. Horodniceanu is a Professor and the inaugural Chair of the IDC Innovation Hub, a new initiative aimed at actively engaging stakeholders across the construction', 'source': 'https://engineering.nyu.edu/dr-michael-horodniceanu'}
{'text': 'engaging stakeholders across the construction industry, including government officials, developers, cont

In [47]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
from ray.data import ActorPoolStrategy
import os 
class EmbedChunks:
    def __init__(self, model_name):
        if model_name == "text-embedding-ada-002":
            self.embedding_model = OpenAIEmbeddings(
                model=model_name,
                openai_api_base=os.environ["OPENAI_API_BASE"],
                openai_api_key=os.environ["OPENAI_API_KEY"])
        else:
            self.embedding_model = HuggingFaceEmbeddings(
                model_name=model_name,
                model_kwargs = {'device': 'cpu'},
                encode_kwargs = {'normalize_embeddings': False})
    def __call__(self, batch):
        embeddings = self.embedding_model.embed_documents(batch["text"])
        return {"text": batch["text"], "source": batch["source"], "embeddings": 
embeddings}

In [74]:
# Embed chunks
embedding_model_name = "sentence-transformers/all-MiniLM-l6-v2"
embedded_chunks = chunks_ds.map_batches(
    EmbedChunks,
    fn_constructor_kwargs={"model_name": embedding_model_name},
    batch_size=100, 
    num_cpus=1,
    compute=ActorPoolStrategy(size=2))

In [71]:
import chromadb
client = chromadb.PersistentClient(path="./chromdb")
collection = client.get_or_create_collection(name="test")

In [72]:
class StoreResults:
    def __call__(self, batch):
        collection.add(documents=batch["text"],embeddings=batch["embeddings"],metadatas=batch["source"])
        return {}

In [75]:
# Index data
embedded_chunks.map_batches(
    StoreResults,
    batch_size=128,
    num_cpus=1,
    compute=ActorPoolStrategy(size=2),
).count()

2023-12-13 22:31:20,380	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)->MapBatches(EmbedChunks)] -> ActorPoolMapOperator[MapBatches(StoreResults)]
2023-12-13 22:31:20,381	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-13 22:31:20,382	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-12-13 22:31:20,425	INFO actor_pool_map_operator.py:114 -- FlatMap(extract_sections)->FlatMap(partial)->MapBatches(EmbedChunks): Waiting for 2 pool actors to start...




KeyboardInterrupt: 

Exception ignored in: <function StreamingExecutor.__del__ at 0x121e34a60>
Traceback (most recent call last):
  File "/Users/himanshukumar/.pyenv/versions/3.10.13/envs/nyu-rag-app/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 161, in __del__
    self.shutdown()
  File "/Users/himanshukumar/.pyenv/versions/3.10.13/envs/nyu-rag-app/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 174, in shutdown
    self.join(timeout=2.0)
  File "/Users/himanshukumar/.pyenv/versions/3.10.13/lib/python3.10/threading.py", line 1091, in join
    raise RuntimeError("cannot join thread before it is started")
RuntimeError: cannot join thread before it is started


