# Fabric Spark Monitoring with Log Analytics Integration

## Overview
This notebook demonstrates how to collect Spark application data, logs, and metrics from Microsoft Fabric using the new Spark Monitoring APIs and ingest them into Azure Monitor Log Analytics.

### What This Notebook Does
1. **Collect Spark Applications**: List all Spark applications in a workspace
2. **Collect Application Details**: Get detailed metrics and resource usage
3. **Collect Logs**: Retrieve driver, executor, and Livy logs
4. **Collect Metrics**: Get performance metrics using Spark History Server APIs
5. **Ingest to Log Analytics**: Send all data to Azure Monitor for analysis

### Prerequisites
- Azure authentication configured (service principal or managed identity)
- Log Analytics workspace with DCR-based custom tables
- Data Collection Rules (DCR) and Data Collection Endpoint (DCE) configured
- Fabric workspace with Spark applications

### Value Proposition
- **Centralized Monitoring**: All Spark telemetry in Azure Monitor
- **Rich Analytics**: KQL queries for performance analysis
- **Alerting**: Proactive notifications for job failures or performance issues
- **Correlation**: Cross-reference with other Fabric workloads
- **Historical Analysis**: Long-term trends and optimization insights

## Setup and Configuration

In [None]:
# Import required modules
import os
import json
from datetime import datetime, timedelta

# Import FabricLA-Connector components
from fabricla_connector import workflows
from fabricla_connector.config import validate_config
from fabricla_connector.collectors import (
    collect_spark_applications_workspace,
    collect_spark_applications_item,
    collect_spark_logs,
    collect_spark_metrics
)

print("📦 FabricLA-Connector modules imported successfully")
print("🔗 Spark Monitoring API integration ready")

In [None]:
# Configuration and validation
print("⚙️ Validating configuration...")

# Validate all configuration sections
config_status = validate_config("all")
print(f"Configuration Status: {'✅ Valid' if config_status else '❌ Invalid'}")

# Set workspace ID (update with your workspace ID)
WORKSPACE_ID = os.getenv("FABRIC_WORKSPACE_ID", "your-workspace-id-here")

if WORKSPACE_ID == "your-workspace-id-here":
    print("⚠️ Please update WORKSPACE_ID with your actual Fabric workspace ID")
else:
    print(f"🎯 Target workspace: {WORKSPACE_ID}")

# Configuration for data collection
LOOKBACK_HOURS = 24  # How far back to look for Spark applications
MAX_APPLICATIONS = 50  # Maximum number of applications to process
INCLUDE_LOGS = True  # Whether to collect detailed logs
INCLUDE_METRICS = True  # Whether to collect performance metrics

print(f"📊 Collection settings:")
print(f"   Lookback: {LOOKBACK_HOURS} hours")
print(f"   Max applications: {MAX_APPLICATIONS}")
print(f"   Include logs: {INCLUDE_LOGS}")
print(f"   Include metrics: {INCLUDE_METRICS}")

## Spark Applications Collection

### Workspace-Level Spark Applications
Collect all Spark applications running in the workspace.

In [None]:
# Collect workspace-level Spark applications
print("🚀 Collecting workspace-level Spark applications...")

results = workflows.collect_and_ingest_spark_applications(
    workspace_id=WORKSPACE_ID,
    lookback_hours=LOOKBACK_HOURS,
    max_items=MAX_APPLICATIONS
)

print(f"\n📋 Workspace Spark Applications Results:")
print(f"   Collected: {results['collected']}")
print(f"   Ingested: {results['ingested']}")
print(f"   Errors: {len(results['errors'])}")

if results['errors']:
    print("\n❌ Errors encountered:")
    for error in results['errors'][:3]:  # Show first 3 errors
        print(f"   - {error}")

# Store results for later use
workspace_spark_results = results

### Item-Level Spark Applications
Collect Spark applications for specific items (notebooks, Spark job definitions, etc.).

In [None]:
# Example: Collect Spark applications for a specific notebook
# Update these with your actual item IDs if you want to test item-level collection

SAMPLE_NOTEBOOK_ID = os.getenv("SAMPLE_NOTEBOOK_ID")
SAMPLE_SPARK_JOB_ID = os.getenv("SAMPLE_SPARK_JOB_ID")

item_results = []

if SAMPLE_NOTEBOOK_ID:
    print(f"🚀 Collecting Spark applications for notebook {SAMPLE_NOTEBOOK_ID}...")
    
    notebook_results = workflows.collect_and_ingest_spark_item_applications(
        workspace_id=WORKSPACE_ID,
        item_id=SAMPLE_NOTEBOOK_ID,
        item_type="notebook",
        lookback_hours=LOOKBACK_HOURS
    )
    
    item_results.append(("notebook", notebook_results))
    print(f"   Notebook apps - Collected: {notebook_results['collected']}, Ingested: {notebook_results['ingested']}")

