# Star Schema Builder for Fabric Monitoring Analytics

## Overview
This notebook transforms raw Monitor Hub activity data into a Kimball-style star schema suitable for SQL queries, semantic models, and Power BI reports.

It is designed to work in both:
- **Microsoft Fabric notebooks** (paths auto-resolve under `/lakehouse/default/Files/`), and
- **Local dev** (writes under `exports/` by default).

## Star Schema Tables
- **Dimensions**: dim_date, dim_time, dim_workspace, dim_item, dim_user, dim_activity_type, dim_status
- **Facts**: fact_activity, fact_daily_metrics

## Key Features
- Incremental loading with high-water mark tracking
- SCD Type 2 support for slowly changing dimensions
- Automatic surrogate key generation
- Pre-aggregated daily metrics for fast dashboards

## How to Use
1. **Install Package** (first run only): Uncomment and run the pip install cell
2. **Configure Paths**: Set INPUT_DIR and OUTPUT_DIR for your environment
3. **Run Pipeline**: Execute the build cells
4. **Optional**: Convert to Delta tables for SQL Endpoint access

## Package Installation
<span style="color:red">pip install is only required on first run. Uncomment and run once, then re-comment.</span>

In [1]:
# %pip install /lakehouse/default/Files/usf_fabric_monitoring-0.3.0-py3-none-any.whl --force-reinstall

## Setup Local Path (For Local Development)

In [2]:
# SETUP LOCAL PATH (For Local Development)
import sys
import os
from pathlib import Path

# Add the src directory to sys.path to allow importing the local package
# This is necessary when running locally without installing the package
current_dir = Path(os.getcwd())

# Check if we are in notebooks directory
if current_dir.name == "notebooks":
    src_path = current_dir.parent / "src"
else:
    # Assume we are in project root
    src_path = current_dir / "src"

if src_path.exists() and str(src_path) not in sys.path:
    sys.path.insert(0, str(src_path))
    print(f"‚úÖ Added {src_path} to sys.path (local development mode)")
else:
    print("‚ÑπÔ∏è Running in Fabric or package already installed")

‚úÖ Added /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/src to sys.path (local development mode)


In [3]:
# Force reload of modules to pick up code changes
import importlib
import usf_fabric_monitoring.core.star_schema_builder as ssb
importlib.reload(ssb)
print("‚úÖ Modules reloaded")

‚úÖ Modules reloaded


In [4]:
# Package / environment verification (safe: no Azure/API imports)
from importlib.metadata import PackageNotFoundError, version
import importlib
import usf_fabric_monitoring
from usf_fabric_monitoring.core.utils import resolve_path

try:
    pkg_version = getattr(usf_fabric_monitoring, "__version__", None) or version("usf_fabric_monitoring")
except PackageNotFoundError:
    pkg_version = "unknown"

print(f"usf_fabric_monitoring version: {pkg_version}")
print(f"Resolved output dir example: {resolve_path('exports/star_schema')}")

# Check star schema builder availability
try:
    from usf_fabric_monitoring.core.star_schema_builder import StarSchemaBuilder, ALL_STAR_SCHEMA_DDLS
    print("‚úÖ StarSchemaBuilder module loaded successfully")
except ImportError as e:
    print(f"‚ùå Failed to import StarSchemaBuilder: {e}")
    print("   Make sure you have version 0.3.0+ installed")

usf_fabric_monitoring version: 0.3.2
Resolved output dir example: /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/star_schema
‚úÖ StarSchemaBuilder module loaded successfully


## Configuration

In [5]:
from pathlib import Path
from usf_fabric_monitoring.core.utils import resolve_path

# ============================================================================
# CONFIGURATION - Update these values for your environment
# ============================================================================

# Input: Where Monitor Hub pipeline outputs are stored (CSV files with Smart Merge data)
INPUT_DIR = resolve_path("exports/monitor_hub_analysis")

# Output: Where star schema tables will be written
OUTPUT_DIR = resolve_path("exports/star_schema")

