In [227]:
import pandas as pd 
import ray
from pathlib import Path
from bs4 import BeautifulSoup
import re

from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
from ray.data import ActorPoolStrategy

import pgvector
import psycopg2
from pgvector.psycopg import register_vector

import os 

from langchain.document_loaders import ReadTheDocsLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

In [157]:
ray.__version__

'2.7.0'

In [180]:
ray.shutdown()



In [181]:
ray.init()

2024-02-24 15:57:14,726	INFO worker.py:1633 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.10.13
Ray version:,2.7.0
Dashboard:,http://127.0.0.1:8265


### Load Data

In [182]:
# Ray dataset
EFS_DIR = './emerge_docs/'
DOCS_DIR = Path(EFS_DIR,'docs.emergetools.com/docs/')
ds = ray.data.from_items([{"path": path} for path in DOCS_DIR.rglob("*.html") if not path.is_dir()])
print(f"{ds.count()} documents")

79 documents


### Extract Text From Sections

In [183]:
def extract_text_from_sections(html_dict):

    with open(html_dict["path"], 'r', encoding='utf-8') as f:
        html_content = f.read()

    soup = BeautifulSoup(html_content, 'html.parser')
    sections = []

    for id,section in enumerate(soup.find_all(['section'])):
        if len(section.find_all('p')) > 0:
            paragraph_elements = section.find_all('p')
            section_text = ' '.join(paragraph.get_text(strip=True) for paragraph in paragraph_elements)
            section_dict = {
                        "source": f"{str(html_dict['path']).split('/')[-1]}#{id}",
                        "text": section_text  
                    }
            sections.append(section_dict)

    return sections


In [184]:
sections_ds = ds.flat_map(extract_text_from_sections)
sections = sections_ds.take_all()

2024-02-24 15:57:23,305	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_text_from_sections)]
2024-02-24 15:57:23,306	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-24 15:57:23,307	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/79 [00:00<?, ?it/s]

### Chunk Data

In [185]:
def chunk_section(section,chunk_size,overlap):

    text_splitter = RecursiveCharacterTextSplitter(
    separators=[".","\n","\n\n"],
    chunk_size=chunk_size,
    chunk_overlap=overlap,
    length_function=len,
    )

    chunks = text_splitter.create_documents(
    texts=[section["text"]], 
    metadatas=[{"source": section["source"]}])

    return [{"text": chunk.page_content, "source": chunk.metadata["source"]} for chunk in chunks]

In [186]:
chunk_size = 300
chunk_overlap = 50

chunks_ds = sections_ds.flat_map(lambda section:chunk_section(section,chunk_size=chunk_size,overlap=chunk_overlap))

In [187]:
chunks_ds.count()

2024-02-24 15:57:26,814	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_text_from_sections)->FlatMap(<lambda>)]
2024-02-24 15:57:26,815	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-24 15:57:26,816	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/79 [00:00<?, ?it/s]

635

In [188]:
chunks_ds.show(1)

2024-02-24 15:57:29,035	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_text_from_sections)->FlatMap(<lambda>)] -> LimitOperator[limit=1]
2024-02-24 15:57:29,036	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-24 15:57:29,037	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

{'text': "If you haven't already, sign up for an account athttps://emergetools.com/signup The quickest way to get started is to drag-and-drop a manual upload from ourUploads page.  Emerge requires uploads to be azippedxcarchive(.xcarchive.zip)  orzippedxcframework(.xcframework.zip) file", 'source': 'quickstart.html#10'}


### Embed Data

In [204]:
class EmbedChunks:
    def __init__(self, model_name):
        self.embedding_model = HuggingFaceEmbeddings(
            model_name=model_name,
            model_kwargs={"device": "cpu"},
            encode_kwargs={"device": "cpu", "batch_size": 3})

    def __call__(self, batch):
        embeddings = self.embedding_model.embed_documents(batch["text"])
        return {"text": batch["text"], "source": batch["source"], "embeddings": 
    embeddings}

In [205]:
# Embed chunks
embedding_model_name = "thenlper/gte-base"
embedded_chunks = chunks_ds.map_batches(
    EmbedChunks,
    fn_constructor_kwargs={"model_name": embedding_model_name},
    batch_size=3, 
    num_cpus=2,
    compute=ActorPoolStrategy(size=2))

In [264]:
embedded_chunks.show(10)

2024-02-25 18:16:07,498	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_text_from_sections)->FlatMap(<lambda>)] -> ActorPoolMapOperator[MapBatches(EmbedChunks)] -> LimitOperator[limit=10]
2024-02-25 18:16:07,500	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-25 18:16:07,501	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2024-02-25 18:16:07,527	INFO actor_pool_map_operator.py:106 -- MapBatches(EmbedChunks): Waiting for 2 pool actors to start...


Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

