In [0]:
%pip install -U -qqqq pydantic>=2.9.2 mlflow>=2.18.0 databricks-sdk


In [0]:
%pip install -qqqq -U -r requirements.txt

In [0]:
dbutils.library.restartPython()

In [0]:
from mlflow.utils import databricks_utils as du

if not du.is_in_databricks_notebook():
    from databricks.connect import DatabricksSession
    import os

    spark = DatabricksSession.builder.getOrCreate()
    os.environ["MLFLOW_TRACKING_URI"] = "databricks"

In [0]:
from databricks.sdk import WorkspaceClient

# Get current user's name & email
w = WorkspaceClient(
    host="", # will fillup late
    token="" # will fillup later
)
user_email = "mohammedarif@prudential.com.my"
user_name = user_email.split("@")[0].replace(".", "_")

# Get the workspace default UC catalog
default_catalog = "main"

print(f"User email: {user_email}")
print(f"User name: {user_name}")
print(f"Default UC catalog: {default_catalog}")

In [0]:
# First let's create our config directory
import os

os.makedirs("configs", exist_ok=True)
with open("configs/README.md", "w") as f:
    f.write("This folder stores the configurations generated by the notebooks.")

# COMMAND ----------

from pydantic import BaseModel, Field, field_validator, FieldValidationInfo
from typing import Optional
from databricks.sdk.errors.platform import ResourceDoesNotExist, NotFound
import json
import yaml

# Serialize and deserialize configs
def serializable_config_to_yaml(obj: BaseModel) -> str:
    data = obj.model_dump()
    return yaml.dump(data)

def serializable_config_to_yaml_file(obj: BaseModel, yaml_file_path: str) -> None:
    with open(yaml_file_path, "w") as handle:
        handle.write(serializable_config_to_yaml(obj))

In [0]:
# Define the AgentStorageConfig class for our agent's storage locations
class AgentStorageConfig(BaseModel):
    """
    Source data configuration for the Unstructured Data Pipeline. You can modify this class to add additional configuration settings.

    Args:
      uc_model_name (str):
        Required. Fully qualified name of the model in format: catalog.schema.model_name
      evaluation_set_uc_table (str):
        Required. Fully qualified name of the evaluation table in format: catalog.schema.table_name
    """

    uc_model_name: str = Field(..., min_length=1)
    evaluation_set_uc_table: str = Field(..., min_length=1)
    mlflow_experiment_name: str = Field(None)

    @field_validator("uc_model_name", "evaluation_set_uc_table")
    @classmethod
    def validate_uc_fqn_format(cls, v: str, info: FieldValidationInfo) -> str:
        if v.count(".") != 2:
            raise ValueError(
                f"{info.field_name} must be in format: catalog.schema.name"
            )
        return v

    @classmethod
    def escape_uc_fqn(cls, uc_fqn: str) -> str:
        """
        Escape the fully qualified name (FQN) for a Unity Catalog asset if it contains special characters.

        Args:
            uc_fqn (str): The fully qualified name of the asset.

        Returns:
            str: The escaped fully qualified name if it contains special characters, otherwise the original FQN.
        """
        if "-" in uc_fqn:
            parts = uc_fqn.split(".")
            escaped_parts = [f"`{part}`" for part in parts]
            return ".".join(escaped_parts)
        else:
            return uc_fqn

    def check_if_catalog_exists(self, catalog_name: str) -> bool:
        w = WorkspaceClient(
                host="", # will fillup late
                token="" # will fillup later
        )
        try:
            w.catalogs.get(name=catalog_name)
            return True
        except (ResourceDoesNotExist, NotFound):
            return False

    def check_if_schema_exists(self, catalog_name: str, schema_name: str) -> bool:
        w = WorkspaceClient(
                host="", # will fillup late
                token="" # will fillup later
        )
        try:
            full_name = f"{catalog_name}.{schema_name}"
            w.schemas.get(full_name=full_name)
            return True
        except (ResourceDoesNotExist, NotFound):
            return False

    def validate_catalog_and_schema(self) -> tuple[bool, str]:
        """
        Validates that the specified catalogs and schemas exist for both uc_model_name and evaluation_set_uc_table
        Returns:
            tuple[bool, str]: A tuple containing (success, error_message).
            If validation passes, returns (True, success_message). If validation fails, returns (False, error_message).
        """
        # Extract catalog and schema from uc_model_name
        model_catalog, model_schema, _ = self.uc_model_name.split(".")

        # Extract catalog and schema from evaluation_set_uc_table
        eval_catalog, eval_schema, _ = self.evaluation_set_uc_table.split(".")

        # Check model catalog and schema
        if not self.check_if_catalog_exists(model_catalog):
            return (
                False,
                f"Model catalog '{model_catalog}' does not exist. Please create it first.",
            )

        if not self.check_if_schema_exists(model_catalog, model_schema):
            return (
                False,
                f"Model schema '{model_schema}' does not exist in catalog '{model_catalog}'. Please create it first.",
            )

        # Check evaluation table catalog and schema
        if not self.check_if_catalog_exists(eval_catalog):
            return (
                False,
                f"Evaluation catalog '{eval_catalog}' does not exist. Please create it first.",
            )

        if not self.check_if_schema_exists(eval_catalog, eval_schema):
            return (
                False,
                f"Evaluation schema '{eval_schema}' does not exist in catalog '{eval_catalog}'. Please create it first.",
            )

        msg = f"All catalogs and schemas exist for both model `{self.uc_model_name}` and evaluation table `{self.evaluation_set_uc_table}`."
        print(msg)
        return (True, msg)
    
    def pretty_print(self):
        """Print the configuration in a readable format"""
        print("Agent Storage Configuration:")
        print(f"  UC Model Name: {self.uc_model_name}")
        print(f"  Evaluation Set UC Table: {self.evaluation_set_uc_table}")
        print(f"  MLflow Experiment Name: {self.mlflow_experiment_name}")

