### Clean out stage table

In [0]:
%sql

TRUNCATE TABLE dataops_sandbox.tablemetricsstage

### Check Access & Generate Data Tables

In [0]:
# Imports
from pyspark.sql.utils import AnalysisException
from pyspark.sql.utils import ParseException
from pyspark.sql.utils import IllegalArgumentException
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, LongType, DateType
import pyspark.sql.functions as f
import functools

# Config
# spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# -------------------------------------------------------------------------------------------------------------------------------
dftbls = spark.sql("show tables")
dfdbs = spark.sql("show databases")

# Only want metadata from these listed databases below, make sure to comment out if statement if you want just that DB
databaselist = []

# Create DataFrame, assign columns, and puts metadata list into it
schema = StructType([StructField("database", StringType(), True), StructField("tableName", StringType(), True),StructField("rowCount", LongType(), True),
                    StructField("format", StringType(), True),StructField("numFiles", LongType(), True),
                    StructField("sizeInBytes", LongType(), True),StructField("lastModified", TimestampType(), True),StructField("location", StringType(), True),StructField("domainUDP", StringType(), True),
                    StructField("table", StringType(), True),StructField("source", StringType(), True),StructField("lastLoadDate", DateType(), True),StructField("lastLoadCount", LongType(), True),
                    StructField("reportDate", DateType(), True),StructField("s3_root_location", StringType(), True),StructField("columnCount", LongType(), True)])
df = spark.createDataFrame(data=spark.sparkContext.emptyRDD(),schema=schema)

# Shows all tables in the databases above and puts them into a DataFrame
for row in dfdbs.rdd.collect():
#   if row['databaseName'] in databaselist:
    tmp = "show tables from " + row['databaseName']
    dftbls = dftbls.union(spark.sql(tmp))

# Loops through each table and finds your metadata, puts it into a list. Try/catch prevents code from crashing when it runs into a database you don't have access to.
databaseTablePairs = []
for row in dftbls.rdd.collect():
  databaseTablePairs.append([row['database'],row['tableName']])

# Not using .collect() in loops anymore to force data to stay on nodes until the end
dfList = []
df.createOrReplaceTempView('FinalTempView')
for database, tableName in databaseTablePairs:
  print('Analyzing: ' + database + '.' + tableName)
  if database != 'default':
    # Craft DataFrames for each table metric we are looking for
    # ----------------------
    
    # Row Counts
#     print('Getting Row Counts for: ' + database + '.' + tableName)
    try:
      tmpcntdf = spark.sql('select count(*) rowcnt from ' + database + '.' + tableName)
    except (AnalysisException):
      tmpcntdf = spark.sql('select NULL rowcnt')
    except Exception as e:
      if 'java.io.FileNotFoundException' in str(e):
        tmpcntdf = spark.sql('select NULL rowcnt')
        
    # Table Details
#     print('Getting Table Details for: ' + database + '.' + tableName)
    try:
      tmpdeetsdf = spark.sql('DESCRIBE DETAIL ' + database + '.' + tableName)
    except (AnalysisException):
      tmpdeetsdf = spark.sql('SELECT NULL format, NULL id, NULL name, NULL description, NULL location, NULL createdAt, NULL lastModified, NULL partitionColumns,NULL numFiles, NULL sizeInBytes, NULL properties, NULL minReaderVersion, NULL minWriterVersion')
    except Exception as e:
      if 'java.io.FileNotFoundException' in str(e):
        tmpdeetsdf = spark.sql('SELECT NULL format, NULL id, NULL name, NULL description, NULL location, NULL createdAt, NULL lastModified, NULL partitionColumns,NULL numFiles, NULL sizeInBytes, NULL properties, NULL minReaderVersion, NULL minWriterVersion')
        
    # UDP columns
#     print('Parsing UDP columns from: ' + database + '.' + tableName)
    try:
      tmpsourcedf = spark.sql('SELECT DISTINCT domain domainUDP, table, source FROM ' + database + '.' + tableName + ' LIMIT 1')
    except (AnalysisException):
      tmpsourcedf = spark.sql('SELECT NULL domainUDP, NULL table, NULL source')
    except Exception as e:
      if 'java.io.FileNotFoundException' in str(e):
        tmpsourcedf = spark.sql('SELECT NULL domainUDP, NULL table, NULL source')
        
    # Last load dates and counts
#     print('Getting Last Load Dates/Counts for: ' + database + '.' + tableName)
    try:
      tmploaddf = spark.sql('SELECT DISTINCT CAST(eventTimestamp AS DATE) lastLoadDate, COUNT(*) lastLoadCount from ' + database + '.' + tableName + ' GROUP by CAST(eventTimestamp AS DATE) ORDER BY lastLoadDate DESC LIMIT 1')
    except (AnalysisException):
      tmploaddf = spark.sql('SELECT NULL lastLoadDate, NULL lastLoadCount')
    except Exception as e:
      if 'java.io.FileNotFoundException' in str(e):
        tmploaddf = spark.sql('SELECT NULL lastLoadDate, NULL lastLoadCount')
        
    # Report Date
#     print('Getting Current Date')
    currentdatedf = spark.sql('SELECT CAST(CURRENT_TIMESTAMP() AS DATE) reportDate')
    
    # S3 Root Location
#     print('Getting S3 Root Location for: ' + database + '.' + tableName)
    try:
      tmps3df = spark.sql('DESCRIBE SCHEMA ' + database).filter(f.col('database_description_item') == 'Location').withColumnRenamed(existing='database_description_value',new='s3_root_location').select('s3_root_location')
    except (AnalysisException):
      tmps3df = spark.sql('SELECT NULL s3_root_location')
      
    try:
      tmpcoldf = spark.sql('SELECT * from ' + database + '.' + tableName + ' LIMIT 1')
      columncount = int(len(tmpcoldf.columns))
    except (AnalysisException):
      columncount = 0
    except Exception as e:
      if 'java.io.FileNotFoundException' in str(e):
        columncount = 0