# Processing options
INCREMENTAL_LOAD = True  # Set to False for full refresh (rebuilds all tables)
WRITE_TO_DELTA_TABLES = False  # Set to True in Fabric to create SQL Endpoint tables

# ============================================================================
# Display configuration
# ============================================================================
print("=" * 60)
print("STAR SCHEMA BUILDER CONFIGURATION")
print("=" * 60)
print(f"Input Directory:       {INPUT_DIR}")
print(f"Output Directory:      {OUTPUT_DIR}")
print(f"Mode:                  {'Incremental' if INCREMENTAL_LOAD else 'Full Refresh'}")
print(f"Write to Delta Tables: {WRITE_TO_DELTA_TABLES}")
print("=" * 60)

STAR SCHEMA BUILDER CONFIGURATION
Input Directory:       /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/monitor_hub_analysis
Output Directory:      /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/star_schema
Mode:                  Incremental
Write to Delta Tables: False


## Load Source Data

In [6]:
import pandas as pd
from pathlib import Path
from datetime import datetime

# Find activities_master CSV files (contains Smart Merge enriched data with accurate failure status)
input_path = Path(INPUT_DIR)
activities_files = sorted(input_path.glob("activities_master_*.csv"), reverse=True)

if not activities_files:
    # Fallback to parquet if no CSV
    parquet_dir = input_path / "parquet"
    parquet_files = sorted(parquet_dir.glob("activities_*.parquet"), reverse=True) if parquet_dir.exists() else []
    if parquet_files:
        print(f"‚ö†Ô∏è No CSV files found, falling back to parquet: {parquet_files[0].name}")
        activities_df = pd.read_parquet(parquet_files[0])
    else:
        raise FileNotFoundError(
            f"No activities files found in {INPUT_DIR}\n"
            "Run the Monitor Hub pipeline first: make monitor-hub"
        )
else:
    # Show available files
    print("Available activity files:")
    for i, f in enumerate(activities_files[:5]):
        df_check = pd.read_csv(f, usecols=['status'], nrows=100000)
        failed_sample = (df_check['status'] == 'Failed').sum()
        print(f"  {i+1}. {f.name} - {'Has failures' if failed_sample > 0 else 'No failures in sample'}")
    
    # Try to find a file with failures, otherwise use latest
    selected_file = None
    for f in activities_files:
        df_check = pd.read_csv(f, usecols=['status'], low_memory=False)
        if (df_check['status'] == 'Failed').sum() > 0:
            selected_file = f
            print(f"\n‚úÖ Selected file with Smart Merge failure data: {f.name}")
            break
    
    if selected_file is None:
        selected_file = activities_files[0]
        print(f"\n‚ö†Ô∏è No file with failures found, using latest: {selected_file.name}")
    
    activities_df = pd.read_csv(selected_file, low_memory=False)

print(f"\n‚úÖ Loaded {len(activities_df):,} activity records")
print(f"   Columns: {len(activities_df.columns)} total")

# Show status distribution (key for Smart Merge validation)
if 'status' in activities_df.columns:
    status_counts = activities_df['status'].value_counts()
    print(f"   Status distribution:")
    for status, count in status_counts.items():
        print(f"     - {status}: {count:,}")

# Show date range
time_col = 'start_time' if 'start_time' in activities_df.columns else 'StartTimeUtc'
if time_col in activities_df.columns:
    activities_df[time_col] = pd.to_datetime(activities_df[time_col], errors='coerce')
    print(f"   Date range: {activities_df[time_col].min()} to {activities_df[time_col].max()}")

Available activity files:
  1. activities_master_20251204_153124.csv - No failures in sample
  2. activities_master_20251203_212443.csv - Has failures
  3. activities_master_20251203_082824.csv - No failures in sample
  2. activities_master_20251203_212443.csv - Has failures
  3. activities_master_20251203_082824.csv - No failures in sample
  4. activities_master_20251201_232450.csv - No failures in sample
  4. activities_master_20251201_232450.csv - No failures in sample

