In [0]:
dbutils.widgets.removeAll()

In [0]:
dbutils.widgets.text("config", "/Workspace/Users/alex.lopes@databricks.com/WorkspaceInventory/conf/global_init_scripts.yml")

In [0]:
widgets = dbutils.widgets.getAll()

In [0]:
from databricks.sdk import WorkspaceClient, AccountClient
from pprint import pprint
import yaml
import traceback
from pyspark.sql.functions import *

In [0]:
print("Config:", widgets["config"])
with open(widgets["config"], "r") as f:
  config = yaml.safe_load(f.read())
pprint(config)

Config: /Workspace/Users/alex.lopes@databricks.com/WorkspaceInventory/conf/global_init_scripts.yml
[{'api': {'call': 'w.global_init_scripts.list',
          'write': {'mode': 'overwrite',
                    'table': 'shared.bigc.global_init_scripts'}}}]


In [0]:
def get_api_call(conf):
  api_func = conf['call'].split(".")
  print(api_func)

  objs = []
  keys = {}
  if api_func[0] == 'w':
    w = WorkspaceClient()
    objs.append(w)
    keys["workspace_id"] = w.get_workspace_id()
    user = w.current_user.me().as_dict()
    keys["execution_user_name"] = user["displayName"]
    keys["execution_user_id"] = user["id"]
  elif api_func[0] == 'a':
    objs.append(AccountClient()) 


  for i, f in enumerate(api_func[1:]):
    objs.append(getattr(objs[i], f))
  return objs, keys


In [0]:
def create_df(df_result, rows, i, schema):
  if (i + 1) % 1000 == 0:
      print("Sending to DF data", i)
      tmp_df = spark.createDataFrame(rows, schema)
      if df_result is None:
        df_result = tmp_df
      else:
        df_result = df_result.unionAll(tmp_df)
      rows = []
  return df_result, rows

def execute_and_save(conf, objs):
  df_result = None  
  rows = []
  i = 0
  items = objs[-1](*conf.get("cargs", []), **conf.get("kargs", {}))
  items = items if type(items) == iter else iter(items)
  while True:
    try:
      row = next(items).as_dict()  
    except StopIteration:
      break
    except Exception as e:
      print("Error in one object:", e, traceback.format_exc())
      i += 1
      continue    
    else:
      if i == 0: 
        pprint(row)
        schema = ",".join([i + " string" for i in row.keys()])    
      rows.append(row)
      df_result, rows = create_df(df_result, rows, i, schema)    
      i += 1
  
  print("Total items found:", i)
  if i > 0:
    tmp_df = spark.createDataFrame(rows, schema)
    df_result = df_result.unionAll(tmp_df) if df_result is not None else tmp_df 
  
  return df_result
  

In [0]:
def write_to_table(conf, df, keys):
  df = df.withColumn("api_updated_at", current_timestamp())
  for key, value in keys.items():
    df = df.withColumn("api_" + key, lit(value))

  df = df.write
  write_conf = conf['write']
  if "mode" in write_conf:
    df = df.mode(write_conf['mode'])
  if "options" in write_conf:
    df = df.options(**write_conf['options'])

  df.saveAsTable(write_conf['table'])

In [0]:
for conf in config:
  conf = conf['api']
  print("Executing:", conf["call"], "Table:", conf["write"]["table"])
  objs, keys = get_api_call(conf)  
  df = execute_and_save(conf, objs)
  if df:
    write_to_table(conf, df, keys)
  else:
    print("No data to save")

Executing: w.global_init_scripts.list Table: shared.bigc.global_init_scripts
['w', 'global_init_scripts', 'list']
Total items found: 0
No data to save
