# Fabric Pipeline & Dataflow Monitoring

This notebook collects **Pipeline Runs** and **Dataflow Runs** from the Fabric REST APIs and sends them to Azure Log Analytics.

**📖 For complete setup instructions, authentication methods, configuration options, and troubleshooting, see [`fabric_pipeline_dataflow_collector_README.md`](fabric_pipeline_dataflow_collector_README.md)**

In [None]:
# === One-time installs per session Or Use Fabric Environment ===
%pip install --quiet msal requests azure-identity azure-keyvault-secrets python-dotenv

In [None]:
# === Parameters (mark this as a parameter cell in Fabric) ===
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Workspace and item configuration
workspace_id = os.getenv("FABRIC_WORKSPACE_ID")

# List only the items you want to collect. Leave empty lists to skip.
pipeline_item_ids = [
    # Add your pipeline IDs here, e.g.:
    "fffa88df-ed23-45ba-bb2e-803f0089dc39",
    "92243a88-c144-4749-8eac-e2dd8e7f9b31",
]
dataflow_item_ids = [
    # Add your dataflow IDs here, e.g.:
    "bc5c92b0-d58a-487b-8692-965e69345792",
    "065696af-4621-4538-953c-65899053ae24",
]

# === CONFIGURATION MODES ===
# OPTION 1: Bulk Ingestion (Historical Data Collection)
lookback_minutes = 43200  # 30 days for comprehensive bulk load
collect_activity_runs = True  # Disabled to avoid API limits

# OPTION 2: Incremental Collection (Regular Monitoring)
# lookback_minutes = 1200  # 20 hours for regular incremental collection
# collect_activity_runs = True  # Enabled for detailed activity monitoring

# 🎯 OPTION 3: Activity Runs Backfill (After bulk completion)
# lookback_minutes = 10080  # 7 days for recent activity runs
# collect_activity_runs = True  # Enabled for detailed data

# Lookback window options reference:
# - 1200 (20 hours) - Regular incremental runs
# - 10080 (7 days) - Good for activity runs backfill
# - 43200 (30 days) - Full month bulk ingestion
# - 131400 (3 months) - Comprehensive historical data

# DCR / Logs Ingestion API settings from environment variables
dcr_endpoint_host = os.getenv("DCR_ENDPOINT_HOST")
dcr_immutable_id = os.getenv("DCR_IMMUTABLE_ID")

# Stream names must match your DCR configuration and map to the LA tables we created
stream_pipeline = "Custom-FabricPipelineRun_CL"             
stream_activity = "Custom-FabricPipelineActivityRun_CL"     
stream_dataflow = "Custom-FabricDataflowRun_CL"            

# === Authentication Configuration ===
# Basic credentials from environment variables
tenant_id = os.getenv("FABRIC_TENANT_ID")
client_id = os.getenv("FABRIC_APP_ID")
client_secret_env = os.getenv("FABRIC_APP_SECRET")

# Key Vault configuration (optional)
use_key_vault = False  # Set to True to use Key Vault
use_managed_identity = False  # Set to True when running on Azure resources (VM, Container App, etc.)
key_vault_uri = os.getenv("AZURE_KEY_VAULT_URI", "https://kaydemokeyvault.vault.azure.net/")
key_vault_secret_name = os.getenv("AZURE_KEY_VAULT_SECRET_NAME", "FabricServicePrincipal")

# Authentication options:
# 1. Environment variables only: use_key_vault=False
# 2. Key Vault with managed identity: use_key_vault=True, use_managed_identity=True
# 3. Key Vault with client secret: use_key_vault=True, use_managed_identity=False (requires FABRIC_APP_SECRET)

# Validation
if not all([tenant_id, client_id, dcr_endpoint_host, dcr_immutable_id]):
    missing = []
    if not tenant_id: missing.append("FABRIC_TENANT_ID")
    if not client_id: missing.append("FABRIC_APP_ID")
    if not dcr_endpoint_host: missing.append("DCR_ENDPOINT_HOST")
    if not dcr_immutable_id: missing.append("DCR_IMMUTABLE_ID")
    print(f"❌ Missing required environment variables: {', '.join(missing)}")
    print("   Please check your .env file or environment configuration")
else:
    print("✅ Environment variables loaded successfully")

if not client_secret_env and not use_key_vault:
    print("⚠️  Warning: No authentication method configured")
    print("   Either set FABRIC_APP_SECRET or configure Key Vault")