‚úÖ Selected file with Smart Merge failure data: activities_master_20251203_212443.csv

‚úÖ Selected file with Smart Merge failure data: activities_master_20251203_212443.csv

‚úÖ Loaded 968,766 activity records
   Columns: 19 total
   Status distribution:
     - Succeeded: 961,773
     - Completed: 5,747
     - Failed: 1,203
     - Cancelled: 34
     - InProgress: 9
   Date range: 2025-11-03 01:00:03.450000 to 2025-12-03 21:00:02.754907300

‚úÖ Loaded 968,766 activity records
   Columns: 19 total
   Status distribut

## Build Star Schema

In [7]:
# Use reloaded module
from usf_fabric_monitoring.core.star_schema_builder import StarSchemaBuilder
import importlib
import usf_fabric_monitoring.core.star_schema_builder as ssb_module
importlib.reload(ssb_module)
StarSchemaBuilder = ssb_module.StarSchemaBuilder

from datetime import datetime

# Create output directory
output_path = Path(OUTPUT_DIR)
output_path.mkdir(parents=True, exist_ok=True)

# Build star schema
print("=" * 60)
print("BUILDING STAR SCHEMA")
print("=" * 60)
start_time = datetime.now()

builder = StarSchemaBuilder(output_directory=OUTPUT_DIR)
results = builder.build_complete_schema(
    activities=activities_df.to_dict(orient="records"),
    incremental=INCREMENTAL_LOAD
)

duration = (datetime.now() - start_time).total_seconds()

if results["status"] == "success":
    print(f"\n‚úÖ Star Schema build completed in {duration:.2f} seconds!")
    
    print(f"\nüìä Dimensions Built:")
    for dim_name, count in results.get("dimensions_built", {}).items():
        if not dim_name.endswith("_new"):
            new_count = results.get("dimensions_built", {}).get(f"{dim_name}_new", "")
            new_suffix = f" (+{new_count} new)" if new_count else ""
            print(f"   ‚Ä¢ {dim_name}: {count:,} records{new_suffix}")
    
    print(f"\nüìà Fact Tables Built:")
    for fact_name, count in results.get("facts_built", {}).items():
        if not fact_name.endswith("_new"):
            new_count = results.get("facts_built", {}).get(f"{fact_name}_new", "")
            new_suffix = f" (+{new_count} new)" if new_count else ""
            print(f"   ‚Ä¢ {fact_name}: {count:,} records{new_suffix}")
    
    print(f"\nüìÅ Output Directory: {OUTPUT_DIR}")
else:
    print(f"\n‚ùå Build failed:")
    for error in results.get("errors", []):
        print(f"   {error}")

BUILDING STAR SCHEMA
2025-12-17 01:09:43 | INFO | star_schema_builder | Starting Star Schema Build
2025-12-17 01:09:43 | INFO | star_schema_builder | Mode: Incremental
2025-12-17 01:09:43 | INFO | star_schema_builder | Input activities: 968766
2025-12-17 01:09:43 | INFO | star_schema_builder | Starting Star Schema Build
2025-12-17 01:09:43 | INFO | star_schema_builder | Mode: Incremental
2025-12-17 01:09:43 | INFO | star_schema_builder | Input activities: 968766
2025-12-17 01:09:43 | INFO | star_schema_builder | Step 1: Building reference dimensions...
2025-12-17 01:09:43 | INFO | star_schema_builder | Step 1: Building reference dimensions...
2025-12-17 01:09:43 | INFO | star_schema_builder | Saved dim_date with 456 records to /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTEMS/1_Project_Rhico/usf_fabric_monitoring/exports/star_schema/dim_date.parquet
2025-12-17 01:09:43 | INFO | star_schema_builder | Saved dim_time with 96 records to /home/sanmi/Documents/J'TOYE_DIGITAL/LEIT_TEKSYSTE

