# Refresh Metadata KPIs Cache

This notebook computes metadata health KPIs and stores them in a cache table for fast retrieval.

**Note:** KPIs are computed only for the catalogs defined in `ALLOWED_CATALOGS` in the configuration cell.

**Schedule this as a Databricks Job to run:**
- Hourly during business hours
- Or daily at a specific time

**Cache Table:** `asda_metadata_rampup.metadata_population.metadata_kpis`


In [None]:
# Configuration
STAGING_CATALOG = "asda_metadata_rampup"
STAGING_SCHEMA = "metadata_population"
CACHE_TABLE = "metadata_kpis"
SUGGESTIONS_TABLE = "metadata_suggestions"

# Catalogs to include in KPI computation
ALLOWED_CATALOGS = [
    "example_cat_1",
    "example_cat_2",
]

# Build SQL IN clause for filtering
CATALOGS_SQL_LIST = ", ".join([f"'{c}'" for c in ALLOWED_CATALOGS])

cache_full_name = f"{STAGING_CATALOG}.{STAGING_SCHEMA}.{CACHE_TABLE}"
suggestions_full_name = f"{STAGING_CATALOG}.{STAGING_SCHEMA}.{SUGGESTIONS_TABLE}"

print(f"üìä Refreshing KPIs cache: {cache_full_name}")
print(f"üìÇ Filtering to catalogs: {', '.join(ALLOWED_CATALOGS)}")
print(f"‚è∞ Started at: {spark.sql('SELECT current_timestamp()').collect()[0][0]}")


## Step 1: Compute Metadata Coverage Statistics


In [None]:
# Get total tables across allowed catalogs only
total_tables_df = spark.sql(f"""
    SELECT COUNT(DISTINCT CONCAT(table_catalog, '.', table_schema, '.', table_name)) as total_tables
    FROM system.information_schema.tables
    WHERE table_schema NOT IN ('information_schema', 'system')
    AND table_catalog IN ({CATALOGS_SQL_LIST})
""")

total_tables = total_tables_df.collect()[0]['total_tables']
print(f"üìã Total tables in allowed catalogs: {total_tables}")


In [None]:
# Get tables with descriptions
tables_with_desc_df = spark.sql(f"""
    SELECT COUNT(DISTINCT CONCAT(table_catalog, '.', table_schema, '.', table_name)) as tables_with_desc
    FROM system.information_schema.tables
    WHERE table_schema NOT IN ('information_schema', 'system')
    AND table_catalog IN ({CATALOGS_SQL_LIST})
    AND comment IS NOT NULL AND comment != ''
""")

tables_with_desc = tables_with_desc_df.collect()[0]['tables_with_desc']
pct_with_desc = (tables_with_desc / total_tables * 100) if total_tables > 0 else 0
print(f"üìù Tables with descriptions: {tables_with_desc} ({pct_with_desc:.1f}%)")


In [None]:
# Get tables with all columns having descriptions
all_cols_desc_df = spark.sql(f"""
    WITH table_column_counts AS (
        SELECT 
            table_catalog,
            table_schema,
            table_name,
            COUNT(*) as total_columns,
            SUM(CASE WHEN comment IS NOT NULL AND comment != '' THEN 1 ELSE 0 END) as described_columns
        FROM system.information_schema.columns
        WHERE table_schema NOT IN ('information_schema', 'system')
        AND table_catalog IN ({CATALOGS_SQL_LIST})
        GROUP BY table_catalog, table_schema, table_name
    )
    SELECT COUNT(*) as tables_all_cols_desc
    FROM table_column_counts
    WHERE total_columns = described_columns AND total_columns > 0
""")

tables_all_cols_desc = all_cols_desc_df.collect()[0]['tables_all_cols_desc']
pct_all_cols = (tables_all_cols_desc / total_tables * 100) if total_tables > 0 else 0
print(f"‚úÖ Tables with all columns described: {tables_all_cols_desc} ({pct_all_cols:.1f}%)")


