In [0]:

%pip install transformers==4.30.2 "unstructured[pdf,docx]==0.10.30" langchain==0.1.5 llama-index==0.9.3 databricks-vectorsearch==0.22 pydantic==1.10.9 mlflow==2.10.1
dbutils.library.restartPython()

In [0]:
%run ./init $reset_all_data=false

USE CATALOG `onlytrends`
using catalog.database `onlytrends`.`onlytrends_db`


DataFrame[]

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS onlytrends.onlytrends_db.volume_reports;

In [0]:
volume_folder =  f"/Volumes/{catalog}/{db}/volume_reports"
url = fileUrl
upload_pdf_to_volume(url, volume_folder)
display(dbutils.fs.ls(volume_folder))

saving /Volumes/onlytrends/onlytrends_db/volume_reports/DI_Tech-trends-2024.pdf


path,name,size,modificationTime
dbfs:/Volumes/onlytrends/onlytrends_db/volume_reports/DI_Tech-trends-2024.pdf,DI_Tech-trends-2024.pdf,9043029,1715016188000


In [0]:
df = (spark.readStream
        .format('cloudFiles')
        .option('cloudFiles.format', 'BINARYFILE')
        .option("pathGlobFilter", "*.pdf")
        .load('dbfs:'+volume_folder))

# Write the data as a Delta table
(df.writeStream
  .trigger(availableNow=True)
  .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/raw_docs')
  .table('pdf_raw').awaitTermination())

In [0]:
%sql SELECT * FROM pdf_raw LIMIT 2

path,modificationTime,length,content
dbfs:/Volumes/onlytrends/onlytrends_db/volume_reports/DI_Tech-trends-2024.pdf,2024-05-06T17:23:08Z,9043029,JVBERi0xLjcNJeLjz9MNCjQ3OTkgMCBvYmoNPDwvTGluZWFyaXplZCAxL0wgOTA0MzAyOS9PIDQ4MDIvRSAxMTk1OTQyL04gNjQvVCA5MDQwNDY5L0ggWyA1MjkgMTQ4MV0+Pg1lbmRvYmoNICAgICAgIA00ODIxIDAgb2JqDTw= (truncated)


In [0]:
install_ocr_on_nodes()

OCR libraries installed


In [0]:
from unstructured.partition.auto import partition
import re
import io

def extract_doc_text(x : bytes) -> str:
  # Read files and extract the values with unstructured
  sections = partition(file=io.BytesIO(x))
  def clean_section(txt):
    txt = re.sub(r'\n', '', txt)
    return re.sub(r' ?\.', '.', txt)
  # Default split is by section of document, concatenate them all together because we want to split by sentence instead.
  return "\n".join([clean_section(s.text) for s in sections]) 

In [0]:
from llama_index.langchain_helpers.text_splitter import SentenceSplitter
from llama_index import Document, set_global_tokenizer
from transformers import AutoTokenizer
from typing import Iterator
from pyspark.sql.functions import pandas_udf
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col, udf, length, pandas_udf
import os
import mlflow
from typing import Iterator
from mlflow import MlflowClient


# Reduce the arrow batch size as our PDF can be big in memory
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10)