print("\n Data Collection Configuration:")
print(f"  Workspace ID: {workspace_id or 'Not set'}")
print(f"  Lookback Window: {lookback_minutes:,} minutes ({lookback_minutes/1440:.1f} days)")
print(f"  Collect Activity Runs: {collect_activity_runs}")
print(f"  Pipeline Items: {len(pipeline_item_ids)}")
print(f"  Dataflow Items: {len(dataflow_item_ids)}")
print()
print("Authentication Configuration:")
print(f"  Tenant ID: {tenant_id[:8] + '...' if tenant_id else 'Not set'}")
print(f"  Client ID: {client_id[:8] + '...' if client_id else 'Not set'}")
print(f"  Use Key Vault: {use_key_vault}")
print(f"  Use Managed Identity: {use_managed_identity}")
print(f"  Environment Secret Available: {'Yes' if client_secret_env else 'No'}")
print(f"  Key Vault URI: {key_vault_uri if use_key_vault else 'Not used'}")
print()
print("Data Collection Endpoint Configuration:")
print(f"  DCR Endpoint: {dcr_endpoint_host or 'Not set'}")
print(f"  DCR Immutable ID: {dcr_immutable_id[:8] + '...' if dcr_immutable_id else 'Not set'}")
print()

# Configuration mode detection
if lookback_minutes > 10080 and not collect_activity_runs:  # More than 7 days, no activities
    print("BULK INGESTION MODE")
    print("   - Large lookback window for historical data")
    print("   - Activity runs disabled to avoid API limits")
    print("   - After completion, switch to incremental mode")
elif lookback_minutes <= 2880 and collect_activity_runs:  # 2 days or less with activities
    print("INCREMENTAL COLLECTION MODE")
    print("   - Short lookback window for regular monitoring")
    print("   - Activity runs enabled for detailed insights")
    print("   - Suitable for scheduled execution")
elif collect_activity_runs:  # Medium window with activities
    print("ACTIVITY RUNS BACKFILL MODE")
    print("   - Medium lookback window for activity collection")
    print("   - Collecting detailed activity data")
    print("   - Monitor API rate limits")
else:
    print("CUSTOM CONFIGURATION")
    print("   - Review settings based on your requirements")

if lookback_minutes > 10080:  # More than 7 days
    print("   - Consider running during off-peak hours")
    print("   - Monitor API rate limits and Log Analytics ingestion limits")

In [None]:
# === Fabric-Aware Authentication Helpers ===
def get_fabric_token(scope: str = "https://api.fabric.microsoft.com/.default") -> str:
    """
    Get authentication token with Fabric-aware logic:
    1. Try Fabric workspace identity (if available)
    2. Fall back to service principal authentication
    3. Support both local and Fabric environments
    """
    
    # First, try Fabric's built-in authentication if available
    try:
        import notebookutils
        print(f"[Auth] Attempting Fabric workspace identity for {scope}")
        
        # Use Fabric's credential system if available
        token = notebookutils.credentials.getSecret("System", "AccessToken")
        if token:
            print(f"[Auth] ✅ Successfully acquired token via Fabric workspace identity")
            return token
        else:
            print(f"[Auth] ⚠️  Fabric workspace token not available, falling back to service principal")
            
    except (ImportError, AttributeError, Exception) as e:
        print(f"[Auth] Fabric authentication not available: {str(e)[:100]}")
        print(f"[Auth] Using service principal authentication")
    
    # Fall back to standard service principal authentication
    return acquire_token_client_credentials(tenant_id, client_id, client_secret_env, scope)

def get_credentials_fabric_aware():
    """
    Get authentication credentials with Fabric runtime awareness
    Returns tuple: (tenant_id, client_id, client_secret, use_fabric_auth)
    """
    
    # Check if running in Fabric
    try:
        import notebookutils
        running_in_fabric = True
        
        # Try to get credentials from Fabric Key Vault integration
        try:
            fabric_tenant = notebookutils.credentials.getSecret("Fabric", "TenantId")
            fabric_client_id = notebookutils.credentials.getSecret("Fabric", "ClientId") 
            fabric_secret = notebookutils.credentials.getSecret("Fabric", "ClientSecret")
            
            if fabric_tenant and fabric_client_id:
                print("[Auth] ✅ Using credentials from Fabric Key Vault integration")
                return fabric_tenant, fabric_client_id, fabric_secret, True
                
        except Exception as e:
            print(f"[Auth] Fabric Key Vault not configured: {str(e)[:100]}")
            
    except ImportError:
        running_in_fabric = False
    
    # Use environment variables or parameter values
    final_tenant = tenant_id or os.getenv("FABRIC_TENANT_ID")
    final_client_id = client_id or os.getenv("FABRIC_APP_ID") 
    final_secret = client_secret_env or os.getenv("FABRIC_APP_SECRET")
    
    if not all([final_tenant, final_client_id, final_secret]):
        missing = []
        if not final_tenant: missing.append("tenant_id/FABRIC_TENANT_ID")
        if not final_client_id: missing.append("client_id/FABRIC_APP_ID")
        if not final_secret: missing.append("client_secret/FABRIC_APP_SECRET")
        
        print(f"[Auth] ❌ Missing credentials: {', '.join(missing)}")
        if running_in_fabric:
            print("[Auth] 💡 In Fabric, you can:")
            print("       1. Set up Key Vault integration with secrets named 'TenantId', 'ClientId', 'ClientSecret'")
            print("       2. Set values directly in the parameters cell above")
            print("       3. Use workspace managed identity (if configured)")
        else:
            print("[Auth] 💡 Set missing values in your .env file")
            
        return None, None, None, False
    
    auth_source = "Fabric Key Vault" if running_in_fabric else "Environment Variables"
    print(f"[Auth] ✅ Using credentials from {auth_source}")
    return final_tenant, final_client_id, final_secret, running_in_fabric

