In [0]:
%pip install transformers==4.30.2 "unstructured[pdf,docx]==0.10.30" llama-index==0.9.40 databricks-vectorsearch==0.20 pydantic==1.10.9 mlflow==2.9.0 protobuf==3.20.0 openai==1.10.0 langchain-openai langchain torch torchvision torchaudio FlagEmbedding
dbutils.library.restartPython()

In [0]:
from unstructured.partition.auto import partition
import re
import io

def extract_doc_text(x : bytes) -> str:
  # Read files and extract the values with unstructured
  sections = partition(file=io.BytesIO(x))
  print(sections)
  def clean_section(txt):
    txt = re.sub(r'\n', '', txt)
    return re.sub(r' ?\.', '.', txt)
  # Default split is by section of document, concatenate them all together because we want to split by sentence instead.
  return "\n".join([clean_section(s.text) for s in sections]) 

In [0]:
from llama_index.langchain_helpers.text_splitter import SentenceSplitter
from llama_index.node_parser import SemanticSplitterNodeParser
from llama_index import Document, set_global_tokenizer
from transformers import AutoTokenizer
from pyspark.sql.functions import pandas_udf
from typing import Iterator
import pandas as pd
import os
import logging
from pyspark.sql import functions as F
import mypy_extensions
from openai import AzureOpenAI
from langchain_openai import AzureOpenAIEmbeddings

os.environ["AZURE_OPENAI_API_KEY"] = dbutils.secrets.get(scope='dev_demo', key='azure_openai_api_key')
os.environ["AZURE_OPENAI_ENDPOINT"] = "https://nous-ue2-openai-sbx-openai.openai.azure.com/"

embeddings = AzureOpenAIEmbeddings(
    azure_deployment="nous-ue2-openai-sbx-base-deploy-text-embedding-ada-002",
    openai_api_version="2023-05-15",
)

client = AzureOpenAI(
    api_key = dbutils.secrets.get(scope='dev_demo', key='azure_openai_api_key'),
    api_version = "2023-05-15",
    azure_endpoint = "https://nous-ue2-openai-sbx-openai.openai.azure.com/",
    )

# Reduce the arrow batch size as our PDF can be big in memory
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10)

os.environ["HF_HOME"] = '/tmp'

@pandas_udf("array<string>")
def read_as_chunk(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    #set embedding model
    # embed_model = "nous-ue2-openai-sbx-base-deploy-text-embedding-ada-002"
    #set llama2 as tokenizer to match our model size (will stay below BGE 1024 limit)
    set_global_tokenizer(
      AutoTokenizer.from_pretrained("hf-internal-testing/llama-tokenizer", cache_dir = '/tmp')
    )
    # splitter = SemanticSplitterNodeParser(
    # buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embeddings
    # )
    #Sentence splitter from llama_index to split on sentences
    base_splitter = SentenceSplitter(chunk_size=500, chunk_overlap=25)
    def extract_and_split(b):
      txt = extract_doc_text(b)
      nodes = base_splitter.get_nodes_from_documents([Document(text=txt)])
      logging.info(f"from chunk function: {txt}")
      
      return [n.text for n in nodes]

    for x in batch_iter:
        yield x.apply(extract_and_split)

In [0]:
%sql
--Note that we need to enable Change Data Feed on the table to create the index
CREATE TABLE IF NOT EXISTS demo.hackathon.databricks_pdf_documentation_openai (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY,
  url STRING,
  content STRING,
  embedding ARRAY <FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true); 

In [0]:
def open_ai_embeddings(contents):
    embed_model = "nous-ue2-openai-sbx-base-deploy-text-embedding-ada-002"

    response = client.embeddings.create(
        input = contents,
        model = embed_model
    )

    return response.data[0].embedding

In [0]:
from pyspark.sql import functions as F
import mypy_extensions
import pandas as pd
import os

# # Reduce the arrow batch size as our PDF can be big in memory
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10)

# os.environ["HF_HOME"] = '/tmp'

