### Criando Datasets

In [0]:
import pandas as pd
import os
import requests
import json
import uuid
import re

from pyspark.sql import SparkSession

catalog_name = f"actuarial_app_catalog"

In [None]:
spark.sql("GRANT CREATE CATALOG ON METASTORE TO `account users`")
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_name}.variable_annuity")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog_name}.long_term_care")

In [0]:
data_path = './data/'

folders = {'va': 'variable_annuity', 'ltc': 'long_term_care'}

for folder, schema in folders.items():

    folder_path = os.path.join(data_path, folder)

    for file in os.listdir(folder_path):

        if file.endswith('.csv'):

            entity_name = file[:-4]

            table_name = f"{catalog_name}.{schema}.{entity_name}"

            file_path = os.path.join(folder_path, file)

            df = pd.read_csv(file_path)

            s_df = spark.createDataFrame(df)

            s_df.write.mode("overwrite").saveAsTable(table_name)

In [None]:
%python
# Apply constraints to ltc_incidence_w_comment
# Note: Primary Key columns must be NOT NULL in Delta tables

table_name = f"{catalog_name}.long_term_care.ltc_incidence_w_comment"
pk_cols = [
    "Group_Indicator", "Gender", "Issue_Age_Bucket", "Incurred_Age_Bucket",
    "Issue_Year_Bucket", "Policy_Year", "Marital_Status", "Premium_Class",
    "Underwriting_Type", "Coverage_Type_Bucket", "Tax_Qualification_Status",
    "Inflation_Rider", "Rate_Increase_Flag", "Restoration_of_Benefits",
    "NH_Orig_Daily_Ben_Bucket", "ALF_Orig_Daily_Ben_Bucket",
    "HHC_Orig_Daily_Ben_Bucket", "NH_Ben_Period_Bucket",
    "ALF_Ben_Period_Bucket", "HHC_Ben_Period_Bucket", "NH_EP_Bucket",
    "ALF_EP_Bucket", "HHC_EP_Bucket", "Region"
]

print(f"Applying constraints to {table_name}...")

# 1. Set columns to NOT NULL
for col in pk_cols:
    try:
        spark.sql(f"ALTER TABLE {table_name} ALTER COLUMN {col} SET NOT NULL")
    except Exception as e:
        print(f"Warning: Could not set NOT NULL on {col} (might already be set): {e}")

# 2. Add Primary Key Constraint
try:
    # Check if constraint already exists to avoid error
    # (Simple check via try/except is often easiest for scripts)
    spark.sql(f"ALTER TABLE {table_name} ADD CONSTRAINT ltc_incidence_w_comment_pk PRIMARY KEY ({', '.join(pk_cols)})")
    print(f"Successfully added PK constraint to {table_name}")
except Exception as e:
    print(f"Warning: Could not add PK constraint (might already exist): {e}")

In [None]:
%python
# Apply constraints to ltc_termination_w_comment
# Note: Primary Key columns must be NOT NULL in Delta tables

table_name_term = f"{catalog_name}.long_term_care.ltc_termination_w_comment"
pk_cols_term = [
    "Gender", "Incurred_Age_Bucket", "Region", "Incurred_Year_Bucket",
    "Claim_Type", "Diagnosis_Category", "Claim_Duration"
]

print(f"Applying constraints to {table_name_term}...")

# 1. Set columns to NOT NULL
for col in pk_cols_term:
    try:
        spark.sql(f"ALTER TABLE {table_name_term} ALTER COLUMN {col} SET NOT NULL")
    except Exception as e:
        print(f"Warning: Could not set NOT NULL on {col} (might already be set): {e}")

# 2. Add Primary Key Constraint
try:
    spark.sql(f"ALTER TABLE {table_name_term} ADD CONSTRAINT ltc_termination_w_comment_pk PRIMARY KEY ({', '.join(pk_cols_term)})")
    print(f"Successfully added PK constraint to {table_name_term}")
except Exception as e:
    print(f"Warning: Could not add PK constraint (might already exist): {e}")

### Criando Dashboards and Genie

In [None]:
# Get Databricks context
try:
    ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
    host_name = ctx.apiUrl().get()
    token = ctx.apiToken().get()
except:
    # Fallback for local development if env vars are set
    host_name = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

if not host_name or not token:
    raise Exception("Databricks host and token must be available.")

headers = {"Authorization": f"Bearer {token}"}

