In [0]:
%run ./config

In [0]:
%pip install azure-ai-inference azure-identity semantic-kernel flask azure-ai-documentintelligence pandas azure-storage-blob langchain langchain-community langchain-openai langchainhub openai azure-search-documents mflow azure-ai-inference azure-ai-ml databricks-sdk mlflow databricks-agents --quiet
dbutils.library.restartPython()


In [0]:
import os
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from openai import AzureOpenAI


os.environ["AZURE_CLIENT_ID"] = "7318b99c-c3ab-483e-979f-34c7e6bad8ea"
os.environ["AZURE_TENANT_ID"] = "7f6a2cf9-5e4e-46ae-95d4-74016c1df1a6"
os.environ["AZURE_CLIENT_SECRET"] = dbutils.secrets.get(scope="azure",key="rag")

# 1. Initialize the credential object
credential = DefaultAzureCredential()

# 2. ⭐ Create the callable token provider using the helper function
# The scope tells Azure what service we want to access.
token_provider = get_bearer_token_provider(
    credential, "https://cognitiveservices.azure.com/.default"
)

# 3. Initialize the client with the callable token provider
azure_openai_endpoint = "https://aifoundry6666.openai.azure.com/"

client = AzureOpenAI(
    azure_endpoint=azure_openai_endpoint,
    api_version="2024-02-01",
    azure_ad_token_provider=token_provider  # Pass the new callable provider here
)

print("✅ Successfully initialized AzureOpenAI client.")

# 4. Define your model deployment name
deployment_name = "gpt-4.1-mini"  # Make sure this deployment exists in your resource

# 5. Construct the messages list
messages = [
    {"role": "system", "content": "You are a helpful assistant."},
    {"role": "user", "content": "What is the primary function of a CPU in a computer?"}
]

# 6. Send the chat completion request
try:
    print(f"Sending request to deployment '{deployment_name}'...")
    response = client.chat.completions.create(
        model=deployment_name,
        messages=messages,
        temperature=0.7,
        max_tokens=800
    )

    # Extract and print the response
    final_answer = response.choices[0].message.content
    print("\nAssistant's Answer:")
    print(final_answer)

    # Print token usage
    print("\n--- Token Usage ---")
    print(f"Prompt tokens: {response.usage.prompt_tokens}")
    print(f"Completion tokens: {response.usage.completion_tokens}")
    print(f"Total tokens: {response.usage.total_tokens}")

except Exception as e:
    print(f"❌ An error occurred: {e}")

In [0]:
import os
from pathlib import Path
# from azure.core.credentials import AzureKeyCredential # Not needed if using DefaultAzureCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
from azure.ai.agents.models import FunctionTool # Make sure this is imported!
from dotenv import load_dotenv
from azure.ai.documentintelligence import DocumentIntelligenceClient
from langchain import hub
from langchain_openai import AzureChatOpenAI
from langchain_community.document_loaders import AzureAIDocumentIntelligenceLoader
from langchain_openai import AzureOpenAIEmbeddings
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langchain.vectorstores.azuresearch import AzureSearch
from langchain_community.document_loaders import WebBaseLoader

load_dotenv() 

In [0]:
from azure.storage.blob import BlobServiceClient

# Config
STORAGE_ACCOUNT_NAME ="tfstate6666"
CONTAINER_NAME = "pdfs"

storage_account_url = f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net"

try:
  
    blob_service_client = BlobServiceClient(account_url=storage_account_url, credential=credential)
    container_client = blob_service_client.get_container_client(CONTAINER_NAME)
    print(f"✅ Blob service client initialized.")
    print(list(container_client.list_blobs()))
except Exception as e:
    print(f"Error during client initialization: {e}")
    exit()


In [0]:


# Define volume, folder, and file details.
catalog            = 'rag'
schema             = 'development'
volume             = 'blob'
folder             = 'markdown'
volume_path        = f"/Volumes/{catalog}/{schema}/{volume}/{folder}" # /Volumes/main/default/my-volume

display(dbutils.fs.ls(volume_path))



### load the files into a table for versioning with checkpointing 

In [0]:

df = (spark.readStream
        .format('cloudFiles')
        .option('cloudFiles.format', 'BINARYFILE')
        .option("pathGlobFilter", "*.md")
        .load('dbfs:'+volume_path))

