https://ai-cookbook.io/10-min-demo/mosaic-ai-agents-demo-dbx-notebook.html

In [0]:
%pip install -U -qqqq databricks-agents mlflow mlflow-skinny databricks-vectorsearch databricks-sdk langchain==0.2.1 langchain_core==0.2.5 langchain_community==0.2.4 



In [0]:
dbutils.library.restartPython()

In [0]:
import os
from databricks.sdk.core import DatabricksError
from databricks.sdk import WorkspaceClient

CURRENT_FOLDER = os.getcwd()
QUICK_START_REPO_URL = "https://github.com/epec254/staging-cookbook.git"
QUICK_START_REPO_URL = "https://github.com/databricks/genai-cookbook.git"
QUICK_START_REPO_SAVE_FOLDER = "genai-cookbook"

if os.path.isdir(QUICK_START_REPO_SAVE_FOLDER):
    raise Exception(
        f"{QUICK_START_REPO_SAVE_FOLDER} folder already exists, please change the variable QUICK_START_REPO_SAVE_FOLDER to be a non-existant path."
    )

# Clone the repo
w = WorkspaceClient()
try:
    w.repos.create(
        url=QUICK_START_REPO_URL, provider="github", path=f"{CURRENT_FOLDER}/{QUICK_START_REPO_SAVE_FOLDER}"
    )
    print(f"Cloned sample code repo to: {QUICK_START_REPO_SAVE_FOLDER}")
except DatabricksError as e:
    if e.error_code == "RESOURCE_ALREADY_EXISTS":
        print("Repo already exists. Skipping creation")
    else:
        raise Exception(
            f"Failed to clone the quick start code.  You can manually import this by creating a Git folder from the contents of {QUICK_START_REPO_URL} in the {QUICK_START_REPO_SAVE_FOLDER} folder in your workspace and then re-running this Notebook."
        )

In [0]:
# Use the current user name to create any necesary resources
w = WorkspaceClient()
user_name = w.current_user.me().user_name.split("@")[0].replace(".", "")

# UC Catalog & Schema where outputs tables/indexs are saved
# If this catalog/schema does not exist, you need create catalog/schema permissions.
UC_CATALOG = f'{user_name}_catalog'
UC_CATALOG = "rag_m1_release"
UC_SCHEMA = f'rag_{user_name}'
UC_SCHEMA = "10_mins"

# UC Model name where the POC chain is logged
UC_MODEL_NAME = f"{UC_CATALOG}.{UC_SCHEMA}.{user_name}_agent_quick_start"

# Vector Search endpoint where index is loaded
# If this does not exist, it will be created
VECTOR_SEARCH_ENDPOINT = f'{user_name}_vector_search'

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.vectorsearch import EndpointStatusState, EndpointType
from databricks.sdk.service.serving import EndpointCoreConfigInput, EndpointStateReady
from databricks.sdk.errors import ResourceDoesNotExist
import os

w = WorkspaceClient()

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import NotFound, PermissionDenied
w = WorkspaceClient()

# Create UC Catalog if it does not exist, otherwise, raise an exception
try:
    _ = w.catalogs.get(UC_CATALOG)
    print(f"PASS: UC catalog `{UC_CATALOG}` exists")
except NotFound as e:
    print(f"`{UC_CATALOG}` does not exist, trying to create...")
    try:
        _ = w.catalogs.create(name=UC_CATALOG)
    except PermissionDenied as e:
        print(f"FAIL: `{UC_CATALOG}` does not exist, and no permissions to create.  Please provide an existing UC Catalog.")
        raise ValueError(f"Unity Catalog `{UC_CATALOG}` does not exist.")
        
# Create UC Schema if it does not exist, otherwise, raise an exception
try:
    _ = w.schemas.get(full_name=f"{UC_CATALOG}.{UC_SCHEMA}")
    print(f"PASS: UC schema `{UC_CATALOG}.{UC_SCHEMA}` exists")
except NotFound as e:
    print(f"`{UC_CATALOG}.{UC_SCHEMA}` does not exist, trying to create...")
    try:
        _ = w.schemas.create(name=UC_SCHEMA, catalog_name=UC_CATALOG)
        print(f"PASS: UC schema `{UC_CATALOG}.{UC_SCHEMA}` created")
    except PermissionDenied as e:
        print(f"FAIL: `{UC_CATALOG}.{UC_SCHEMA}` does not exist, and no permissions to create.  Please provide an existing UC Schema.")
        raise ValueError("Unity Catalog Schema `{UC_CATALOG}.{UC_SCHEMA}` does not exist.")

In [0]:
# Create the Vector Search endpoint if it does not exist
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.vectorsearch import EndpointType
w = WorkspaceClient()
vector_search_endpoints = w.vector_search_endpoints.list_endpoints()
if sum([VECTOR_SEARCH_ENDPOINT == ve.name for ve in vector_search_endpoints]) == 0:
    print(f"Please wait, creating Vector Search endpoint `{VECTOR_SEARCH_ENDPOINT}`.  This can take up to 20 minutes...")
    w.vector_search_endpoints.create_endpoint_and_wait(VECTOR_SEARCH_ENDPOINT, endpoint_type=EndpointType.STANDARD)

# Make sure vector search endpoint is online and ready.
w.vector_search_endpoints.wait_get_endpoint_vector_search_endpoint_online(VECTOR_SEARCH_ENDPOINT)

print(f"PASS: Vector Search endpoint `{VECTOR_SEARCH_ENDPOINT}` exists")

Create the Vector Search Index

In [0]:
 UC locations to store the chunked documents & index
CHUNKS_DELTA_TABLE = f"{UC_CATALOG}.{UC_SCHEMA}.databricks_docs_chunked2"
CHUNKS_VECTOR_INDEX = f"{UC_CATALOG}.{UC_SCHEMA}.databricks_docs_chunked_index2"

In [0]:
from pyspark.sql import SparkSession
from databricks.vector_search.client import VectorSearchClient

# Workspace URL for printing links to the delta table/vector index
workspace_url = SparkSession.getActiveSession().conf.get(
    "spark.databricks.workspaceUrl", None
)

# Vector Search client
vsc = VectorSearchClient(disable_notice=True)

# Load the chunked data to Delta Table & enable change-data capture to allow the table to sync to Vector Search
chunked_docs_df = spark.read.parquet(
    f"file:{QUICK_START_REPO_SAVE_FOLDER}/quick_start_demo/chunked_databricks_docs.snappy.parquet"
)
chunked_docs_df.write.format("delta").saveAsTable(CHUNKS_DELTA_TABLE)
spark.sql(
    f"ALTER TABLE {CHUNKS_DELTA_TABLE} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
)

print(
    f"View Delta Table at: https://{workspace_url}/explore/data/{UC_CATALOG}/{UC_SCHEMA}/{CHUNKS_DELTA_TABLE.split('.')[-1]}"
)

# Embed and sync chunks to a vector index
print(
    f"Embedding docs & creating Vector Search Index, this will take ~5 - 10 minutes.\nView Index Status at: https://{workspace_url}/explore/data/{UC_CATALOG}/{UC_SCHEMA}/{CHUNKS_VECTOR_INDEX.split('.')[-1]}"
)

index = vsc.create_delta_sync_index_and_wait(
    endpoint_name=VECTOR_SEARCH_ENDPOINT,
    index_name=CHUNKS_VECTOR_INDEX,
    primary_key="chunk_id",
    source_table_name=CHUNKS_DELTA_TABLE,
    pipeline_type="TRIGGERED",
    embedding_source_column="chunked_text",
    embedding_model_endpoint_name="databricks-gte-large-en",
)

Deploy to the review application