In [1]:
import os
from functools import partial
from pathlib import Path

from langchain.document_loaders import ReadTheDocsLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
import psycopg2
from pgvector.psycopg2 import register_vector
from sentence_transformers import SentenceTransformer
import ray
from ray.data import ActorPoolStrategy

  from tqdm.autonotebook import tqdm, trange


In [2]:
ray.shutdown()
ray.init()

2024-10-31 03:37:40,461	INFO worker.py:1807 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.12.7
Ray version:,2.38.0
Dashboard:,http://127.0.0.1:8265


### Reading raw data

In [3]:
DOCS_DIR = Path('/home/ubuntu/pandas/doc')
files_path = [str(file) for file in DOCS_DIR.rglob("*.rst") if "_static" not in str(file)]

In [4]:
@ray.remote
def read_file(file_path):
    with open(file_path, "r") as f:
        text = f.read()
    return {'source': file_path, 'text': text}

This is not very efficient rn

In [5]:
ds = ray.data.from_items(ray.get([read_file.remote(file_path) for file_path in files_path]))

In [6]:
ds.count()

211

### chunking

In [7]:
chunk_size = 300
chunk_overlap = 50

In [8]:
def chunk_section(section, chunk_size, chunk_overlap):
    text_splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", " ", ""],
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
    )
    chunks = text_splitter.create_documents(
        texts=[section["text"]], 
        metadatas=[{"source": section["source"]}])
    return [{"text": chunk.page_content, "source": chunk.metadata["source"]} for chunk in chunks]

In [9]:
chunks_ds = ds.flat_map(partial(chunk_section, chunk_size=chunk_size, chunk_overlap=chunk_overlap))

In [10]:
chunks_ds.count()

2024-10-31 03:37:46,331	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-31_03-37-38_861305_18883/logs/ray-data
2024-10-31 03:37:46,332	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(partial)]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(partial) 1: 0.00 row [00:00, ? row/s]

16037

In [11]:
cds = chunks_ds.take(2)

2024-10-31 03:37:52,414	INFO dataset.py:2529 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-10-31 03:37:52,419	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-31_03-37-38_861305_18883/logs/ray-data
2024-10-31 03:37:52,420	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(partial)] -> LimitOperator[limit=2]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(partial) 1: 0.00 row [00:00, ? row/s]

- limit=2 2: 0.00 row [00:00, ? row/s]

### embeddings

In [12]:
embedding_model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

In [13]:
embedded_chunks = chunks_ds.flat_map(
    lambda row: [{"text": row["text"], "source": row["source"], "embeddings": embedding_model.encode(row["text"])}])

### DB storage

In [14]:
db_user = 'postgres'
db_password = 'CS230password'
db_host = 'database-1.cdi4gywsaigf.us-east-2.rds.amazonaws.com'
db_port = 5432
db_name = 'postgres'

db_connection_string = f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"

In [15]:
docs = embedded_chunks.take(2)

2024-10-31 03:37:53,285	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-31_03-37-38_861305_18883/logs/ray-data
2024-10-31 03:37:53,286	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(partial)->FlatMap(<lambda>)] -> LimitOperator[limit=2]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(partial)->FlatMap(<lambda>) 1: 0.00 row [00:00, ? row/s]

- limit=2 2: 0.00 row [00:00, ? row/s]

In [16]:
def insert_batch_into_db(batch):
    with psycopg2.connect(db_connection_string) as conn:
        register_vector(conn)
        with conn.cursor() as cur:
            for text, source, embedding in zip(batch["text"], batch["source"], batch["embeddings"]):
                cur.execute(
                    "INSERT INTO document (text, source, embedding) VALUES (%s, %s, %s)",
                    (text, source, embedding),
                )
        conn.commit()
    return {}

In [17]:
embedded_chunks.map_batches(
    insert_batch_into_db,
    batch_size=64,
    num_cpus=2,
    concurrency=2
).count()

2024-10-31 03:38:08,023	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-31_03-37-38_861305_18883/logs/ray-data
2024-10-31 03:38:08,025	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(partial)->FlatMap(<lambda>)] -> TaskPoolMapOperator[MapBatches(insert_batch_into_db)]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(partial)->FlatMap(<lambda>) 1: 0.00 row [00:00, ? row/s]

- MapBatches(insert_batch_into_db) 2: 0.00 row [00:00, ? row/s]

0