### Notebook to kill the workflow (and the dlt with-in) containing 2 dlt pipelines `hkjc_mv_customer_segment_gold` and `hkjc-wagering-dashboard-bronze-silver-gold-dlt-pipeline`.
#### Trigger: To be triggered at midnight each day

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import PipelineState
from databricks.sdk.service.jobs import RunLifeCycleState
import time
import pyspark.sql.functions as F
import requests

In [0]:
bearer_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
workspace_url = spark.conf.get('spark.databricks.workspaceUrl')

sdk_workspace_client = WorkspaceClient(token = bearer_token, host = workspace_url)

In [0]:
wf_job_name = dbutils.widgets.get("wf_job_name")

In [0]:
# Find job_id by its name
headers = {
    'Authorization': f'Bearer {bearer_token}',
    'Content-Type': 'application/json'
}
url = f"https://{workspace_url}/api/2.1/jobs/list"
params = {'name': wf_job_name}

response = requests.get(url, headers=headers, params=params)

try:
    if response.status_code == 200:
        data = response.json()
        if data.get('jobs'):
            job = data['jobs'][0]
            wf_job_id = job['job_id']
            print(f"Found Workflow '{wf_job_name}' -> Job ID: {wf_job_id}")
        else:
            print(f"Error: Workflow '{wf_job_name}' not found.")
    else:
        print(f"Error: {response.status_code} - {response.text}")
except Exception as e:
        print(f"Error finding Workflow '{wf_job_name}'")
        raise e

In [0]:
def get_dlt_pipeline_ids(client, workflow_job_id):
    """Function to get DLT pipeline IDs from workflow"""
    pipeline_ids=[]
    # Get job details and extract DLT pipeline IDs
    job = client.jobs.get(job_id=workflow_job_id)
    
    if job.settings and job.settings.tasks:
        for task in job.settings.tasks:
            if hasattr(task, 'pipeline_task') and task.pipeline_task:
                pipeline_ids.append(task.pipeline_task.pipeline_id)
                print(f"Found DLT task '{task.task_key}' -> Pipeline ID: {task.pipeline_task.pipeline_id}")
    
    return pipeline_ids

In [0]:
pipeline_ids = get_dlt_pipeline_ids(sdk_workspace_client, wf_job_id)
print(pipeline_ids)

In [0]:
# Extract all the job list, by default descending runs first
wagering_dashboard_wf_active_runs = sdk_workspace_client.jobs.list_runs(job_id=wf_job_id)
if wagering_dashboard_wf_active_runs:
    active_runs = list(wagering_dashboard_wf_active_runs)

# Extract the latest run_id for the above job_id
if active_runs[0].state.life_cycle_state == RunLifeCycleState.RUNNING:
    active_run_id = active_runs[0].run_id

    # Stop the workflow
    sdk_workspace_client.jobs.cancel_run(run_id=active_run_id)
    print(f"Cancel request submitted to the workflow: {wf_job_name} with run ID: {active_run_id}")
    print(f"check the run status here: -> https://{workspace_url}/jobs/{wf_job_id}/runs/{active_run_id}")

    # MV CUSTOMER SEGMENT DLT PIPELINE
    customer_segment_pl_id = pipeline_ids[0]
    customer_segment = sdk_workspace_client.pipelines.get(pipeline_id = customer_segment_pl_id)
    if customer_segment.state == PipelineState.RUNNING:
        try:
            # Stop the pipeline
            sdk_workspace_client.pipelines.stop(pipeline_id=customer_segment_pl_id)
            print(f"Stop request submitted to the pipeline with id: {customer_segment.name}")
        except Exception as e:
            print(f"Error stopping {customer_segment.name} pipeline: {e}")    

        # Wait for the pipeline to stop
        while customer_segment.state == PipelineState.RUNNING:
            print(f"waiting for {customer_segment.name} pipeline to stop...")
            time.sleep(10)
            customer_segment = sdk_workspace_client.pipelines.get(pipeline_id = customer_segment_pl_id)
            print(f"Pipeline state: {customer_segment.state}")

    # WAGERING DASHBOARD DLT PIPELINE
    wagering_dashboard_pl_id = pipeline_ids[1]
    wagering_dashboard = sdk_workspace_client.pipelines.get(pipeline_id = wagering_dashboard_pl_id)
    if wagering_dashboard.state == PipelineState.RUNNING:
        try:
            # Stop the pipeline
            sdk_workspace_client.pipelines.stop(pipeline_id=wagering_dashboard_pl_id)
            print(f"Stop request submitted to the pipeline with id: {wagering_dashboard.name}")
        except Exception as e:
            print(f"Error stopping {wagering_dashboard.name} pipeline: {e}")    

        # Wait for the pipeline to stop
        while wagering_dashboard.state == PipelineState.RUNNING:
            print(f"waiting for {wagering_dashboard.name} pipeline to stop...")
            time.sleep(10)
            wagering_dashboard = sdk_workspace_client.pipelines.get(pipeline_id = wagering_dashboard_pl_id)
            print(f"Pipeline state: {wagering_dashboard.state}")

print("Pipelines are not running")
for i in pipeline_ids:
    print(sdk_workspace_client.pipelines.get(pipeline_id = i).state)