# Microsoft Fabric Items Retrieval and Management System
# Last Updated: March 2025

In [None]:
#  Purpose: 
#   - Retrieve all Fabric items efficiently with minimal Capacity Unit usage
#   - Track changes over time in Fabric item inventory
#   - Provide data quality validation for retrieved items
#   - Enable incremental refresh capabilities
#   - Implement alerting for significant changes
# 
# IMPORTANT NOTES:
#   - As of March 2025, Dataflow Gen2 is not included in the Fabric items
#   - This only retrieves Fabric Items, not Power BI Items
#   - Using this Python notebook uses significantly less Capacity Units than a Spark notebook

In [None]:
import notebookutils
import json
import requests
import pandas as pd
import numpy as np
import datetime
from datetime import datetime, date, timedelta
import time
import duckdb
from deltalake import write_deltalake, DeltaTable, read_deltalake
import pyarrow
import os
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import hashlib
from typing import Dict, List, Optional, Any, Tuple, Set

# SECTION 1: Configuration and Logging Setup



In [None]:
# Create a configuration dictionary to centralize all settings
config = {
    # API and authentication settings
    "key_vault_url": "https://company-keyvault.vault.azure.net/",
    "tenant_id_secret_name": "tenantid",
    "client_id_secret_name": "powerbi-applicationid",
    "client_secret_name": "powerbi-clientsecret",
    "fabric_api_base_url": "https://api.fabric.microsoft.com/v1/admin/items",
    "api_scope": "https://analysis.windows.net/powerbi/api/.default",
    
    # Storage paths
    "raw_json_dir": "/lakehouse/default/Files/Fabric_Items/",
    "staging_table_path": "/lakehouse/default/Tables/staging_all_fabric_items",
    "production_table_path": "/lakehouse/default/Tables/all_fabric_items",
    "changes_table_path": "/lakehouse/default/Tables/fabric_items_changes",
    "history_table_path": "/lakehouse/default/Tables/fabric_items_history",
    "quality_issues_path": "/lakehouse/default/Tables/fabric_items_quality_issues",
    
    # Runtime configurations
    "incremental_refresh": True,  # Set to False for full refresh
    "days_to_keep_history": 90,   # Number of days to retain historical data
    "request_delay": 0.5,         # Delay between API requests in seconds
    "batch_size": 1000,           # Number of items to process in memory at once
    
    # Alert settings
    "enable_alerts": True,
    "alert_email_recipients": ["data.team@company.com", "fabric.admins@company.com"],
    "alert_smtp_server": "smtp.company.com",
    "alert_smtp_port": 587,
    "alert_sender_email": "fabric-monitor@company.com",
    "alert_threshold_item_count_change_pct": 10,  # Alert if item count changes by 10%
    
    # Data quality thresholds
    "min_expected_items": 100,    # Minimum expected number of items
    "required_fields": ["id", "name", "type", "modifiedBy", "modifiedDate"],
    "valid_item_types": ["Report", "Dataset", "Dataflow", "Lakehouse", "Warehouse", "Dashboard"]
}

# Set up logging configuration
def setup_logging():
    """Configure the logging system for tracking execution and errors"""
    
    # Create directory for logs if it doesn't exist
    notebookutils.fs.mkdirs("Files/Logs/")
    
    # Set up logging format and level
    log_file = f"/lakehouse/default/Files/Logs/fabric_items_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
    
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()  # Also log to console/notebook output
        ]
    )
    
    logging.info("Logging initialized")
    return log_file

# Initialize logging
log_file_path = setup_logging()
logging.info(f"Starting Fabric Items retrieval and processing job")
logging.info(f"Using configuration: {json.dumps({k: v for k, v in config.items() if not 'secret' in k})}")

# SECTION 2: Authentication and Key Vault Access

In [None]:
def get_auth_token() -> str:
    """
    Retrieve authentication credentials from Key Vault and obtain access token
    
    Returns:
        str: The access token for API authentication
    
    Raises:
        Exception: If authentication fails for any reason
    """
    try:
        logging.info("Retrieving secrets from Azure Key Vault")
        
        # Retrieve secrets from Azure Key Vault
        key_vault = config["key_vault_url"]
        tenant = notebookutils.credentials.getSecret(key_vault, config["tenant_id_secret_name"])
        client = notebookutils.credentials.getSecret(key_vault, config["client_id_secret_name"])
        client_secret = notebookutils.credentials.getSecret(key_vault, config["client_secret_name"])
        
        logging.info("Successfully retrieved secrets from Key Vault")
        
        # Import required authentication libraries with error handling
        try: 
            from azure.identity import ClientSecretCredential 
        except ImportError:
            logging.info("Installing azure.identity package")
            %pip install azure.identity 
            from azure.identity import ClientSecretCredential
            logging.info("Successfully installed azure.identity")
        
        # Create the credential object for service principal authentication
        auth = ClientSecretCredential(
            authority='https://login.microsoftonline.com/', 
            tenant_id=tenant, 
            client_id=client, 
            client_secret=client_secret
        )
        
        # Retrieve the access token
        api = config["api_scope"]
        access_token = auth.get_token(api)
        access_token = access_token.token
        
        logging.info("Successfully obtained authentication token")
        return access_token
        
    except Exception as e:
        logging.error(f"Authentication failed: {str(e)}")
        raise Exception(f"Failed to authenticate: {str(e)}")