volume_folder = f"/Volumes/demo/hackathon/privacy_act_docs/*"

temp = (spark.table('demo.hackathon.pdf_raw')
        .withColumn("content", F.explode(read_as_chunk("content")))
        .withColumn("embedding", F.lit(open_ai_embeddings("content")))
        .withColumn("id", F.monotonically_increasing_id())
        .withColumn("state", F.split(F.col("path"), "/")[5])
        .selectExpr('id', 'path as url', 'content', 'embedding', 'state')
        )

(temp.write
    .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/pdf_chunk_openai')
    .option("overwriteSchema", "true")
    .mode("overwrite")
    .saveAsTable('demo.hackathon.databricks_pdf_documentation_openai'))

In [0]:
# workaround for not having ML cluster
temp = (spark.table('demo.hackathon.databricks_pdf_documentation_openai')
        .withColumn("state", F.split(F.col("url"), "/")[5])
        .selectExpr('id', 'url', 'content', 'embedding', 'state')
        )

(temp.write
    .option("checkpointLocation", f'dbfs:{volume_folder}/checkpoints/pdf_chunk_openai')
    .option("overwriteSchema", "true")
    .mode("overwrite")
    .saveAsTable('demo.hackathon.databricks_pdf_documentation_openai'))

In [0]:
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()
vs_index_fullname = "demo.hackathon.openai_self_managed_index_v3"
endpoint_name = "openai_vector_search_v3"

In [0]:
vsc.create_endpoint(name=endpoint_name, endpoint_type="STANDARD")
vsc.create_delta_sync_index(
    endpoint_name=endpoint_name,
    index_name=vs_index_fullname,
    source_table_name="demo.hackathon.databricks_pdf_documentation_openai",
    pipeline_type="TRIGGERED", #Sync needs to be manually triggered
    primary_key="id",
    embedding_dimension=1536, #Match your model embedding size (bge)
    embedding_vector_column="embedding"
  )

In [0]:
# Resync our index with new data
vsc.get_index(endpoint_name, vs_index_fullname).sync()

In [0]:
from mlflow.deployments import get_deploy_client
from pprint import pprint
# bge-large-en Foundation models are available using the /serving-endpoints/databricks-bge-large-en/invocations api. 
deploy_client = get_deploy_client("databricks")
query = f"When does the Colorado Privacy Act take effect?"

results = vsc.get_index(endpoint_name, vs_index_fullname).similarity_search(
  query_vector = open_ai_embeddings(query),
  columns=["state", "url", "content"],
  num_results=10)
docs = results.get('result', {}).get('data_array', [])
pprint(docs)

In [0]:
# This filter is *working* hard-coded, but the RAG is still coming back with incorrect results.

# from mlflow.deployments import get_deploy_client
# from pprint import pprint
# # bge-large-en Foundation models are available using the /serving-endpoints/databricks-bge-large-en/invocations api. 
# deploy_client = get_deploy_client("databricks")
# query = f"When does the Colorado Privacy Act take effect?"
# #content
# results = vsc.get_index(endpoint_name, vs_index_fullname).similarity_search(
#   query_vector=open_ai_embeddings(query),
#   columns=["state", "url", "content"],
#   filters={"state": "Colorado"},
#   num_results=10)
# docs = results.get('result', {}).get('data_array', [])
# pprint(docs)


In [0]:
# Load model directly
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from FlagEmbedding import FlagReranker

tokenizer = AutoTokenizer.from_pretrained("BAAI/bge-reranker-large")
model = AutoModelForSequenceClassification.from_pretrained("BAAI/bge-reranker-large")

reranker = FlagReranker('BAAI/bge-reranker-large', use_fp16=True) # Setting use_fp16 to True speeds up computation with a slight performance degradation
query_and_docs = [[query, d[1]] for d in docs]

scores = reranker.compute_score(query_and_docs)

reranked_docs = sorted(list(zip(docs, scores)), key=lambda x: x[1], reverse=True)

pprint(reranked_docs)