@pandas_udf("array<string>")
def read_as_chunk(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    #set llama2 as tokenizer to match our model size (will stay below BGE 1024 limit)
    set_global_tokenizer(
      AutoTokenizer.from_pretrained("hf-internal-testing/llama-tokenizer")
    )
    #Sentence splitter from llama_index to split on sentences
    splitter = SentenceSplitter(chunk_size=500, chunk_overlap=50)
    def extract_and_split(b):
      txt = extract_doc_text(b)
      nodes = splitter.get_nodes_from_documents([Document(text=txt)])
      return [n.text for n in nodes]

    for x in batch_iter:
        yield x.apply(extract_and_split)

In [0]:
from mlflow.deployments import get_deploy_client

# bge-large-en Foundation models are available using the /serving-endpoints/databricks-bge-large-en/invocations api. 
deploy_client = get_deploy_client("databricks")
 
## NOTE: if you change your embedding model here, make sure you change it in the query step too
embeddings = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": ["What is Apache Spark?"]})
pprint(embeddings)

In [0]:
%sql
--Note that we need to enable Change Data Feed on the table to create the index
CREATE TABLE IF NOT EXISTS pdf_reports (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  content STRING,
  embedding ARRAY <FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true); 

In [0]:
@pandas_udf("array<float>")
def get_embedding(contents: pd.Series) -> pd.Series:
    import mlflow.deployments
    deploy_client = mlflow.deployments.get_deploy_client("databricks")
    def get_embeddings(batch):
        #Note: this will fail if an exception is thrown during embedding creation (add try/except if needed) 
        response = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": batch})
        return [e['embedding'] for e in response.data]

    # Splitting the contents into batches of 150 items each, since the embedding model takes at most 150 inputs per request.
    max_batch_size = 150
    batches = [contents.iloc[i:i + max_batch_size] for i in range(0, len(contents), max_batch_size)]

    # Process each batch and collect the results
    all_embeddings = []
    for batch in batches:
        all_embeddings += get_embeddings(batch.tolist())

    return pd.Series(all_embeddings)

In [0]:
(spark.readStream.table('pdf_raw')
      .withColumn("content", F.explode(read_as_chunk("content")))
      .withColumn("embedding", get_embedding("content"))
      .selectExpr('path as url', 'content', 'embedding')
  .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/pdf_chunks')
    .table('pdf_reports').awaitTermination())

In [0]:
%sql
SELECT * FROM pdf_reports limit 10

In [0]:
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()

if not endpoint_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME):
    vsc.create_endpoint(name=VECTOR_SEARCH_ENDPOINT_NAME, endpoint_type="STANDARD")

wait_for_vs_endpoint_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME)
print(f"Endpoint named {VECTOR_SEARCH_ENDPOINT_NAME} is ready.")

[NOTICE] Using a notebook authentication token. Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True to VectorSearchClient().
Endpoint named onlytrends_vs_endpoint is ready.


In [0]:
from databricks.sdk import WorkspaceClient
import databricks.sdk.service.catalog as c

#The table we'd like to index
source_table_fullname = f"{catalog}.{db}.pdf_reports"
# Where we want to store our index
vs_index_fullname = f"{catalog}.{db}.pdf_reports_self_managed_vs_index"

if not index_exists(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname):
  print(f"Creating index {vs_index_fullname} on endpoint {VECTOR_SEARCH_ENDPOINT_NAME}...")
  vsc.create_delta_sync_index(
    endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
    index_name=vs_index_fullname,
    source_table_name=source_table_fullname,
    pipeline_type="TRIGGERED", #Sync needs to be manually triggered
    primary_key="id",
    embedding_dimension=1024, #Match your model embedding size (bge)
    embedding_vector_column="embedding"
  )
  #Let's wait for the index to be ready and all our embeddings to be created and indexed
  wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)
else:
  #Trigger a sync to update our vs content with the new data saved in the table
  wait_for_index_to_be_ready(vsc, VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname)
  vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).sync()

Error processing request 404 Client Error: Not Found for url: https://oregon.cloud.databricks.com/api/2.0/vector-search/endpoints/onlytrends_vs_endpoint/indexes/onlytrends.onlytrends_db.pdf_reports_self_managed_vs_index
Creating index onlytrends.onlytrends_db.pdf_reports_self_managed_vs_index on endpoint onlytrends_vs_endpoint...
Waiting for index to be ready, this can take a few min... {'detailed_state': 'PROVISIONING_INDEX', 'message': 'Delta sync Index creation is pending. Check latest status: https://dbc-a2aec960-4659.cloud.databricks.com/explore/data/onlytrends/onlytrends_db/pdf_reports_self_managed_vs_index', 'indexed_row_count': 0, 'ready': False, 'index_url': 'dbc-a2aec960-4659.cloud.databricks.com/api/2.0/vector-search/endpoints/onlytrends_vs_endpoint/indexes/onlytrends.onlytrends_db.pdf_reports_self_managed_vs_index'} - pipeline url:dbc-a2aec960-4659.cloud.databricks.com/api/2.0/vector-search/endpoints/onlytrends_vs_endpoint/indexes/onlytrends.onlytrends_db.pdf_reports_self_m

In [0]:
question = "What are some upcoming trends?"

response = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": [question]})
embeddings = [e['embedding'] for e in response.data]

results = vsc.get_index(VECTOR_SEARCH_ENDPOINT_NAME, vs_index_fullname).similarity_search(
  query_vector=embeddings[0],
  columns=["url", "content"],
  filters= {"url LIKE": "DI_Tech-trends-2024"},
  num_results=1)