# Helper function to get a warehouse ID
def get_warehouse_id():
    url = f"{host_name}/api/2.0/sql/warehouses"
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        warehouses = response.json().get("warehouses", [])
        for wh in warehouses:
            if wh.get("state") == "RUNNING":
                return wh.get("id")
        if warehouses:
            return warehouses[0].get("id")
    return None

warehouse_id = get_warehouse_id()
print(f"Using Warehouse ID: {warehouse_id}")

# --- Dashboards ---
dashboard_files = [
    "dashboards/LTC experience study - Incidence.lvdash.json",
    "dashboards/Variable Annuities Projections.lvdash.json"
]

# Map display names to env var placeholders in app.yaml
dashboard_env_map = {
    "LTC experience study - Incidence": "DATABRICKS_DASHBOARD_LTC_ID",
    "Variable Annuities Projections": "DATABRICKS_DASHBOARD_VA_ID"
}

collected_ids = {}

# Fetch existing dashboards to avoid duplicates
existing_dashboards = {}
list_dashboards_url = f"{host_name}/api/2.0/lakeview/dashboards"
next_page_token = None

print("Listing existing dashboards...")
while True:
    params = {}
    if next_page_token:
        params['page_token'] = next_page_token
        
    list_response = requests.get(list_dashboards_url, headers=headers, params=params)
    if list_response.status_code == 200:
        data = list_response.json()
        dashboards = data.get("dashboards", [])
        for db in dashboards:
            existing_dashboards[db.get("display_name")] = db.get("dashboard_id")
        
        next_page_token = data.get("next_page_token")
        if not next_page_token:
            break
    else:
        print(f"Warning: Could not list existing dashboards: {list_response.text}")
        break

print(f"Found {len(existing_dashboards)} existing dashboards.")

for file_path in dashboard_files:
    if not os.path.exists(file_path):
        print(f"File not found: {file_path}")
        continue
        
    with open(file_path, 'r') as f:
        dashboard_content = f.read()
        
    # The file name without extension is the display name
    display_name = os.path.basename(file_path).replace(".lvdash.json", "")
    
    dashboard_id = None
    
    # Check if dashboard already exists
    if display_name in existing_dashboards:
        dashboard_id = existing_dashboards[display_name]
        print(f"Dashboard '{display_name}' already exists with ID: {dashboard_id}. Skipping creation.")
    else:
        # Create Dashboard
        create_url = f"{host_name}/api/2.0/lakeview/dashboards"
        payload = {
            "display_name": display_name,
            "serialized_dashboard": dashboard_content
        }
        if warehouse_id:
            payload["warehouse_id"] = warehouse_id
            
        response = requests.post(create_url, headers=headers, json=payload)
        
        if response.status_code == 200:
            dashboard_id = response.json().get("dashboard_id")
            print(f"Created dashboard '{display_name}' with ID: {dashboard_id}")
        else:
            print(f"Failed to create dashboard '{display_name}': {response.text}")
            continue

    if dashboard_id:
        # Store ID for app.yaml update
        if display_name in dashboard_env_map:
             collected_ids[dashboard_env_map[display_name]] = dashboard_id
        
        # Publish Dashboard
        publish_url = f"{host_name}/api/2.0/lakeview/dashboards/{dashboard_id}/published"
        publish_payload = {
            "embed_credentials": True
        }
        if warehouse_id:
            publish_payload["warehouse_id"] = warehouse_id
            
        pub_response = requests.post(publish_url, headers=headers, json=publish_payload)
        if pub_response.status_code == 200:
            print(f"Published dashboard '{display_name}'")
        else:
            print(f"Failed to publish dashboard '{display_name}': {pub_response.text}")

In [None]:
# --- Genie Spaces ---
genie_spaces_dir = "genie_spaces"
genie_spaces_config = []

if os.path.exists(genie_spaces_dir):
    for filename in os.listdir(genie_spaces_dir):
        if filename.endswith(".json"):
            try:
                with open(os.path.join(genie_spaces_dir, filename), 'r') as f:
                    config = json.load(f)
                    # Join instructions if it's a list
                    if isinstance(config.get("instructions"), list):
                        config["instructions"] = "\n".join(config["instructions"])
                    genie_spaces_config.append(config)
            except Exception as e:
                print(f"Error reading Genie Space config {filename}: {e}")
else:
    print(f"Directory {genie_spaces_dir} not found.")