if SAMPLE_SPARK_JOB_ID:
    print(f"🚀 Collecting Spark applications for Spark job {SAMPLE_SPARK_JOB_ID}...")
    
    job_results = workflows.collect_and_ingest_spark_item_applications(
        workspace_id=WORKSPACE_ID,
        item_id=SAMPLE_SPARK_JOB_ID,
        item_type="sparkjobdefinition",
        lookback_hours=LOOKBACK_HOURS
    )
    
    item_results.append(("sparkjobdefinition", job_results))
    print(f"   Spark job apps - Collected: {job_results['collected']}, Ingested: {job_results['ingested']}")

if not SAMPLE_NOTEBOOK_ID and not SAMPLE_SPARK_JOB_ID:
    print("ℹ️ No item IDs configured. Set SAMPLE_NOTEBOOK_ID or SAMPLE_SPARK_JOB_ID environment variables to test item-level collection.")

print(f"\n📊 Item-level collection completed: {len(item_results)} items processed")

## Detailed Spark Data Collection

### Recent Applications Analysis
Get recent applications and collect detailed logs and metrics.

In [None]:
# Get recent Spark applications for detailed analysis
print("🔍 Analyzing recent Spark applications for detailed data collection...")

recent_applications = []
logs_collected = 0
metrics_collected = 0

# Collect recent applications (last 6 hours)
for app in collect_spark_applications_workspace(WORKSPACE_ID, lookback_hours=6, max_items=10):
    recent_applications.append(app)

print(f"📋 Found {len(recent_applications)} recent applications")

# Process each application for logs and metrics
for i, app in enumerate(recent_applications[:5], 1):  # Process first 5 applications
    session_id = app.get("SessionId")
    application_id = app.get("ApplicationId")
    app_name = app.get("ApplicationName", "Unknown")
    state = app.get("State", "Unknown")
    
    print(f"\n🔍 Processing application {i}/5: {app_name} (State: {state})")
    print(f"   Session ID: {session_id}")
    print(f"   Application ID: {application_id}")
    
    if not session_id:
        print("   ⚠️ No session ID available, skipping")
        continue
    
    try:
        # Collect logs if enabled
        if INCLUDE_LOGS:
            print("   📝 Collecting logs...")
            log_results = workflows.collect_and_ingest_spark_logs(
                workspace_id=WORKSPACE_ID,
                session_id=session_id,
                log_types=["driver", "executor"],  # Collect driver and executor logs
                max_lines=100  # Limit log lines for demo
            )
            logs_collected += log_results["collected"]
            print(f"      Logs collected: {log_results['collected']}")
        
        # Collect metrics if enabled and application ID is available
        if INCLUDE_METRICS and application_id:
            print("   📊 Collecting metrics...")
            metrics_results = workflows.collect_and_ingest_spark_metrics(
                workspace_id=WORKSPACE_ID,
                session_id=session_id,
                application_id=application_id
            )
            metrics_collected += metrics_results["collected"]
            print(f"      Metrics collected: {metrics_results['collected']}")
        elif INCLUDE_METRICS:
            print("   ⚠️ No application ID available for metrics collection")
            
    except Exception as e:
        print(f"   ❌ Error processing application: {str(e)}")
        continue

print(f"\n✅ Detailed data collection completed:")
print(f"   Applications processed: {min(len(recent_applications), 5)}")
print(f"   Total logs collected: {logs_collected}")
print(f"   Total metrics collected: {metrics_collected}")

## Comprehensive Spark Monitoring

### All-in-One Monitoring Workflow
Use the comprehensive monitoring function to collect everything in one go.

In [None]:
# Run comprehensive Spark monitoring
print("🚀 Running comprehensive Spark monitoring workflow...")

comprehensive_results = workflows.comprehensive_spark_monitoring(
    workspace_id=WORKSPACE_ID,
    lookback_hours=LOOKBACK_HOURS,
    include_logs=INCLUDE_LOGS,
    include_metrics=INCLUDE_METRICS,
    max_applications=MAX_APPLICATIONS,
    max_log_lines=500
)

print(f"\n📋 Comprehensive Monitoring Results:")
print(f"   Applications collected: {comprehensive_results['applications_collected']}")
print(f"   Logs collected: {comprehensive_results['logs_collected']}")
print(f"   Metrics collected: {comprehensive_results['metrics_collected']}")
print(f"   Total records ingested: {comprehensive_results['total_ingested']}")
print(f"   Errors: {len(comprehensive_results['errors'])}")

if comprehensive_results['errors']:
    print("\n❌ Errors encountered:")
    for error in comprehensive_results['errors'][:3]:
        print(f"   - {error}")

# Store results for summary
final_results = comprehensive_results

## Results Summary and Next Steps

In [None]:
# Summary of all data collection activities
print("📊 SPARK MONITORING SUMMARY")
print("=" * 50)

total_collected = (
    workspace_spark_results.get('collected', 0) +
    sum(result[1].get('collected', 0) for result in item_results) +
    logs_collected +
    metrics_collected
)