# Test credential availability
print("🔐 Testing Credential Availability:")
test_tenant, test_client, test_secret, is_fabric = get_credentials_fabric_aware()

if all([test_tenant, test_client, test_secret]):
    print(f" Credentials available")
    print(f" Runtime: {'Fabric' if is_fabric else 'Local'}")
    print(f" Tenant: {test_tenant[:8]}...")
    print(f" Client: {test_client[:8]}...")
    print(f" Secret: {'***configured***'}")
else:
    print(f" Credentials missing - check configuration above")

In [None]:
# === Environment Variable Validation ===
# This cell validates that all required environment variables are properly set

import os

# Check if .env file exists in current directory or parent directories
env_files_found = []
for path in ['.env', '../.env', '../../.env']:
    if os.path.exists(path):
        env_files_found.append(path)

print("🔍 Environment File Detection:")
if env_files_found:
    print(f"   Found .env files: {', '.join(env_files_found)}")
else:
    print("   No .env files found in current or parent directories")
    print("   Make sure to copy .env.example to .env and fill in your values")

# Detailed environment variable status
required_vars = [
    ("FABRIC_TENANT_ID", "Azure tenant ID"),
    ("FABRIC_APP_ID", "Service principal client ID"),
    ("FABRIC_APP_SECRET", "Service principal client secret"),
    ("FABRIC_WORKSPACE_ID", "Fabric workspace ID"),
    ("DCR_ENDPOINT_HOST", "Data Collection Rule endpoint host"),
    ("DCR_IMMUTABLE_ID", "Data Collection Rule immutable ID"),
]

optional_vars = [
    ("LOG_ANALYTICS_WORKSPACE_ID", "Log Analytics workspace ID"),
    ("AZURE_SUBSCRIPTION_ID", "Azure subscription ID"),
    ("FABRIC_RUNTIME_VERSION", "Fabric runtime version"),
    ("AZURE_KEY_VAULT_URI", "Azure Key Vault URI"),
    ("AZURE_KEY_VAULT_SECRET_NAME", "Key Vault secret name"),
]

print("\n📋 Required Environment Variables:")
for var_name, description in required_vars:
    value = os.getenv(var_name)
    if value:
        # Show first 8 chars for security
        display_value = value[:8] + "..." if len(value) > 8 else value
        print(f"   ✅ {var_name}: {display_value}")
    else:
        print(f"   ❌ {var_name}: Not set - {description}")

print("\n📋 Optional Environment Variables:")
for var_name, description in optional_vars:
    value = os.getenv(var_name)
    if value:
        display_value = value[:8] + "..." if len(value) > 8 else value
        print(f"   ✅ {var_name}: {display_value}")
    else:
        print(f"   ⚪ {var_name}: Not set - {description}")

print("\n💡 Next Steps:")
print("   1. Ensure all required variables are set in your .env file")
print("   2. Run the parameters cell above to load configuration")
print("   3. Proceed with authentication and data collection")

In [None]:
# === Fabric Runtime Detection ===
# This cell detects if we're running in Fabric and adapts accordingly

import sys
import os

# Detect if we're running in Fabric
running_in_fabric = False
try:
    import notebookutils
    running_in_fabric = True
    print("🏭 Running in Microsoft Fabric environment")
    print(f"   Fabric notebook utilities available: {notebookutils is not None}")
    
    # Try to get workspace context from Fabric
    try:
        # In Fabric, you can get current workspace info
        fabric_workspace_info = notebookutils.credentials.getSecret("FabricWorkspace", "WorkspaceId")
        if fabric_workspace_info and not workspace_id:
            workspace_id = fabric_workspace_info
            print(f"   Using Fabric workspace context: {workspace_id[:8]}...")
    except:
        pass
        
except ImportError:
    print("💻 Running in local development environment")
    print("   Loading configuration from .env file")

# Check for semantic-link-sempy (available in Fabric)
try:
    import sempy.fabric as fabric
    print("✅ Semantic Link available - can use Fabric workspace functions")
    
    # Get current workspace if not set
    if not workspace_id:
        try:
            current_workspace = fabric.get_workspace_id()
            if current_workspace:
                workspace_id = current_workspace
                print(f"   Auto-detected workspace ID: {workspace_id[:8]}...")
        except:
            pass
            
except ImportError:
    if running_in_fabric:
        print("⚠️  Semantic Link not available in this Fabric runtime")
    else:
        print("ℹ️  Semantic Link not available (local environment)")