## Convert to Delta Tables (Fabric Only)
Run this cell only in Microsoft Fabric to create Delta tables accessible via SQL Endpoint.

In [14]:
if WRITE_TO_DELTA_TABLES:
    try:
        from pyspark.sql import SparkSession
        
        spark = SparkSession.builder.getOrCreate()
        
        print("Converting Parquet files to Delta tables...")
        print("-" * 50)
        
        # Convert each parquet to Delta table
        for parquet_file in Path(OUTPUT_DIR).glob("*.parquet"):
            table_name = parquet_file.stem  # e.g., "dim_date", "fact_activity"
            
            try:
                df = spark.read.parquet(str(parquet_file))
                
                # Write as Delta table (overwrite mode)
                df.write.mode("overwrite").format("delta").saveAsTable(table_name)
                
                print(f"   ‚úÖ {table_name}: {df.count():,} rows")
            except Exception as e:
                print(f"   ‚ùå {table_name}: {e}")
        
        print("-" * 50)
        print("‚úÖ Delta tables created successfully!")
        print("   Tables are now available in the SQL Endpoint.")
        
    except ImportError:
        print("‚ö†Ô∏è PySpark not available. Delta table creation skipped.")
        print("   This feature is only available in Microsoft Fabric.")
else:
    print("‚ÑπÔ∏è Delta table creation skipped (WRITE_TO_DELTA_TABLES=False)")
    print("   Set WRITE_TO_DELTA_TABLES=True in Fabric to enable.")

‚ÑπÔ∏è Delta table creation skipped (WRITE_TO_DELTA_TABLES=False)
   Set WRITE_TO_DELTA_TABLES=True in Fabric to enable.


## Validate Star Schema

In [15]:
import pandas as pd

print("=" * 60)
print("STAR SCHEMA VALIDATION")
print("=" * 60)

# Load and validate tables
tables = {}
for parquet_file in Path(OUTPUT_DIR).glob("*.parquet"):
    table_name = parquet_file.stem
    tables[table_name] = pd.read_parquet(parquet_file)
    print(f"{table_name}: {len(tables[table_name]):,} records")

# Check FK integrity
print("\nüîó Foreign Key Validation:")

fk_checks = [
    ('fact_activity', 'workspace_sk', 'dim_workspace', 'workspace_sk'),
    ('fact_activity', 'item_sk', 'dim_item', 'item_sk'),
    ('fact_activity', 'user_sk', 'dim_user', 'user_sk'),
    ('fact_activity', 'date_sk', 'dim_date', 'date_sk'),
    ('fact_activity', 'time_sk', 'dim_time', 'time_sk'),
    ('fact_activity', 'activity_type_sk', 'dim_activity_type', 'activity_type_sk'),
    ('fact_activity', 'status_sk', 'dim_status', 'status_sk'),
]

all_passed = True
for fact_table, fact_col, dim_table, dim_col in fk_checks:
    if fact_table in tables and dim_table in tables:
        fact_vals = set(tables[fact_table][fact_col].dropna().unique())
        dim_vals = set(tables[dim_table][dim_col].unique())
        orphans = fact_vals - dim_vals
        status = '‚úÖ PASS' if len(orphans) == 0 else f'‚ùå {len(orphans)} orphans'
        if len(orphans) > 0:
            all_passed = False
        print(f"   {fact_col}: {status}")

print("\n" + ("‚úÖ All FK validations passed!" if all_passed else "‚ö†Ô∏è Some FK validations failed"))

STAR SCHEMA VALIDATION
dim_activity_type: 61 records
dim_user: 95 records
dim_item: 2,213 records
dim_workspace: 159 records
fact_daily_metrics: 1,573 records
dim_time: 96 records
dim_date: 456 records
dim_status: 8 records
fact_activity: 1,930,540 records

üîó Foreign Key Validation:
   workspace_sk: ‚úÖ PASS
   item_sk: ‚úÖ PASS
   user_sk: ‚úÖ PASS
   date_sk: ‚úÖ PASS
   time_sk: ‚úÖ PASS
   activity_type_sk: ‚úÖ PASS
   status_sk: ‚úÖ PASS

