In [None]:
# install dependencies
!pip install langchain ray==2.9.3 datasets sentence-transformers
!pip install "cloud-sql-python-connector[pg8000]" SQLAlchemy==2.0.7

In [None]:
! mkdir -p test

In [3]:
%%writefile test/test.py
# Comment out the above line if you want to see notebook print out, but the line is required for the actual ray job

import os
import uuid
import ray
from langchain.document_loaders import ArxivLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer
from typing import List
import torch
from datasets import load_dataset_builder, load_dataset, Dataset
from huggingface_hub import snapshot_download
from google.cloud.sql.connector import Connector
import sqlalchemy

# initialize parameters
INSTANCE_CONNECTION_NAME = "<project-id>:<location>:pgvector-instance" # Modify the project and region based on your setting
print(f"Your instance connection name is: {INSTANCE_CONNECTION_NAME}")
DB_USER = "rag-user-notebook" # Modify this based on your setting
DB_PASS = "<password>" # Modify this based on your setting
DB_NAME = "pgvector-database"

# initialize Connector object
connector = Connector()

# function to return the database connection object
def getconn():
    conn = connector.connect(
        INSTANCE_CONNECTION_NAME,
        "pg8000",
        user=DB_USER,
        password=DB_PASS,
        db=DB_NAME
    )
    return conn

# create connection pool with 'creator' argument to our connection object function
pool = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
)

# Get data from dataset
DATASET = 'wiki_dpr'  # Huggingface Dataset to use
DATASET_SUBSET = 'psgs_w100.multiset.compressed.no_embeddings'
SHARED_DATA_BASEPATH='/data/rag/st'
SENTENCE_TRANSFORMER_MODEL = 'intfloat/multilingual-e5-small' # Transformer to use for converting text chunks to vector embeddings
SENTENCE_TRANSFORMER_MODEL_PATH_NAME='models--intfloat--multilingual-e5-small' # the downloaded model path takes this form for a given model name
SENTENCE_TRANSFORMER_MODEL_SNAPSHOT="ffdcc22a9a5c973ef0470385cef91e1ecb461d9f" # specific snapshot of the model to use
SENTENCE_TRANSFORMER_MODEL_PATH = SHARED_DATA_BASEPATH + '/' + SENTENCE_TRANSFORMER_MODEL_PATH_NAME + '/snapshots/' + SENTENCE_TRANSFORMER_MODEL_SNAPSHOT # the path where the model is downloaded one time

BATCH_SIZE = 100
CHUNK_SIZE = 1000 # text chunk sizes which will be converted to vector embeddings
CHUNK_OVERLAP = 10
TABLE_NAME = 'huggingface_db'  # CloudSQL table name
DIMENSION = 384  # Embeddings size
WORKING_DATASET_SIZE = 100 # number of dataset rows used for the example
ACTOR_POOL_SIZE = 1 # number of actors for the distributed map_batches function

class Embed:
  def __init__(self):
        print("torch cuda version", torch.version.cuda)
        device="cpu"
        if torch.cuda.is_available():
            print("device cuda found")
            device="cuda"

        print ("reading sentence transformer model from cache path:", SENTENCE_TRANSFORMER_MODEL_PATH)
        self.transformer = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL_PATH, device=device)
        self.splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, length_function=len)

  def __call__(self, text_batch: List[str]):
      text = text_batch["item"]
      # print("type(text)=", type(text), "type(text_batch)=", type(text_batch))
      chunks = []
      for data in text:
        splits = self.splitter.split_text(data)
        # print("len(data)", len(data), "len(splits)=", len(splits))
        chunks.extend(splits)

      embeddings = self.transformer.encode(
          chunks,
          batch_size=BATCH_SIZE
      ).tolist()
      print("len(chunks)=", len(chunks), ", len(emb)=", len(embeddings))
      return {'results':list(zip(chunks, embeddings))}


# prepare the persistent shared directory to store artifacts needed for the ray workers
os.makedirs(SHARED_DATA_BASEPATH, exist_ok=True)

# One time download of the sentence transformer model to a shared persistent storage available to the ray workers
snapshot_download(repo_id=SENTENCE_TRANSFORMER_MODEL, revision=SENTENCE_TRANSFORMER_MODEL_SNAPSHOT, cache_dir=SHARED_DATA_BASEPATH)

print("WORKING_DATASET_SIZE=", WORKING_DATASET_SIZE)

# Process the dataset first
# Load the HF dataset in streaming mode
dataset = load_dataset(DATASET, DATASET_SUBSET, split='train', streaming=True)
sub_data_dataset = dataset.take(WORKING_DATASET_SIZE)
test_dataset = Dataset.from_list(list(sub_data_dataset))