# Fabric-specific authentication options
if running_in_fabric:
    print("\n🔐 Fabric Authentication Options:")
    print("   1. Use Fabric workspace identity (recommended)")
    print("   2. Use Key Vault with workspace managed identity")
    print("   3. Set credentials in parameters cell")
    print("   4. Use environment variables (if .env file uploaded)")
    
    # In Fabric, you can use workspace identity for authentication
    try:
        # Check if we can use Fabric's built-in authentication
        if hasattr(notebookutils, 'credentials'):
            print("   ✅ Fabric credential utilities available")
        else:
            print("   ⚠️  Fabric credential utilities not available")
    except:
        pass
else:
    print("\n🔐 Local Development Authentication:")
    print("   Using environment variables from .env file")

print(f"\n📍 Current Configuration:")
print(f"   Runtime Environment: {'Fabric' if running_in_fabric else 'Local'}")
print(f"   Workspace ID: {workspace_id[:8] + '...' if workspace_id else 'Not set'}")
print(f"   Python Version: {sys.version.split()[0]}")
print(f"   Working Directory: {os.getcwd()}")

In [None]:
# === Verify critical imports ===
try:
    from azure.identity import DefaultAzureCredential
    from azure.keyvault.secrets import SecretClient
    import msal
    import pandas as pd
    import requests

    print("\n✅ All critical imports successful - environment ready!")
except ImportError as e:
    print(f"\n❌ Import error: {e}")
    print("Some packages may need manual installation")

In [None]:
# === define main functions ===
import os, json, time, datetime as dt
import requests
from typing import List, Dict, Any


# Key Vault helper with multiple authentication options
def get_secret_from_kv(
    vault_uri: str,
    secret_name: str,
    tenant_id: str = None,
    client_id: str = None,
    client_secret: str = None,
    use_managed_identity: bool = False,
) -> str:
    """
    Get secret from Key Vault using either:
    1. Managed Identity (recommended for Azure resources)
    2. Client credentials (service principal)
    """
    try:
        from azure.keyvault.secrets import SecretClient

        if use_managed_identity:
            # Use managed identity (no secrets needed!)
            from azure.identity import ManagedIdentityCredential

            print("[KeyVault] Using managed identity authentication")
            credential = ManagedIdentityCredential()
        else:
            # Use client credentials (requires secret - circular dependency)
            from azure.identity import ClientSecretCredential

            print("[KeyVault] Using client secret authentication")
            credential = ClientSecretCredential(
                tenant_id=tenant_id, client_id=client_id, client_secret=client_secret
            )

        client = SecretClient(vault_url=vault_uri, credential=credential)
        secret = client.get_secret(secret_name)
        return secret.value

    except Exception as e:
        print(f"[KeyVault] Failed to fetch secret '{secret_name}': {e}")
        return None


# Tokens
FABRIC_SCOPE = "https://api.fabric.microsoft.com/.default"
MONITOR_SCOPE = "https://monitor.azure.com/.default"
FABRIC_API = "https://api.fabric.microsoft.com/v1"


def acquire_token_client_credentials(
    tenant: str, client_id: str, client_secret: str, scope: str
) -> str:
    import msal

    authority = f"https://login.microsoftonline.com/{tenant}"
    app = msal.ConfidentialClientApplication(
        client_id, authority=authority, client_credential=client_secret
    )
    result = app.acquire_token_for_client(scopes=[scope])
    if "access_token" not in result:
        print(f"❌ Token acquisition failed for {scope}")
        print(f"   Error: {result.get('error', 'Unknown error')}")
        print(f"   Description: {result.get('error_description', 'No description')}")
        raise RuntimeError(f"Failed to get token for {scope}: {result}")

    # Debug: Show token info (first/last 10 chars only)
    token = result["access_token"]
    print(f"✅ Token acquired for {scope}: {token[:10]}...{token[-10:]}")
    return token


def acquire_token_managed_identity(scope: str) -> str:
    """Get token using managed identity (for Azure resources)"""
    try:
        from azure.identity import ManagedIdentityCredential

        credential = ManagedIdentityCredential()
        token = credential.get_token(scope)
        print(
            f"✅ Managed identity token acquired for {scope}: {token.token[:10]}...{token.token[-10:]}"
        )
        return token.token
    except Exception as e:
        raise RuntimeError(f"Failed to get managed identity token for {scope}: {e}")


def iso_now() -> str:
    return (
        dt.datetime.utcnow()
        .replace(tzinfo=dt.timezone.utc)
        .isoformat()
        .replace("+00:00", "Z")
    )


def to_iso(ts: dt.datetime) -> str:
    if ts.tzinfo is None:
        ts = ts.replace(tzinfo=dt.timezone.utc)
    return ts.isoformat().replace("+00:00", "Z")


