## Instrucciones
- Adjunta el notebook a un cluster (Serverless es preferido).
- Ejecuta las Celdas 2, 3 y 4 para que los parámetros de widgets se pueblen.
- Completa los parámetros según la descripción a continuación.
- Ejecuta el resto del notebook para desplegar los dashboards.

### Parámetros del Notebook

**Parámetros Principales:**
- **`actions`**: Selecciona las acciones a realizar durante el despliegue. Para el primer despliegue, selecciona **All** para ejecutar todas las acciones (Deploy Dashboards, Publish Dashboards, Create Functions, Create/Refresh Tables).
- **`warehouse`**: Selecciona un SQL warehouse que será usado por los dashboards para ejecutar queries. Los warehouses serverless están marcados con \*\* y son recomendados.
- **`catalog`**: Especifica un catálogo UC donde tienes permisos de lectura/escritura. Aquí se crearán las tablas y funciones necesarias. Se creará automáticamente si no existe.
- **`schema`**: Proporciona el nombre del schema dentro del catálogo seleccionado donde se almacenarán las tablas y funciones. Se creará automáticamente si no existe.
- **`workspace_name`**: Nombre descriptivo del workspace actual. Este nombre se usará en los dashboards para visualización en lugar del workspace ID.
- **`tags_to_consider_for_team_name`**: Lista separada por comas de claves de tags que se usarán para identificar equipos en los dashboards (ej: `team_name,group`).

In [0]:
%pip install databricks-sdk==0.38.0
dbutils.library.restartPython()

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
from pyspark.errors import PySparkException

# Cliente inicializado para el workspace actual
w = WorkspaceClient()

# Obtener lista de warehouses disponibles
warehouses = w.warehouses.list()
warehouse_names = [f"{w.name} - ({w.id})**" if w.enable_serverless_compute else f"{w.name} - ({w.id})" for w in warehouses]

In [0]:
# Limpiar todos los widgets existentes antes de crearlos
dbutils.widgets.removeAll()

dbutils.widgets.multiselect('actions', 'All', choices=['All', 'Deploy Dashboards', 'Publish Dashboards', 'Create Functions', 'Create/refresh Tables']) # Selecciona las acciones a realizar
dbutils.widgets.dropdown('warehouse', warehouse_names[0], choices=warehouse_names) # Selecciona un warehouse que será usado en los dashboards
dbutils.widgets.text('catalog', 'main') # Proporciona un catálogo donde tienes permisos de lectura/escritura. El catálogo se creará si no existe.
dbutils.widgets.text('schema', 'default') # Proporciona un schema donde tienes permisos de lectura/escritura. El schema se creará si no existe.
dbutils.widgets.text('workspace_name', 'Mi Workspace') # Proporciona un nombre descriptivo para el workspace actual
dbutils.widgets.text('tags_to_consider_for_team_name', 'team_name,group') # Proporciona una lista separada por comas de tags para identificar equipos

In [0]:
actions = dbutils.widgets.get('actions')
catalog = dbutils.widgets.get('catalog')
schema = dbutils.widgets.get('schema')
workspace_name = dbutils.widgets.get('workspace_name')
tags_to_consider_for_team_name = dbutils.widgets.get('tags_to_consider_for_team_name')
warehouse = dbutils.widgets.get('warehouse')
warehouse_id = warehouse.split("(")[1].split(")")[0]

try:
    spark.sql(f'USE CATALOG {catalog}')
except PySparkException as ex:
  if (ex.getErrorClass() == "NO_SUCH_CATALOG_EXCEPTION"):
    spark.sql(f'CREATE CATALOG IF NOT EXISTS {catalog}')
  else:
    raise
spark.sql(f'CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}')

In [0]:
import json
from pathlib import Path
import os
from pyspark.sql.types import StringType
from pyspark.sql.functions import col
from databricks.sdk.service.dashboards import Dashboard