docs = results.get('result', {}).get('data_array', [])
pprint(docs)

[['dbfs:/Volumes/onlytrends/onlytrends_db/volume_reports/DI_Tech-trends-2024.pdf',
  'Tech Trends 2024\n'
  'Deloitte’s 15th annual Tech Trends report helps business and technology leaders separate signal '
  'from noise and embrace technology’s evolution as a tool to revolutionize business.\n'
  'Trend Lines The future is already here, albeit unevenly distributed\n'
  'Our technology case studies form a collage of how pioneering leaders and organizations are '
  'building distinct facets of the future, today, through emerging technology innovation.\n'
  'deloitte.com/us/trendlines\n'
  'Trending the trends: Last decade of research\n'
  'INTERACTION\n'
  'INFORMATION\n'
  'COMPUTATION\n'
  'BUSINESS OF TECHNOLOGY\n'
  'CYBER AND TRUST\n'
  'Interfaces in new places\n'
  'Genie out of the bottle\n'
  'Smarter, not harder\n'
  '2024\n'
  'From DevOps to DevEx\n'
  'Defending reality\n'
  'Core workout\n'
  'Through the glass\n'
  'Opening up to AI\n'
  'Above the clouds\n'
  '2023\n'
  '

In [0]:
%pip install mlflow==2.10.1 lxml==4.9.3 langchain==0.1.5 databricks-vectorsearch==0.22 cloudpickle==2.2.1 databricks-sdk==0.18.0 cloudpickle==2.2.1 pydantic==2.5.2
%pip install pip mlflow[databricks]==2.10.1

In [0]:
from langchain.prompts import PromptTemplate
from langchain_community.chat_models import ChatDatabricks
from langchain.schema.output_parser import StrOutputParser

prompt = PromptTemplate(
  input_variables = ["question"],
  template = "You are an assistant. Give a short answer to this question: {question}"
)
chat_model = ChatDatabricks(endpoint="databricks-dbrx-instruct", max_tokens = 500)

chain = (
  prompt
  | chat_model
  | StrOutputParser()
)
print(chain.invoke({"question": "What are the top 5 technological trends?"}))

1. Artificial Intelligence (AI) and Machine Learning (ML)
2. Internet of Things (IoT) and Edge Computing
3. Cybersecurity and Privacy
4. Augmented Reality (AR) and Virtual Reality (VR)
5. 5G and Advanced Connectivity


In [0]:
prompt_with_history_str = """
Your are a OnlyTrends chatbot. Please answer questions related to trends and investment strategies only. If you don't know or not related to OnlyTrends, don't answer.

Here is a history between you and a human: {chat_history}

Now, please answer this question: {question}
"""

prompt_with_history = PromptTemplate(
  input_variables = ["chat_history", "question"],
  template = prompt_with_history_str
)

In [0]:
from langchain.schema.runnable import RunnableLambda
from operator import itemgetter

#The question is the last entry of the history
def extract_question(input):
    return input[-1]["content"]

#The history is everything before the last question
def extract_history(input):
    return input[:-1]

chat_model = ChatDatabricks(endpoint="databricks-dbrx-instruct", max_tokens = 200)

is_question_about_databricks_str = """
You are classifying documents to know if this question is related to future trends in technology, finance, banking, ecommerce, investment strategies, crypto and AI. Also answer no if the last part is inappropriate. 

Here are some examples:

Question: Knowing this followup history: What are some future trends?, classify this question: Do you have more details?
Expected Response: Yes

Question: Knowing this followup history:What are some future trends?, classify this question: Write me a song.
Expected Response: No

Only answer with "yes" or "no". 

Knowing this followup history: {chat_history}, classify this question: {question}
"""

is_question_about_databricks_prompt = PromptTemplate(
  input_variables= ["chat_history", "question"],
  template = is_question_about_databricks_str
)

is_about_databricks_chain = (
    {
        "question": itemgetter("messages") | RunnableLambda(extract_question),
        "chat_history": itemgetter("messages") | RunnableLambda(extract_history),
    }
    | is_question_about_databricks_prompt
    | chat_model
    | StrOutputParser()
)

#Returns "Yes" as this is about Databricks: 
print(is_about_databricks_chain.invoke({
    "messages": [
        {"role": "user", "content": "What are some future trends?"}, 
        {"role": "assistant", "content": " Artificial Intelligence (AI) and Machine Learning (ML)"}, 
        {"role": "user", "content": "Tell me more about it"}
    ]
}))

Yes.


In [0]:
index_name=f"{catalog}.{db}.pdf_reports_self_managed_vs_index"
host = "https://" + spark.conf.get("spark.databricks.workspaceUrl")
print(host)

#Let's make sure the secret is properly setup and can access our vector search index. Check the quick-start demo for more guidance
test_demo_permissions(host, secret_scope="dbdemos", secret_key="rag_sp_token", vs_endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME, index_name=index_name, embedding_endpoint_name="databricks-bge-large-en", managed_embeddings = False)

https://dbc-a2aec960-4659.cloud.databricks.com
Secret and permissions seems to be properly setup, you can continue the demo!


In [0]:
from databricks.vector_search.client import VectorSearchClient
from langchain_community.vectorstores import DatabricksVectorSearch
from langchain_community.embeddings import DatabricksEmbeddings
from langchain.chains import RetrievalQA

os.environ['DATABRICKS_TOKEN'] = dbutils.secrets.get("dbdemos", "rag_sp_token")

embedding_model = DatabricksEmbeddings(endpoint="databricks-bge-large-en")

def get_retriever(persist_dir: str = None):
    os.environ["DATABRICKS_HOST"] = host
    #Get the vector search index
    vsc = VectorSearchClient(workspace_url=host, personal_access_token=os.environ["DATABRICKS_TOKEN"])
    vs_index = vsc.get_index(
        endpoint_name=VECTOR_SEARCH_ENDPOINT_NAME,
        index_name=index_name
    )

    # Create the retriever
    vectorstore = DatabricksVectorSearch(
        vs_index, text_column="content", embedding=embedding_model, columns=["url"]
    )
    return vectorstore.as_retriever(search_kwargs={'k': 4, "filters": {"url LIKE": "DI_Tech-trends-2024"},})

retriever = get_retriever()

retrieve_document_chain = (
    itemgetter("messages") 
    | RunnableLambda(extract_question)
    | retriever
)
print(retrieve_document_chain.invoke({"messages": [{"role": "user", "content": "What are some top trends of 2024?"}]}))

[NOTICE] Using a Personal Authentication Token (PAT). Recommended for development only. For improved performance, please use Service Principal based authentication. To disable this message, pass disable_notice=True to VectorSearchClient().
[Document(page_content='Tech Trends 2024\nDeloitte’s 15th annual Tech Trends report helps business and technology leaders separate signal from noise and embrace technology’s evolution as a tool to revolutionize business.\nTrend Lines The future is already here, albeit unevenly distributed\nOur technology case studies form a collage of how pioneering leaders and organizations are building distinct facets of the future, today, through emerging technology innovation.\ndeloitte.com/us/trendlines\nTrending the trends: Last decade of research\nINTERACTION\nINFORMATION\nCOMPUTATION\nBUSINESS OF TECHNOLOGY\nCYBER AND TRUST\nInterfaces in new places\nGenie out of the bottle\nSmarter, not harder\n2024\nFrom DevOps to DevEx\nDefending reality\nCore workout\nThr

In [0]:
from langchain.schema.runnable import RunnableBranch

generate_query_to_retrieve_context_template = """
Based on the chat history below, we want you to generate a query for an external data source to retrieve relevant documents so that we can better answer the question. The query should be in natual language. The external data source uses similarity search to search for relevant documents in a vector space. So the query should be similar to the relevant documents semantically. Answer with only the query. Do not add explanation.

Chat history: {chat_history}

Question: {question}
"""

generate_query_to_retrieve_context_prompt = PromptTemplate(
  input_variables= ["chat_history", "question"],
  template = generate_query_to_retrieve_context_template
)

generate_query_to_retrieve_context_chain = (
    {
        "question": itemgetter("messages") | RunnableLambda(extract_question),
        "chat_history": itemgetter("messages") | RunnableLambda(extract_history),
    }
    | RunnableBranch(  #Augment query only when there is a chat history
      (lambda x: x["chat_history"], generate_query_to_retrieve_context_prompt | chat_model | StrOutputParser()),
      (lambda x: not x["chat_history"], RunnableLambda(lambda x: x["question"])),
      RunnableLambda(lambda x: x["question"])
    )
)

#Let's try it
output = generate_query_to_retrieve_context_chain.invoke({
    "messages": [
        {"role": "user", "content": "What are some top trends of 2024"}
    ]
})
print(f"Test retriever query without history: {output}")

output = generate_query_to_retrieve_context_chain.invoke({
    "messages": [
        {"role": "user", "content": "What are some top trends of 2024"}, 
        {"role": "assistant", "content": "Artificial Intelligence (AI) and Machine Learning (ML)"}, 
        {"role": "user", "content": "Tell me more about artificial intelligence"}
    ]
})
print(f"Test retriever question, summarized with history: {output}")

Test retriever query without history: What are some top trends of 2024
Test retriever question, summarized with history: 2024 Trends in Artificial Intelligence: Advances, Applications, and Implications


In [0]:
from langchain.schema.runnable import RunnableBranch, RunnableParallel, RunnablePassthrough

question_with_history_and_context_str = """
You are a trustful assistant for OnlyTrends users. You are classifying documents to know if this question is related to future trends in technology, finance, banking, ecommerce, investment strategies, crypto and AI. If you do not know the answer to a question, you truthfully say you do not know. Read the discussion to get the context of the previous conversation. In the chat discussion, you are referred to as "system". The user is referred to as "user".

Discussion: {chat_history}

Here's some context which might or might not help you answer: {context}

Answer straight, do not repeat the question, do not start with something like: the answer to the question, do not add "AI" in front of your answer, do not say: here is the answer, do not mention the context or the question.

Based on this history and context, answer this question: {question}
"""

question_with_history_and_context_prompt = PromptTemplate(
  input_variables= ["chat_history", "context", "question"],
  template = question_with_history_and_context_str
)

def format_context(docs):
    return "\n\n".join([d.page_content for d in docs])

def extract_source_urls(docs):
    return [d.metadata["url"] for d in docs]

relevant_question_chain = (
  RunnablePassthrough() |
  {
    "relevant_docs": generate_query_to_retrieve_context_prompt | chat_model | StrOutputParser() | retriever,
    "chat_history": itemgetter("chat_history"), 
    "question": itemgetter("question")
  }
  |
  {
    "context": itemgetter("relevant_docs") | RunnableLambda(format_context),
    "sources": itemgetter("relevant_docs") | RunnableLambda(extract_source_urls),
    "chat_history": itemgetter("chat_history"), 
    "question": itemgetter("question")
  }
  |
  {
    "prompt": question_with_history_and_context_prompt,
    "sources": itemgetter("sources")
  }
  |
  {
    "result": itemgetter("prompt") | chat_model | StrOutputParser(),
    "sources": itemgetter("sources")
  }
)

irrelevant_question_chain = (
  RunnableLambda(lambda x: {"result": 'I cannot answer questions that are not about OnlyTrends.', "sources": []})
)

branch_node = RunnableBranch(
  (lambda x: "yes" in x["question_is_relevant"].lower(), relevant_question_chain),
  (lambda x: "no" in x["question_is_relevant"].lower(), irrelevant_question_chain),
  irrelevant_question_chain
)

full_chain = (
  {
    "question_is_relevant": is_about_databricks_chain,
    "question": itemgetter("messages") | RunnableLambda(extract_question),
    "chat_history": itemgetter("messages") | RunnableLambda(extract_history),    
  }
  | branch_node
)

In [0]:
import json
non_relevant_dialog = {
    "messages": [
        {"role": "user", "content": "What are some future top trends"}, 
        {"role": "assistant", "content": "AI and machine learning"}, 
        {"role": "user", "content": "Why is the sky blue?"}
    ]
}
print(f'Testing with a non relevant question...')
response = full_chain.invoke(non_relevant_dialog)
display_chat(non_relevant_dialog["messages"], response)

Testing with a non relevant question...


In [0]:
dialog = {
    "messages": [
        {"role": "user", "content": "What are some top trends of 2024? Give it in a format of title and description"}, 
        # {"role": "assistant", "content": "Artificial intelligence and machine learning"}, 
        # {"role": "user", "content": "Should I invest in NVIDIA then?"}
    ]
}
print(f'Testing with relevant history and question...')
response = full_chain.invoke(dialog)
display_chat(dialog["messages"], response)

Testing with relevant history and question...


In [0]:
import requests
import json
payload = json.dumps(response)
headers = {
  'Content-Type': 'application/json'
}

response = requests.request("PUT", API_URL, headers=headers, data=payload)

print(response.text)