# Simple Observability 

_Created by Eumar Assis (eumar.assis@databricks.com_)

This notebook computes and consolidates observability metrics for the Lakehouse into an unified Delta table, making it easy to build operational dashboards. It provides the following insights:

- Inventory of all tables, including their format and storage location
- Table size and growth over time
- Data Freshness: tables change dates
- Last optimization and vacuum dates



**How does it work?** The notebook queries the [system.information_schema](https://docs.databricks.com/en/sql/language-manual/sql-ref-information-schema.html) table based on the provided regular expression to find the desired tables. Then, it uses [DESCRIBE DETAIL](https://docs.databricks.com/en/delta/table-details.html) and [DESCRIBE HISTORY](https://docs.databricks.com/en/sql/language-manual/delta-describe-history.html) for each table to collect the appropriate metrics. The notebook uses a temporary table to make the code idempotent. It will only replace the records in the destination table at the end with an atomic MERGE.

Based on initial tests, the notebook script was able to process 314 tables in 40 minutes by using a 3-nodes cluster with 32GB nodes. Choosing a larger master node and more workers is likely to improve the performance and run the script faster.

**Requirements:**

- User running this notebook will need the USE CATALOG, USE SCHEMA, and SELECT permissions on tables targeted for the script.

**Parameters:**

- **destination_table**: provide the name of the destination table, must use a three-level namespace. e.g. catalog_name.schema_name.table_name
- **filter**: regular expression to filter which catalogs, schemas, or tables the metrics should be collected for. Examples:
  - All schemas and tables under catalog 'demo': `^demo\.`
  - All tables under catalog 'demo' and schema 'dbdemo': `^demo\.dbdemo\.`
  - Single table 'tabledemo': `demo.dbdemo.tabledemo`

**Output Table:**


| Column Name             | Type       | Description                            |
|-------------------------|------------|----------------------------------------|
| `timestamp`             | TIMESTAMP  | The timestamp of the record            |
| `catalogName`           | STRING     | The name of the catalog                |
| `schemaName`            | STRING     | The name of the schema                 |
| `tableName`             | STRING     | The name of the table                  |
| `creationDate`          | TIMESTAMP  | The creation date of the table         |
| `createdBy`             | STRING     | The creator of the table               |
| `owner`                 | STRING     | The owner of the table                 |
| `type`                  | STRING     | The type of the table                  |
| `format`                | STRING     | The format of the table                |
| `storageLocation`       | STRING     | The storage location of the table      |
| `lastVacuumDate`        | TIMESTAMP  | The date of the last vacuum operation  |
| `lastOptimizeDate`      | TIMESTAMP  | The date of the last optimize operation|
| `lastSchemaChangeDate`  | TIMESTAMP  | The date of the last schema change     |
| `lastWriteDate`         | TIMESTAMP  | The date of the last write operation   |
| `sizeInMB`              | DOUBLE     | The size of the table in MB            |
| `sizeInMB24Hours`       | DOUBLE     | The size of the table in the last 24 hours |
| `sizeInMB7Days`         | DOUBLE     | The size of the table in the last 7 days   |
| `sizeInMb30Days`        | DOUBLE     | The size of the table in the last 30 days  |
| `partitionColumns`      | STRING     | The partition columns of the table     |
| `clusteringColumns`     | STRING     | The clustering columns of the table    |
| `sizeHistory`           | STRING     | This size history, storing a JSON array     |


In [None]:

dbutils.widgets.text("filter", "^catalogName\.", "Table Filter Regex")

dbutils.widgets.text("destination_table", "eumar_tests.eumar_default.simple_observability", "Destination table")

In [None]:
from datetime import datetime

# Get the destination table name from the widget
destination_table = dbutils.widgets.get("destination_table")

temporary_table = destination_table + "_temp_processing_" + datetime.now().strftime('%Y%m%d%H')

# Creates the destination Delta table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {destination_table}
(
  timestamp TIMESTAMP,
  catalogName STRING,
  schemaName STRING,
  tableName STRING,
  creationDate TIMESTAMP,
  createdBy STRING,
  owner STRING,
  type STRING,
  format STRING,
  storageLocation STRING,
  lastVacuumDate TIMESTAMP,
  lastOptimizeDate TIMESTAMP,
  lastSchemaChangeDate TIMESTAMP,
  lastWriteDate TIMESTAMP,
  sizeInMB DOUBLE,
  sizeInMB24Hours DOUBLE,
  sizeInMB7Days DOUBLE,
  sizeInMb30Days DOUBLE,
  partitionColumns STRING,
  clusteringColumns STRING,
  sizeHistory STRING  -- This column will store the JSON data
)
USING delta
""")

# Create a temporary table with the same schema as the original table
spark.sql(f"SELECT * FROM {destination_table} LIMIT 0").write.mode("overwrite").saveAsTable(temporary_table)



To find the targeted tables, we will query and filter the system.information_schema table using the Regex. Then, we'll insert the target tables into the temporary table.
https://docs.databricks.com/en/sql/language-manual/sql-ref-information-schema.html

In [None]:
%sql
select distinct table_type from system.information_schema.tables

In [None]:
# Get the value of the widget
table_filter_regex = dbutils.widgets.get("filter")

# Define query to insert filtered results into the temp table
sql_query = f"""
INSERT INTO {temporary_table} 
  (
    timestamp,
    catalogName,
    schemaName,
    tableName,
    creationDate,
    createdBy,
    owner,
    lastSchemaChangeDate,    
    type,
    format,
    sizeHistory
  )
SELECT
  CURRENT_DATE(),
  table_catalog,
  table_schema,
  table_name,
  created,
  created_by,  
  table_owner,
  last_altered,
  table_type,
  data_source_format,
  to_json(array())

FROM system.information_schema.tables
WHERE CONCAT(table_catalog, '.', table_schema, '.', table_name) RLIKE r'{table_filter_regex}'
AND table_owner != 'System user' AND table_type IN ('MANAGED', 'EXTERNAL' )
"""

# Execute the SQL query
spark.sql(sql_query)

display(spark.sql(f"SELECT COUNT(*) as records_to_process FROM {temporary_table}"))


In [None]:
from datetime import datetime
from pyspark.sql.functions import col
import json

# Define a function to filter and get the first timestamp for a given set of operations
def get_first_timestamp(operations, history_array):
    filtered = [row["timestamp"] for row in history_array if row["operation"] in operations]
    return filtered[0] if filtered else None
  
# this cannot be an UDF since UDFs cannot access the spark context
def get_update_history(catalog, schema, table, table_format):

  details = spark.sql(f"DESCRIBE DETAIL `{catalog}`.`{schema}`.`{table}`").collect()[0]

  last_vacuum_date = None
  last_optimize_date = None
  last_write_date = None

  if table_format == "DELTA":
    history = spark.sql(f"DESCRIBE HISTORY `{catalog}`.`{schema}`.`{table}`").orderBy("timestamp", ascending=False).collect()
    
    #Extract the required columns
    # vacuum_date_row = history.filter(col("operation").isin(["VACUUM"])).select("timestamp").first()
    # optimize_date_row = history.filter(col("operation").isin(["OPTIMIZE"])).select("timestamp").first()
    # last_write_date_row = history.filter(col("operation").isin(["WRITE", "MERGE", "DELETE", "UPDATE"])).select("timestamp").first()

    last_vacuum_date = get_first_timestamp(["VACUUM"], history)
    last_optimize_date = get_first_timestamp(["OPTIMIZE"], history)
    last_write_date = get_first_timestamp(["WRITE", "MERGE", "DELETE", "UPDATE"], history)

  size_in_mb = 0 if details["sizeInBytes"] is None else details["sizeInBytes"] / (1024 * 1024)
  partition_columns = ",".join(details["partitionColumns"]) if details["partitionColumns"] is not None else ""
  clustering_columns = ",".join(details["clusteringColumns"]) if details["clusteringColumns"] is not None else ""
  storage_location = details["location"]


  return {
        "lastVacuumDate": last_vacuum_date,
        "lastOptimizeDate": last_optimize_date,
        "lastWriteDate": last_write_date,
        "sizeInMB": float(size_in_mb),
        "storageLocation": storage_location,
        "partitionColumns": partition_columns,
        "clusteringColumns": clustering_columns,
        "sizeHistory" : json.dumps([{
          "size" : float(size_in_mb),
          "date" : datetime.now().strftime('%Y-%m-%d')
        }])
  }


In [None]:
import concurrent.futures

# Get the DataFrame
df = spark.sql(f"SELECT * FROM {temporary_table}")

# Create a new list to store the updated rows
updated_rows = []

rows_processed = 0

rows_collect = df.collect()

print(f"Getting history for tables. Total tables: {len(rows_collect)}")

# Define a function to process each row and update the columns
def process_row(row):
    try:
        update_history = get_update_history(row['catalogName'], row['schemaName'], row['tableName'], row['format'])

        updated_row = row.asDict()

        for key in update_history.keys():
            updated_row[key] = update_history[key]

        return updated_row

    except Exception as e:
        # Code to handle any other exceptions
        print(f"{row['catalogName']}.{row['schemaName']}.{row['tableName']} failed")
        print("An error occurred:", str(e))

# Create a ThreadPoolExecutor with a maximum of 5 threads
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    # Submit each row to the executor for processing
    futures = [executor.submit(process_row, row) for row in rows_collect]

    # Iterate over the completed futures and get the updated rows
    for future in concurrent.futures.as_completed(futures):
        updated_row = future.result()
        if updated_row:
            updated_rows.append(updated_row)
            rows_processed += 1
            print(f"({rows_processed}) {updated_row['catalogName']}.{updated_row['schemaName']}.{updated_row['tableName']} completed")

# Create a new DataFrame with the updated rows
updated_df = spark.createDataFrame(updated_rows, df.schema)

# Save the changes in bulk
updated_df.write.mode("overwrite").insertInto(temporary_table)

In [None]:
sql_query = f"""
MERGE INTO {destination_table} AS d
USING {temporary_table} AS i
ON d.catalogName = i.catalogName
   AND d.schemaName = i.schemaName
   AND d.tableName = i.tableName
WHEN MATCHED THEN
  UPDATE SET
    d.timestamp = CURRENT_DATE(),
    d.creationDate = i.creationDate,
    d.createdBy = i.createdBy,
    d.lastSchemaChangeDate = i.lastSchemaChangeDate,
    d.owner = i.owner,
    d.type = i.type,
    d.format = i.format,
    d.sizeInMB = i.sizeInMB,
    d.storageLocation = i.storageLocation,
    d.lastVacuumDate = i.lastVacuumDate,
    d.lastOptimizeDate = i.lastOptimizeDate,
    d.lastWriteDate = i.lastWriteDate,
    d.partitionColumns = i.partitionColumns,
    d.clusteringColumns = i.clusteringColumns,
    d.sizeHistory = to_json(array_union(from_json(d.sizeHistory, 'array<struct<date:string,size:double>>'),
                                        from_json(i.sizeHistory, 'array<struct<date:string,size:double>>')))
WHEN NOT MATCHED THEN
  INSERT (
    timestamp,
    catalogName,
    schemaName,
    tableName,
    creationDate,
    createdBy,
    lastSchemaChangeDate,
    owner,
    type,
    format,
    sizeInMB,
    storageLocation,
    lastVacuumDate,
    lastOptimizeDate,
    lastWriteDate,
    partitionColumns,
    clusteringColumns,    
    sizeHistory
  )
  VALUES (
    CURRENT_DATE(),
    i.catalogName,
    i.schemaName,
    i.tableName,
    i.creationDate,
    i.createdBy,
    i.lastSchemaChangeDate,
    i.owner,
    i.type,
    i.format,
    i.sizeInMB,
    i.storageLocation,
    i.lastVacuumDate,
    i.lastOptimizeDate,
    i.lastWriteDate,
    i.partitionColumns,
    i.clusteringColumns,    
    i.sizeHistory
  )
"""

display(spark.sql(sql_query))

In [None]:

sql_query = f"""
MERGE INTO {destination_table} AS target
USING (
  WITH exploded_table AS (
      SELECT 
          t.catalogName,
          t.schemaName,
          t.tableName,
          explode(from_json(t.sizeHistory, 'array<struct<date:string, size:double>>')) AS sizeHistoryElement
      FROM 
          {destination_table} t
      INNER JOIN
        {temporary_table} s
      ON t.catalogName = s.catalogName AND t.schemaName = s.schemaName AND t.tableName = s.tableName
      ORDER BY sizeHistoryElement.date DESC          
  )
  SELECT  
      catalogName,
      schemaName,
      tableName,
      MAX(CASE WHEN date_diff(current_date(), sizeHistoryElement.date) >= 1 THEN sizeHistoryElement.size ELSE NULL END) AS sizeInMB24Hours,
      MAX(CASE WHEN date_diff(current_date(), sizeHistoryElement.date) >= 7 THEN sizeHistoryElement.size ELSE NULL END) AS sizeInMB7Days,
      MAX(CASE WHEN date_diff(current_date(), sizeHistoryElement.date) >= 30 THEN sizeHistoryElement.size ELSE NULL END) AS sizeInMB30Days
  FROM 
      exploded_table
  GROUP BY
      catalogName,
      schemaName,
      tableName
) AS source
ON target.catalogName = source.catalogName AND target.schemaName = source.schemaName AND target.tableName = source.tableName
WHEN MATCHED THEN
  UPDATE SET
    target.sizeInMB24Hours = source.sizeInMB24Hours,
    target.sizeInMB7Days = source.sizeInMB7Days,
    target.sizeInMb30Days = source.sizeInMb30Days;
"""

display(spark.sql(sql_query))

In [None]:
query = f"""
SELECT 
  t.* 
FROM 
  {destination_table} t
INNER JOIN
  {temporary_table} s
  ON t.catalogName = s.catalogName AND t.schemaName = s.schemaName AND t.tableName = s.tableName 
"""

display(spark.sql(query))

In [None]:
spark.sql(f"DROP TABLE {temporary_table}")