## Resume Chatbot Demo

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/rag-pdf-self-managed-0.png?raw=true" style="width: 800px; margin-left: 10px">

### Steps:

- Use autoloader to load the binary PDFs into our first table. 
- Use the `unstructured` library  to parse the text content of the PDFs.
- Use `llama_index` or `Langchain` to split the texts into chunks.
- Compute embeddings for the chunks.
- Save our text chunks + embeddings in a Delta Lake table, ready for Vector Search indexing.

In [0]:
%pip install transformers==4.30.2 "unstructured[pdf,docx]==0.10.30" langchain==0.0.319 llama-index==0.9.3 databricks-vectorsearch==0.20 pydantic==1.10.9 mlflow==2.9.0
dbutils.library.restartPython()

In [0]:
%run ../_resources/00-init-advanced $reset_all_data=false

In [0]:
install_ocr_on_nodes()

## Ingesting Resume PDFs
1. Upload all the pdfs to [Unity Catalog Volume](https://docs.databricks.com/en/connect/unity-catalog/volumes.html)
1. First, let's ingest our PDFs as a Delta Lake table with path urls and content in binary format
1. We'll use [Databricks Autoloader](https://docs.databricks.com/en/ingestion/auto-loader/index.html) to incrementally ingest unstructured PDF data in binary format

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS volume_resume;

In [0]:
# List our raw PDF docs
volume_folder =  f"/Volumes/{catalog}/{db}/volume_resume"
display(dbutils.fs.ls(volume_folder+"/resume-pdf"))

#### Cleanup from previous runs

In [0]:
%sql
DROP TABLE IF EXISTS dbdemos_vishesh.llm_db.pdf_raw;
DROP TABLE IF EXISTS dbdemos_vishesh.llm_db.resume_pdf;

In [0]:
dbutils.fs.rm(f'dbfs:{volume_folder}/checkpoints/raw_docs', True)
dbutils.fs.rm(f"dbfs:{volume_folder}/checkpoints/pdf_chunk", True)

In [0]:
df = (spark.readStream.format('cloudFiles')
        .option('cloudFiles.format', 'BINARYFILE')
        .option("pathGlobFilter", "*.pdf")
        .load('dbfs:'+volume_folder+"/resume-pdf"))

# Write the data as a Delta table
(df.writeStream
  .trigger(availableNow=True)
  .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/raw_docs')
  .table('pdf_raw').awaitTermination())

In [0]:
%sql
SELECT * FROM pdf_raw LIMIT 2

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/chunk-window-size.png?raw=true" style="float: right" width="600px">

## Extracting our PDF content and splitting into smaller chunks
1. We need to convert the PDF documents bytes to text, and extract chunks from their content
1. We'll extract the content and then use llama_index `SentenceSplitter`, and ensure that each chunk isn't bigger than 500 tokens. Prompt+Answer should stay below your model max window size (4096 for llama2)
1. The chunk size and chunk overlap depend on the use case and the dataset. Review chunks to ensure they make sense and contain relevant information

In [0]:
from unstructured.partition.auto import partition
import re

def extract_doc_text(x : bytes) -> str:
  # Read files and extract the values with unstructured
  sections = partition(file=io.BytesIO(x))
  def clean_section(txt):
    txt = re.sub(r'\n', '', txt)
    return re.sub(r' ?\.', '.', txt)
  # Default split is by section of document, concatenate them all together because we want to split by sentence instead.
  return "\n".join([clean_section(s.text) for s in sections]) 

In [0]:
def get_resume_attributes(prompt, endpoint_name = 'dbdemos-azure-openai'):
  host = 'e2-demo-field-eng.cloud.databricks.com'
  token = ''
  url = f"https://{host}/serving-endpoints/{endpoint_name}/invocations"

  payload = json.dumps({
    "messages": [{"role": "user",
       "content": prompt
       }],
    "max_tokens": 500})
  headers = {'Content-Type': 'application/json',
             'Authorization': f'Bearer {token}'}
  
  response = requests.request("POST", url, headers=headers, data=payload).json()
  return response['choices'][0]['message']['content']

In [0]:
import io
import re
import json
import requests

with open('/Volumes/dbdemos_vishesh/llm_db/volume_resume/resume-pdf/11911050 Nidhi.pdf', 'rb') as pdf:
  pdf_content = pdf.read()
  doc = extract_doc_text(pdf_content)

  prompt = f"""Please summarize the following information relevant for hiring and recruitment from the provided resume text : ```{doc}```. Make sure you have included List all the skills relevant to Data Science and Machine Learning, graduation year or expected graduation year (if not available, set current year), and provide a summary of the candidate's work experience, including job titles, company names, and dates of employment."""
  
  # prompt = f"""Please extract the following information from the provided resume text and format it as JSON: ```{doc}```. The json format should look like: {{"skills": "List all relevant skillsets mentioned in the resume.", "graduation_year": "Extract the graduation year or expected graduation year (if not available, set current year)", "work_experience": "Provide a summary of the candidate's work experience, including job titles, company names, and dates of employment."}}"""
  
  res_sum = get_resume_attributes(prompt)
  print("PDF Text:\r\n")
  print(doc)
  print("\r\nPDF Summary:\r\n")
  print(res_sum)

In [0]:
from llama_index.langchain_helpers.text_splitter import SentenceSplitter
from llama_index import Document, set_global_tokenizer
from transformers import AutoTokenizer

# Reduce the arrow batch size as our PDF can be big in memory
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10)