# Lista todos los archivos JSON de la carpeta actual
def list_json_dash_files():
    notebook_folder = json.loads(
        dbutils.notebook.entry_point.getDbutils().notebook().getContext().safeToJson()
    )["attributes"]["notebook_path"]
    new_folder_name = "dashboard_assess_dbx_costs"
    dashboard_save_path = (
        f'{notebook_folder.rsplit("/", 1)[0]}/dashboard_assess_dbx_costs'
    )
    Path(new_folder_name).mkdir(parents=True, exist_ok=True)
    json_files = [
        f for f in os.listdir(".") if os.path.isfile(f) and f.endswith(".lvdash.json")
    ]
    print(f"Dashboard JSON files found: {json_files}")
    return json_files, dashboard_save_path


# Despliega dashboards desde archivos json
def deploy_dashboards():
    # Obtiene todos los archivos JSON de la carpeta actual
    json_files, dashboard_save_path = list_json_dash_files()
    dash_name_id_dict = {}

    # Procesa cada JSON
    for json_file in json_files:
        with open(json_file, "r") as file:
            data = file.read().rstrip()
        replaced_data = data.replace("{catalog}", catalog).replace("{schema}", schema)

        dash_name = json_file.split(".")[0]

        try:
            # Actualiza el dashboard si ya existe
            dashboard_id = w.workspace.get_status(
                f"{dashboard_save_path}/{json_file}"
            ).resource_id
            curr_dash = w.lakeview.get(dashboard_id)
            curr_dash.serialized_dashboard = replaced_data
            dash_updated = w.lakeview.update(
                dashboard_id=dashboard_id,
                dashboard=curr_dash,
            )
            print(
                f'Dashboard "{dash_name}" updated successfully at {dash_updated.create_time}'
            )
            dash_name_id_dict[dash_name] = dashboard_id
        except Exception as e:
            # Crea un nuevo dashboard si no existe
            if "doesn't exist" in str(e):
                new_dash = Dashboard(
                    display_name=dash_name,
                    parent_path=dashboard_save_path,
                    serialized_dashboard=replaced_data,
                    warehouse_id=warehouse_id)
                dash_created = w.lakeview.create(dashboard=new_dash)
                dashboard_id = dash_created.dashboard_id
                print(
                    f'Dashboard "{dash_name}" created successfully at {dash_created.create_time}'
                )
                dash_name_id_dict[dash_name] = dashboard_id
            else:
                raise e

    # Actualiza URLs en el dashboard índice
    for json_file in json_files:
        if "Databricks" in json_file:
            with open(json_file, "r") as file:
                data = file.read().rstrip()
            replaced_data = data.replace("{catalog}", catalog).replace(
                "{schema}", schema
            )

            host_url = f"https://{spark.conf.get('spark.databricks.workspaceUrl')}"

            for dash_name, dashboard_id in dash_name_id_dict.items():
                str_to_replace = f"**[{dash_name}](*)**"
                to_replace_with = f"**[{dash_name}]({host_url}/dashboardsv3/{dashboard_id}/published)**"
                replaced_data = replaced_data.replace(
                    f"{str_to_replace}", f"{to_replace_with}"
                )

            dashboard_id = w.workspace.get_status(
                f"{dashboard_save_path}/{json_file}"
            ).resource_id
            curr_dash = w.lakeview.get(dashboard_id)
            curr_dash.serialized_dashboard = replaced_data
            dash_updated = w.lakeview.update(
                dashboard_id=dashboard_id,
                dashboard=curr_dash,
            )
            print(
                f'Dashboard "{dash_name}" updated successfully at {dash_updated.create_time} with links to other dashboards'
            )
            print(f"{host_url}/dashboardsv3/{dashboard_id}/published")
            break


# Publica dashboards
def publish_dashboards():
    json_files, dashboard_save_path = list_json_dash_files()

    for json_file in json_files:
        dash_name = json_file.split(".")[0]

        try:
            dashboard_id = w.workspace.get_status(
                f"{dashboard_save_path}/{json_file}"
            ).resource_id
            dash_published = w.lakeview.publish(
                dashboard_id=dashboard_id, warehouse_id=warehouse_id
            )
            print(
                f'Dashboard "{dash_name}" published successfully at {dash_published.revision_create_time}'
            )
        except Exception as e:
            print(f"Dashboard {dash_name} could not be published")
            raise e