def parse_iso(s: str) -> dt.datetime:
    if not s:
        return None
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"
    parsed = dt.datetime.fromisoformat(s)
    # Ensure timezone awareness - if no timezone, assume UTC
    if parsed.tzinfo is None:
        parsed = parsed.replace(tzinfo=dt.timezone.utc)
    return parsed


def within_lookback(start_iso: str, end_iso: str, lookback_minutes: int) -> bool:
    edge = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc) - dt.timedelta(
        minutes=int(lookback_minutes)
    )
    t = parse_iso(end_iso) or parse_iso(start_iso)
    return (t is not None) and (t >= edge)


# Fabric REST with better error handling
def list_item_job_instances(
    workspace_id: str, item_id: str, token: str
) -> List[Dict[str, Any]]:
    url = f"{FABRIC_API}/workspaces/{workspace_id}/items/{item_id}/jobs/instances"
    headers = {"Authorization": f"Bearer {token}"}
    out = []

    print(f"[DEBUG] Calling: {url}")
    print(f"[DEBUG] Token (first 20 chars): {token[:20]}...")

    while True:
        try:
            r = requests.get(url, headers=headers, timeout=60)
            print(f"[DEBUG] Response status: {r.status_code}")

            if r.status_code == 401:
                print("❌ 401 Unauthorized - Authentication failed")
                print("Possible causes:")
                print("1. Token expired or invalid")
                print("2. Service principal doesn't have 'Fabric.ReadAll' permission")
                print("3. Service principal not granted admin consent")
                print("4. Wrong tenant ID")
                print(f"Response: {r.text}")

            elif r.status_code == 403:
                print("❌ 403 Forbidden - Permission denied")
                print(
                    "The service principal needs 'Fabric.ReadAll' application permission"
                )
                print(f"Response: {r.text}")

            elif r.status_code == 404:
                print("❌ 404 Not Found - Workspace or item doesn't exist")
                print(f"Workspace ID: {workspace_id}")
                print(f"Item ID: {item_id}")
                print(f"Response: {r.text}")

            r.raise_for_status()
            data = r.json()
            out.extend(data.get("value", []))
            next_link = data.get("nextLink")
            if not next_link:
                break
            url = next_link

        except requests.exceptions.HTTPError as e:
            print(f"❌ HTTP Error: {e}")
            raise
        except Exception as e:
            print(f"❌ Unexpected error: {e}")
            raise

    return out


def query_pipeline_activity_runs(
    workspace_id: str,
    job_instance_id: str,
    token: str,
    last_after_iso: str,
    last_before_iso: str,
) -> List[Dict[str, Any]]:
    url = f"{FABRIC_API}/workspaces/{workspace_id}/datapipelines/pipelineruns/{job_instance_id}/queryactivityruns"
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    body = {
        "filters": [],
        "orderBy": [{"orderBy": "ActivityRunStart", "order": "DESC"}],
        "lastUpdatedAfter": last_after_iso,
        "lastUpdatedBefore": last_before_iso,
    }
    r = requests.post(url, headers=headers, json=body, timeout=60)
    r.raise_for_status()
    data = r.json()
    return data.get("value") or data.get("activityRuns") or data.get("items") or []


# Logs Ingestion API
def post_rows_to_dcr(
    endpoint_host: str,
    dcr_id: str,
    stream_name: str,
    rows: List[Dict[str, Any]],
    monitor_token: str,
):
    if not rows:
        return {"sent": 0, "batches": 0}
    import json

    MAX_BYTES = 950_000  # keep under API limit (~1MB)
    batch, batches, sent, size = [], 0, 0, 2  # 2 for []

    def flush():
        nonlocal batches, sent, batch, size
        if not batch:
            return
        url = f"https://{endpoint_host}/dataCollectionRules/{dcr_id}/streams/{stream_name}?api-version=2023-01-01"
        headers = {
            "Authorization": f"Bearer {monitor_token}",
            "Content-Type": "application/json",
        }
        resp = requests.post(url, headers=headers, data=json.dumps(batch), timeout=60)
        if resp.status_code >= 400:
            raise RuntimeError(
                f"Ingestion failed ({resp.status_code}): {resp.text[:500]}"
            )
        batches += 1
        sent += len(batch)
        batch, size = [], 2

    for row in rows:
        s = len(json.dumps(row, separators=(",", ":")))
        if size + s + (1 if batch else 0) > MAX_BYTES:
            flush()
        batch.append(row)
        size += s + (1 if batch else 0)
    flush()
    return {"sent": sent, "batches": batches}