# Write the data as a Delta table
(df.writeStream
  .trigger(availableNow=True)
  .option("checkpointLocation", f'dbfs:{volume_path}/checkpoints/')
  .table('rag.development.md_raw').awaitTermination())

In [0]:
%sql 
SELECT path, modificationTime FROM rag.development.md_raw LIMIT 2;


In [0]:
df = spark.sql("SELECT content FROM rag.development.md_raw")
# binary_data_column = df.select("content")
# md_binary_data = binary_data_column.collect()[0].content

In [0]:
from io import BytesIO
from langchain.text_splitter import MarkdownHeaderTextSplitter
from langchain_core.documents import Document



# Spark/Databricks table and column names
TABLE_NAME = "rag.development.md_raw"
CONTENT_COLUMN = "content"  # The column with binary Markdown data
SOURCE_COLUMN = "path" # An identifier column (e.g., file name, URI, or ID)
chunk_size = 1000
overlap = 100


# This will split the document based on Markdown headers (H1, H2, H3)
# and add the header text to each chunk's metadata.
headers_to_split_on = [
    ("#", "PageTitle"),
    ("##", "PageSubtitle"),
    ("###", "PageSection"),
]

# Initialize Text Splitter 

markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)

# Process Markdown from the Spark DataFrame 

all_final_chunks = []
rows_processed = 0

print(f"🔄 Querying Spark table '{TABLE_NAME}'...")

try:
    # Select the content and a source identifier column
    # Using .toLocalIterator() is memory-efficient for large tables
    query = f"SELECT {CONTENT_COLUMN}, {SOURCE_COLUMN} FROM {TABLE_NAME}"
    df = spark.sql(query)
    
    print(f"✅ Query successful. Processing rows...")

    for row in df.toLocalIterator():
        rows_processed += 1
        binary_md_data = row[CONTENT_COLUMN]
        source_identifier = row[SOURCE_COLUMN]

        print(f"  - Processing source: {source_identifier}")

        if not binary_md_data:
            print(f"    ⚠️ Warning: No binary data found for source '{source_identifier}'. Skipping.")
            continue
        
        # a. ⭐️ Decode the binary Markdown data into a text string
        try:
            # Markdown is text, so we decode it (UTF-8 is standard)
            md_text = binary_md_data.decode('utf-8')

        except Exception as e:
            print(f"    ❌ Error decoding Markdown for source '{source_identifier}'. Skipping. Error: {e}")
            continue

        # Split the text using the Markdown splitter
        # This method returns Document objects directly, including metadata for the headers.
        final_chunks = markdown_splitter.split_text(md_text)
        
        # Add the original source identifier to each chunk's metadata
        # The splitter already created metadata with header info, so we just add to it.
        for chunk in final_chunks:
            chunk.metadata["source"] = source_identifier
        
        all_final_chunks.extend(final_chunks)

except Exception as e:
    print(f"❌ An error occurred during Spark processing: {e}")




### Create a table in the catalog to hold the chunks & emeddings for versioning

In [0]:
%sql
CREATE TABLE IF NOT EXISTS rag.development.md_chunks (
  id BIGINT GENERATED BY DEFAULT AS IDENTITY, --Need a PK
  source STRING,
  PageTitle STRING, 
  PageSubtitle STRING,
  PageSection STRING,
  content STRING,
  embedding ARRAY <FLOAT>
) TBLPROPERTIES (delta.enableChangeDataFeed = true);

In [0]:
import os
from azure.identity import DefaultAzureCredential, get_bearer_token_provider
from openai import AzureOpenAI

# Target the delta table
TARGET_TABLE_NAME = "rag.development.md_chunks"