def create_sql_functions():
    # Crea función para extraer el tipo de job desde el SKU
    print(f"Creating {catalog}.{schema}.job_type_from_sku function...")
    spark.sql(
        f"""CREATE OR REPLACE FUNCTION {catalog}.{schema}.job_type_from_sku(sku STRING)
          RETURNS STRING
          RETURN
          CASE
            WHEN sku LIKE '%JOBS_SERVERLESS%' THEN 'JOBS_SERVERLESS'
            WHEN sku LIKE '%JOBS_COMPUTE_(PHOTON)%' THEN 'JOBS_COMPUTE_PHOTON'
            WHEN sku LIKE '%JOBS_COMPUTE%' THEN 'JOBS_COMPUTE'
            WHEN sku IS NULL THEN 'UNKNOWN'
            ELSE 'OTHER'
          END;"""
    )
    print(f"Function {catalog}.{schema}.job_type_from_sku created successfully")

    # Crea función para extraer el tipo de SQL desde el SKU
    print(f"Creating {catalog}.{schema}.sql_type_from_sku function...")
    spark.sql(
        f"""CREATE OR REPLACE FUNCTION {catalog}.{schema}.sql_type_from_sku(sku STRING)
          RETURNS STRING
          RETURN
          CASE
            WHEN sku LIKE '%SERVERLESS_SQL%' THEN 'SQL_SERVERLESS'
            WHEN sku LIKE '%SQL_PRO%' THEN 'SQL_PRO'
            WHEN sku LIKE '%SQL%' THEN 'SQL_CLASSIC'
            WHEN sku IS NULL THEN 'UNKNOWN'
            ELSE 'OTHER'
          END;"""
    )
    print(f"Function {catalog}.{schema}.sql_type_from_sku created successfully")

    # Crea función programáticamente para extraer el nombre del equipo desde los tags usando el parámetro de entrada
    print(f"Creating {catalog}.{schema}.team_name_from_tags function...")
    keys = [] if tags_to_consider_for_team_name == '' else tags_to_consider_for_team_name.split(",")
    param_cols = ["cluster_tags", "job_tags"]

    # Construye dinámicamente el statement CASE
    case_list = []
    for each_param_col in param_cols:
        case_statement = "CASE  \n"
        if len(keys) > 0:
            for key in keys:
                case_statement += f"WHEN map_contains_key({each_param_col}, '{key.strip()}') THEN lower({each_param_col}.`{key.strip()}`) \n"
        case_statement += f"WHEN map_contains_key({each_param_col}, 'LakehouseMonitoring') AND {each_param_col}.LakehouseMonitoring = 'true' THEN 'LakehouseMonitoring' \n"
        case_statement += f"ELSE NULL END AS {each_param_col}_team_name_init \n"
        case_list.append(case_statement)

    query = f"SELECT ifnull({param_cols[0]}_team_name_init, {param_cols[1]}_team_name_init) as team_name_init FROM \n (SELECT {', '.join(case_list)})"

    # Query SQL final
    query = f"(SELECT ifnull(team_name_init, 'unknown') AS team_name FROM \n ({query}))"

    # Crea la función
    spark.sql(
        f"""create or replace function {catalog}.{schema}.team_name_from_tags(cluster_tags MAP<STRING,STRING>, job_tags MAP<STRING,STRING>)
        RETURNS STRING RETURN {query}"""
    )
    print(f"Function {catalog}.{schema}.team_name_from_tags created successfully")