# Mappers
def map_pipeline_run(
    workspace_id: str, item_id: str, run: Dict[str, Any]
) -> Dict[str, Any]:
    st, et = run.get("startTimeUtc"), run.get("endTimeUtc")
    dur_ms = None
    try:
        if st and et:
            dur_ms = int((parse_iso(et) - parse_iso(st)).total_seconds() * 1000)
    except Exception:
        pass
    fr = run.get("failureReason") or {}
    return {
        "TimeGenerated": et or st or iso_now(),
        "WorkspaceId": workspace_id,
        "ItemId": item_id,
        "ItemType": "DataPipeline",
        "RunId": run.get("id"),
        "JobType": run.get("jobType", "Pipeline"),
        "InvokeType": run.get("invokeType"),
        "Status": run.get("status"),
        "StartTimeUtc": st,
        "EndTimeUtc": et,
        "DurationMs": dur_ms,
        "FailureCode": fr.get("errorCode") if isinstance(fr, dict) else None,
        "FailureMessage": fr.get("message") if isinstance(fr, dict) else None,
    }


def map_activity_run(
    workspace_id: str, pipeline_id: str, pipeline_run_id: str, act: Dict[str, Any]
) -> Dict[str, Any]:
    st = act.get("activityRunStart") or act.get("ActivityRunStart")
    et = act.get("activityRunEnd") or act.get("ActivityRunEnd")
    dur_ms = act.get("durationInMs") or act.get("DurationInMs")
    
    # Extract performance metrics if available
    rows_read = None
    rows_written = None
    
    # Try to get performance metrics from activity output
    output = act.get("output") or {}
    if isinstance(output, dict):
        # Check for common performance metric fields
        rows_read = output.get("rowsRead") or output.get("dataRead") or output.get("recordsRead")
        rows_written = output.get("rowsWritten") or output.get("dataWritten") or output.get("recordsWritten")
    
    return {
        "TimeGenerated": et or st or iso_now(),
        "WorkspaceId": workspace_id,
        "PipelineName": pipeline_id,  # This might be the pipeline name, not ID - adjust as needed
        "ActivityName": act.get("activityName") or act.get("ActivityName"),
        "ActivityType": act.get("activityType") or act.get("ActivityType"),
        "RunId": pipeline_run_id,
        "Status": act.get("status") or act.get("Status"),
        "StartTimeUtc": st,
        "EndTimeUtc": et,
        "DurationMs": dur_ms,
        "RowsRead": rows_read,
        "RowsWritten": rows_written,
        "ErrorCode": (
            (act.get("error") or {}).get("code")
            if isinstance(act.get("error"), dict)
            else None
        ),
        "ErrorMessage": (
            (act.get("error") or {}).get("message")
            if isinstance(act.get("error"), dict)
            else None
        ),
    }


def map_dataflow_run(
    workspace_id: str, item_id: str, run: Dict[str, Any]
) -> Dict[str, Any]:
    st, et = run.get("startTimeUtc"), run.get("endTimeUtc")
    dur_ms = None
    try:
        if st and et:
            dur_ms = int((parse_iso(et) - parse_iso(st)).total_seconds() * 1000)
    except Exception:
        pass
    
    # Handle error information - prefer simple error message for dataflows
    fr = run.get("failureReason") or {}
    error_message = None
    if isinstance(fr, dict):
        error_message = fr.get("message")
    elif isinstance(fr, str):
        error_message = fr
    
    return {
        "TimeGenerated": et or st or iso_now(),
        "WorkspaceId": workspace_id,
        "DataflowId": item_id,  # Using DataflowId to match table schema
        "RunId": run.get("id"),
        "Status": run.get("status"),
        "StartTimeUtc": st,
        "EndTimeUtc": et,
        "DurationMs": dur_ms,
        "ErrorMessage": error_message,
    }

In [None]:
# === Main ===

client_secret = None

# Option 1: Direct environment variable (simplest)
if client_secret_env:
    client_secret = client_secret_env
    print("✅ Using client secret from environment variable")

# Option 2: Key Vault authentication
elif use_key_vault:
    print(f"Fetching secret '{key_vault_secret_name}' from Key Vault '{key_vault_uri}'...")
    
    if use_managed_identity:
        # Use managed identity to access Key Vault
        print("🔐 Using managed identity to authenticate to Key Vault")
        client_secret = get_secret_from_kv(
            key_vault_uri, 
            key_vault_secret_name, 
            use_managed_identity=True
        )
    else:
        # Use client credentials to access Key Vault
        print("🔐 Using client credentials to authenticate to Key Vault")
        temp_secret = os.getenv("FABRIC_APP_SECRET")
        if not temp_secret:
            print("❌ Cannot access Key Vault with client credentials: FABRIC_APP_SECRET environment variable is required")
            print("Solutions:")
            print("1. Set FABRIC_APP_SECRET environment variable")
            print("2. Set use_managed_identity=True (if running on Azure)")
            print("3. Set use_key_vault=False and use environment variables directly")
            raise RuntimeError("FABRIC_APP_SECRET required for Key Vault client credential authentication")
        
        client_secret = get_secret_from_kv(
            key_vault_uri, 
            key_vault_secret_name, 
            tenant_id, 
            client_id, 
            temp_secret,
            use_managed_identity=False
        )
    
    print(f"Key Vault returned: {'***' if client_secret else 'None'}")