if final_chunks:
    print(f"Processing {len(final_chunks)} document chunks...")
    credential = DefaultAzureCredential()
    # 1. 🤖 Generate Embeddings from Azure OpenAI
    print("Generating embeddings for all chunks...")
    embedding_endpoint = "https://aifoundry6666.cognitiveservices.azure.com/openai/deployments/text-embedding-3-large/embeddings?api-version=2023-05-15"
    embedding_model_name = "text-embedding-3-large" 
    token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default")

    embedding_client = AzureOpenAI(
        api_version="2024-02-01",
        azure_endpoint=embedding_endpoint,
        azure_ad_token_provider=token_provider,
    )

    # Extract text content and generate embeddings in a single call
    chunks_text = [doc.page_content for doc in final_chunks]
    embedding_response = embedding_client.embeddings.create(model=model_name, input=chunks_text)
    embeddings = [item.embedding for item in embedding_response.data]
    print(f"✅ Generated {len(embeddings)} embedding vectors.")

    # 2. 📝 Combine chunks, metadata, and embeddings
    # Use zip to pair each document with its corresponding embedding vector
    data_for_df = [
        {
            "page_content": doc.page_content, 
            "embedding": emb,                 # <-- Added the new embedding
            **doc.metadata
        }
        for doc, emb in zip(final_chunks, embeddings)
    ]

    # 3. 🔄 Create and Align the Spark DataFrame
    print("Creating Spark DataFrame with embeddings...")
    df_chunks = spark.createDataFrame(data_for_df)

    # Rename 'page_content' to 'content' to match the target Delta table schema
    if "page_content" in df_chunks.columns:
        df_chunks = df_chunks.withColumnRenamed("page_content", "content")

    # Define the columns in the desired order for the final table, now including 'embedding'
    schema_columns = [
        "source",
        "PageTitle",
        "PageSubtitle",
        "PageSection",
        "content",
        "embedding"  # <-- Added embedding to the schema definition
    ]
    
    # Filter the list to include only columns that exist in our DataFrame.
    # This prevents errors if a metadata field is not present in all chunks.
    final_columns_to_select = [col for col in schema_columns if col in df_chunks.columns]
    
    # Select and reorder the columns to match the table structure
    df_to_write = df_chunks.select(final_columns_to_select)

    # 4. 💾 Write the DataFrame to the Delta table
    print(f"Appending {df_to_write.count()} chunks with embeddings to Delta table: {TARGET_TABLE_NAME}...")
    
    (df_to_write.write
      .format("delta")
      .mode("append")
      .option("mergeSchema", "true") 
      .saveAsTable(TARGET_TABLE_NAME))
      
    print(f"✅ Successfully saved to {TARGET_TABLE_NAME}")
    
    # 5. ✅ Verify the written data
    # print("Verifying the written data...")
    # display(spark.sql(f"SELECT source, PageTitle, content, embedding FROM {TARGET_TABLE_NAME} LIMIT 2"))

else:
    print("⚠️ No chunks were generated, so no data was processed or saved.")

# Build the AI search index

In [0]:
from azure.identity import DefaultAzureCredential
import uuid
from azure.core.exceptions import ResourceExistsError
from openai import AzureOpenAI
from azure.search.documents import SearchClient
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.indexes.models import (
    SearchIndex,
    SearchField,
    SearchFieldDataType,
    SimpleField,
    SearchableField,
    VectorSearch,
    HnswAlgorithmConfiguration,
    VectorSearchProfile,
    ScoringProfile, # Import ScoringProfile
    TextWeights,      # Import TextWeights
)

index_name="ragamuffin-index"
    # --- Step 6: Define and Create the Search Index ---
print("3. Defining and creating search index schema...")
index_client = SearchIndexClient(endpoint="https://search6666.search.windows.net", credential=credential)
# Use the correct dimensions for your embedding model (e.g., 3072 for text-embedding-3-large)

fields = [
    SimpleField(name="id", type="Edm.String", key=True),
    SearchableField(name="content", type="Edm.String", searchable=True),
    SearchableField(name="source", type="Edm.String", searchable=True),
    # Add header fields for filtering and context
    SearchableField(name="page_title", type="Edm.String", filterable=True, facetable=True),
    SearchableField(name="page_subtitle", type="Edm.String", filterable=True, facetable=True),
    SearchableField(name="page_section", type="Edm.String", filterable=True, facetable=True),
    SearchField(name="content_vector", type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
                    searchable=True, vector_search_dimensions=os.getenv("EMBEDDING_DIMENSIONS"),
                    vector_search_profile_name="my-hnsw-profile")
]