In [None]:
# Get tables with partial column descriptions
partial_cols_desc_df = spark.sql(f"""
    WITH table_column_counts AS (
        SELECT 
            table_catalog,
            table_schema,
            table_name,
            COUNT(*) as total_columns,
            SUM(CASE WHEN comment IS NOT NULL AND comment != '' THEN 1 ELSE 0 END) as described_columns
        FROM system.information_schema.columns
        WHERE table_schema NOT IN ('information_schema', 'system')
        AND table_catalog IN ({CATALOGS_SQL_LIST})
        GROUP BY table_catalog, table_schema, table_name
    )
    SELECT COUNT(*) as tables_partial_cols_desc
    FROM table_column_counts
    WHERE described_columns > 0 AND described_columns < total_columns
""")

tables_partial_cols_desc = partial_cols_desc_df.collect()[0]['tables_partial_cols_desc']
pct_partial = (tables_partial_cols_desc / total_tables * 100) if total_tables > 0 else 0
print(f"‚ö†Ô∏è  Tables with partial column descriptions: {tables_partial_cols_desc} ({pct_partial:.1f}%)")


In [None]:
# Get top 3 suggestors with most rejections (normalize usernames by removing email domain)
top_rejected_suggestors_df = spark.sql(f"""
    SELECT 
        CASE 
            WHEN suggestor_id LIKE '%@%' THEN SPLIT(suggestor_id, '@')[0]
            ELSE suggestor_id
        END as normalized_user,
        COUNT(*) as rejection_count
    FROM {suggestions_full_name}
    WHERE status = 'rejected'
    GROUP BY 
        CASE 
            WHEN suggestor_id LIKE '%@%' THEN SPLIT(suggestor_id, '@')[0]
            ELSE suggestor_id
        END
    ORDER BY rejection_count DESC
    LIMIT 3
""")

print("‚ùå Top 3 Suggestors with Most Rejections:")
for row in top_rejected_suggestors_df.collect():
    print(f"   - {row['normalized_user']}: {row['rejection_count']} rejections")


## Step 2: Compute Top Contributors


In [None]:
# Get top 3 suggestors (normalize usernames by removing email domain)
top_suggestors_df = spark.sql(f"""
    SELECT 
        CASE 
            WHEN suggestor_id LIKE '%@%' THEN SPLIT(suggestor_id, '@')[0]
            ELSE suggestor_id
        END as normalized_user,
        COUNT(*) as suggestion_count
    FROM {suggestions_full_name}
    GROUP BY 
        CASE 
            WHEN suggestor_id LIKE '%@%' THEN SPLIT(suggestor_id, '@')[0]
            ELSE suggestor_id
        END
    ORDER BY suggestion_count DESC
    LIMIT 3
""")

print("üèÜ Top 3 Contributors:")
for row in top_suggestors_df.collect():
    print(f"   - {row['normalized_user']}: {row['suggestion_count']} suggestions")


In [None]:
# Get top 3 reviewers (normalize usernames by removing email domain)
top_reviewers_df = spark.sql(f"""
    SELECT 
        CASE 
            WHEN reviewer_id LIKE '%@%' THEN SPLIT(reviewer_id, '@')[0]
            ELSE reviewer_id
        END as normalized_user,
        COUNT(*) as review_count
    FROM {suggestions_full_name}
    WHERE reviewer_id IS NOT NULL
    GROUP BY 
        CASE 
            WHEN reviewer_id LIKE '%@%' THEN SPLIT(reviewer_id, '@')[0]
            ELSE reviewer_id
        END
    ORDER BY review_count DESC
    LIMIT 3
""")

print("‚úÖ Top 3 Reviewers:")
for row in top_reviewers_df.collect():
    print(f"   - {row['normalized_user']}: {row['review_count']} reviews")