# Final validation
if not client_secret:
    print("❌ No client secret found!")
    print("Authentication Configuration:")
    print(f"  use_key_vault: {use_key_vault}")
    print(f"  use_managed_identity: {use_managed_identity}")
    print(f"  FABRIC_APP_SECRET env var: {'Set' if client_secret_env else 'Not set'}")
    print()
    print("Solutions:")
    print("1. Set FABRIC_APP_SECRET environment variable")
    print("2. Create .env file with FABRIC_APP_SECRET=your-secret")
    print("3. Use managed identity (set use_managed_identity=True)")
    print("4. Set use_key_vault=False and provide secret directly")
    raise RuntimeError("Client secret not found. Check your authentication configuration.")

print("✅ Client secret resolved successfully")

# Determine token acquisition method
use_managed_identity_for_tokens = use_managed_identity and use_key_vault

if use_managed_identity_for_tokens:
    print("🔐 Using managed identity for token acquisition")
    try:
        print("Acquiring Fabric API token with managed identity...")
        fabric_token = acquire_token_managed_identity(FABRIC_SCOPE)
        
        print("Acquiring Azure Monitor token with managed identity...")
        monitor_token = acquire_token_managed_identity(MONITOR_SCOPE)
        
        print("✅ All tokens acquired successfully with managed identity")
    except Exception as e:
        print(f"❌ Managed identity token acquisition failed: {e}")
        print("Falling back to client credentials...")
        use_managed_identity_for_tokens = False

if not use_managed_identity_for_tokens:
    print("🔐 Using client credentials for token acquisition")
    # Acquire tokens using client credentials
    print("Acquiring Fabric API token...")
    fabric_token = acquire_token_client_credentials(
        tenant_id, client_id, client_secret, FABRIC_SCOPE
    )

    print("Acquiring Azure Monitor token...")
    monitor_token = acquire_token_client_credentials(
        tenant_id, client_id, client_secret, MONITOR_SCOPE
    )

    print("✅ All tokens acquired successfully with client credentials")

now = dt.datetime.utcnow().replace(tzinfo=dt.timezone.utc)
window_start = to_iso(now - dt.timedelta(minutes=int(lookback_minutes)))
window_end = to_iso(now)

print(f"Collecting data from {window_start} to {window_end} (lookback: {lookback_minutes} minutes)")

pipeline_rows, activity_rows, dataflow_rows = [], [], []

# Pipelines
print(f"Processing {len(pipeline_item_ids)} pipelines...")
for pid in pipeline_item_ids:
    print(f"  Fetching runs for pipeline {pid}")
    runs = list_item_job_instances(workspace_id, pid, fabric_token)
    runs_recent = [
        r
        for r in runs
        if within_lookback(r.get("startTimeUtc"), r.get("endTimeUtc"), lookback_minutes)
    ]
    print(f"    Found {len(runs_recent)} recent runs")
    for r in runs_recent:
        pipeline_rows.append(map_pipeline_run(workspace_id, pid, r))
        if collect_activity_runs:
            run_id = r.get("id")
            try:
                acts = query_pipeline_activity_runs(
                    workspace_id, run_id, fabric_token, window_start, window_end
                )
                print(f"    Found {len(acts)} activity runs for run {run_id}")
                for a in acts:
                    activity_rows.append(map_activity_run(workspace_id, pid, run_id, a))
            except Exception as e:
                print(f"    ⚠️  Failed to get activity runs for pipeline {pid} run {run_id}: {e}")

# Dataflows
print(f"Processing {len(dataflow_item_ids)} dataflows...")
for did in dataflow_item_ids:
    print(f"  Fetching runs for dataflow {did}")
    runs = list_item_job_instances(workspace_id, did, fabric_token)
    runs_recent = [
        r
        for r in runs
        if within_lookback(r.get("startTimeUtc"), r.get("endTimeUtc"), lookback_minutes)
    ]
    print(f"    Found {len(runs_recent)} recent runs")
    for r in runs_recent:
        dataflow_rows.append(map_dataflow_run(workspace_id, did, r))

# Ingest to LA via Logs Ingestion API
print(f"\nIngesting data to Log Analytics:")
print(f"  Pipeline runs: {len(pipeline_rows)}")
print(f"  Activity runs: {len(activity_rows)}")
print(f"  Dataflow runs: {len(dataflow_rows)}")

summary = {}
if pipeline_rows:
    print("  Sending pipeline runs...")
    res = post_rows_to_dcr(
        dcr_endpoint_host,
        dcr_immutable_id,
        stream_pipeline,
        pipeline_rows,
        monitor_token,
    )
    summary["pipeline_rows"] = res

if activity_rows:
    print("  Sending activity runs...")
    res = post_rows_to_dcr(
        dcr_endpoint_host,
        dcr_immutable_id,
        stream_activity,
        activity_rows,
        monitor_token,
    )
    summary["activity_rows"] = res

