# ðŸš¨ Long-Running Databricks Tasks

This notebook identifies jobs, queries, and clusters exceeding thresholds.


In [0]:
job_threshold_minutes = float(dbutils.widgets.get("job_threshold_minutes"))
query_threshold_minutes = float(dbutils.widgets.get("query_threshold_minutes"))
cluster_threshold_hours = float(dbutils.widgets.get("cluster_threshold_hours"))
output_catalog = dbutils.widgets.get("output_catalog")
output_schema = dbutils.widgets.get("output_schema")

In [0]:
"""Monitor for long-running Databricks tasks using REST API."""

import pandas as pd
import time
import requests
import json
from datetime import datetime
from typing import Optional


class RunningTasks:
    """Monitor long-running tasks across queries, jobs, and clusters."""
    
    def __init__(self):
        context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
        self.hostname = context.apiUrl().get()
        self.headers = {"Authorization": f"Bearer {context.apiToken().get()}"}
        self.current_time_ms = int(time.time() * 1000)
    
    def _ms_to_utc(self, timestamp_ms: Optional[int]) -> Optional[datetime]:
        if timestamp_ms and timestamp_ms > 0:
            return datetime.utcfromtimestamp(timestamp_ms / 1000)
        return None
    
    def _calculate_duration(self, start_time_ms: Optional[int], unit_divisor: int) -> float:
        if start_time_ms:
            return round((self.current_time_ms - start_time_ms) / unit_divisor, 2)
        return 0.0
    
    def _api_call(self, url: str, params: dict) -> dict:
        response = requests.get(url=url, data=json.dumps(params), headers=self.headers)
        return response.json()
    
    def get_active_queries(self) -> pd.DataFrame:
        url = f"{self.hostname}/api/2.0/sql/history/queries"
        
        running = self._api_call(url, {"filter_by": {"statuses": ["RUNNING"]}}).get('res', [])
        queued = self._api_call(url, {"filter_by": {"statuses": ["QUEUED"]}}).get('res', [])
        
        data = [{
            'query_id': q.get('query_id'),
            'user_name': q.get('user_name'),
            'executed_as_user_name': q.get('executed_as_user_name'),
            'warehouse_id': q.get('warehouse_id'),
            'state': q.get('status'),
            'query_text': q.get('query_text'),
            'start_time_utc': self._ms_to_utc(q.get('query_start_time_ms')),
            'duration_minutes': self._calculate_duration(q.get('query_start_time_ms'), 60000)
        } for q in running + queued]
        
        return pd.DataFrame(data)
    
    def get_active_jobs(self) -> pd.DataFrame:
        url = f"{self.hostname}/api/2.2/jobs/runs/list"
        runs = self._api_call(url, {"active_only": True}).get('runs', [])
        
        data = [{
            'job_id': job.get('job_id'),
            'run_id': job.get('run_id'),
            'run_name': job.get('run_name'),
            'creator_user_name': job.get('creator_user_name'),
            'state': job.get('state', {}).get('life_cycle_state'),
            'start_time_utc': self._ms_to_utc(job.get('start_time')),
            'duration_minutes': self._calculate_duration(job.get('start_time'), 60000),
            'run_page_url': job.get('run_page_url'),
            'run_type': job.get('run_type')
        } for job in runs]
        
        return pd.DataFrame(data)
    
    def get_active_clusters(self) -> pd.DataFrame:
        url = f"{self.hostname}/api/2.1/clusters/list"
        clusters = self._api_call(url, {"filter_by": {"cluster_states": ["RUNNING"]}}).get('clusters', [])
        
        data = []
        for c in clusters:
            autoscale = c.get('autoscale')
            num_workers = f"{autoscale.get('min_workers')}-{autoscale.get('max_workers')}" if autoscale else str(c.get('num_workers', ''))
            
            data.append({
                'cluster_id': c.get('cluster_id'),
                'cluster_name': c.get('cluster_name'),
                'creator_user_name': c.get('creator_user_name'),
                'state': c.get('state'),
                'start_time_utc': self._ms_to_utc(c.get('start_time')),
                'uptime_hours': self._calculate_duration(c.get('start_time'), 3600000),
                'cluster_source': c.get('cluster_source'),
                'num_workers': num_workers
            })
        
        return pd.DataFrame(data)


In [0]:
monitor = RunningTasks()
active_queries = monitor.get_active_queries()
active_jobs = monitor.get_active_jobs()
active_clusters = monitor.get_active_clusters()

active_queries_filtered = active_queries[active_queries['duration_minutes'] > query_threshold_minutes]
spark_df = spark.createDataFrame(active_queries_filtered)
spark_df.write.mode("overwrite").saveAsTable(f"{output_catalog}.{output_schema}.long_running_queries")

active_jobs_filtered = active_jobs[active_jobs['duration_minutes'] > job_threshold_minutes]
spark_df = spark.createDataFrame(active_jobs_filtered)
spark_df.write.mode("overwrite").saveAsTable(f"{output_catalog}.{output_schema}.long_running_jobs")

active_clusters_filtered = active_clusters[active_clusters['uptime_hours'] > cluster_threshold_hours]
spark_df = spark.createDataFrame(active_clusters_filtered)
spark_df.write.mode("overwrite").saveAsTable(f"{output_catalog}.{output_schema}.long_running_clusters")