vector_search = VectorSearch(
    profiles=[VectorSearchProfile(name="my-hnsw-profile", algorithm_configuration_name="my-hnsw-config")],
    algorithms=[HnswAlgorithmConfiguration(name="my-hnsw-config")]
)

# =Define the Scoring Profile to boost 'page_section' 
scoring_profile = ScoringProfile(
    name="boost_section_profile",
    text_weights=TextWeights(
        weights={
            "page_section": 5,
            "page_title": 3,
            "page_subtitle": 2,
            "content": 1
        }
    )
)

index = SearchIndex(name=index_name, 
                    fields=fields, 
                    vector_search=vector_search,
                    scoring_profiles=[scoring_profile]
                    )

try:
    index_client.create_index(index)
    print(f"   Index '{os.getenv("AZURE_SEARCH_INDEX_NAME")}' created.")
except ResourceExistsError:
    print(f"   Index '{os.getenv("AZURE_SEARCH_INDEX_NAME")}' already exists.")

# --- Step 7: Prepare and Upload Documents 📤 ---
print("4. Preparing and uploading documents to the index...")
documents_to_upload = []
for i, doc in enumerate(all_final_chunks):
    documents_to_upload.append({
        "id": str(uuid.uuid4()),
        "content": doc.page_content,
        "content_vector": embeddings[i],
        "source": doc.metadata.get("source"),
        "page_title": str(doc.metadata.get("Page Title")), 
        "page_subtitle": str(doc.metadata.get("Page Subtitle")),
        "page_section": str(doc.metadata.get("Page Section")) 
    })
    
search_client = SearchClient(endpoint=os.getenv("AZURE_SEARCH_ENDPOINT"), index_name=os.getenv("AZURE_SEARCH_INDEX_NAME"), credential=credential)
search_client.upload_documents(documents=documents_to_upload)
print("   ✅ Upload complete!")

 

## Use the Search SDK and track with mlflow to test retriever

In [0]:
from azure.search.documents.models import VectorizedQuery
import mlflow

# Assume embedding_client, search_client, and model_name are already configured
# from a previous setup.

@mlflow.trace()
def retrieve_from_azure_ai_search(query: str, top_k: int = 3):
    """
    Performs a vector search in Azure AI Search and logs details to MLflow.
    """
    AZURE_SEARCH_ENDPOINT= "https://search6666.search.windows.net"
    search_client = SearchClient(endpoint=AZURE_SEARCH_ENDPOINT,
                                  index_name=AZURE_SEARCH_ENDPOINT, 
                                  credential=credential)
    params = {
        "search_text": query,
        "embedding_model": model_name,
        "search_index_name": index_name,
        "top_k": top_k,
        "search_type": "hnsw",
        "embedding_dimensions": os.getenv("EMBEDDING_DIMENSIONS"),
        "chunk_size": chunk_size,
        "overlap": overlap,
    }
    mlflow.log_params(params)


    # Generate the query vector
    search_client = SearchClient(endpoint=AZURE_SEARCH_ENDPOINT, index_name=index_name, credential=credential)
    response = embedding_client.embeddings.create(input=query, model=model_name)
    query_vector = response.data[0].embedding

    vector_query = VectorizedQuery(
        vector=query_vector,
        k_nearest_neighbors=top_k,
        fields="content_vector"  # The name of the vector field in your index
    )

    # Perform the vector search
    results = search_client.search(
        select=["source","page_title","page_subtitle","page_section", "content"],
        vector_queries=[vector_query],
        top=top_k
    )

    # Format the retrieved documents
    retrieved_docs = [{"source": doc["source"],"page_title": doc["page_title"],"page_subtitle": doc["page_subtitle"],"page_subtitle": doc["page_subtitle"],"content": doc["content"],  "score": doc["@search.score"]} for doc in results]
        # --- Log RAG Metrics ---
    if retrieved_docs:
        scores = [doc["score"] for doc in retrieved_docs]
        mlflow.log_metric("retrieved_docs_count", len(retrieved_docs))
        mlflow.log_metric("average_retrieval_score", sum(scores) / len(scores))
        mlflow.log_metric("min_retrieval_score", min(scores))

    return retrieved_docs


