In [1]:
!pip install -r requirements.txt

Collecting ray==2.6.1 (from -r requirements.txt (line 1))
  Downloading ray-2.6.1-cp310-cp310-macosx_11_0_arm64.whl (55.9 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.9/55.9 MB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:02[0mm
[?25hCollecting virtualenv==20.24.2 (from -r requirements.txt (line 2))
  Downloading virtualenv-20.24.2-py3-none-any.whl (3.0 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m745.2 kB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
[?25hCollecting langchain==0.0.246 (from -r requirements.txt (line 3))
  Downloading langchain-0.0.246-py3-none-any.whl (1.4 MB)
[2K     [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0mm eta [36m0:00:01[0m[36m0:00:01[0m
[?25hCollecting sentence-transformers==2.2.2 (from -r requirements.txt (line 4))
  Downloading sentenc

In [2]:
import ray
ray.init(runtime_env={"pip": ["langchain", "pypdf", "sentence_transformers", "transformers"]})

2023-07-28 14:01:24,955	INFO worker.py:1621 -- Started a local Ray instance.


0,1
Python version:,3.10.11
Ray version:,2.6.1


In [3]:
from langchain.document_loaders import ArxivLoader

In [4]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000, chunk_overlap=100, length_function=len
    )

In [11]:
!pip install certifi



In [12]:
docs_chunks = ArxivLoader(query="1605.08386", load_max_docs=2).load_and_split(text_splitter)

URLError: <urlopen error [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self-signed certificate in certificate chain (_ssl.c:1007)>

In [None]:
ds = ray.data.from_items(docs_chunks)

In [None]:
for d in ds.iter_rows():
  print(d)

In [None]:
# preprocess chunks by replacing \n by empty space
ds_batch = ds.flat_map(lambda row: [{'item': row["item"].page_content.replace("\n", " ")}])

In [None]:
for row in ds_batch.iter_rows():
  print(row)

In [9]:
from sentence_transformers import SentenceTransformer
from typing import List

In [None]:
model_name="intfloat/multilingual-e5-small"

In [10]:
class Embed:
  def __init__(self):
        self.transformer = SentenceTransformer(model_name)

  def __call__(self, text_batch: List[str]):
      # We manually encode using sentence_transformer since LangChain
      # HuggingfaceEmbeddings does not support specifying a batch size yet.
      text = text_batch["item"]
      embeddings = self.transformer.encode(
          text,
          batch_size=100 #,  # Large batch size to maximize GPU/CPU utilization.
          #device="cuda",
      ).tolist()

      #return {'results': list((text, embeddings))}
      return {'results':list(zip(text, embeddings))}


In [None]:
ds_embed = ds_batch.map_batches(
    Embed,
    compute=ray.data.ActorPoolStrategy(size=1),
    # Large batch size to maximize GPU utilization.
    # Too large a batch size may result in GPU running out of memory.
    # If the chunk size is increased, then decrease batch size.
    # If the chunk size is decreased, then increase batch size.
    #batch_size=10,  # Large batch size to maximize GPU utilization.
    #compute=ray.data.ActorPoolStrategy(min_size=1, max_size=20),  # I have 20 GPUs in my cluster
    #num_gpus=0.1,  # 1 GPU for each actor.
    num_cpus=0,
)

In [None]:
for output in ds_embed.iter_rows():
    print(len(output["results"][1]))

## Setup connection with Astra DB

In [None]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from local_creds_secrets import *

cloud_config= {
  'secure_connect_bundle': secure_bundle_path
}
auth_provider = PlainTextAuthProvider(client_id, client_secret)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

row = session.execute("select release_version from system.local").one()
if row:
  print(row[0])
else:
  print("An error occurred.")

In [None]:
table_name = f"{db_keyspace}.papers"

In [None]:
table_creation_query = f"""CREATE TABLE IF NOT EXISTS {table_name} (
  id int PRIMARY KEY,
  name TEXT,
  description TEXT,
  item_vector VECTOR<FLOAT, 384> //create a 384-dimensional embedding
);"""

In [None]:
session.execute(table_creation_query)

In [None]:
sai_index_query= f"""CREATE CUSTOM INDEX IF NOT EXISTS ann_index
  ON {table_name}(item_vector) USING 'StorageAttachedIndex'; """
  #WITH OPTIONS = { 'similarity_function': 'DOT_PRODUCT' };"""
session.execute(sai_index_query)

In [None]:
session.execute(sai_index_query)

### Insert vector records into AstraDB

In [None]:
id=1
for output in ds_embed.iter_rows():
  query = f"""INSERT INTO vector_search.papers (id, name, description, item_vector) 
  VALUES ({id},'paper_1_sample', '{output["results"][0]}',{output["results"][1]});"""
  id+=1
  session.execute(query)

In [None]:
rows = session.execute("select * from vector_search.papers limit 3;").all()

In [None]:
for row in rows:
  print(row)

## Searching a sentence from the table

In [None]:
model = SentenceTransformer('intfloat/multilingual-e5-small')

In [None]:
example_sentence="two lattice points are connected by an edge if their diﬀerence lies in a ﬁnite"
embedding = list(model.encode(example_sentence))


In [None]:
search_query = f"""SELECT * FROM vector_search.papers
ORDER BY item_vector ANN OF {embedding}
LIMIT 2;"""

In [None]:
session.execute(search_query).all()