# Join all the DataFrames
#     print('Joining Data for: ' + database + '.' + tableName)
    joineddf = tmpcntdf.join(tmpdeetsdf,how='full').join(tmpsourcedf,how='full').join(tmploaddf,how='full').join(currentdatedf,how='full').join(tmps3df,how='full').cache()
#     print('Filtering Data for: ' + database + '.' + tableName)
    filtereddf = joineddf.withColumn('database', f.lit(database)).withColumn('tableName', f.lit(tableName)).withColumn('columnCount', f.lit(columncount)).select('database','tableName',joineddf.rowcnt.alias('rowCount'),'format', 'numFiles', 'sizeInBytes','lastModified','location','domainUDP', 'table', 'source', 'lastLoadDate', 'lastLoadCount','reportDate','s3_root_location','columnCount').cache()
    df_mb = filtereddf.withColumn('sizeInMB',f.round((f.col('sizeInBytes') / 1000000),2))
    df_mb = df_mb.withColumn('domain',f.when(f.col('location').rlike('bronze'), f.split(f.col("location"), "/").getItem(4)).otherwise(f.split(f.col("location"), "/").getItem(3))).select('reportDate','domain', 'database','tableName','rowCount','format','lastLoadDate','lastLoadCount', 'numFiles', 'sizeInMB','lastModified','location','domainUDP','table','source','s3_root_location','columnCount')

    df_mb.createOrReplaceTempView('TempView')
    spark.sql('INSERT INTO dataops_sandbox.tablemetricsstage SELECT * FROM TempView')

### Delete Data if there already is some in there for today

In [0]:
%sql

DELETE FROM dataops_sandbox.tablemetrics WHERE reportDate = CAST(CURRENT_TIMESTAMP() AS DATE)

### Insert Daily Data into Delta Table from Stage Table

In [0]:
%sql
INSERT INTO dataops_sandbox.tablemetrics SELECT * FROM dataops_sandbox.tablemetricsstage

### Clean Out Stage Table

In [0]:
%sql
TRUNCATE TABLE dataops_sandbox.tablemetricsstage

### Zone Table Creation

In [0]:
df = spark.sql('''SELECT database,
tableName,
COALESCE(location, s3_root_location) AS s3_location,
CASE WHEN COALESCE(location, s3_root_location) LIKE '%sandbox%' THEN 'Sandbox'
WHEN COALESCE(location, s3_root_location) LIKE '%/refined/%' THEN 'Refined'
WHEN COALESCE(location, s3_root_location) LIKE '%/structured/%' THEN 'Structured Raw'
WHEN COALESCE(location, s3_root_location) LIKE '%/bronze/%' THEN 'Bronze'
WHEN COALESCE(location, s3_root_location) LIKE '%/hive/warehouse%' THEN 'Hive Warehouse/unknown'
WHEN COALESCE(location, s3_root_location) LIKE '%/client/BIIPODS%' THEN 'Structured Raw'
WHEN COALESCE(location, s3_root_location) LIKE '%/px/px_landing/%' THEN 'Structured Raw'
WHEN COALESCE(location, s3_root_location) LIKE '%/source/px/%' THEN 'Structured Raw'
ELSE 'Sandbox'
END AS Zone 
FROM dataops_sandbox.tablemetrics WHERE reportDate = (SELECT DISTINCT reportDate FROm dataops_sandbox.tablemetrics ORDER BY reportDate DESC LIMIT 1)''')

df.write.insertInto("dataops_sandbox.tables_by_zone", overwrite=True)

display(df)


### Domain Table Creations

In [0]:
df_domains = spark.sql('''SELECT DISTINCT COALESCE(domain,domainUDP) AS domain, database, tableName from dataops_sandbox.tablemetrics WHERE reportDate = (SELECT DISTINCT reportDate FROm dataops_sandbox.tablemetrics ORDER BY reportDate DESC LIMIT 1)''')
df_domains.write.insertInto("dataops_sandbox.tables_by_domain", overwrite=True)

display(df_domains)

### Column Count Table

In [0]:
df_columns = spark.sql('''SELECT DISTINCT columnCount AS columnCount, database, tableName from dataops_sandbox.tablemetrics WHERE reportDate = (SELECT DISTINCT reportDate FROm dataops_sandbox.tablemetrics ORDER BY reportDate DESC LIMIT 1)''')
# df_columns.write.format("delta").saveAsTable('dataops_sandbox.columnCount_by_table')
df_columns.write.insertInto("dataops_sandbox.columnCount_by_table", overwrite=True)

display(df_columns)

### Optimization Candidate Table

In [0]:
df_optimize = spark.sql('''SELECT reportDate, database, tableName, numFiles FROM dataops_sandbox.tablemetrics WHERE CAST(numFiles AS INT) > 1000 and reportDate = (SELECT MAX(reportDate) FROM dataops_sandbox.tablemetrics) ORDER BY CAST(numFiles AS INT) DESC''')

df_optimize.write.insertInto("dataops_sandbox.optimizationcandidates", overwrite=True)

### Optimize raw table every so often

In [0]:
df_opt = spark.sql('''DESCRIBE DETAIL dataops_sandbox.tablemetrics''')

display(df_opt)

if df_opt.select('numFiles').collect()[0][0] > 300:
  spark.sql('''OPTIMIZE dataops_sandbox.tablemetrics''')