‚úÖ All FK validations passed!
   time_sk: ‚úÖ PASS
   activity_type_sk: ‚úÖ PASS
   status_sk: ‚úÖ PASS

‚úÖ All FK validations passed!


## Sample Analytical Queries

In [10]:
# Top 10 Most Active Workspaces
print("üìä Top 10 Most Active Workspaces")
print("-" * 50)

if 'fact_activity' in tables and 'dim_workspace' in tables:
    fact = tables['fact_activity']
    dim_ws = tables['dim_workspace']
    
    merged = fact.merge(dim_ws[['workspace_sk', 'workspace_name']], on='workspace_sk')
    result = merged.groupby('workspace_name').agg(
        activity_count=('workspace_sk', 'count'),
        unique_users=('user_sk', 'nunique')
    ).sort_values('activity_count', ascending=False).head(10)
    
    for ws_name, row in result.iterrows():
        print(f"   {ws_name[:40]:<40} | {row['activity_count']:>10,} activities | {row['unique_users']:>4} users")

üìä Top 10 Most Active Workspaces
--------------------------------------------------
   Unknown                                  |  1,937,534 activities |   95 users
   Unknown                                  |  1,937,534 activities |   95 users


In [11]:
# Activity by Type
print("üìä Activity Count by Type")
print("-" * 50)

if 'fact_activity' in tables and 'dim_activity_type' in tables:
    fact = tables['fact_activity']
    dim_type = tables['dim_activity_type']
    
    merged = fact.merge(dim_type[['activity_type_sk', 'activity_type', 'activity_category']], on='activity_type_sk')
    result = merged.groupby(['activity_category', 'activity_type']).size().sort_values(ascending=False).head(10)
    
    for (cat, act_type), count in result.items():
        print(f"   {cat:<15} | {act_type:<30} | {count:>10,}")

üìä Activity Count by Type
--------------------------------------------------
   Artifact Operations | ReadArtifact                   |    669,120
   File Operations | CreateFile                     |    382,302
   File Operations | RenameFileOrDirectory          |    176,752
   Compute         | RunArtifact                    |    170,752
   File Operations | CreateDirectory                |    136,566
   Spark           | ViewSparkAppLog                |    107,356
   File Operations | DeleteFileOrBlob               |     60,954
   Artifact Operations | UpdateArtifact                 |     53,972
   Query           | ConnectWarehouseAndSqlAnalyticsEndpointLakehouseFromExternalApp |     39,874
   Spark           | MountStorageByMssparkutils     |     27,654
   Artifact Operations | ReadArtifact                   |    669,120
   File Operations | CreateFile                     |    382,302
   File Operations | RenameFileOrDirectory          |    176,752
   Compute         | RunArtifac

In [16]:
# Daily Activity Trend
print("üìä Daily Activity Trend (Last 14 Days)")
print("-" * 50)

if 'fact_daily_metrics' in tables and 'dim_date' in tables:
    fact_daily = tables['fact_daily_metrics']
    dim_date = tables['dim_date']
    
    # Aggregate across workspaces
    daily_totals = fact_daily.groupby('date_sk').agg({
        'total_activities': 'sum',
        'unique_users': 'sum',
        'failed_activities': 'sum'
    }).reset_index()
    
    # Join with date dimension
    daily_totals = daily_totals.merge(
        dim_date[['date_sk', 'full_date', 'day_of_week_name']], 
        on='date_sk'
    ).sort_values('full_date', ascending=False).head(14)
    
    for _, row in daily_totals.iterrows():
        day = row['day_of_week_name'][:3]
        print(f"   {row['full_date']} ({day}) | {int(row['total_activities']):>8,} activities | {int(row['unique_users']):>4} users | {int(row['failed_activities']):>3} failed")