{'text': "If you haven't already, sign up for an account athttps://emergetools.com/signup The quickest way to get started is to drag-and-drop a manual upload from ourUploads page.  Emerge requires uploads to be azippedxcarchive(.xcarchive.zip)  orzippedxcframework(.xcframework.zip) file", 'source': 'quickstart.html#10', 'embeddings': [0.009439121000468731, -0.03042501211166382, 0.01017707958817482, 0.03201162442564964, 0.09091600030660629, 0.002893374301493168, 0.004314114339649677, 0.024726662784814835, 0.009248252958059311, -0.055820565670728683, -0.05065443366765976, 0.01035268884152174, -0.07338765263557434, -0.0012087784707546234, -0.026057912036776543, 0.04280624911189079, 0.031157277524471283, -0.00731886550784111, 0.030110331252217293, 0.017970822751522064, -0.009607716463506222, -0.0295089203864336, -0.005334742832928896, 0.016603244468569756, 0.007489761803299189, -0.0006265927222557366, -0.033214256167411804, 0.01268627680838108, -0.08527663350105286, -0.025874512270092964, 



### Index Data

In [325]:
def create_connection():
    """Create a connection to the PostgreSQL database."""
    try:
        connection = psycopg2.connect(
            user="chetanmunugala",
            host="localhost",
            port="5432",
            database="vector_db"
        )
        #print("Connected to the PostgreSQL database successfully.")
        return connection
    except (Exception, Error) as error:
        print("Error while connecting to PostgreSQL:", error)
        return None

In [381]:
class StoreResults:
    def __call__(self, batch):

        connection = create_connection()
        cursor = connection.cursor()
        cursor.execute("INSERT INTO DOCUMENT (text,source,embedding) VALUES (%s, %s, %s)",(batch['text'][0],batch['source'][0],batch['embeddings'][0]))
        connection.commit()

        return {}

In [382]:
# Index data
embedded_chunks.map_batches(
    StoreResults,
    batch_size=5,
    num_cpus=1,
    compute=ActorPoolStrategy(size=2),
).count()

2024-02-28 16:39:36,725	INFO streaming_executor.py:93 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_text_from_sections)->FlatMap(<lambda>)] -> ActorPoolMapOperator[MapBatches(EmbedChunks)] -> ActorPoolMapOperator[MapBatches(StoreResults)]
2024-02-28 16:39:36,726	INFO streaming_executor.py:94 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-02-28 16:39:36,727	INFO streaming_executor.py:96 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2024-02-28 16:39:36,752	INFO actor_pool_map_operator.py:106 -- MapBatches(EmbedChunks): Waiting for 2 pool actors to start...
2024-02-28 16:39:42,064	INFO actor_pool_map_operator.py:106 -- MapBatches(StoreResults): Waiting for 2 pool actors to start...


Running 0:   0%|          | 0/79 [00:00<?, ?it/s]



0

### Query Retrieval

In [414]:
embedding_model = HuggingFaceEmbeddings(model_name=embedding_model_name)
query = "Each time a new version of the IOS app is uploaded, does it get installed on an iphone?"
embedding = embedding_model.embed_query(query)
len(embedding)


768

In [415]:
# Get context
numResponses = 5

connection = create_connection()
cursor = connection.cursor()
        

In [416]:
cursor.execute(f"SELECT * FROM document ORDER BY embedding <-> '{embedding}' LIMIT {numResponses}")

In [417]:
cursor.fetchall()

[(12,
  "Each time a new release build of your app is uploaded, the app is installed on a Google Pixel 3 running Android 12, along with the version it's being compared against (usually the app referenced from the specifiedbaseSha)",
  'android-performance-analysis.html#10',
  '[0.0013143771,0.0055333646,0.04361266,0.010873758,0.07567704,0.017372062,0.024439396,0.03752786,-0.0037555338,-0.05270206,-0.0052037775,0.011927625,-0.08648916,0.047483016,-0.025557172,0.05799295,0.08211088,0.044278715,0.026137006,0.014077406,0.00733199,-0.027967976,-0.017512007,0.03638663,-0.004227356,0.01857862,0.05431834,0.0035573356,-0.09286568,-0.031750616,0.041775532,-0.012714182,0.0021942423,-0.013267471,-0.022434017,0.0031434575,-0.04150809,-0.015831584,-0.017908514,-0.01584972,-0.032361135,-0.017889697,-0.029296517,-0.0032892865,-0.030163702,0.052532442,-0.050491344,0.03356743,-0.030954942,-0.021751618,-0.048821647,0.03359499,-0.024986021,-0.024145229,0.015021857,0.05896284,0.03935177,-0.03848366,-0.0167

In [418]:
def semantic_search(query,embedding_model,numResponses):

    embedding = embedding_model.embed_query(query)
    connection = create_connection()
    cursor = connection.cursor()
    cursor.execute(f"SELECT * FROM document ORDER BY embedding <-> '{embedding}' LIMIT {numResponses}")

    potentialContexts = cursor.fetchall()
    semantic_contexts = [{"id": context[0], "text": context[1], "source": context[2]} for row in potentialContexts]

    return semantic_contexts

In [None]:
from rag.generate import prepare_response
from rag.utils import get_client

In [None]:
cursor.execute("INSERT INTO DOCUMENT (text,source,embedding) VALUES (%s, %s, %s)",(batch['text'][0],batch['source'][0],batch['embeddings'][0]))
        connection.commit()
with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
    register_vector(conn)
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM document ORDER BY embedding <-> %s LIMIT %s", (embedding, num_chunks))
        rows = cur.fetchall()
        context = [{"text": row[1]} for row in rows]
        sources = [row[2] for row in rows]