In [2]:
import json
import subprocess
from copy import deepcopy


def get_all_clusters_id() -> dict:
    # retrieve a dict of [clusterid:clustername]
    clusters = dict()
    p = subprocess.Popen('databricks clusters list', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    for line in p.stdout.readlines():
        # clean returned objects
        output = list(filter(None, str(line, 'utf-8').split(" ")))
        cluster_id, cluster_name = output[0], output[1]
        # add into dict
        clusters[cluster_id] = cluster_name
    retval = p.wait()
    return clusters


def get_cluster_info(cluster_id: str) -> dict:
    # gets single cluster metadata
    p_res = subprocess.check_output(
        ["databricks", "clusters", "get", "--cluster-id", cluster_id])
    res_json = json.loads(p_res)
    return res_json


def modify_tag_json(cluster_original_json: dict, custom_tag: dict) -> dict:
    # given single cluster original metadata, apply and return custom_tag onto json obj
    modified_json = deepcopy(cluster_original_json)
    modified_json["custom_tags"] = custom_tag
    return modified_json


def patch_cluster(cluster_id: str, json_obj: dict) -> None:
    # update single cluster
    p_res = subprocess.check_output(
        ["databricks", "clusters", "edit", "--json", json.dumps(json_obj, indent=2).encode('utf-8')])
    return None

In [3]:
get_all_clusters_id()

{'0114-053019-fmho5ft6': 'rocinante',
 '0308-083108-873mkxez': 'job-1128-run-12573-get-baby-names_cluster',
 '0223-093121-5hpja6sw': 'job-1044-run-11822',
 '0223-083659-28gbz6gt': 'job-960-run-11668',
 '0223-075854-go8a51nk': 'job-848-run-11515'}

In [5]:
custom_tags = {
    'costcenter': 'sales',
    'randomtag1': 'val1',
    'randomtag2': 'val2',
    }

In [20]:
all_clusters = get_all_clusters_id()
print(list(all_clusters.values())[0])

rocinante


In [21]:
for k, v in all_clusters.items():
    print(k,v)

0114-053019-fmho5ft6 rocinante
0308-083108-873mkxez job-1128-run-12573-get-baby-names_cluster
0223-093121-5hpja6sw job-1044-run-11822
0223-083659-28gbz6gt job-960-run-11668
0223-075854-go8a51nk job-848-run-11515


In [23]:
original_json = get_cluster_info(cluster_id="0114-053019-fmho5ft6")
print(original_json)

{'cluster_id': '0114-053019-fmho5ft6', 'spark_context_id': 8844868844272713892, 'cluster_name': 'rocinante', 'spark_version': '9.1.x-scala2.12', 'spark_conf': {'spark.databricks.delta.preview.enabled': 'true'}, 'node_type_id': 'Standard_DS3_v2', 'driver_node_type_id': 'Standard_DS3_v2', 'spark_env_vars': {'PYSPARK_PYTHON': '/databricks/python3/bin/python3'}, 'autotermination_minutes': 30, 'enable_elastic_disk': True, 'disk_spec': {}, 'cluster_source': 'UI', 'enable_local_disk_encryption': False, 'azure_attributes': {'first_on_demand': 1, 'availability': 'ON_DEMAND_AZURE', 'spot_bid_max_price': -1.0}, 'instance_source': {'node_type_id': 'Standard_DS3_v2'}, 'driver_instance_source': {'node_type_id': 'Standard_DS3_v2'}, 'state': 'TERMINATED', 'state_message': 'Inactive cluster terminated (inactive for 30 minutes).', 'start_time': 1642138219449, 'terminated_time': 1647320715498, 'last_state_loss_time': 1647317132192, 'last_restarted_time': 1647317132283, 'autoscale': {'min_workers': 2, 'ma

In [26]:
updated_json = modify_tag_json(original_json,custom_tags) 
print(updated_json)

{'cluster_id': '0114-053019-fmho5ft6', 'spark_context_id': 8844868844272713892, 'cluster_name': 'rocinante', 'spark_version': '9.1.x-scala2.12', 'spark_conf': {'spark.databricks.delta.preview.enabled': 'true'}, 'node_type_id': 'Standard_DS3_v2', 'driver_node_type_id': 'Standard_DS3_v2', 'spark_env_vars': {'PYSPARK_PYTHON': '/databricks/python3/bin/python3'}, 'autotermination_minutes': 30, 'enable_elastic_disk': True, 'disk_spec': {}, 'cluster_source': 'UI', 'enable_local_disk_encryption': False, 'azure_attributes': {'first_on_demand': 1, 'availability': 'ON_DEMAND_AZURE', 'spot_bid_max_price': -1.0}, 'instance_source': {'node_type_id': 'Standard_DS3_v2'}, 'driver_instance_source': {'node_type_id': 'Standard_DS3_v2'}, 'state': 'TERMINATED', 'state_message': 'Inactive cluster terminated (inactive for 30 minutes).', 'start_time': 1642138219449, 'terminated_time': 1647320715498, 'last_state_loss_time': 1647317132192, 'last_restarted_time': 1647317132283, 'autoscale': {'min_workers': 2, 'ma

In [27]:
patch_cluster(cluster_id="0114-053019-fmho5ft6", json_obj=updated_json)