if dataflow_rows:
    print("  Sending dataflow runs...")
    res = post_rows_to_dcr(
        dcr_endpoint_host,
        dcr_immutable_id,
        stream_dataflow,
        dataflow_rows,
        monitor_token,
    )
    summary["dataflow_rows"] = res

import json

print("\n✅ Done! Ingestion summary:")
print(json.dumps(summary, indent=2))

In [None]:
# === Troubleshooting: Why are Log Analytics tables empty? ===

print("🔍 TROUBLESHOOTING LOG ANALYTICS TABLES")
print("=" * 50)

# 1. Check if data was actually sent
print("\n1. DATA COLLECTION SUMMARY:")
if 'summary' in locals():
    import json
    print(json.dumps(summary, indent=2))
    
    total_sent = sum(result.get('sent', 0) for result in summary.values())
    print(f"\nTotal records sent: {total_sent}")
    
    if total_sent == 0:
        print("❌ No data was sent - this explains why tables are empty!")
        print("Possible causes:")
        print("   - No recent runs in the lookback window")
        print("   - Empty item ID lists")
        print("   - API authentication issues")
    else:
        print(f"✅ {total_sent} records were successfully sent to DCR")
else:
    print("❌ No summary data available - collection may have failed")

# 2. Check ingestion delay
print("\n2. LOG ANALYTICS INGESTION TIMING:")
print("   Data ingestion to Log Analytics can take 5-30 minutes")
print("   Custom tables may take longer for the first ingestion")
print(f"   Data was sent at: {dt.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} UTC")
print("   Check tables again in 10-15 minutes")

# 3. Check table names and structure
print("\n3. LOG ANALYTICS TABLE VERIFICATION:")
print("   Expected tables in your Log Analytics workspace:")
print(f"   - FabricPipelineRun_CL (should have {len(pipeline_rows)} records)")
print(f"   - FabricPipelineActivityRun_CL (should have {len(activity_rows)} records)")
print(f"   - FabricDataflowRun_CL (should have {len(dataflow_rows)} records)")
print()
print("   Sample KQL queries to check tables:")
print("   ```kql")
print("   // Check if tables exist and have data")
print("   FabricPipelineRun_CL | count")
print("   FabricPipelineActivityRun_CL | count") 
print("   FabricDataflowRun_CL | count")
print()
print("   // Check recent data (last 24 hours)")
print("   FabricPipelineRun_CL | where TimeGenerated > ago(24h)")
print("   FabricDataflowRun_CL | where TimeGenerated > ago(24h)")
print("   ```")

# 4. Check DCR configuration
print("\n4. DCR CONFIGURATION CHECK:")
print(f"   DCR Endpoint: {dcr_endpoint_host}")
print(f"   DCR Immutable ID: {dcr_immutable_id[:20]}...")
print(f"   Stream Names Used:")
print(f"     - {stream_pipeline}")
print(f"     - {stream_activity}")
print(f"     - {stream_dataflow}")

# 5. Sample data inspection
print("\n5. SAMPLE DATA INSPECTION:")
if pipeline_rows:
    print("   Sample Pipeline Run Data:")
    sample_pipeline = pipeline_rows[0]
    for key, value in sample_pipeline.items():
        print(f"     {key}: {value}")
    print()

if dataflow_rows:
    print("   Sample Dataflow Run Data:")
    sample_dataflow = dataflow_rows[0]
    for key, value in sample_dataflow.items():
        print(f"     {key}: {value}")
    print()

# 6. Common troubleshooting steps
print("\n6. COMMON TROUBLESHOOTING STEPS:")
print("   a) Wait 10-15 minutes for ingestion to complete")
print("   b) Check if tables were created properly in Log Analytics")
print("   c) Verify DCR is properly configured and linked to workspace")
print("   d) Check if service principal has Log Analytics Contributor role")
print("   e) Verify workspace ID matches your actual Log Analytics workspace")
print("   f) Check Azure Activity Log for any DCR ingestion errors")

print("\n7. IMMEDIATE ACTIONS TO VERIFY:")
print("   1. Go to your Log Analytics workspace in Azure Portal")
print("   2. Navigate to 'Logs' section")
print("   3. Run: search '*' | where TimeGenerated > ago(1h) | take 10")
print("   4. Check if any FabricPipeline* or FabricDataflow* tables appear")
print("   5. If tables don't exist, check if Bicep/Terraform deployment completed successfully")

print("\n📊 NEXT STEPS:")
if total_sent > 0:
    print("   ✅ Data was sent successfully - wait 10-15 minutes and check again")
    print("   ✅ If still empty after 30 minutes, check DCR configuration and permissions")
else:
    print("   ❌ No data was sent - investigate why collections returned empty results")
    print("   ❌ Check pipeline/dataflow IDs and workspace ID configuration")