üìä Daily Activity Trend (Last 14 Days)
--------------------------------------------------
   2025-12-03 (Wed) |      761 activities |    1 users |  95 failed
   2025-12-02 (Tue) |   60,809 activities |  283 users |  41 failed
   2025-12-01 (Mon) |   47,268 activities |  124 users | 106 failed
   2025-11-30 (Sun) |   20,184 activities |   56 users |  30 failed
   2025-11-29 (Sat) |   15,260 activities |   60 users |  17 failed
   2025-11-28 (Fri) |   32,843 activities |  107 users |  23 failed
   2025-11-27 (Thu) |   27,540 activities |  113 users |  34 failed
   2025-11-26 (Wed) |   36,159 activities |  115 users |  33 failed
   2025-11-25 (Tue) |   29,762 activities |  241 users |  30 failed
   2025-11-24 (Mon) |   32,881 activities |  121 users |  28 failed
   2025-11-23 (Sun) |   18,854 activities |   52 users |  22 failed
   2025-11-22 (Sat) |   15,373 activities |   52 users |  19 failed
   2025-11-21 (Fri) |   24,490 activities |  103 users |  42 failed
   2025-11-20 (Thu) |   

In [17]:
# Failure Analysis (Smart Merge Data)
print("üìä Failure Analysis")
print("-" * 50)

if 'fact_activity' in tables and 'dim_status' in tables:
    fact = tables['fact_activity']
    dim_status = tables['dim_status']
    
    # Overall failure stats
    total = len(fact)
    failed = (fact['is_failed'] == 1).sum()
    success_rate = ((total - failed) / total) * 100 if total > 0 else 0
    
    print(f"   Total Activities:    {total:,}")
    print(f"   Failed Activities:   {failed:,}")
    print(f"   Success Rate:        {success_rate:.2f}%")
    
    # Failures by status
    print(f"\n   Failures by Status:")
    merged = fact[fact['is_failed'] == 1].merge(
        dim_status[['status_sk', 'status_code']], on='status_sk'
    )
    if len(merged) > 0:
        for status, count in merged['status_code'].value_counts().items():
            print(f"     - {status}: {count:,}")
    
    # Failures by activity type (if available)
    if 'dim_activity_type' in tables:
        dim_type = tables['dim_activity_type']
        merged = fact[fact['is_failed'] == 1].merge(
            dim_type[['activity_type_sk', 'activity_type']], on='activity_type_sk'
        )
        if len(merged) > 0:
            print(f"\n   Top Failed Activity Types:")
            for act_type, count in merged['activity_type'].value_counts().head(5).items():
                print(f"     - {act_type}: {count:,}")

üìä Failure Analysis
--------------------------------------------------
   Total Activities:    1,930,540
   Failed Activities:   1,238
   Success Rate:        99.94%

   Failures by Status:
     - Failed: 1,204
     - Cancelled: 34

   Top Failed Activity Types:
     - Unknown: 1,238


## Summary

The star schema has been built and is ready for:

1. **SQL Queries** - Query the parquet files directly or use Delta tables via SQL Endpoint
2. **Semantic Model** - Create a Direct Lake model pointing to these tables
3. **Power BI Reports** - Build monitoring dashboards using the semantic model

### Scheduled Refresh
To automate the star schema build:
1. Schedule this notebook to run daily after the Monitor Hub pipeline
2. Or use a Fabric Data Pipeline to orchestrate both steps

### Table Relationships (for Semantic Model)
```
fact_activity[date_sk] ‚Üí dim_date[date_sk]
fact_activity[time_sk] ‚Üí dim_time[time_sk]
fact_activity[workspace_sk] ‚Üí dim_workspace[workspace_sk]
fact_activity[item_sk] ‚Üí dim_item[item_sk]
fact_activity[user_sk] ‚Üí dim_user[user_sk]
fact_activity[activity_type_sk] ‚Üí dim_activity_type[activity_type_sk]
fact_activity[status_sk] ‚Üí dim_status[status_sk]
```