# SECTION 3: Data Retrieval from Fabric API with Incremental Support

In [None]:
def get_last_modified_date() -> Optional[str]:
    """
    Get the most recent modifiedDate from the existing production table
    for use with incremental refresh.
    
    Returns:
        Optional[str]: ISO formatted date string of last modified date or None if no data exists
    """
    try:
        # Check if the production table exists
        if not notebookutils.fs.exists(config["production_table_path"]):
            logging.info("No existing production data found, performing full refresh")
            return None
            
        # Read the production table into a DataFrame
        df = read_deltalake(config["production_table_path"])
        
        # Convert to pandas for easier manipulation
        pdf = df.to_pandas()
        
        if "modifiedDate" not in pdf.columns or pdf.empty:
            logging.info("No modifiedDate column found or empty table, performing full refresh")
            return None
            
        # Get the maximum modifiedDate
        last_modified = pdf["modifiedDate"].max()
        
        # Convert to ISO format string if needed
        if isinstance(last_modified, pd.Timestamp):
            last_modified = last_modified.isoformat()
            
        logging.info(f"Found last modified date: {last_modified}")
        return last_modified
        
    except Exception as e:
        logging.warning(f"Error determining last modified date: {str(e)}")
        logging.warning("Defaulting to full refresh")
        return None

def retrieve_fabric_items(access_token: str) -> List[Dict]:
    """
    Retrieve Fabric items from the API with support for full or incremental retrieval
    
    Args:
        access_token (str): Authentication token for the API
        
    Returns:
        List[Dict]: List of Fabric items retrieved from the API
    """
    # Set up headers with authentication token and content type
    headers = {
        'Authorization': f'Bearer {access_token}',
        'Content-Type': 'application/json'
    }
    
    # Initialize an empty list to store all Fabric items
    all_fabric_items = []
    
    # Determine if we're doing incremental or full refresh
    if config["incremental_refresh"]:
        last_modified_date = get_last_modified_date()
        if last_modified_date:
            # Modify the API URL to include a filter for modified date
            # Note: This assumes the Fabric API supports filtering by modifiedDate
            # If the API doesn't support this, we'll need to filter post-retrieval
            base_url = config["fabric_api_base_url"]
            current_api_url = f"{base_url}?$filter=modifiedDate gt {last_modified_date}"
            logging.info(f"Using incremental refresh mode with modified date filter: {last_modified_date}")
        else:
            current_api_url = config["fabric_api_base_url"]
            logging.info("Incremental refresh enabled but no existing data found. Performing full refresh.")
    else:
        current_api_url = config["fabric_api_base_url"]
        logging.info("Using full refresh mode")
    
    # Implement pagination to retrieve all items
    page_count = 0
    total_item_count = 0
    start_time = time.time()
    
    # Begin pagination loop
    logging.info(f"Starting API data retrieval from: {current_api_url}")
    
    while current_api_url:
        try:
            # Make the API request for the current page
            response = requests.get(current_api_url, headers=headers)
            
            # Check if the request was successful
            if response.status_code != 200:
                logging.error(f"API request failed: Status code {response.status_code}")
                logging.error(f"Response: {response.text}")
                
                # If we have a 429 (Too Many Requests), wait longer and retry
                if response.status_code == 429:
                    retry_after = int(response.headers.get('Retry-After', '30'))
                    logging.info(f"Rate limited. Waiting {retry_after} seconds before retrying.")
                    time.sleep(retry_after)
                    continue
                    
                # For other errors, break the loop after logging
                break
            
            # Parse the JSON response
            data = response.json()
            page_count += 1
            
            # Extract and store the Fabric items from the current page
            items_in_page = data.get('itemEntities', [])
            page_item_count = len(items_in_page)
            all_fabric_items.extend(items_in_page)
            
            total_item_count += page_item_count
            elapsed_time = time.time() - start_time
            
            logging.info(f"Retrieved page {page_count} with {page_item_count} items. " 
                        f"Total: {total_item_count} items in {elapsed_time:.2f} seconds")
            
            # Get the URL for the next page of results, if available
            current_api_url = data.get('@odata.nextLink')
            
            # Add a small delay to avoid rate limiting
            if current_api_url and config["request_delay"] > 0:
                time.sleep(config["request_delay"])
                
        except Exception as e:
            logging.error(f"Error during API retrieval: {str(e)}")
            # If there's an error, wait and retry this page
            logging.info("Waiting 30 seconds before retrying...")
            time.sleep(30)
            # If we've had multiple failures, break the loop
            # This would need additional implementation for proper retry logic
    
    # Log summary of the retrieval process
    elapsed_time = time.time() - start_time
    logging.info(f"API retrieval complete: {total_item_count} items retrieved across {page_count} pages "
                f"in {elapsed_time:.2f} seconds")
    
    return all_fabric_items

# SECTION 4: Data Quality Validation