# Wrap the HF data with a ray data
ray_ds = ray.data.from_items(test_dataset)
print(ray_ds.schema)

# Distributed flat map to extract the raw text fields.
ds_batch = ray_ds.flat_map(lambda row: [{'item': row["text"].replace("\n", " ")}])
print(ds_batch.schema)

# Distributed map batches to create chunks out of each row, and fetch the vector embeddings by running inference on the sentence transformer
ds_embed = ds_batch.map_batches(
    Embed,
    compute=ray.data.ActorPoolStrategy(size=ACTOR_POOL_SIZE),
    batch_size=BATCH_SIZE,  # Large batch size to maximize GPU utilization.
    num_gpus=1,  # 1 GPU for each actor.
    # num_cpus=1,
)

print("Embeddings ray dataset", ds_embed.schema)

data_text = ""
data_emb = ""

# connect to connection pool
with pool.connect() as db_conn:
  db_conn.execute(
    sqlalchemy.text(
    "CREATE EXTENSION IF NOT EXISTS vector;"
    )
  )
  db_conn.commit()

  # create huggingface_db table
  create_table_query = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " ( id VARCHAR(255) NOT NULL, text TEXT NOT NULL, text_embedding vector(384) NOT NULL, PRIMARY KEY (id));"
  db_conn.execute(
    sqlalchemy.text(create_table_query)
  )
  # commit transaction (SQLAlchemy v2.X.X is commit as you go)
  db_conn.commit()
  print("Created table=", TABLE_NAME)

  # TODO: Fix workaround access grant for the frontend to access the table.
  grant_access_stmt = "GRANT SELECT on " + TABLE_NAME + " to \"rag-user\";"
  db_conn.execute(
    sqlalchemy.text(grant_access_stmt)
  )

  insert_stmt = sqlalchemy.text(
      "INSERT INTO huggingface_db (id, text, text_embedding) VALUES (:id, :text, :text_embedding)",
  )
  for output in ds_embed.iter_rows():
    # print ("type of embeddings", type(output["results"][1]), "len embeddings", len(output["results"][1]))
    # restrict the text string to be less than 65535
    data_text = output["results"][0][:65535]
    # vector data pass in needs to be a string  
    data_emb = ",".join(map(str, output["results"][1]))
    data_emb = "[" + data_emb + "]"
    # print("text_embedding is ", data_emb)
    id = uuid.uuid4()

    # insert entries into table
    # print("insert into huggingface_db")
    db_conn.execute(insert_stmt, parameters={"id": id, "text": data_text, "text_embedding": data_emb})
  

  # batch commit transactions
  db_conn.commit()

  # query and fetch table
  results = db_conn.execute(sqlalchemy.text("SELECT * FROM huggingface_db")).fetchall()
  # uncomment to show results to verify or debug
  # for row in results:
  #   print(row)

  # verify results
  transformer = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL)
  query_text = "Each family would then have two or three small pieces of land scattered about the village, which they used to grow crops. For many years people lived in the village and then in 1845 Famine struck in Achill as it did in the rest of Ireland" 
  query_emb = transformer.encode(query_text).tolist()
  query_request = "SELECT id, text, text_embedding, 1 - ('[" + ",".join(map(str, query_emb)) + "]' <=> text_embedding) AS cosine_similarity FROM " + TABLE_NAME + " ORDER BY cosine_similarity DESC LIMIT 5;" 
  query_results = db_conn.execute(sqlalchemy.text(query_request)).fetchall()
  db_conn.commit()
  print("print query_results, the 1st one is the hit")  
  for row in query_results:
    print(row)

# cleanup connector object
connector.close()
print ("end job")

In [None]:
import ray
from ray.job_submission import JobSubmissionClient
client = JobSubmissionClient("ray://example-cluster-kuberay-head-svc:10001")

In [None]:
job_id = client.submit_job(
    entrypoint="python test.py",
    # Path to the local directory that contains the entrypoint file.
    runtime_env={
        "working_dir": "/home/jovyan/test", # upload the local working directory to ray workers
        "pip": [
                "langchain",
                "transformers",
                "sentence-transformers",
                "pyarrow",
                "datasets",
                "torch==2.0.1",
                "cloud-sql-python-connector[pg8000]",
                "SQLAlchemy",
                "huggingface_hub",
                ]
    }
)
print("jobid:", job_id)

In [None]:
# Need to run kubectl port-forward -n <namespace> service/example-cluster-kuberay-head-svc 8265:8265 to see the UI
# Fetch job status
!ray job status {job_id}  --address "ray://example-cluster-kuberay-head-svc:10001" 