@pandas_udf("array<string>")
def read_as_chunk(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    #set llama2 as tokenizer to match our model size (will stay below BGE 1024 limit)
    set_global_tokenizer(
      AutoTokenizer.from_pretrained("hf-internal-testing/llama-tokenizer")
    )
    #Sentence splitter from llama_index to split on sentences
    splitter = SentenceSplitter(chunk_size=500, chunk_overlap=50)
    def extract_and_split(b):
      txt = extract_doc_text(b)
      
      prompt = f"""Please summarize the following information relevant for hiring and recruitment from the provided resume text : ```{doc}```. Make sure you have included List all the skills relevant to Data Science and Machine Learning, graduation year or expected graduation year (if not available, set current year), and provide a summary of the candidate's work experience, including job titles, company names, and dates of employment."""
      
      # prompt = f"""Please extract the following information from the provided resume text and format it as JSON: ```{doc}```. The json format should look like: {{"skills": "List all relevant skillsets mentioned in the resume.", "graduation_year": "Extract the graduation year or expected graduation year (if not available, set current year)", "work_experience": "Provide a summary of the candidate's work experience, including job titles, company names, and dates of employment."}}"""
      
      res_sum = get_resume_attributes(prompt)

      nodes = splitter.get_nodes_from_documents([Document(text=res_sum)])
      return [n.text for n in nodes]

    for x in batch_iter:
        yield x.apply(extract_and_split)

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/chatbot-rag/databricks-vector-search-type.png?raw=true" style="float: right" width="800px">

###Databricks provide multiple types of vector search indexes:

- **Managed embeddings**: you provide a text column and endpoint name and Databricks synchronizes the index with your Delta table 
- **Self Managed embeddings**: you compute the embeddings and save them as a field of your Delta Table, Databricks will then synchronize the index
- **Direct index**: when you want to use and update the index without having a Delta Table.

In this demo, we will setup a **Self-managed Embeddings** index by computing the embeddings of our chunks and saving them as a Delta Lake table field as `array&ltfloat&gt`


###Databricks supports several endpoint types to compute embeddings or evaluate a model:
- A **foundation model endpoint**, provided by databricks (ex: llama2-70B, MPT...)
- An **external endpoint**, acting as a gateway to an external model (ex: Azure OpenAI)
- A **custom**, fined-tuned model hosted on Databricks model service

See [Model Serving Endpoint page](/ml/endpoints)

In [0]:
from mlflow.deployments import get_deploy_client

# bge-large-en Foundation models are available using the /serving-endpoints/databricks-bge-large-en/invocations api. 
deploy_client = get_deploy_client("databricks")

## NOTE: if you change your embedding model here, make sure you change it in the query step too
embeddings = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": ["Who has python skills?"]})
pprint(embeddings)

In [0]:
%sql
--Note that we need to enable Change Data Feed on the table to create the index
CREATE TABLE IF NOT EXISTS resume_pdf (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  content STRING,
  embedding ARRAY <FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true); 

### Computing the chunk embeddings and saving them to our Delta Table

1. The last step is to now compute an embedding for all our chunks using the foundation model endpoint
2. Note that this part would typically be setup as a production-grade job, running as soon as a new documentation page is updated (incremental DLT)

In [0]:
@pandas_udf("array<float>")
def get_embedding(contents: pd.Series) -> pd.Series:
    import mlflow.deployments
    deploy_client = mlflow.deployments.get_deploy_client("databricks")
    def get_embeddings(batch):
        #Note: this will fail if an exception is thrown during embedding creation (add try/except if needed) 
        response = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": batch})
        return [e['embedding'] for e in response.data]

    # Splitting the contents into batches of 150 items each, since the embedding model takes at most 150 inputs per request.
    max_batch_size = 150
    batches = [contents.iloc[i:i + max_batch_size] for i in range(0, len(contents), max_batch_size)]

    # Process each batch and collect the results
    all_embeddings = []
    for batch in batches:
        all_embeddings += get_embeddings(batch.tolist())

    return pd.Series(all_embeddings)

In [0]:
(spark.readStream.table('pdf_raw')
      .withColumn("content", F.explode(read_as_chunk("content")))
      .withColumn("embedding", get_embedding("content"))
      .selectExpr('path as url', 'content', 'embedding')
  .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/pdf_chunk')
    .table('resume_pdf').awaitTermination())

In [0]:
%sql
SELECT * FROM resume_pdf WHERE url like '%.pdf' limit 10


### Creating Self-Managed Vector Search Index

1. we'll configure Databricks Vector Search endpoint and a vector search index using this endpoint on top of our delta table
1. Vector search index uses a Vector search endpoint to serve the embeddings (Vector Search API endpoint)
1. Multiple Indexes can use the same endpoint

In [0]:
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()

if VECTOR_SEARCH_ENDPOINT_NAME not in [e['name'] for e in vsc.list_endpoints()['endpoints']]:
    vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD")

wait_for_vs_endpoint_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME)
print(f"Endpoint named {VECTOR_SEARCH_ENDPOINT_NAME} is ready.")