# Create experiment and set tags
mlflow.set_experiment("/Users/huy.d@hotmail.com/RAG_with_Azure_AI_Search_exp")
description = "Evaluating retriever"
experiment_tags = {
    "project": "RAG",
    "domain": "DA",
    "purpose": "Retrieval evaluation"
}


mlflow.set_experiment_tags(experiment_tags)


# Use a context manager to ensure the run is properly managed 
with mlflow.start_run(run_name="Retrieval Test"):
    print("\nPerforming tracked retrieval inside an MLflow run...")
    
    retrieved_documents = retrieve_from_azure_ai_search(
        query="what is a ragamuffin",
        top_k=5
    )

    # print("\nRetrieved Documents:")
    # for doc in retrieved_documents:
    #     print(f"  page_section: {doc.get('page_section')}, Score: {doc.get('score'):.4f}, Text: {doc.get('content')}")

print("\n✅ Retrieval process tracked in MLflow. Run 'mlflow ui' to view the trace.")

# Build out the rag

In [0]:
import os
import yaml
import time
import mlflow
import pandas as pd 
from openai import AzureOpenAI
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential,get_bearer_token_provider
from azure.search.documents import SearchClient
from azure.search.documents.models import VectorizedQuery


# MLflow experiment setup
mlflow.set_experiment("/Users/huy.d@hotmail.com/RAG_with_Azure_AI_Search_exp")

description = "Evaluating RAG performance"

experiment_tags = {
    "project": "RAG",
    "domain": "DA",
    "purpose": "Retrieval evaluation"
}

OPENAI_API_VERSION="2024-02-01"
AZURE_OPENAI_ENDPOINT="https://aifoundry6666.cognitiveservices.azure.com/openai/deployments/gpt-4.1-mini/chat/completions?api-version=2025-01-01-preview"
AZURE_SEARCH_ENDPOINT="https://search6666.search.windows.net"
EMBEDDING_DIMENSIONS = 3072
EMBEDDING_MODEL_NAME="text-embedding-3-large"
chat_model_deployment="gpt-4.1-mini"

mlflow.set_experiment_tags(experiment_tags)
mlflow.set_active_model(name="rag-dev")

credential = DefaultAzureCredential()
cognitive_services_scope = "https://cognitiveservices.azure.com/.default"
token_provider = get_bearer_token_provider(
    credential, 
    cognitive_services_scope
)

#clients
aoai_client = AzureOpenAI(
    api_version=OPENAI_API_VERSION,
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
    azure_ad_token_provider=token_provider,
)

# Initialize the credential and client
embedding_client = AzureOpenAI(
        api_version="2024-02-01",
        azure_endpoint=embedding_endpoint,
        azure_ad_token_provider=token_provider,
    )

# Define RAG functions 
def retrieve_documents(query, top_k):


    embedding_response = embedding_client.embeddings.create(model=EMBEDDING_MODEL_NAME, input=query)
    search_client = SearchClient(
                                endpoint=AZURE_SEARCH_ENDPOINT, 
                                index_name=index_name, 
                                 credential=credential
                                 )
    query_vector = embedding_response.data[0].embedding
    vector_query = VectorizedQuery(
    vector=query_vector,
    k_nearest_neighbors=top_k,
    fields="content_vector"  # The name of the vector field in ai search index
    )
    results = search_client.search(
        select=["source","page_title","page_subtitle","page_subtitle", "content"],
        vector_queries=[vector_query],
        top=top_k)
    
    return [{"source": res["source"], "page_title": res["page_title"], "page_subtitle": res["page_subtitle"], "content": res["content"]} for res in results]

def generate_answer(query, retrieved_docs):
   
    context = "\n\n".join([doc["content"] for doc in retrieved_docs])
    system_message = "You are an intelligent assistant..."
    user_message = f"CONTEXT:\n---\n{context}\n---\nQUESTION: {query}"
    messages = [{"role": "system", "content": system_message}, {"role": "user", "content": user_message}]
    chat_response = aoai_client.chat.completions.create(model=chat_model_deployment, 
                                                        messages=messages, 
                                                        temperature=0.1)
    final_answer = chat_response.choices[0].message.content

    token_usage = chat_response.usage
    return chat_response.choices[0].message.content, user_message,token_usage # Return the prompt too


