In [0]:
  import pandas as pd

In [0]:
def column_to_list(df, column):
  df_pdf = df.toPandas()
  df_list = df_pdf[column].tolist()
  return df_list

In [0]:
def get_schemata(catalog_name, schema_name, external_schema_summary):
  print("Getting Schema owner and comments")
  schema_detail = spark.sql(f"SELECT * FROM system.information_schema.schemata WHERE catalog_name =  '{catalog_name}' AND schema_name = '{schema_name}'")
  new_row = {'id': 1,'object_category': "Schema",'object_sub_category': "Schema Definition",'count': schema_detail.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return schema_detail,  external_schema_summary


In [0]:
def get_schema_location(catalog_name, schema_name, external_schema_summary):
  print("Getting Schema location")
  schema_def = spark.sql(f"DESCRIBE SCHEMA `{catalog_name}`.{schema_name}")
  schema_location = schema_def.filter(schema_def['database_description_item'] == 'Location') 
  new_row = {'id': 2,'object_category': "Schema",'object_sub_category': "Schema Location",'count': schema_location.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return schema_location, external_schema_summary

In [0]:
def get_schema_tags(catalog_name, schema_name, external_schema_summary):
  print("Getting Schema tags")
  schema_tags = spark.sql(f"SELECT * FROM system.information_schema.schema_tags WHERE catalog_name =  '{catalog_name}' AND schema_name = '{schema_name}'")
  new_row = {'id': 3,'object_category': "Schema",'object_sub_category': "Schema Tags",'count': schema_tags.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return schema_tags, external_schema_summary

In [0]:
def get_schema_grants(catalog_name, schema_name, external_schema_summary):
  print("Getting Schema grants")
  schema_grants = spark.sql(f"SHOW GRANT ON SCHEMA `{catalog_name}`.{schema_name}")
  schema_grants = schema_grants.filter(schema_grants['ObjectType'] == 'SCHEMA')
  new_row = {'id': 4,'object_category': "Schema",'object_sub_category': "Schema Grants",'count': schema_grants.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return schema_grants, external_schema_summary

In [0]:
def get_unsupported_tables(catalog_name, schema_name, external_schema_summary):
  print("Getting unsupported tables name, description, type, format and owner")
  unsupported_tables = spark.sql(f"SELECT table_name, table_type, data_source_format, table_owner, comment FROM system.information_schema.tables WHERE table_catalog =  '{catalog_name}' AND table_schema = '{schema_name}' and table_type IN ('FOREIGN','STREAMING_TABLE','MATERIALIZED_VIEW','MANAGED_SHALLOW_CLONE','EXTERNAL_SHALLOW_CLONE')")
  new_row = {'id': 5,'object_category': "Tables & Views",'object_sub_category': "Unsupported Tables",'count': unsupported_tables.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return unsupported_tables, external_schema_summary

In [0]:
def get_supported_tables(catalog_name, schema_name, external_schema_summary):
  print("Getting Supported tables name, description, type, format and owner")
  tables = spark.sql(
    f"SELECT table_name, table_type, data_source_format, table_owner, comment FROM system.information_schema.tables WHERE table_catalog =  '{catalog_name}' AND table_schema = '{schema_name}' AND table_type in ('MANAGED','EXTERNAL')")
  new_row = {'id': 6,'object_category': "Tables & Views",'object_sub_category': "Supported Tables",'count': tables.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return tables, external_schema_summary

In [0]:
def get_views(catalog_name, schema_name, external_schema_summary):
  print("Getting view details")
  view_details = spark.sql(
    f"SELECT v.*, t.table_owner, t.comment FROM system.information_schema.views v, system.information_schema.tables t WHERE t.table_catalog =  '{catalog_name}' AND t.table_schema = '{schema_name}' AND t.table_type = 'VIEW' AND v.table_name = t.table_name and v.table_schema = t.table_schema and v.table_catalog = t.table_catalog ")
  new_row = {'id': 7,'object_category': "Tables & Views",'object_sub_category': "Views",'count': view_details.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return view_details, external_schema_summary

In [0]:
def get_tables_views_tags(catalog_name, schema_name, external_schema_summary, table_names, view_names):
  print("Getting tables and views tags")
  table_names_list = column_to_list(table_names,'table_name')
  view_names_list = column_to_list(view_names,'table_name')
  all_table_view_names_list = table_names_list + view_names_list
  all_table_view_names_list = str(all_table_view_names_list)[1:-1]
  tables_views_tags = spark.sql(
    f"SELECT * FROM system.information_schema.table_tags WHERE catalog_name =  '{catalog_name}' AND schema_name = '{schema_name}' AND table_name in ({all_table_view_names_list}) ")
  new_row = {'id': 8,'object_category': "Tables & Views",'object_sub_category': "Tags",'count': tables_views_tags.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return tables_views_tags, external_schema_summary

In [0]:
def get_table_view_columns_comment(catalog_name, schema_name, external_schema_summary, table_names, view_names):
  print("Getting tables and views columns comments")
  table_names_list = column_to_list(table_names,'table_name')
  view_names_list = column_to_list(view_names,'table_name')
  all_table_view_names_list = table_names_list + view_names_list
  all_table_view_names_list = str(all_table_view_names_list)[1:-1]
  table_view_columns_comment = spark.sql(
    f"SELECT table_catalog, table_schema, table_name, column_name, comment FROM system.information_schema.columns WHERE table_catalog =  '{catalog_name}' AND table_schema = '{schema_name}' AND table_name in ({all_table_view_names_list}) and comment is not null ")
  new_row = {'id': 9,'object_category': "Tables & Views",'object_sub_category': "Columns comment",'count': table_view_columns_comment.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return table_view_columns_comment, external_schema_summary

In [0]:
def get_table_view_columns_tags(catalog_name, schema_name, external_schema_summary, table_names, view_names):
  print("Getting tables and views column tags")
  table_names_list = column_to_list(table_names,'table_name')
  view_names_list = column_to_list(view_names,'table_name')
  all_table_view_names_list = table_names_list + view_names_list
  all_table_view_names_list = str(all_table_view_names_list)[1:-1]
  table_view_columns_tags = spark.sql(
    f"SELECT * FROM system.information_schema.column_tags WHERE catalog_name =  '{catalog_name}' AND schema_name = '{schema_name}' AND table_name in ({all_table_view_names_list}) and tag_name is not null ")
  new_row = {'id': 10,'object_category': "Tables & Views",'object_sub_category': "Columns tags",'count': table_view_columns_tags.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return table_view_columns_tags, external_schema_summary

In [0]:
def get_table_view_constraints(catalog_name, schema_name, external_schema_summary, table_names):
  print("Getting tables constraints")
  table_names_list = column_to_list(table_names,'table_name')
  table_names_list = str(table_names_list)[1:-1]
  table_constraints = spark.sql(
    f"SELECT * FROM system.information_schema.table_constraints WHERE table_catalog =  '{catalog_name}' AND table_schema = '{schema_name}' AND table_name in ({table_names_list}) ")
  new_row = {'id': 11,'object_category': "Tables & Views",'object_sub_category': "Constraints",'count': table_constraints.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return table_constraints, external_schema_summary

In [0]:
def get_table_view_grants(catalog_name, schema_name, external_schema_summary, table_names, view_names):
  print("Getting table adn view level grants")
  table_names_list = column_to_list(table_names,'table_name')
  view_names_list = column_to_list(view_names,'table_name')
  all_table_view_names_list = table_names_list + view_names_list
  all_table_view_names_list = str(all_table_view_names_list)[1:-1]
  all_table_view_grants = spark.sql(
    f"SELECT * FROM system.information_schema.table_privileges WHERE table_catalog =  '{catalog_name}' AND table_schema = '{schema_name}' AND table_name in ({all_table_view_names_list}) and inherited_from = 'NONE' ")
  new_row = {'id': 12,'object_category': "Tables & Views",'object_sub_category': "Grants",'count': all_table_view_grants.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return all_table_view_grants, external_schema_summary

In [0]:
def get_unsupported_volumes(catalog_name, schema_name, external_schema_summary):
  print("Getting unsupported volumes name, owner, location, description")
  unsupported_volumes = spark.sql(f"SELECT * FROM system.information_schema.volumes WHERE volume_catalog =  '{catalog_name}' AND volume_schema = '{schema_name}' and volume_type IN ('MANAGED')")
  new_row = {'id': 13,'object_category': "Volumes",'object_sub_category': "Unsupported Volumes",'count': unsupported_volumes.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return unsupported_volumes, external_schema_summary

In [0]:
def get_supported_volumes(catalog_name, schema_name, external_schema_summary):
  print("Getting supported volumes name, owner, location, description")
  supported_volumes = spark.sql(f"SELECT * FROM system.information_schema.volumes WHERE volume_catalog =  '{catalog_name}' AND volume_schema = '{schema_name}' and volume_type IN ('EXTERNAL')")
  new_row = {'id': 14,'object_category': "Volumes",'object_sub_category': "Supported Volumes",'count': supported_volumes.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return supported_volumes, external_schema_summary

In [0]:
def get_volume_tags(catalog_name, schema_name, external_schema_summary, volume_names):
  print("Getting volumes tags")
  volumes_names_list = column_to_list(volume_names,'volume_name')
  volumes_names_list = str(volumes_names_list)[1:-1]
  volume_tags = spark.sql(
    f"SELECT * FROM system.information_schema.volume_tags WHERE catalog_name =  '{catalog_name}' AND schema_name = '{schema_name}' AND volume_name in ({volumes_names_list}) ")
  new_row = {'id': 15,'object_category': "Volumes",'object_sub_category': "Tags",'count': volume_tags.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return volume_tags, external_schema_summary

In [0]:
def get_volume_grants(catalog_name, schema_name, external_schema_summary, volume_names):
  print("Getting volume grants")
  volumes_names_list = column_to_list(volume_names,'volume_name')
  volumes_names_list = str(volumes_names_list)[1:-1]
  volume_grants = spark.sql(
    f"SELECT * FROM system.information_schema.volume_privileges WHERE volume_catalog =  '{catalog_name}' AND volume_schema = '{schema_name}' AND volume_name in ({volumes_names_list}) and inherited_from = 'NONE' ")
  new_row = {'id': 16,'object_category': "Volumes",'object_sub_category': "Grants",'count': volume_grants.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return volume_grants, external_schema_summary

In [0]:
def get_unsupported_functions(catalog_name, schema_name, external_schema_summary):
  print("Getting unsupported functions name, owner, location, description")
  spark.sql(f"use catalog `{catalog_name}`;")
  unsupported_functions = spark.sql(f"SHOW USER FUNCTIONS IN {schema_name}")
  unsupported_functions = unsupported_functions.filter(unsupported_functions['function'] != 'getargument')
  new_row = {'id': 17,'object_category': "Functions",'object_sub_category': "Unsupported Functions",'count': unsupported_functions.count()}
  external_schema_summary = pd.concat([external_schema_summary, pd.DataFrame([new_row])], ignore_index=True)
  return unsupported_functions, external_schema_summary

In [0]:
def generate_schema_scripts(catalog_name, schema_name, schema_root_location, schema_tags, schema_grants, schema_details):
  schema_scripts = []
  print("Creating managed schema script")
  schema_scripts.append(f"CREATE SCHEMA IF NOT EXISTS `{catalog_name}`.{schema_name} MANAGED LOCATION '{schema_root_location}';")
  print("Creating schema tags script")
  for tags in schema_tags.collect():
    schema_scripts.append(f"ALTER SCHEMA `{catalog_name}`.{schema_name} SET TAGS ('{tags[2]}' = '{tags[3]}');")
  print("Creating schema grants script")
  for grant in schema_grants.collect():
    schema_scripts.append(f"GRANT {grant[1]} ON {grant[2]} `{catalog_name}`.{schema_name} TO `{grant[0]}`;")
  print("Creating schema comments and owner script")
  for schema in schema_details.collect():
    schema_scripts.append(f"COMMENT ON SCHEMA `{catalog_name}`.{schema_name} IS '{schema[3]}';")
    schema_scripts.append(f"ALTER SCHEMA `{catalog_name}`.{schema_name} SET OWNER TO `{schema[2]}`;")
  return schema_scripts

In [0]:
def generate_table_view_scripts(catalog_name, schema_name,backup_schema_name, tables, views, table_view_tags, column_comments, column_tags, table_view_grants ):
  table_view_scripts = []
  delta_tables = tables.filter(tables['DATA_SOURCE_FORMAT'] == 'DELTA')
  non_delta_tables = tables.filter(tables['DATA_SOURCE_FORMAT'] != 'DELTA')
  print("Creating delta clone table and cluster by script ")
  for table in delta_tables.collect():
    table_view_scripts.append(f"CREATE OR REPLACE TABLE `{catalog_name}`.{schema_name}.{table[0]} CLONE `{catalog_name}`.{backup_schema_name}.{table[0]};")
    table_view_scripts.append(f"ALTER TABLE `{catalog_name}`.{schema_name}.{table[0]} CLUSTER BY AUTO;")

  print("Creating drop and create non delta external table script")
  for table in non_delta_tables.collect():
    table_view_scripts.append(f"DROP TABLE `{catalog_name}`.{backup_schema_name}.{table[0]};")
    table_view_scripts.append(spark.sql(f"SHOW CREATE TABLE `{catalog_name}`.{schema_name}.{table[0]}").collect()[0][0])

  print("Creating view script")
  for view in views.collect():
    table_view_scripts.append(f"CREATE OR REPLACE VIEW `{catalog_name}`.{schema_name}.{view[2]} AS {view[3]};")
  
  print("Creating table tags script")
  table_tags = table_view_tags.join(tables, tables['table_name'] == table_view_tags['table_name'], 'inner')
  for tags in table_tags.collect():
    table_view_scripts.append(f"ALTER TABLE `{catalog_name}`.{schema_name}.{tags[2]} SET TAGS ('{tags[3]}' = '{tags[4]}');")
  
  print("Creating view tags script")
  view_tags = table_view_tags.join(views, views['table_name'] == table_view_tags['table_name'], 'inner')
  for tags in view_tags.collect():
    table_view_scripts.append(f"ALTER VIEW `{catalog_name}`.{schema_name}.{tags[2]} SET TAGS ('{tags[3]}' = '{tags[4]}');")

  #print("Creating table and view column comments script")
  #for comment in column_comments.collect():
    #table_view_scripts.append(f"COMMENT ON COLUMN `{catalog_name}`.{schema_name}.{comment[2]}.{comment[3]} IS '{comment[4]}';")
  
  print("Creating table and views column tags script")
  table_column_tags = column_tags.join(tables, tables['table_name'] == column_tags['table_name'], 'inner')
  for tags in table_column_tags.collect():
    table_view_scripts.append(f"ALTER TABLE `{catalog_name}`.{schema_name}.{tags[2]} ALTER COLUMN {tags[3]} SET TAGS ('{tags[4]}' = '{tags[5]}');")
  view_column_tags = column_tags.join(views, views['table_name'] == column_tags['table_name'], 'inner')
  for tags in view_column_tags.collect():
    table_view_scripts.append(f"ALTER TABLE `{catalog_name}`.{schema_name}.{tags[2]} ALTER COLUMN {tags[3]} SET TAGS ('{tags[4]}' = '{tags[5]}');")

  print("Creating table grants script")
  table_grants = table_view_grants.join(tables, tables['table_name'] == table_view_grants['table_name'], 'inner')
  for grant in table_grants.collect():
    table_view_scripts.append(f"GRANT {grant[5]} ON TABLE `{grant[2]}`.{grant[3]}.{grant[4]} TO `{grant[1]}`;")
  
  print("Creating view grants script")
  view_grants = table_view_grants.join(views, views['table_name'] == table_view_grants['table_name'], 'inner')
  for grant in table_grants.collect():
    table_view_scripts.append(f"GRANT {grant[5]} ON VIEW `{grant[2]}`.{grant[3]}.{grant[4]} TO `{grant[1]}`;")
  
  print("Creating table owner and comment script")
  for table in tables.collect():
    #table_view_scripts.append(f"COMMENT ON TABLE `{catalog_name}`.{schema_name}.{table[0]} IS '{table[4]}';")
    table_view_scripts.append(f"ALTER TABLE `{catalog_name}`.{schema_name}.{table[0]} SET OWNER TO `{table[3]}`;")
  
  print("Creating view owner and comment script")
  for view in views.collect():
    #table_view_scripts.append(f"COMMENT ON VIEW `{catalog_name}`.{schema_name}.{view[2]} IS '{view[10]}';")
    table_view_scripts.append(f"ALTER VIEW `{catalog_name}`.{schema_name}.{view[2]} SET OWNER TO `{view[9]}`;")

  return table_view_scripts

In [0]:
def generate_volume_scripts(catalog_name, schema_name,backup_schema_name, volumes, volume_tags, volume_grants ):
  volume_scripts =[]
  print("Creating drop create volume with comments and owner script")
  for volume in volumes.collect():
    volume_scripts.append(f"DROP VOLUME `{catalog_name}`.{backup_schema_name}.{volume[2]};")
    volume_scripts.append(f"CREATE EXTERNAL VOLUME IF NOT EXISTS `{catalog_name}`.{schema_name}.{volume[2]} LOCATION '{volume[6]}';")
    volume_scripts.append(f"COMMENT ON VOLUME `{catalog_name}`.{schema_name}.{volume[2]} IS '{volume[5]}';")
    volume_scripts.append(f"ALTER VOLUME `{catalog_name}`.{schema_name}.{volume[2]} SET OWNER TO `{volume[4]}`;")
  
  print("Creating volume tags script")
  for tags in volume_tags.collect():
    volume_scripts.append(f"ALTER VOLUME `{catalog_name}`.{schema_name}.{tags[2]} SET TAGS ('{tags[3]}' = '{tags[4]}');")

  print("Creating volume grants script")
  for grant in volume_grants.collect():
    volume_scripts.append(f"GRANT {grant[5]} ON VOLUME `{grant[2]}`.{grant[3]}.{grant[4]} TO `{grant[1]}`;")
  return volume_scripts

In [0]:
def count_files(path: str):
    total_files = 0
    items = dbutils.fs.ls(path)
    for item in items:
        if item.isDir():
            total_files += count_files(item.path)
        else:
            total_files += 1
    return total_files

In [0]:
def data_count(catalog_name, schema_name, tables, views, volumes):
  data_count = pd.DataFrame(columns=["catalog_name","schema_name", "object_name", "count"])
  for table in tables.collect():
    size = spark.sql(f"SELECT count(*) FROM `{catalog_name}`.{schema_name}.{table[0]}").collect()[0][0]
    new_row = {'catalog_name': f"{catalog_name}",'schema_name': f"{schema_name}",'object_name': f"{table[0]}",'count': size}
    data_count = pd.concat([data_count, pd.DataFrame([new_row])], ignore_index=True)
  for view in views.collect():
    size = spark.sql(f"SELECT count(*) FROM `{catalog_name}`.{schema_name}.{view[2]}").collect()[0][0]
    new_row = {'catalog_name': f"{catalog_name}",'schema_name': f"{schema_name}",'object_name': f"{view[2]}",'count': size}
    data_count = pd.concat([data_count, pd.DataFrame([new_row])], ignore_index=True)
  for volume in volumes.collect():
    path = f"/Volumes/{volume[0]}/{volume[1]}/{volume[2]}/"
    total_files = 0
    items = dbutils.fs.ls(path)
    for item in items:
        if item.isDir():
            total_files += count_files(item.path)
        else:
            total_files += 1
    new_row = {'catalog_name': f"{catalog_name}",'schema_name': f"{schema_name}",'object_name': f"{volume[2]}",'count': total_files}
    data_count = pd.concat([data_count, pd.DataFrame([new_row])], ignore_index=True)
  return data_count



In [0]:
def add_prefix_to_columns(df, prefix):
    df.columns = [f"{prefix}{col}" for col in df.columns]
    return df