In [None]:
def validate_data_quality(items: List[Dict]) -> Tuple[List[Dict], List[Dict]]:
    """
    Validate the quality of retrieved data and separate valid items from issues
    
    Args:
        items (List[Dict]): The list of Fabric items to validate
        
    Returns:
        Tuple[List[Dict], List[Dict]]: Valid items and items with quality issues
    """
    logging.info(f"Starting data quality validation for {len(items)} items")
    
    valid_items = []
    quality_issues = []
    
    # Define required fields based on configuration
    required_fields = set(config["required_fields"])
    valid_types = set(config["valid_item_types"])
    
    # Check if we have the minimum expected number of items
    if len(items) < config["min_expected_items"]:
        issue = {
            "issue_type": "insufficient_items",
            "description": f"Retrieved only {len(items)} items, which is below the minimum threshold of {config['min_expected_items']}",
            "timestamp": datetime.now().isoformat(),
            "severity": "high"
        }
        quality_issues.append(issue)
        logging.warning(f"Data quality issue: {issue['description']}")
    
    # Process each item for validation
    for item in items:
        item_issues = []
        
        # Check for missing required fields
        missing_fields = required_fields - set(item.keys())
        if missing_fields:
            item_issues.append({
                "issue_type": "missing_fields",
                "fields": list(missing_fields),
                "item_id": item.get("id", "unknown")
            })
        
        # Check for valid item type if the field exists
        if "type" in item and item["type"] not in valid_types:
            item_issues.append({
                "issue_type": "invalid_type",
                "actual_type": item["type"],
                "valid_types": list(valid_types),
                "item_id": item.get("id", "unknown")
            })
        
        # Check for empty name
        if "name" in item and (not item["name"] or item["name"].strip() == ""):
            item_issues.append({
                "issue_type": "empty_name",
                "item_id": item.get("id", "unknown")
            })
        
        # Add additional validation rules here as needed
        
        # If no issues were found, consider the item valid
        if not item_issues:
            valid_items.append(item)
        else:
            # Add the item with quality issues to our list
            for issue in item_issues:
                issue_record = {
                    "item": item,
                    "issue": issue,
                    "timestamp": datetime.now().isoformat()
                }
                quality_issues.append(issue_record)
                logging.warning(f"Data quality issue: {issue['issue_type']} for item {issue['item_id']}")
    
    logging.info(f"Data quality validation complete: {len(valid_items)} valid items, "
                f"{len(quality_issues)} quality issues")
    
    return valid_items, quality_issues

# SECTION 5: Data Storage and Delta Lake Operations

In [None]:
def save_raw_json(items: List[Dict]) -> str:
    """
    Save the raw JSON data to a file for historical reference
    
    Args:
        items (List[Dict]): The raw items to save
        
    Returns:
        str: The path to the saved file
    """
    # Create directory if it doesn't exist
    notebookutils.fs.mkdirs(config["raw_json_dir"])
    
    # Generate filename with current date and time for historical tracking
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    fileName = f'Fabric_Items_{timestamp}.json'
    file_path = f"{config['raw_json_dir']}{fileName}"
    
    try:
        # Write the output to a JSON file
        with open(file_path, "w") as json_file:
            json.dump(items, json_file, indent=2)
        logging.info(f"Successfully saved raw data to {file_path}")
        return file_path
    except Exception as e:
        logging.error(f"Error saving JSON file: {str(e)}")
        raise

def process_and_store_data(items: List[Dict], quality_issues: List[Dict]) -> pd.DataFrame:
    """
    Process the validated items and store them in Delta format
    
    Args:
        items (List[Dict]): The validated items to process and store
        quality_issues (List[Dict]): Quality issues to store separately
        
    Returns:
        pd.DataFrame: The processed dataframe of items
    """
    logging.info("Processing and normalizing data")
    
    # Convert items to DataFrame
    df = pd.json_normalize(items)
    
    # Check if we have any data to process
    if df.empty:
        logging.warning("No valid items to process after data quality validation")
        return df
    
    # Add processing timestamp
    df['processing_timestamp'] = datetime.now().isoformat()
    
    # Set storage options for writing to Delta format
    storage_options = {
        "use_fabric_endpoint": "true", 
        "allow_unsafe_rename": "true", 
        "bearer_token": notebookutils.credentials.getToken('storage')
    }
    
    # Process the data in batches to avoid memory issues with large datasets
    batch_size = config["batch_size"]
    total_rows = len(df)
    
    if total_rows > batch_size:
        logging.info(f"Processing {total_rows} rows in batches of {batch_size}")
        
        # Process in batches
        for i in range(0, total_rows, batch_size):
            end_idx = min(i + batch_size, total_rows)
            batch_df = df.iloc[i:end_idx]
            
            # For the first batch, overwrite the staging table
            mode = "overwrite" if i == 0 else "append"
            
            logging.info(f"Writing batch {i//batch_size + 1} ({len(batch_df)} rows) to staging table with mode: {mode}")
            
            write_deltalake(
                config["staging_table_path"], 
                batch_df, 
                mode=mode, 
                engine='rust', 
                storage_options=storage_options
            )
    else:
        # If the dataset is small enough, write it all at once
        logging.info(f"Writing {total_rows} rows to staging table")
        
        write_deltalake(
            config["staging_table_path"], 
            df, 
            mode="overwrite", 
            engine='rust', 
            storage_options=storage_options
        )
    
    # Store quality issues if any were found
    if quality_issues:
        logging.info(f"Storing {len(quality_issues)} data quality issues")
        quality_df = pd.DataFrame(quality_issues)
        
        # Check if the quality issues table exists
        quality_table_exists = notebookutils.fs.exists(config["quality_issues_path"])
        
        write_deltalake(
            config["quality_issues_path"], 
            quality_df, 
            mode="append" if quality_table_exists else "overwrite", 
            engine='rust', 
            storage_options=storage_options
        )
    
    logging.info("Successfully processed and stored data in staging table")
    return df