for space_config in genie_spaces_config:
    schema = space_config["schema"]
    # Get tables in schema
    try:
        tables_df = spark.sql(f"SHOW TABLES IN {catalog_name}.{schema}")
        table_names = [f"{catalog_name}.{schema}.{row.tableName}" for row in tables_df.collect()]
    except Exception as e:
        print(f"Error fetching tables for schema {schema}: {e}")
        table_names = []

    if not table_names:
        print(f"No tables found for schema {schema}, skipping Genie Space creation.")
        continue

    # Construct inner payload (the content of the space)
    inner_payload = {
        "version": 1,
        "data_sources": {
            "tables": [{"identifier": t} for t in table_names]
        },
        "instructions": {
            "text_instructions": [
                {
                    "id": str(uuid.uuid4()).replace('-', ''),
                    "content": [space_config["instructions"].strip()]
                }
            ]
        }
    }
    
    # Construct outer payload
    # The API expects 'serialized_space' to be a JSON string of the space configuration
    # AND it also expects 'parent_path' to be set.
    # We will use the current user's home folder as the parent path.
    
    current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
    parent_path = f"/Users/{current_user}"
    
    payload = {
        "title": space_config["name"],
        "description": space_config["description"],
        "parent_path": parent_path,
        "serialized_space": json.dumps(inner_payload)
    }
    
    if warehouse_id:
        payload["warehouse_id"] = warehouse_id
    
    create_url = f"{host_name}/api/2.0/genie/spaces"
    response = requests.post(create_url, headers=headers, json=payload)
    
    if response.status_code == 200:
        space_id = response.json().get("space_id")
        print(f"Created Genie Space '{space_config['name']}' with ID: {space_id}")
        collected_ids[space_config["env_var"]] = space_id
    else:
        print(f"Failed to create Genie Space '{space_config['name']}': {response.text}")

In [None]:
# Update app.yaml
app_yaml_path = "app/app.yaml"
if os.path.exists(app_yaml_path) and collected_ids:
    with open(app_yaml_path, 'r') as f:
        content = f.read()
    
    for env_name, db_id in collected_ids.items():
        # Simple replacement assuming the value is set to the env var name as a placeholder
        # This relies on the user resetting the app.yaml to the template state before running
        placeholder = f"value: '{env_name}'"
        new_line = f"value: '{db_id}'"
        
        if placeholder in content:
            content = content.replace(placeholder, new_line)
            print(f"Updated {env_name} in app.yaml")
        else:
            print(f"Could not find placeholder '{placeholder}' for {env_name} in app.yaml")
        
    with open(app_yaml_path, 'w') as f:
        f.write(content)
    print("Finished updating app.yaml")
else:
    print("app.yaml not found or no IDs collected")

### Deploying the App


In [None]:
# App Configuration
app_name = "actuarial-app"

# Determine source code path
try:
    notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
    # Assuming the notebook is at the root of the project in the workspace
    # and the app code is in the 'app' subdirectory
    project_root = os.path.dirname(notebook_path)
    app_source_path = f"{project_root}/app"
    print(f"Derived app source path: {app_source_path}")
except Exception as e:
    print("Could not determine notebook path (likely running locally). Skipping automatic deployment.")
    app_source_path = None

if app_source_path:
    # 1. Check if App exists
    get_app_url = f"{host_name}/api/2.0/apps/{app_name}"
    app_response = requests.get(get_app_url, headers=headers)
    
    if app_response.status_code == 404:
        # Create App
        print(f"App '{app_name}' not found. Creating...")
        create_app_url = f"{host_name}/api/2.0/apps"
        create_payload = {
            "name": app_name,
            "description": "Actuarial App with Embedded Dashboards and Genie"
        }
        create_response = requests.post(create_app_url, headers=headers, json=create_payload)
        if create_response.status_code == 200:
            print(f"Successfully created app '{app_name}'")
        else:
            print(f"Failed to create app: {create_response.text}")
            app_source_path = None # Stop deployment
    elif app_response.status_code == 200:
        print(f"App '{app_name}' already exists.")
    else:
        print(f"Error checking app status: {app_response.text}")
        app_source_path = None

    # 2. Deploy App
    if app_source_path:
        print(f"Deploying app from {app_source_path}...")
        deploy_url = f"{host_name}/api/2.0/apps/{app_name}/deployments"
        deploy_payload = {
            "source_code_path": app_source_path
        }
        deploy_response = requests.post(deploy_url, headers=headers, json=deploy_payload)
        
        if deploy_response.status_code == 200:
            deployment_id = deploy_response.json().get("deployment_id")
            print(f"Deployment initiated. ID: {deployment_id}")
            print(f"Track status at: {host_name}/compute/apps/{app_name}/deployments/{deployment_id}")
        else:
            print(f"Failed to deploy app: {deploy_response.text}")