total_ingested = (
    workspace_spark_results.get('ingested', 0) +
    sum(result[1].get('ingested', 0) for result in item_results) +
    final_results.get('total_ingested', 0)
)

print(f"🎯 Workspace: {WORKSPACE_ID}")
print(f"⏰ Time range: Last {LOOKBACK_HOURS} hours")
print(f"📈 Total records collected: {total_collected}")
print(f"📤 Total records ingested: {total_ingested}")
print(f"\n📋 Breakdown by category:")
print(f"   • Workspace applications: {workspace_spark_results.get('collected', 0)}")
print(f"   • Item applications: {sum(result[1].get('collected', 0) for result in item_results)}")
print(f"   • Log entries: {logs_collected}")
print(f"   • Metrics: {metrics_collected}")

print(f"\n🔗 Data ingested to Log Analytics:")
print(f"   • Table: {os.getenv('LOG_ANALYTICS_TABLE', 'CustomTable_CL')}")
print(f"   • Workspace: {os.getenv('LOG_ANALYTICS_WORKSPACE_ID', 'your-workspace-id')}")

print(f"\n✅ Spark monitoring integration completed successfully!")

## KQL Queries for Analysis

### Sample KQL Queries
Use these queries in Azure Monitor to analyze your Spark data.

In [None]:
# Display sample KQL queries for analyzing Spark data in Log Analytics
table_name = os.getenv('LOG_ANALYTICS_TABLE', 'FabricOperational_CL')

kql_queries = {
    "Spark Applications Overview": f"""
// Get overview of Spark applications in the last 24 hours
{table_name}
| where TimeGenerated > ago(24h)
| where MetricType == "SparkApplication"
| summarize 
    Applications = count(),
    Completed = countif(State == "success"),
    Failed = countif(State == "error"),
    Running = countif(State == "running")
| extend SuccessRate = round(Completed * 100.0 / Applications, 2)
""",

    "Top Resource-Intensive Applications": f"""
// Find applications using the most resources
{table_name}
| where TimeGenerated > ago(24h)
| where MetricType == "SparkApplication"
| where isnotnull(ExecutorCount) and isnotnull(ExecutorCores)
| extend TotalCores = ExecutorCount * ExecutorCores
| top 10 by TotalCores desc
| project 
    ApplicationName,
    State,
    SubmissionTime,
    ExecutorCount,
    ExecutorCores,
    TotalCores,
    ExecutorMemory
""",

    "Failed Applications Analysis": f"""
// Analyze failed Spark applications
{table_name}
| where TimeGenerated > ago(24h)
| where MetricType == "SparkApplication" and State == "error"
| summarize FailureCount = count() by ApplicationName
| order by FailureCount desc
""",

    "Spark Logs Analysis": f"""
// Analyze Spark log patterns
{table_name}
| where TimeGenerated > ago(24h)
| where MetricType == "SparkLog"
| where LogMessage contains "ERROR" or LogMessage contains "WARN"
| summarize ErrorCount = count() by LogType, bin(TimeGenerated, 1h)
| order by TimeGenerated desc
""",

    "Performance Metrics Trends": f"""
// Analyze Spark performance metrics over time
{table_name}
| where TimeGenerated > ago(24h)
| where MetricType == "SparkMetric" and MetricCategory == "Executor"
| summarize 
    AvgMemoryUsed = avg(MemoryUsed),
    AvgActiveTasks = avg(ActiveTasks),
    TotalFailedTasks = sum(FailedTasks)
    by bin(TimeGenerated, 1h)
| order by TimeGenerated desc
"""
}

print("📊 Sample KQL Queries for Spark Data Analysis")
print("=" * 60)

for query_name, query in kql_queries.items():
    print(f"\n🔍 {query_name}:")
    print(query.strip())
    print("-" * 40)

print(f"\n💡 Tips for Analysis:")
print(f"   • Use these queries in Azure Monitor Log Analytics")
print(f"   • Create custom dashboards and alerts")
print(f"   • Adjust time ranges as needed")
print(f"   • Combine with other Fabric metrics for comprehensive analysis")

## Integration with Fabric Diagnostic Emitter

### How This Complements the Fabric Spark Diagnostic Emitter

This connector works alongside Fabric's built-in Spark Diagnostic Emitter:

1. **Real-time Streaming (Diagnostic Emitter)**:
   - Automatically streams logs during Spark execution
   - Direct integration with Event Hubs, Storage, or Log Analytics
   - No additional code required

2. **Historical Analysis (This Connector)**:
   - Retrospective analysis using Monitoring APIs
   - Rich metadata and application context
   - Cross-workspace correlation
   - Business logic and KPI calculation

3. **Combined Value**:
   - Complete operational picture
   - Real-time alerts + historical trends
   - Technical metrics + business insights

### Next Steps
1. Configure Fabric Diagnostic Emitter for real-time streaming
2. Schedule this connector for regular historical collection
3. Create unified dashboards in Azure Monitor
4. Set up alerting rules for proactive monitoring