def create_update_tables():
    # Crea/actualiza programáticamente una tabla para mapeo de workspace id a nombre
    print(f"Creating {catalog}.{schema}.workspace_reference table...")
    # CREATE OR REPLACE limpia la tabla automáticamente en cada ejecución
    spark.sql(
        f"CREATE OR REPLACE TABLE {catalog}.{schema}.workspace_reference (workspace_id STRING, workspace_name STRING)"
    )
    
    # Obtiene el ID del workspace actual usando múltiples métodos
    current_workspace_id = None
    
    # Método 1: Extraer del workspace URL (más confiable)
    try:
        workspace_url = spark.conf.get('spark.databricks.workspaceUrl')
        # URL format: adb-WORKSPACE_ID.REGION.azuredatabricks.net o dbc-WORKSPACE_ID.cloud.databricks.com
        if 'adb-' in workspace_url:
            current_workspace_id = workspace_url.split('adb-')[1].split('.')[0]
        elif 'dbc-' in workspace_url:
            current_workspace_id = workspace_url.split('dbc-')[1].split('.')[0]
    except:
        pass
    
    # Método 2: Desde la configuración de spark
    if not current_workspace_id:
        try:
            current_workspace_id = spark.conf.get("spark.databricks.workspaceId")
        except:
            pass
    
    # Método 3: Desde system.billing.usage (solo workspace actual)
    if not current_workspace_id:
        try:
            current_workspace_id = spark.sql("SELECT DISTINCT workspace_id FROM system.billing.usage LIMIT 1").collect()[0][0]
        except:
            pass
    
    # Fallback final
    if not current_workspace_id:
        current_workspace_id = "unknown"
    
    # Inserta la información del workspace actual en la tabla limpia
    print(f"\t Using workspace_id: {current_workspace_id}, workspace_name: {workspace_name}")
    spark.sql(
        f"""INSERT INTO {catalog}.{schema}.workspace_reference 
        VALUES ('{current_workspace_id}', '{workspace_name}')"""
    )
    print(f"Table {catalog}.{schema}.workspace_reference created/updated successfully")

    # Crea/actualiza programáticamente una tabla para mapeo de warehouse id a nombre
    print(f"Creating {catalog}.{schema}.warehouse_reference table...")
    # CREATE OR REPLACE limpia la tabla automáticamente en cada ejecución
    spark.sql(
        f"CREATE OR REPLACE TABLE {catalog}.{schema}.warehouse_reference (workspace_id STRING, warehouse_id STRING, warehouse_name STRING)"
    )
    
    # Obtiene los warehouses SOLO del workspace actual combinando dos fuentes:
    # 1. system.compute.warehouses - warehouses actuales y su configuración oficial
    # 2. system.access.audit - warehouses históricos que ya no existen pero tienen datos de billing
    spark.sql(
        f"""INSERT INTO {catalog}.{schema}.warehouse_reference
            SELECT
              workspace_id,
              warehouse_id,
              MAX(warehouse_name) as warehouse_name
            FROM
              (
                -- Warehouses actuales desde system.compute.warehouses
                SELECT
                  workspace_id,
                  warehouse_id,
                  warehouse_name
                FROM
                  (
                    SELECT
                      workspace_id,
                      warehouse_id,
                      warehouse_name,
                      delete_time,
                      ROW_NUMBER() OVER(
                        PARTITION BY workspace_id, warehouse_id
                        ORDER BY change_time DESC
                      ) as rn
                    FROM
                      system.compute.warehouses
                    WHERE
                      workspace_id = '{current_workspace_id}'
                  )
                WHERE
                  rn = 1
                  AND delete_time IS NULL
                
                UNION
                
                -- Warehouses históricos desde system.access.audit
                SELECT
                  workspace_id,
                  GET_JSON_OBJECT(response.result, '$.id') AS warehouse_id,
                  request_params.name AS warehouse_name
                FROM
                  system.access.audit
                WHERE
                  service_name = 'databrickssql'
                  AND GET_JSON_OBJECT(response.result, '$.id') IS NOT NULL
                  AND workspace_id = '{current_workspace_id}'
              )
            GROUP BY
              workspace_id,
              warehouse_id"""
    )
    print(f"Table {catalog}.{schema}.warehouse_reference created/updated successfully")

In [0]:
all_actions = actions.split(",")
for each_action in all_actions:
    if each_action == "Deploy Dashboards":
        print("Deploying dashboards...")
        deploy_dashboards()
    elif each_action == "Publish Dashboards":
        print("Publishing dashboards...")
        publish_dashboards()
    elif each_action == "Create Functions":
        print("Creating functions...")
        create_sql_functions()
    elif each_action == "Create/refresh Tables":
        print("Creating/refreshing tables...")
        create_update_tables()
    else:
        print(f"Performing all actions...")
        deploy_dashboards()
        publish_dashboards()
        create_sql_functions()
        create_update_tables()
        break