# SECTION 6: Change Tracking and History Management

In [None]:
def detect_and_record_changes(staging_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Detect changes between staging and production data, and record the history
    
    Args:
        staging_df (pd.DataFrame): The dataframe with new/updated items
        
    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: The final production dataframe and changes dataframe
    """
    logging.info("Starting change detection process")
    
    # Check if production table exists
    production_exists = notebookutils.fs.exists(config["production_table_path"])
    
    if not production_exists:
        logging.info("No existing production data - all items are new")
        
        # Add change_type column to indicate these are all new items
        staging_df['change_type'] = 'new'
        
        # The changes are all new items in this case
        changes_df = staging_df.copy()
        changes_df['change_timestamp'] = datetime.now().isoformat()
        
        # Return both dataframes
        return staging_df, changes_df
    
    # Connect to DuckDB for performing SQL operations
    con = duckdb.connect()
    
    try:
        # Install and load the Delta extension
        con.execute("INSTALL delta;")
        con.execute("LOAD delta;")
        
        # Load both staging and production data
        con.execute(f"CREATE TABLE staging AS SELECT * FROM delta_scan('{config['staging_table_path']}')")
        con.execute(f"CREATE TABLE production AS SELECT * FROM delta_scan('{config['production_table_path']}')")
        
        # Find new items (in staging but not in production)
        new_items_query = """
            SELECT s.*, 'new' as change_type
            FROM staging s
            LEFT JOIN production p ON s.id = p.id
            WHERE p.id IS NULL
        """
        new_items = con.execute(new_items_query).fetchdf()
        
        # Find updated items (in both, but with changes)
        # This assumes 'modifiedDate' is a reliable indicator of changes
        updated_items_query = """
            SELECT s.*, 'updated' as change_type
            FROM staging s
            JOIN production p ON s.id = p.id
            WHERE s.modifiedDate > p.modifiedDate
        """
        updated_items = con.execute(updated_items_query).fetchdf()
        
        # Find deleted items (in production but not in staging)
        # Only if we're doing a full refresh (if incremental, we can't detect deletions)
        deleted_items = pd.DataFrame()
        if not config["incremental_refresh"]:
            deleted_items_query = """
                SELECT p.*, 'deleted' as change_type
                FROM production p
                LEFT JOIN staging s ON p.id = s.id
                WHERE s.id IS NULL
            """
            deleted_items = con.execute(deleted_items_query).fetchdf()
        
        # Combine all changes
        changes_df = pd.concat([new_items, updated_items, deleted_items], ignore_index=True)
        
        # Add timestamp for when the change was detected
        changes_df['change_timestamp'] = datetime.now().isoformat()
        
        # If we're doing incremental refresh, we need to merge staging with production
        if config["incremental_refresh"]:
            logging.info("Performing incremental merge of staging data into production")
            
            # Merge updated items first
            con.execute("""
                UPDATE production p
                SET
                    name = s.name,
                    type = s.type,
                    modifiedDate = s.modifiedDate,
                    modifiedBy = s.modifiedBy,
                    processing_timestamp = s.processing_timestamp
                FROM staging s
                WHERE p.id = s.id
            """)
            
            # Insert new items
            con.execute("""
                INSERT INTO production
                SELECT s.*
                FROM staging s
                LEFT JOIN production p ON s.id = p.id
                WHERE p.id IS NULL
            """)
            
            # Get the final merged dataset
            final_df = con.execute("SELECT * FROM production").fetchdf()
        else:
            # For full refresh, we just use the staging data plus add back any deleted items marked as deleted
            if not deleted_items.empty:
                # Mark deleted items in the final dataset
                deleted_items['is_deleted'] = True
                
                # Get only the necessary columns from staging
                staging_columns = staging_df.columns.tolist()
                
                # Ensure deleted_items has the same columns as staging
                for col in staging_columns:
                    if col not in deleted_items.columns:
                        deleted_items[col] = None
                
                # Combine staging with deleted items
                final_df = pd.concat([staging_df, deleted_items[staging_columns]], ignore_index=True)
            else:
                final_df = staging_df
        
        # Record changes if any were detected
        if not changes_df.empty:
            logging.info(f"Detected {len(changes_df)} changes: "
                        f"{len(new_items)} new, {len(updated_items)} updated, {len(deleted_items)} deleted")
            
            # Set storage options
            storage_options = {
                "use_fabric_endpoint": "true", 
                "allow_unsafe_rename": "true", 
                "bearer_token": notebookutils.credentials.getToken('storage')
            }
            
            # Check if changes table exists
            changes_table_exists = notebookutils.fs.exists(config["changes_table_path"])
            
            # Write changes to changes tracking table
            write_deltalake(
                config["changes_table_path"], 
                changes_df, 
                mode="append" if changes_table_exists else "overwrite", 
                engine='rust', 
                storage_options=storage_options
            )
            
            # Also append to history table
            history_table_exists = notebookutils.fs.exists(config["history_table_path"])
            
            write_deltalake(
                config["history_table_path"], 
                changes_df, 
                mode="append" if history_table_exists else "overwrite", 
                engine='rust', 
                storage_options=storage_options
            )
        else:
            logging.info("No changes detected between staging and production data")
        
        return final_df, changes_df
        
    except Exception as e:
        logging.error(f"Error during change detection: {str(e)}")
        # If there's an error, just return the staging data
        return staging_df, pd.DataFrame()

def prune_history_data():
    """
    Prune historical data to keep storage manageable by removing data older than the retention period
    """
    logging.info("Checking if history data needs pruning")
    
    # Check if history table exists
    history_exists = notebookutils.fs.exists(config["history_table_path"])
    
    if not history_exists:
        logging.info("No history data exists yet, skipping pruning")
        return
    
    try:
        # Calculate cutoff date
        days_to_keep = config["days_to_keep_history"]
        cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat()
        
        logging.info(f"Pruning history data older than {cutoff_date}")
        
        # Connect to DuckDB for SQL operations
        con = duckdb.connect()
        
        # Install and load Delta extension
        con.execute("INSTALL delta;")
        con.execute("LOAD delta;")
        
        # Load history data
        con.execute(f"CREATE TABLE history AS SELECT * FROM delta_scan('{config['history_table_path']}')")
        
        # Count total records before pruning
        total_count = con.execute("SELECT COUNT(*) FROM history").fetchone()[0]
        
        # Count records to be pruned
        to_prune_count = con.execute(f"""
            SELECT COUNT(*) FROM history 
            WHERE change_timestamp < '{cutoff_date}'
        """).fetchone()[0]
        
        if to_prune_count == 0:
            logging.info("No history data needs pruning at this time")
            return
        
        # Get records to keep
        keep_records = con.execute(f"""
            SELECT * FROM history 
            WHERE change_timestamp >= '{cutoff_date}'
        """).fetchdf()
        
        if keep_records.empty:
            logging.info("All history would be pruned, keeping at least 30 days worth instead")
            # Adjust to keep at least 30 days
            adjusted_cutoff = (datetime.now() - timedelta(days=30)).isoformat()
            keep_records = con.execute(f"""
                SELECT * FROM history 
                WHERE change_timestamp >= '{adjusted_cutoff}'
            """).fetchdf()
        
        # Write back only the records to keep
        storage_options = {
            "use_fabric_endpoint": "true", 
            "allow_unsafe_rename": "true", 
            "bearer_token": notebookutils.credentials.getToken('storage')
        }
        
        write_deltalake(
            config["history_table_path"], 
            keep_records, 
            mode="overwrite", 
            engine='rust', 
            storage_options=storage_options
        )
        
        logging.info(f"Pruned {to_prune_count} out of {total_count} historical records older than {cutoff_date}")
        
    except Exception as e:
        logging.error(f"Error during history pruning: {str(e)}")

# SECTION 7: Final Data Update and Alert System

In [None]:
def send_email_alert(subject: str, message: str):
    """
    Send an email alert when significant changes are detected
    
    Args:
        subject (str): Email subject
        message (str): Email message body
    """
    if not config["enable_alerts"]:
        logging.info("Alerts are disabled, skipping email notification")
        return
    
    try:
        # Create email message
        msg = MIMEMultipart()
        msg['From'] = config["alert_sender_email"]
        msg['To'] = ", ".join(config["alert_email_recipients"])
        msg['Subject'] = subject
        
        # Add message body
        msg.attach(MIMEText(message, 'plain'))
        
        # Connect to SMTP server
        server = smtplib.SMTP(config["alert_smtp_server"], config["alert_smtp_port"])
        server.starttls()
        
        # Send email
        server.send_message(msg)
        server.quit()
        
        logging.info(f"Email alert sent: {subject}")
        
    except Exception as e:
        logging.error(f"Failed to send email alert: {str(e)}")

def check_for_alert_conditions(changes_df: pd.DataFrame, staging_df: pd.DataFrame):
    """
    Check if any alert conditions are met based on the detected changes
    
    Args:
        changes_df (pd.DataFrame): DataFrame containing detected changes
        staging_df (pd.DataFrame): DataFrame containing all current items
    """
    # Skip if changes DataFrame is empty or alerts are disabled
    if changes_df.empty or not config["enable_alerts"]:
        return
    
    alerts = []
    
    # Check for significant change in item count
    try:
        production_exists = notebookutils.fs.exists(config["production_table_path"])
        
        if production_exists:
            # Read the production table to get the previous count
            production_df = read_deltalake(config["production_table_path"]).to_pandas()
            previous_count = len(production_df)
            current_count = len(staging_df)
            
            # Calculate percentage change
            if previous_count > 0:
                pct_change = abs((current_count - previous_count) / previous_count * 100)
                
                # Alert if the change exceeds the configured threshold
                if pct_change > config["alert_threshold_item_count_change_pct"]:
                    alert_msg = (f"ALERT: Significant change in Fabric item count detected. "
                                f"Previous: {previous_count}, Current: {current_count}, "
                                f"Change: {pct_change:.2f}%")
                    alerts.append(alert_msg)
                    logging.warning(alert_msg)
    except Exception as e:
        logging.error(f"Error checking for item count changes: {str(e)}")
    
    # Check for specific item type changes
    try:
        # Get counts of changes by change_type and item_type
        change_summary = changes_df.groupby(['change_type', 'type']).size().reset_index(name='count')
        
        # Check for significant changes in specific item types
        for _, row in change_summary.iterrows():
            change_type = row['change_type']
            item_type = row['type']
            count = row['count']
            
            # Set thresholds based on item type (these could be moved to config)
            threshold = 5  # Default threshold
            if item_type in ['Report', 'Dataset']:
                threshold = 10
            elif item_type in ['Lakehouse', 'Warehouse']:
                threshold = 3  # More critical items have lower thresholds
            
            if count > threshold:
                alert_msg = (f"ALERT: {count} {item_type} items have been {change_type}. "
                            f"This exceeds the threshold of {threshold}.")
                alerts.append(alert_msg)
                logging.warning(alert_msg)
    except Exception as e:
        logging.error(f"Error checking for item type changes: {str(e)}")
    
    # Check for changes in critically important items (e.g., by specific workspaces)
    try:
        # Define critical workspaces (could be moved to config)
        critical_workspaces = ['Finance', 'Executive', 'Production']
        
        # Filter changes to only those in critical workspaces
        if 'workspaceName' in changes_df.columns:
            critical_changes = changes_df[changes_df['workspaceName'].str.contains('|'.join(critical_workspaces), 
                                                                                  case=False, 
                                                                                  na=False)]
            
            if not critical_changes.empty:
                alert_msg = (f"ALERT: {len(critical_changes)} changes detected in critical workspaces: "
                            f"{', '.join(critical_workspaces)}.")
                alerts.append(alert_msg)
                logging.warning(alert_msg)
                
                # Add details for each critical change
                for _, row in critical_changes.iterrows():
                    item_detail = (f"- {row.get('change_type', 'Changed')} item: {row.get('name', 'Unknown')} "
                                  f"in workspace: {row.get('workspaceName', 'Unknown')}")
                    alerts.append(item_detail)
    except Exception as e:
        logging.error(f"Error checking for critical workspace changes: {str(e)}")
    
    # If we have alerts, send an email
    if alerts:
        subject = f"Fabric Items Alert: {len(alerts)} significant changes detected"
        message = "\n".join(alerts)
        message += f"\n\nFor more details, check the log file: {log_file_path}"
        
        send_email_alert(subject, message)

def update_production_data_and_history(staging_df: pd.DataFrame, changes_df: pd.DataFrame = None):
    """
    Update the production table with new data and maintain historical records
    
    Args:
        staging_df (pd.DataFrame): DataFrame containing all current Fabric items
        changes_df (pd.DataFrame, optional): DataFrame containing detected changes
    """
    try:
        # Convert DataFrames to Arrow tables for Delta Lake operations
        staging_table = pyarrow.Table.from_pandas(staging_df)
        
        # Update the production table (overwrite with the latest data)
        logging.info(f"Updating production table at {config['production_table_path']}")
        write_deltalake(
            config["production_table_path"],
            staging_table,
            mode="overwrite",
            engine="pyarrow"
        )
        
        # Update the history table (append with current snapshot and timestamp)
        history_df = staging_df.copy()
        history_df["snapshot_date"] = datetime.now().isoformat()
        
        history_table = pyarrow.Table.from_pandas(history_df)
        
        logging.info(f"Updating history table at {config['history_table_path']}")
        write_deltalake(
            config["history_table_path"],
            history_table,
            mode="append",
            engine="pyarrow"
        )
        
        # If changes were detected, save them to the changes table
        if changes_df is not None and not changes_df.empty:
            changes_table = pyarrow.Table.from_pandas(changes_df)
            
            logging.info(f"Updating changes table at {config['changes_table_path']} with {len(changes_df)} changes")
            write_deltalake(
                config["changes_table_path"],
                changes_table,
                mode="append",
                engine="pyarrow"
            )
        
        # Clean up historical data to maintain retention policy
        if config["days_to_keep_history"] > 0:
            cleanup_historical_data(config["days_to_keep_history"])
        
        logging.info("Data update complete")
        
    except Exception as e:
        logging.error(f"Error updating production data: {str(e)}")
        raise

def cleanup_historical_data(days_to_keep: int):
    """
    Remove historical data older than the specified retention period
    
    Args:
        days_to_keep (int): Number of days of history to retain
    """
    try:
        # Calculate the cutoff date
        cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat()
        
        # Check if history table exists
        if not notebookutils.fs.exists(config["history_table_path"]):
            logging.info("No history table found, skipping cleanup")
            return
            
        # Read the history table
        history_df = read_deltalake(config["history_table_path"]).to_pandas()
        
        # Filter to keep only records within the retention period
        retained_history = history_df[history_df["snapshot_date"] >= cutoff_date]
        
        # Check if we have records to remove
        if len(retained_history) < len(history_df):
            records_removed = len(history_df) - len(retained_history)
            logging.info(f"Removing {records_removed} historical records older than {cutoff_date}")
            
            # Write back the filtered history
            retained_table = pyarrow.Table.from_pandas(retained_history)
            write_deltalake(
                config["history_table_path"],
                retained_table,
                mode="overwrite",
                engine="pyarrow"
            )
        else:
            logging.info(f"No historical records found older than {cutoff_date}")
            
    except Exception as e:
        logging.error(f"Error during historical data cleanup: {str(e)}")

# SECTION 8: Create Views for Common Analysis Patterns

In [None]:
def create_analysis_views():
    """
    Create SQL views for common analysis patterns using DuckDB
    """
    try:
        logging.info("Creating analysis views for common patterns")
        
        # Initialize DuckDB connection
        conn = duckdb.connect(database=':memory:')
        
        # Register the Delta table as a view in DuckDB
        conn.execute(f"INSTALL 'delta';")
        conn.execute(f"LOAD 'delta';")
        
        # Register the tables
        conn.execute(f"CREATE VIEW fabric_items AS SELECT * FROM delta_scan('{config['production_table_path']}');")
        conn.execute(f"CREATE VIEW fabric_items_history AS SELECT * FROM delta_scan('{config['history_table_path']}');")
        conn.execute(f"CREATE VIEW fabric_items_changes AS SELECT * FROM delta_scan('{config['changes_table_path']}');")
        
        # Create view 1: Usage trends by item type
        view_sql = """
        CREATE VIEW IF NOT EXISTS usage_trends_by_type AS
        SELECT 
            DATE_TRUNC('month', snapshot_date::TIMESTAMP) AS month,
            type,
            COUNT(*) AS item_count,
            AVG(views) AS avg_views,
            SUM(views) AS total_views
        FROM fabric_items_history
        GROUP BY 1, 2
        ORDER BY 1, 2;
        """
        conn.execute(view_sql)
        
        # Create view 2: Recently modified items
        view_sql = """
        CREATE VIEW IF NOT EXISTS recently_modified_items AS
        SELECT 
            id,
            name,
            type,
            workspaceName,
            modifiedBy,
            modifiedDate,
            DATEDIFF('day', modifiedDate::TIMESTAMP, CURRENT_TIMESTAMP) AS days_since_modified
        FROM fabric_items
        WHERE DATEDIFF('day', modifiedDate::TIMESTAMP, CURRENT_TIMESTAMP) <= 30
        ORDER BY modifiedDate DESC;
        """
        conn.execute(view_sql)
        
        # Create view 3: Workspace growth over time
        view_sql = """
        CREATE VIEW IF NOT EXISTS workspace_growth AS
        SELECT 
            DATE_TRUNC('month', snapshot_date::TIMESTAMP) AS month,
            workspaceName,
            COUNT(DISTINCT id) AS item_count
        FROM fabric_items_history
        GROUP BY 1, 2
        ORDER BY 1, 2;
        """
        conn.execute(view_sql)
        
        # Create view 4: Most active users
        view_sql = """
        CREATE VIEW IF NOT EXISTS most_active_users AS
        SELECT 
            modifiedBy,
            COUNT(*) AS items_modified,
            COUNT(DISTINCT type) AS item_types_modified,
            MIN(modifiedDate) AS first_modification,
            MAX(modifiedDate) AS last_modification
        FROM fabric_items
        GROUP BY 1
        ORDER BY 2 DESC;
        """
        conn.execute(view_sql)
        
        # Create view 5: Item change frequency
        view_sql = """
        CREATE VIEW IF NOT EXISTS item_change_frequency AS
        SELECT 
            id,
            name,
            type,
            workspaceName,
            COUNT(*) AS change_count
        FROM fabric_items_changes
        GROUP BY 1, 2, 3, 4
        ORDER BY 5 DESC;
        """
        conn.execute(view_sql)
        
        # Save the views as Delta tables for querying from other tools
        views = [
            "usage_trends_by_type", 
            "recently_modified_items", 
            "workspace_growth", 
            "most_active_users", 
            "item_change_frequency"
        ]
        
        for view in views:
            result_df = conn.execute(f"SELECT * FROM {view}").fetchdf()
            result_table = pyarrow.Table.from_pandas(result_df)
            
            # Write to the views folder as a Delta table
            view_path = f"/lakehouse/default/Tables/views/{view}"
            
            # Create directory if it doesn't exist
            notebookutils.fs.mkdirs(f"Tables/views/")
            
            write_deltalake(
                view_path,
                result_table,
                mode="overwrite",
                engine="pyarrow"
            )
            
            logging.info(f"Created analysis view: {view}")
        
        logging.info("Analysis views creation complete")
        
    except Exception as e:
        logging.error(f"Error creating analysis views: {str(e)}")

# SECTION 9: Incremental Refresh Implementation

In [None]:
def process_incremental_data(new_items: List[Dict], existing_data_path: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Process data for incremental refresh, identifying changes and updates
    
    Args:
        new_items (List[Dict]): List of newly retrieved items
        existing_data_path (str): Path to existing data
        
    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: Complete updated dataset and changes dataset
    """
    try:
        logging.info("Processing incremental data update")
        
        # Create DataFrame from new items
        new_df = pd.DataFrame(new_items)
        
        # If no new items, return empty DataFrames
        if new_df.empty:
            logging.info("No new items to process")
            return pd.DataFrame(), pd.DataFrame()
        
        # Add retrieval timestamp
        new_df["retrieval_timestamp"] = datetime.now().isoformat()
        
        # Check if existing data exists
        if not notebookutils.fs.exists(existing_data_path):
            logging.info("No existing data found, treating as initial load")
            # For initial load, mark all as additions
            new_df["change_type"] = "added"
            return new_df, new_df
            
        # Load existing data
        existing_df = read_deltalake(existing_data_path).to_pandas()
        
        # Create sets of existing and new IDs for comparison
        existing_ids = set(existing_df["id"])
        new_ids = set(new_df["id"])
        
        # Identify added, removed, and potentially updated items
        added_ids = new_ids - existing_ids
        removed_ids = existing_ids - new_ids
        potentially_updated_ids = new_ids.intersection(existing_ids)
        
        # Prepare change tracking DataFrame
        changes_df = pd.DataFrame()
        
        # Process added items
        if added_ids:
            added_df = new_df[new_df["id"].isin(added_ids)].copy()
            added_df["change_type"] = "added"
            changes_df = pd.concat([changes_df, added_df])
            logging.info(f"Identified {len(added_ids)} new items")
        
        # Process removed items
        if removed_ids:
            removed_df = existing_df[existing_df["id"].isin(removed_ids)].copy()
            removed_df["change_type"] = "removed"
            changes_df = pd.concat([changes_df, removed_df])
            logging.info(f"Identified {len(removed_ids)} removed items")
        
        # Process updated items - need to compare fields to detect changes
        if potentially_updated_ids:
            # Fields to compare for detecting updates
            compare_fields = ["name", "type", "modifiedBy", "modifiedDate", "views"]
            compare_fields = [f for f in compare_fields if f in new_df.columns and f in existing_df.columns]
            
            updated_items = []
            
            # Create lookup dictionary for faster comparison
            existing_lookup = {row["id"]: row for _, row in existing_df[existing_df["id"].isin(potentially_updated_ids)].iterrows()}
            
            # Check each potentially updated item
            for _, row in new_df[new_df["id"].isin(potentially_updated_ids)].iterrows():
                item_id = row["id"]
                existing_item = existing_lookup.get(item_id)
                
                if existing_item is not None:
                    # Check if any fields changed
                    for field in compare_fields:
                        if row.get(field) != existing_item.get(field):
                            # This item has been updated
                            updated_item = row.copy()
                            updated_item["change_type"] = "updated"
                            updated_item["previous_" + field] = existing_item.get(field)
                            updated_items.append(updated_item)
                            break
            
            if updated_items:
                updated_df = pd.DataFrame(updated_items)
                changes_df = pd.concat([changes_df, updated_df])
                logging.info(f"Identified {len(updated_items)} updated items")
        
        # Create complete updated dataset
        # Remove items that no longer exist
        filtered_existing_df = existing_df[~existing_df["id"].isin(removed_ids)]
        
        # Update existing items with new data
        for item_id in potentially_updated_ids:
            existing_idx = filtered_existing_df.index[filtered_existing_df["id"] == item_id].tolist()
            new_idx = new_df.index[new_df["id"] == item_id].tolist()
            
            if existing_idx and new_idx:
                filtered_existing_df.loc[existing_idx[0]] = new_df.loc[new_idx[0]]
        
        # Add completely new items
        added_df = new_df[new_df["id"].isin(added_ids)]
        updated_df = pd.concat([filtered_existing_df, added_df])
        
        logging.info(f"Incremental update processed: {len(changes_df)} changes identified")
        
        return updated_df, changes_df
        
    except Exception as e:
        logging.error(f"Error during incremental data processing: {str(e)}")
        raise

# SECTION 10: Main Execution Flow

In [None]:
def main():
    """
    Main execution flow for Fabric Items retrieval and processing
    """
    try:
        logging.info("Starting Fabric Items retrieval and management process")
        
        # Step 1: Get authentication token
        access_token = get_auth_token()
        
        # Step 2: Retrieve Fabric items from API
        fabric_items = retrieve_fabric_items(access_token)
        
        if not fabric_items:
            logging.warning("No Fabric items retrieved, aborting execution")
            return
            
        logging.info(f"Retrieved {len(fabric_items)} Fabric items from API")
        
        # Step 3: Process data with incremental refresh support
        if config["incremental_refresh"]:
            full_df, changes_df = process_incremental_data(fabric_items, config["production_table_path"])
            
            # Skip further processing if no data or changes
            if full_df.empty:
                logging.info("No data to process after incremental refresh")
                return
        else:
            # Full refresh mode - process all items as new
            full_df = pd.DataFrame(fabric_items)
            full_df["retrieval_timestamp"] = datetime.now().isoformat()
            changes_df = pd.DataFrame()  # No change tracking in full refresh mode
        
        # Step 4: Perform data quality validation (would need implementation)
        valid_items, quality_issues = validate_data_quality(fabric_items)
        logging.info(f"Data quality validation: {len(valid_items)} valid, {len(quality_issues)} issues")
    
        full_df = process_and_store_data(valid_items, quality_issues)
        
        # Step 5: Check for alert conditions
        check_for_alert_conditions(changes_df, full_df)
        
        # Step 6: Update production data and maintain history
        update_production_data_and_history(full_df, changes_df)
        
        # Step 7: Create or update analysis views
        create_analysis_views()
        
        logging.info("Fabric Items retrieval and management process completed successfully")
        
    except Exception as e:
        logging.error(f"Error in main execution: {str(e)}")
        send_email_alert(
            "Fabric Items Process Error",
            f"An error occurred during the Fabric Items retrieval and management process: {str(e)}"
        )

# Execute the main function if running as the main script
if __name__ == "__main__":
    main()

# Section 11: Unit Tests

In [None]:
def test_validate_data_quality():
    items = [{"id": "1", "name": "", "type": "Report", "views": "-1", "modifiedDate": "invalid"}]
    valid_items, issues = validate_data_quality(items)
    assert len(issues) >= 3  # Expect issues for empty name, invalid views, and invalid date
    assert len(valid_items) == 0  # No valid items due to multiple issues
    print("Test passed: validate_data_quality")

test_validate_data_quality()