## Process Control Framework - Main Notebook
#### This notebook creates a workflow for a given job based on its child job configuration

In [0]:
dbutils.widgets.removeAll()
dbutils.widgets.text("proj_id", "", "Project ID")
dbutils.widgets.text("dept_id", "", "Department ID")
dbutils.widgets.text("job_id", "", "Job ID")
dbutils.widgets.text("esp_job_name", "", "ESP Job Name")

In [0]:
proj_id = dbutils.widgets.get("proj_id")
dept_id = dbutils.widgets.get("dept_id")
job_id = dbutils.widgets.get("job_id")
esp_job_name = dbutils.widgets.get("esp_job_name")

In [0]:
print(f"proj_id: {dbutils.widgets.get('proj_id')}")
print(f"dept_id: {dbutils.widgets.get('dept_id')}")
print(f"job_id: {dbutils.widgets.get('job_id')}")
print(f"esp_job_name: {esp_job_name}")

In [0]:
from src.metadata_utils import get_child_jobs, get_notebook_path
from src.workflow_utils import create_workflow_bundle, build_workflow_tasks

In [0]:
from pathlib import Path
import os

def get_project_root(marker="databricks.yml"):
  current_file = Path(os.path.abspath("")).resolve()
  for parent in current_file.parents:
    if (parent / marker).exists():
      return parent
  raise Exception(f"Could not find project root with marker {marker}")

project_root_path = get_project_root()
print(f"Project root path: {project_root_path}")

In [0]:
try:
    print(f"Starting process control for project_id = {proj_id}, dept_id = {dept_id}, job_id = {job_id} and esp_job_name = {esp_job_name}")

    # Step 1: Get child jobs
    child_jobs = get_child_jobs(spark, proj_id, dept_id, job_id)

    if not child_jobs:
        raise Exception(f"No child jobs found for job {job_id}")
    
    print(f"Found {len(child_jobs)} child jobs for {job_id}")
    
    # Step 2: Get notebook paths for all child jobs
    notebook_paths = {}
    for child in child_jobs:
        child_name = child["name"]
        notebook_path = get_notebook_path(spark, proj_id, dept_id, child_name)
        if notebook_path:
            notebook_paths[child_name] = notebook_path
    
    # print(f"Notebook paths: {notebook_paths}")
    
    # Step 3: Build workflow tasks
    # workflow_name = f"{proj_id}_{dept_id}_{job_id}_{esp_job_name}"
    workflow_name = f"{esp_job_name}"
    print(f"Workflow name: {workflow_name}")

    job_parameters = {
        "env": "dev",
        "proj_id": proj_id,
        "dept_id": dept_id,
        "job_id": job_id
    }

    tasks = build_workflow_tasks(proj_id, dept_id, job_id, child_jobs, notebook_paths)

    print(f"Tasks: {tasks}")
    
    # Step 4: Create workflow
    print(project_root_path)
    bundle_path = create_workflow_bundle(workflow_name, tasks, job_parameters, project_root_path)

    if bundle_path:
        print(f"SUCCESS: Created workflow yaml")
    else:
        raise Exception("FAILED: Could not create workflow bundle")

    # # Get Databricks workspace URL and token
    # databricks_url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
    # token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()

    # print(f"Databricks URL: {databricks_url}")
    # print(f"Token: {token}")
    
    # workflow_id = create_workflow(databricks_url, token, workflow_name, tasks, proj_id, dept_id, job_id)
    # print(f"Workflow ID: {workflow_id}")
    # if workflow_id:
    #     print(f"SUCCESS: Created workflow {workflow_id}")
    # else:
    #     raise Exception("FAILED: Could not create workflow")
except Exception as e:
    error_msg = f"Error in process control: {str(e)}"
    print(error_msg)
    raise Exception(f"FAILED: {error_msg}")