In [0]:
import mlflow 

# Agent storage configuration
agent_name = "story_builder_agent"
uc_catalog_name = f"{default_catalog}"
uc_schema_name = "default"

agent_storage_config = AgentStorageConfig(
    uc_model_name=f"{uc_catalog_name}.{uc_schema_name}.{agent_name}",  # UC model to store staging/production versions of the Agent's code/config
    evaluation_set_uc_table=f"{uc_catalog_name}.{uc_schema_name}.{agent_name}_eval_set",  # UC table to store the evaluation set
    mlflow_experiment_name=f"/Users/{user_email}/{agent_name}_mlflow_experiment",  # MLflow Experiment to store development versions of the Agent and their associated quality/cost/latency evaluation results + MLflow Traces
)

# Validate the UC catalog and schema for the Agent'smodel & evaluation table
# Commenting out actual validation as we don't have direct DB access in this implementation
# is_valid, msg = agent_storage_config.validate_catalog_and_schema()
# if not is_valid:
#     raise Exception(msg)

# Set the MLflow experiment, validating the path is valid
experiment_info = mlflow.set_experiment(agent_storage_config.mlflow_experiment_name)

# print(f"View the MLflow Experiment `{agent_storage_config.mlflow_experiment_name}` at {get_mlflow_experiment_url(experiment_info.experiment_id)}")

In [0]:
serializable_config_to_yaml_file(agent_storage_config, "./configs/agent_storage_config.yaml")

In [0]:
# import os

# # Create necessary directories
# dirs = [
#     "cookbook",
#     "cookbook/agents",
#     "cookbook/agents/utils",
#     "cookbook/config",
#     "cookbook/config/agents",
#     "cookbook/config/shared",
#     "cookbook/databricks_utils",
#     "cookbook/tools",
#     "tools"
# ]

# for dir_path in dirs:
#     os.makedirs(dir_path, exist_ok=True)

# # Create __init__.py files in each directory
# for dir_path in dirs:
#     with open(f"{dir_path}/__init__.py", "w") as f:
#         pass