@mlflow.trace()
def run_rag_pipeline(query, top_k):
    """
    This function executes the full RAG pipeline and is traced by MLflow.
    """
    #Retrieval Step as a Span
    with mlflow.start_span("retrieval") as span:
        start_time = time.time()
        documents = retrieve_documents(query, top_k)
        retrieval_time = time.time() - start_time
        
        span.set_inputs({"query": query, "top_k": top_k})
        span.set_outputs({"documents": documents})
        span.set_attribute("duration_sec", round(retrieval_time, 2))

    # Generation Step as a Span
    with mlflow.start_span("generation") as span:
        start_time = time.time()
        final_answer, augmented_prompt,token_usage = generate_answer(query, documents)
        generation_time = time.time() - start_time
        
        span.set_inputs({"prompt": augmented_prompt})
        span.set_outputs({"answer": final_answer})
        span.set_attribute("duration_sec", round(generation_time, 2))
        
        if token_usage:
            span.set_attribute("prompt_tokens", token_usage.prompt_tokens)
            span.set_attribute("completion_tokens", token_usage.completion_tokens)
            span.set_attribute("total_tokens", token_usage.total_tokens)
    return final_answer, documents, augmented_prompt,generation_time,retrieval_time,token_usage

# Define the Delta table used as the source for the RAG knowledge base
SOURCE_DELTA_TABLE = "rag.development.md_chunks"

#Run and Track the Rag experiment 
user_query = "What are ragamuffins"
top_k_value = 5

with mlflow.start_run(run_name="RAG test") as run:
    print(f"🚀 Starting MLflow run: {run.info.run_name}")

    # 2. 🔗 Load the Delta table as an MLflow Dataset, providing the version
    print(f"Linking source dataset: {SOURCE_DELTA_TABLE}")
    source_dataset = mlflow.data.load_delta(
        table_name=SOURCE_DELTA_TABLE,
        version=latest_version  # Pass the version here
    )

    # 3. Log the dataset as an input to the run for traceability
    mlflow.log_input(source_dataset, context="source_documents")

    mlflow.log_params({
        "api_version": OPENAI_API_VERSION,
        "top_k": top_k_value,
        "search_text": user_query,
        "embedding_model": embedding_model_name,
        "search_index_name": index_name,
        "search_type": "hnsw",
        "embedding_dimensions": EMBEDDING_DIMENSIONS,
        "chunk_overlap": overlap,
        "chat_model": chat_model_deployment
    })

    # Execute the traced pipeline
    final_answer, documents, augmented_prompt, retrieval_time, generation_time,token_usage = run_rag_pipeline(
        query=user_query, 
        top_k=top_k_value
    )
    metrics_to_log = {
        "retrieval_time_sec": round(retrieval_time, 2),
        "generation_time_sec": round(generation_time, 2),
        "total_time_sec": round(retrieval_time + generation_time, 2)
    }
    if token_usage:
        metrics_to_log["prompt_tokens"] = token_usage.prompt_tokens
        metrics_to_log["completion_tokens"] = token_usage.completion_tokens
        metrics_to_log["total_tokens"] = token_usage.total_tokens
        
    mlflow.log_metrics(metrics_to_log)
    # You can still log overall metrics and artifacts if you wish
    rag_data = {
        "prompt": [user_query],
        "prompt with context": [augmented_prompt],
        "final_answer": [final_answer],
        "retrieved_documents": [str(documents)] # Convert list of docs to string for table storage
    }
    mlflow.log_table(data=pd.DataFrame(rag_data), artifact_file="rag_results.json")
    print("✅ MLflow run and trace completed.")
    print(f"\nFinal Answer:\n{final_answer}")

### assesment and review

In [0]:
%pip install databricks-agents

In [0]:
import mlflow
import mlflow.genai.datasets
import time


# 1. Create an evaluation dataset

# Replace with a Unity Catalog schema where you have CREATE TABLE permission
uc_schema = "rag.development"
# This table will be created in the above UC schema
evaluation_dataset_table_name = "muffin_generation_eval"

eval_dataset = mlflow.genai.datasets.create_dataset(
    uc_table_name=f"{uc_schema}.{evaluation_dataset_table_name}",
)
print(f"Created evaluation dataset: {uc_schema}.{evaluation_dataset_table_name}")