## Step 3: Create/Update Cache Table


In [None]:
# Create cache table if it doesn't exist
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {cache_full_name} (
        refresh_timestamp TIMESTAMP,
        metric_name STRING,
        metric_value BIGINT,
        metric_percentage DOUBLE,
        user_id STRING,
        user_count BIGINT,
        user_rank INT
    )
    TBLPROPERTIES (
        'delta.enableChangeDataFeed' = 'true',
        'description' = 'Cache table for metadata health KPIs - refreshed by scheduled job'
    )
""")

print(f"‚úÖ Cache table ready: {cache_full_name}")


In [None]:
# Prepare data for cache table
from pyspark.sql.functions import current_timestamp, lit
from pyspark.sql.types import StructType, StructField, TimestampType, StringType, LongType, DoubleType, IntegerType

refresh_time = spark.sql("SELECT current_timestamp() as ts").collect()[0]['ts']

# Create rows for metadata statistics
data = [
    (refresh_time, 'total_tables', total_tables, None, None, None, None),
    (refresh_time, 'tables_with_desc', tables_with_desc, pct_with_desc, None, None, None),
    (refresh_time, 'tables_all_cols_desc', tables_all_cols_desc, pct_all_cols, None, None, None),
    (refresh_time, 'tables_partial_cols_desc', tables_partial_cols_desc, pct_partial, None, None, None),
]

# Add top suggestors
for idx, row in enumerate(top_suggestors_df.collect(), 1):
    data.append((
        refresh_time, 'top_suggestor', None, None, 
        row['normalized_user'], row['suggestion_count'], idx
    ))

# Add top reviewers
for idx, row in enumerate(top_reviewers_df.collect(), 1):
    data.append((
        refresh_time, 'top_reviewer', None, None,
        row['normalized_user'], row['review_count'], idx
    ))

# Add top rejected suggestors
for idx, row in enumerate(top_rejected_suggestors_df.collect(), 1):
    data.append((
        refresh_time, 'top_rejected_suggestor', None, None,
        row['normalized_user'], row['rejection_count'], idx
    ))

# Create DataFrame
schema = StructType([
    StructField("refresh_timestamp", TimestampType(), True),
    StructField("metric_name", StringType(), True),
    StructField("metric_value", LongType(), True),
    StructField("metric_percentage", DoubleType(), True),
    StructField("user_id", StringType(), True),
    StructField("user_count", LongType(), True),
    StructField("user_rank", IntegerType(), True)
])

cache_df = spark.createDataFrame(data, schema)

print(f"üìä Prepared {cache_df.count()} rows for cache")


In [None]:
# Clear old data and insert new data
spark.sql(f"DELETE FROM {cache_full_name}")

cache_df.write.mode("append").saveAsTable(cache_full_name)

print(f"‚úÖ Cache table updated successfully!")
print(f"‚è∞ Refresh completed at: {refresh_time}")


## Step 4: Verify Cache Table


In [None]:
# Display cached data
display(spark.sql(f"SELECT * FROM {cache_full_name} ORDER BY metric_name, user_rank"))


## Summary

‚úÖ **Cache table refreshed successfully!**

The metadata KPIs are now cached in `asda_metadata_rampup.metadata_population.metadata_kpis`.

**Catalogs included:** `example_cat_1`, `example_cat_2`

### Next Steps:

1. **Schedule this notebook as a Job:**
   - Go to Workflows ‚Üí Create Job
   - Select this notebook
   - Schedule: Hourly or Daily
   - Cluster: Small cluster is sufficient

2. **The Streamlit app will now read from this cache** for instant KPI display

3. **Monitor the refresh:**
   ```sql
   SELECT MAX(refresh_timestamp) as last_refresh
   FROM asda_metadata_rampup.metadata_population.metadata_kpis
   ```

4. **To modify the catalogs included:** Edit the `ALLOWED_CATALOGS` list in the Configuration cell
