# Data Setup

Before running this notebook, update the `config.py` file with your catalog and schema names.  


Run this cell to initialize resource names. You can edit resource names in the config file before running.

In [0]:
%run ./config

## Run this section to import data files from repo into tables/volumes under your specified catalog and schema

In [0]:
import sys
import os
import io
import base64
import logging
import pandas as pd

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.workspace import ObjectType, ExportFormat

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

def ensure_catalog_and_schema(spark, catalog: str, schema: str):
    """
    Ensure the Unity Catalog schema exists.

    Note:
    - We assume the catalog already exists / is managed outside this workshop.
    - If you want the workshop to create a dedicated catalog, uncomment the CREATE CATALOG line.
    """
    # If you want to create a new catalog for this workshop:
    # spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog}`")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{catalog}`.`{schema}`")
    logger.info(f"Ensured catalog `{catalog}` and schema `{schema}` exist.")

def ensure_volume(spark, catalog: str, schema: str, volume: str):
    """
    Ensure a Unity Catalog volume exists.
    Volumes provide a governed storage location for unstructured data under UC.
    """
    spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`{volume}`")
    logger.info(f"Ensured volume `{catalog}`.`{schema}`.`{volume}` exists.")

def read_workspace_csv_to_pandas(w: WorkspaceClient, workspace_path: str) -> pd.DataFrame:
    exported = w.workspace.export(workspace_path, format=ExportFormat.AUTO)
    if not hasattr(exported, "content") or exported.content is None:
        raise RuntimeError(f"Export returned no content for {workspace_path}")
    content_bytes = base64.b64decode(exported.content)
    return pd.read_csv(io.BytesIO(content_bytes))

def load_workspace_csv_to_delta_table(
    spark,
    w: WorkspaceClient,
    table: str,
    workspace_csv_path: str,
    catalog: str,
    schema: str,
    mode: str = "overwrite"
):
    full_table = f"`{catalog}`.`{schema}`.`{table}`"
    logger.info(f"Loading {full_table} from {workspace_csv_path}")

    pdf = read_workspace_csv_to_pandas(w, workspace_csv_path)
    sdf = spark.createDataFrame(pdf)

    (sdf.write
        .format("delta")
        .mode(mode)
        .option("overwriteSchema", "true" if mode == "overwrite" else "false")
        .saveAsTable(full_table))

    logger.info(f"Wrote {full_table} ({mode}).")

def copy_folder_to_volume(w: WorkspaceClient, source_folder: str, destination_folder: str, verbose: bool = True):
    """
    Recursively copy a folder from the Databricks Workspace to a UC Volume.

    source_folder:
      Workspace path like /Workspace/Users/.../data/tech_support/knowledge_base
    destination_folder:
      Volume path like /Volumes/<catalog>/<schema>/<volume>

    Implementation details:
    - Uses workspace.list() to traverse folders/files
    - Uses workspace.export() to fetch file bytes (base64 content)
    - Writes files into the volume using normal Python file I/O
    """
    if verbose:
        print(f"Copying from {source_folder} to {destination_folder}")

    items = w.workspace.list(source_folder)

    for item in items:
        item_name = item.path.split("/")[-1]
        dest_path = f"{destination_folder}/{item_name}"

        if item.object_type == ObjectType.DIRECTORY:
            os.makedirs(dest_path, exist_ok=True)
            if verbose:
                print(f"Created folder: {dest_path}")
            copy_folder_to_volume(w, item.path, dest_path, verbose)

        elif item.object_type == ObjectType.FILE:
            exported = w.workspace.export(item.path, format=ExportFormat.AUTO)
            if not hasattr(exported, "content") or exported.content is None:
                raise RuntimeError(f"Export returned no content for {item.path}")
            content = base64.b64decode(exported.content)

            os.makedirs(os.path.dirname(dest_path), exist_ok=True)
            with open(dest_path, "wb") as f:
                f.write(content)

            if verbose:
                print(f"Copied file: {item_name}")

    if verbose:
        print(f"Completed copying {source_folder}")

def main(spark):
    """
    Main workshop setup:
    1) Resolve the workspace data folder relative to this notebook location
    2) Ensure UC schema exists
    3) Load CSV datasets into UC Delta tables
    4) Create volumes for unstructured data
    5) Copy unstructured folders into volumes
    """
    catalog = catalog_name
    schema  = schema_name

    w = WorkspaceClient()

    notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
    notebook_dir = os.path.dirname(notebook_path)
    logger.info(f"Notebook path: {notebook_path}")
    logger.info(f"Notebook dir:  {notebook_dir}")

    ws_data_folder = f"/Workspace{notebook_dir}/data"
    logger.info(f"Workspace data folder: {ws_data_folder}")

    ensure_catalog_and_schema(spark, catalog, schema)

    csv_files = {
        "billing": f"{ws_data_folder}/billing.csv",
        "customers": f"{ws_data_folder}/customers.csv",
        "knowledge_base": f"{ws_data_folder}/knowledge_base.csv",
        "support_tickets": f"{ws_data_folder}/support_tickets.csv",
        "cust_service_data": f"{ws_data_folder}/cust_service_data.csv",
        "policies": f"{ws_data_folder}/policies.csv",
        "product_docs": f"{ws_data_folder}/product_docs.csv",
        "promotions": f"{ws_data_folder}/promotions.csv",
        "devices": f"{ws_data_folder}/devices.csv",
        "plans": f"{ws_data_folder}/plans.csv",
    }

    for tbl, ws_csv_path in csv_files.items():
        load_workspace_csv_to_delta_table(
            spark=spark,
            w=w,
            table=tbl,
            workspace_csv_path=ws_csv_path,
            catalog=catalog,
            schema=schema,
            mode="overwrite"
        )

    volume_kb = "knowledge_base"
    volume_st = "support_tickets"

    ensure_volume(spark, catalog, schema, volume_kb)
    ensure_volume(spark, catalog, schema, volume_st)

    kb_src = f"{ws_data_folder}/tech_support/knowledge_base"
    st_src = f"{ws_data_folder}/tech_support/support_tickets"

    kb_dst = f"/Volumes/{catalog}/{schema}/{volume_kb}"
    st_dst = f"/Volumes/{catalog}/{schema}/{volume_st}"

    copy_folder_to_volume(w, kb_src, kb_dst, verbose=True)
    copy_folder_to_volume(w, st_src, st_dst, verbose=True)

    # Sample invoice PDF
    ensure_volume(spark, catalog, schema, 'invoice')
    copy_folder_to_volume(w, f"{ws_data_folder}/Invoice INV-2026-01482.pdf", f"/Volumes/{catalog}/{schema}/invoice", verbose=True)
    
    # Audio volume for voice demos
    volume_audio = "audio"
    ensure_volume(spark, catalog, schema, volume_audio)

    audio_src = f"{ws_data_folder}/audio/audio.mp3"  # Workspace folder: data/audio (contains .mp3)
    audio_dst = f"/Volumes/{catalog}/{schema}/{volume_audio}"

    copy_folder_to_volume(w, audio_src, audio_dst, verbose=True)

    logger.info("Setup completed successfully.")

main(spark)