In [0]:
from databricks.sdk import WorkspaceClient
import databricks.sdk.service.catalog as c

#The table we'd like to index
source_table_fullname = f"{catalog}.{db}.resume_pdf"
# Where we want to store our index
vs_index_fullname = f"{catalog}.{db}.resume_pdf_self_managed_vs_index"

if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname):
  print(f"Creating index {vs_index_fullname} on endpoint {VECTOR_SEARCH_ENDPOINT_NAME}...")
  vsc.create_delta_sync_index(
    endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
    index_name=vs_index_fullname,
    source_table_name=source_table_fullname,
    pipeline_type="TRIGGERED", #Sync needs to be manually triggered
    primary_key="id",
    embedding_dimension=1024, #Match your model embedding size (bge)
    embedding_vector_column="embedding"
  )
else:
  #Trigger a sync to update our vs content with the new data saved in the table
  vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).sync()

#Let's wait for the index to be ready and all our embeddings to be created and indexed
wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)

## Searching for similar content

1. Databricks will automatically capture and synchronize new entries in your Delta Lake Table
1. `similarity_search` also supports a filters parameter. This is useful to add a security layer to your RAG system: you can filter out some sensitive content based on who is doing the call

In [0]:
question = """Search for resumes of candidates with experience in building and maintaining Machine Learning solutions at scale, proficient in predictive modeling, Machine Learning, Deep Learning, Reinforcement Learning. I am looking for skills including pandas, numpy, sql, python, machine learning, data science"""
# question = """Search for resumes of candidates with experience in building and maintaining ML solutions at scale, proficient in predictive modeling (ML, DL/RL) and experimentation/causal inference methods. They should have experience working as a data scientist. Look for candidates with a proven track record of designing end-to-end ML systems capable of handling large-scale operations or working on Kaggle datasets as part of their projects or internships"""
# question = """{
#   "criteria": {
#     "skills": ["Machine Learning", "Deep Learning", "SQL", "Python", "Pandas", "Spark", "MLOps", "Data Science", "Statistics"],
#     "graduation_year": "2023",
#     "work_experience": "internship or job",
#     "additional_requirements": [
#       "data scientist",
#       "proven track record of designing end-to-end ML systems capable of handling large-scale operations",
#       "experience with Kaggle datasets as part of projects or internships"
#     ]
#   }"""

response = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": [question]})
embeddings = [e['embedding'] for e in response.data]

results = vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).similarity_search(
  query_vector=embeddings[0],
  columns=["url", "content"],
  num_results=5)
docs = results.get('result', {}).get('data_array', [])
pprint(docs)