# Microsoft Fabric Livy API Testing Notebook

This notebook demonstrates how to use the Microsoft Fabric Livy API for interactive Spark sessions. It supports both bearer token authentication (for testing) and service principal authentication (for production scenarios).

## Features
- ✅ Session management (create, monitor, delete)
- ✅ Statement execution with polling
- ✅ Multiple authentication methods
- ✅ Real-world examples with your workspace

## Your Configuration
- **Workspace ID**: `c22f6805-d84a-4143-80b2-0c9e9832e5a2`
- **Lakehouse ID**: `683ce7d6-5d0d-4164-b594-d2cc8dbf70ac`
- **Warehouse ID**: `f01cc17b-6db8-47eb-b3cb-d0b43b1ad8e8`

## 1. Import Required Libraries

Import all necessary libraries for Livy API testing including authentication, HTTP requests, and JSON handling.

In [None]:
# Core libraries
import requests
import time
import json
import sys
from datetime import datetime
from typing import Optional, Dict, Any

# Authentication libraries (install if needed: pip install msal)
try:
    from msal import ConfidentialClientApplication, PublicClientApplication
    MSAL_AVAILABLE = True
    print("✅ MSAL library is available for service principal authentication")
except ImportError:
    MSAL_AVAILABLE = False
    print("⚠️ MSAL library not available. Only bearer token authentication will work.")
    print("   Install with: pip install msal")

print("📚 Libraries imported successfully!")
print(f"🕐 Current time: {datetime.now()}")

## 2. Set Configuration Parameters

Configure your Microsoft Fabric workspace, lakehouse, and authentication credentials.

In [None]:
# Configuration - Update these with your specific values
config = {
    # Your specific Microsoft Fabric workspace and lakehouse IDs
    "workspace_id": "c22f6805-d84a-4143-80b2-0c9e9832e5a2",
    "lakehouse_id": "683ce7d6-5d0d-4164-b594-d2cc8dbf70ac", 
    "warehouse_id": "f01cc17b-6db8-47eb-b3cb-d0b43b1ad8e8",  # Optional for warehouse operations
    
    # Authentication settings
    "use_bearer_token": True,  # Set to False to use service principal auth
    
    # Bearer Token Authentication (for local testing)
    "bearer_token": "",  # Paste your bearer token here or leave empty to be prompted
    
    # Service Principal Authentication (for production)
    "tenant_id": "",      # Your Azure AD tenant ID
    "client_id": "",      # Your service principal client ID  
    "client_secret": "",  # Your service principal client secret
    
    # API settings
    "fabric_base_url": "https://api.fabric.microsoft.com/v1",
    "timeout_seconds": 300,
    "max_poll_attempts": 30
}

# Prompt for credentials if not provided
if config["use_bearer_token"]:
    if not config["bearer_token"]:
        print("🔑 Bearer Token Authentication")
        print("To get your bearer token:")
        print("1. Open Microsoft Fabric in your browser")
        print("2. Press F12 to open Developer Tools")
        print("3. Go to Network tab")
        print("4. Refresh the page")
        print("5. Look for any request and find 'Authorization: Bearer ...' in the headers")
        print("6. Copy the token (everything after 'Bearer ')")
        print()
        config["bearer_token"] = input("Paste your bearer token here: ").strip()
        
    if not config["bearer_token"]:
        raise ValueError("❌ Bearer token is required for authentication")
        
    print("✅ Using Bearer Token authentication")
else:
    # Service Principal authentication
    if not all([config["tenant_id"], config["client_id"], config["client_secret"]]):
        print("🔐 Service Principal Authentication")
        print("Enter your Azure AD service principal credentials:")
        
        if not config["tenant_id"]:
            config["tenant_id"] = input("Tenant ID: ").strip()
        if not config["client_id"]:
            config["client_id"] = input("Client ID: ").strip()
        if not config["client_secret"]:
            config["client_secret"] = input("Client Secret: ").strip()
    
    if not all([config["tenant_id"], config["client_id"], config["client_secret"]]):
        raise ValueError("❌ Service principal credentials are required")
        
    print("✅ Using Service Principal authentication")

print(f"\n📋 Configuration Summary:")
print(f"   Workspace ID: {config['workspace_id']}")
print(f"   Lakehouse ID: {config['lakehouse_id']}")
print(f"   Warehouse ID: {config['warehouse_id']}")
print(f"   Auth Method: {'Bearer Token' if config['use_bearer_token'] else 'Service Principal'}")
print(f"   Fabric API: {config['fabric_base_url']}")

## 3. Authentication: Bearer Token or Service Principal

Acquire an access token using either bearer token or service principal credentials.

In [None]:
def get_service_principal_token(tenant_id: str, client_id: str, client_secret: str) -> Optional[str]:
    """
    Get an access token using service principal credentials.
    """
    if not MSAL_AVAILABLE:
        print("❌ MSAL library not available for service principal authentication")
        return None
    
    try:
        authority = f"https://login.microsoftonline.com/{tenant_id}"
        audience = "https://api.fabric.microsoft.com/.default"
        
        app = ConfidentialClientApplication(
            client_id=client_id,
            client_credential=client_secret,
            authority=authority
        )
        
        result = app.acquire_token_for_client(scopes=[audience])
        
        if "access_token" in result:
            print("✅ Service principal authentication successful")
            return result["access_token"]
        else:
            error_desc = result.get('error_description', 'Unknown error')
            print(f"❌ Service principal authentication failed: {error_desc}")
            return None
            
    except Exception as e:
        print(f"❌ Error during service principal authentication: {e}")
        return None

# Authentication
print("🔐 Authenticating with Microsoft Fabric...")

access_token = None

if config["use_bearer_token"]:
    # Use provided bearer token directly
    access_token = config["bearer_token"]
    print("✅ Using provided bearer token")
    
else:
    # Service Principal authentication using MSAL
    print("🔄 Getting access token using service principal...")
    
    try:
        from msal import ConfidentialClientApplication
        
        # Create authority URL
        authority = f"https://login.microsoftonline.com/{config['tenant_id']}"
        
        # Create MSAL app
        app = ConfidentialClientApplication(
            client_id=config["client_id"],
            client_credential=config["client_secret"],
            authority=authority
        )
        
        # Define scopes for Fabric API
        scopes = ["https://api.fabric.microsoft.com/.default"]
        
        # Acquire token
        result = app.acquire_token_for_client(scopes=scopes)
        
        if "access_token" in result:
            access_token = result["access_token"]
            print("✅ Successfully obtained access token via service principal")
        else:
            error_desc = result.get("error_description", "Unknown error")
            raise Exception(f"Failed to get access token: {error_desc}")
            
    except ImportError:
        print("❌ MSAL library not found. Install it with: pip install msal")
        raise
    except Exception as e:
        print(f"❌ Service principal authentication failed: {e}")
        raise

if not access_token:
    raise ValueError("❌ Failed to obtain access token")

# Set up request headers
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

print("✅ Authentication successful!")
print(f"📊 Token length: {len(access_token)} characters")
print(f"🔑 Token prefix: {access_token[:20]}..." if len(access_token) > 20 else "🔑 Token obtained")

## 4. Build Livy API Endpoints

Construct the Livy session and batch endpoints using your workspace and lakehouse IDs.

In [None]:
# Build Livy API endpoints
print("🔗 Building Livy API endpoints...")

# Base URL using msitapi.fabric.microsoft.com as per your specification
livy_base_url = f"https://msitapi.fabric.microsoft.com/v1/workspaces/{config['workspace_id']}/lakehouses/{config['lakehouse_id']}/livyapi/versions/2023-12-01"

# Session endpoints
session_endpoints = {
    "sessions": f"{livy_base_url}/sessions",
    "session_detail": f"{livy_base_url}/sessions/{{session_id}}",
    "statements": f"{livy_base_url}/sessions/{{session_id}}/statements",
    "statement_detail": f"{livy_base_url}/sessions/{{session_id}}/statements/{{statement_id}}"
}

# Batch endpoints  
batch_endpoints = {
    "batches": f"{livy_base_url}/batches",
    "batch_detail": f"{livy_base_url}/batches/{{batch_id}}"
}

print("✅ Livy API endpoints configured:")
print(f"   Sessions URL: {session_endpoints['sessions']}")
print(f"   Batches URL: {batch_endpoints['batches']}")
print(f"   API Version: 2023-12-01")
print(f"   Using msitapi.fabric.microsoft.com endpoint")

## 5. Create Livy Session

Create a new interactive Livy session for executing Spark code.

In [None]:
# Create Livy session
print(f"🚀 Creating Livy session at {datetime.now()}")
print(f"📡 POST {sessions_url}")

session_config = {
    "kind": "spark",
    "name": f"test-session-{int(time.time())}"
}

try:
    create_session_response = requests.post(sessions_url, headers=headers, json=session_config)
    
    print(f"📥 Response Status: {create_session_response.status_code}")
    
    if create_session_response.status_code == 201:
        session_data = create_session_response.json()
        livy_session_id = session_data['id']
        session_state = session_data.get('state', 'unknown')
        session_kind = session_data.get('kind', 'unknown')
        
        print(f"✅ Livy session created successfully!")
        print(f"🆔 Session ID: {livy_session_id}")
        print(f"🎯 State: {session_state}")
        print(f"🏷️ Kind: {session_kind}")
        print()
        print("📄 Full session response:")
        print(json.dumps(session_data, indent=2))
        
        # Build session-specific URL
        livy_session_url = f"{sessions_url}/{livy_session_id}"
        print(f"🔗 Session URL: {livy_session_url}")
        
    else:
        print(f"❌ Failed to create session")
        print(f"📄 Response: {create_session_response.text}")
        raise Exception("Session creation failed")
        
except Exception as e:
    print(f"❌ Error creating session: {e}")
    raise

## 6. Wait for Session to Become Idle

Poll the session status until it's ready to accept statements.

In [None]:
# Wait for session to become idle
print(f"⏳ Waiting for session {livy_session_id} to become idle...")
max_wait_time = 300  # 5 minutes
start_time = time.time()

while time.time() - start_time < max_wait_time:
    try:
        get_session_response = requests.get(livy_session_url, headers=headers)
        
        if get_session_response.status_code == 200:
            session_status = get_session_response.json()
            current_state = session_status.get('state', 'unknown')
            
            print(f"📊 Session {livy_session_id} state: {current_state} (elapsed: {int(time.time() - start_time)}s)")
            
            if current_state == "idle":
                print(f"✅ Session is ready!")
                break
            elif current_state in ["error", "dead", "killed"]:
                print(f"❌ Session failed with state: {current_state}")
                print(json.dumps(session_status, indent=2))
                raise Exception(f"Session failed: {current_state}")
            else:
                print(f"⏳ Session still initializing... waiting 10 seconds")
                time.sleep(10)
        else:
            print(f"⚠️ Error getting session status: {get_session_response.status_code}")
            time.sleep(10)
            
    except Exception as e:
        print(f"❌ Error checking session status: {e}")
        time.sleep(10)
else:
    print(f"⏰ Timeout waiting for session to become idle")
    raise Exception("Session timeout")

print(f"🎉 Session {livy_session_id} is ready for statements!")

## 7. Submit SQL Statement to Livy Session

Now we'll submit a SQL statement to our active Livy session. We'll execute a simple query to test the connection to our lakehouse.

In [None]:
# Submit a SQL statement to the Livy session
statement_payload = {
    "code": "SELECT 'Hello from Fabric Livy API!' as greeting, current_timestamp() as timestamp",
    "kind": "sql"
}

print("Submitting SQL statement to Livy session...")
statement_response = requests.post(
    f"{livy_base_url}/sessions/{session_id}/statements",
    headers=headers,
    json=statement_payload
)

if statement_response.status_code == 201:
    statement = statement_response.json()
    statement_id = statement['id']
    print(f"✅ Statement submitted successfully!")
    print(f"📋 Statement ID: {statement_id}")
    print(f"🔄 Statement State: {statement['state']}")
else:
    print(f"❌ Failed to submit statement: {statement_response.status_code}")
    print(f"Error: {statement_response.text}")
    statement_id = None

## 8. Poll for Statement Result

We'll poll the statement status until it's completed and then retrieve the results.

In [None]:
# Poll for statement completion
if statement_id is not None:
    print("Polling for statement completion...")
    max_attempts = 30
    attempt = 0
    
    while attempt < max_attempts:
        statement_status_response = requests.get(
            f"{livy_base_url}/sessions/{session_id}/statements/{statement_id}",
            headers=headers
        )
        
        if statement_status_response.status_code == 200:
            statement_status = statement_status_response.json()
            state = statement_status['state']
            print(f"📊 Attempt {attempt + 1}: Statement state is '{state}'")
            
            if state == 'available':
                print("✅ Statement completed successfully!")
                
                # Display the results
                if 'output' in statement_status and statement_status['output']:
                    output = statement_status['output']
                    if output['status'] == 'ok':
                        print("📋 Statement Results:")
                        if 'data' in output and 'text/plain' in output['data']:
                            print(output['data']['text/plain'])
                        else:
                            print("No data returned")
                    else:
                        print(f"❌ Statement failed: {output.get('evalue', 'Unknown error')}")
                        if 'traceback' in output:
                            print("Traceback:")
                            for line in output['traceback']:
                                print(line)
                break
            elif state in ['error', 'cancelled', 'cancelling']:
                print(f"❌ Statement failed with state: {state}")
                if 'output' in statement_status:
                    print(f"Error: {statement_status['output']}")
                break
            
            time.sleep(2)
            attempt += 1
        else:
            print(f"❌ Failed to get statement status: {statement_status_response.status_code}")
            break
    
    if attempt >= max_attempts:
        print("⏰ Timeout waiting for statement completion")
else:
    print("⚠️ Skipping statement polling - no statement ID available")

## 9. Submit Additional SQL Statements

Let's submit a few more SQL statements to demonstrate running multiple queries in the same session. These examples are based on your sample queries.

In [None]:
# Function to execute and poll for statement results
def execute_and_poll_statement(code, kind="sql", description=""):
    """Execute a statement and poll for results"""
    print(f"\n{'='*60}")
    print(f"Executing: {description}")
    print(f"{'='*60}")
    
    # Wait for session to be idle first
    print("Checking session state...")
    session_response = requests.get(f"{livy_base_url}/sessions/{session_id}", headers=headers)
    if session_response.status_code == 200:
        session_state = session_response.json()['state']
        print(f"Session state: {session_state}")
        
        # Wait for idle state
        while session_state != 'idle':
            print(f"Waiting for session to become idle (current: {session_state})...")
            time.sleep(5)
            session_response = requests.get(f"{livy_base_url}/sessions/{session_id}", headers=headers)
            if session_response.status_code == 200:
                session_state = session_response.json()['state']
            else:
                print(f"❌ Failed to get session status: {session_response.status_code}")
                return None
    
    # Submit statement
    statement_payload = {
        "code": code,
        "kind": kind
    }
    
    statement_response = requests.post(
        f"{livy_base_url}/sessions/{session_id}/statements",
        headers=headers,
        json=statement_payload
    )
    
    if statement_response.status_code != 201:
        print(f"❌ Failed to submit statement: {statement_response.status_code}")
        print(f"Error: {statement_response.text}")
        return None
    
    statement_data = statement_response.json()
    stmt_id = statement_data['id']
    print(f"✅ Statement submitted with ID: {stmt_id}")
    
    # Poll for completion
    max_attempts = 30
    attempt = 0
    
    while attempt < max_attempts:
        statement_status_response = requests.get(
            f"{livy_base_url}/sessions/{session_id}/statements/{stmt_id}",
            headers=headers
        )
        
        if statement_status_response.status_code == 200:
            statement_status = statement_status_response.json()
            state = statement_status['state']
            
            if state == 'available':
                print("✅ Statement completed!")
                if 'output' in statement_status and statement_status['output']:
                    output = statement_status['output']
                    if output['status'] == 'ok':
                        if 'data' in output and 'text/plain' in output['data']:
                            print("📋 Results:")
                            print(output['data']['text/plain'])
                        else:
                            print("✅ Statement executed successfully (no data returned)")
                    else:
                        print(f"❌ Statement failed: {output.get('evalue', 'Unknown error')}")
                return statement_status
            elif state in ['error', 'cancelled', 'cancelling']:
                print(f"❌ Statement failed with state: {state}")
                return statement_status
            elif state in ['running', 'waiting']:
                print(f"🔄 Statement {state}... (attempt {attempt + 1})")
            
            time.sleep(3)
            attempt += 1
        else:
            print(f"❌ Failed to get statement status: {statement_status_response.status_code}")
            break
    
    print("⏰ Timeout waiting for statement completion")
    return None

# Example SQL statements to demonstrate the Livy API
statements_to_execute = [
    {
        "code": "SHOW TABLES",
        "description": "List all tables in the lakehouse"
    },
    {
        "code": "SELECT 'Fabric Livy API Test' as message, current_timestamp() as execution_time",
        "description": "Simple SELECT statement with timestamp"
    },
    {
        "code": "SELECT 1 + 1 as calculation, 'Math works!' as message",
        "description": "Basic calculation"
    }
]

# Execute each statement
for i, stmt in enumerate(statements_to_execute, 1):
    print(f"\n🔄 Executing Statement {i}/{len(statements_to_execute)}")
    result = execute_and_poll_statement(
        stmt["code"], 
        "sql", 
        stmt["description"]
    )
    
    if result is None:
        print(f"⚠️ Statement {i} failed or timed out")
    
    # Small delay between statements
    time.sleep(2)

## 10. Advanced Spark SQL Queries (Optional)

If you have data tables in your lakehouse, you can uncomment and modify these examples to query your actual data. These are based on your sample queries.

In [None]:
# Advanced Spark SQL queries (uncomment and modify table names as needed)
advanced_queries = [
    {
        "code": '''spark.sql("SHOW TABLES").show()''',
        "kind": "spark",
        "description": "Show all tables using Spark SQL"
    },
    # Uncomment and modify these if you have the actual tables in your lakehouse
    # {
    #     "code": '''spark.sql("SELECT * FROM green_tripdata_2022_08 WHERE fare_amount = 60").show()''',
    #     "kind": "spark", 
    #     "description": "Query green trip data with fare amount filter"
    # },
    # {
    #     "code": '''spark.sql("SELECT * FROM green_tripdata_2022_08 WHERE tip_amount = 10").show()''',
    #     "kind": "spark",
    #     "description": "Query green trip data with tip amount filter"  
    # },
    {
        "code": '''spark.sql("SELECT current_timestamp() as timestamp, 'Spark SQL works!' as message").show()''',
        "kind": "spark",
        "description": "Test Spark SQL with timestamp"
    }
]

print("🚀 Executing Advanced Spark SQL Queries...")
print("Note: Modify table names in the code above to match your actual lakehouse tables")

# Execute advanced queries (only the ones that are not commented out)
for i, query in enumerate(advanced_queries, 1):
    print(f"\n🔄 Executing Advanced Query {i}/{len(advanced_queries)}")
    result = execute_and_poll_statement(
        query["code"], 
        query["kind"], 
        query["description"]
    )
    
    if result is None:
        print(f"⚠️ Advanced query {i} failed or timed out")
    
    time.sleep(2)

## 11. Delete Livy Session (Cleanup)

Finally, let's clean up by deleting the Livy session to free up resources.

In [None]:
# Clean up: Delete the Livy session
print("🧹 Cleaning up: Deleting Livy session...")
print(f"Session URL: {livy_base_url}/sessions/{session_id}")

delete_response = requests.delete(
    f"{livy_base_url}/sessions/{session_id}",
    headers=headers
)

if delete_response.status_code in [200, 204]:
    print("✅ Livy session deleted successfully!")
    print("🎉 Cleanup completed!")
elif delete_response.status_code == 404:
    print("⚠️ Session already deleted or not found")
else:
    print(f"❌ Failed to delete session: {delete_response.status_code}")
    print(f"Error: {delete_response.text}")

print("\n" + "="*70)
print("🎯 Livy API Test Complete!")
print("="*70)
print("Summary of what we accomplished:")
print("✅ Authenticated with Microsoft Fabric")
print("✅ Created a Livy session")
print("✅ Waited for session to become ready")
print("✅ Executed SQL statements")
print("✅ Executed Spark SQL queries")
print("✅ Cleaned up the session")
print("\nYour Livy API integration is working correctly! 🚀")

## 8. Spark Application Monitoring

Test the new Spark application monitoring capabilities to track the status of all Spark jobs and applications across different Fabric items.

In [None]:
def get_workspace_spark_applications(workspace_id: str, continuation_token: Optional[str] = None) -> Dict[str, Any]:
    """
    Get all Spark applications in a workspace
    
    Args:
        workspace_id: Microsoft Fabric workspace ID
        continuation_token: Optional token for pagination
    
    Returns:
        Dictionary containing the API response with spark applications
    """
    url = f"{config['fabric_base_url']}/workspaces/{workspace_id}/spark/livySessions"
    
    params = {}
    if continuation_token:
        params['continuationToken'] = continuation_token
    
    headers = {
        'Authorization': f'Bearer {auth_token}',
        'Content-Type': 'application/json'
    }
    
    try:
        print(f"🔍 Getting Spark applications from workspace: {workspace_id}")
        if continuation_token:
            print(f"📄 Using continuation token: {continuation_token[:20]}...")
            
        response = requests.get(url, headers=headers, params=params, timeout=config['timeout_seconds'])
        
        if response.status_code == 200:
            data = response.json()
            print(f"✅ Successfully retrieved {len(data.get('value', []))} Spark applications")
            
            # Print summary of applications
            if data.get('value'):
                print("\n📊 Spark Applications Summary:")
                for i, app in enumerate(data['value'][:5], 1):  # Show first 5
                    print(f"   {i}. {app.get('itemName', 'N/A')} ({app.get('itemType', 'N/A')}) - State: {app.get('state', 'N/A')}")
                
                if len(data['value']) > 5:
                    print(f"   ... and {len(data['value']) - 5} more applications")
                    
                if data.get('continuationToken'):
                    print(f"📄 More data available with continuation token")
            else:
                print("   No Spark applications found")
                
            return {
                'success': True,
                'data': data,
                'total_applications': len(data.get('value', [])),
                'has_more': bool(data.get('continuationToken'))
            }
        else:
            error_msg = f"API request failed with status {response.status_code}: {response.text}"
            print(f"❌ {error_msg}")
            return {
                'success': False,
                'error': error_msg,
                'status_code': response.status_code
            }
            
    except requests.exceptions.RequestException as e:
        error_msg = f"Request failed: {str(e)}"
        print(f"❌ {error_msg}")
        return {
            'success': False,
            'error': error_msg
        }

In [None]:
def get_notebook_spark_applications(workspace_id: str, notebook_id: str, continuation_token: Optional[str] = None) -> Dict[str, Any]:
    """
    Get all Spark applications for a specific notebook
    
    Args:
        workspace_id: Microsoft Fabric workspace ID
        notebook_id: Notebook ID
        continuation_token: Optional token for pagination
    
    Returns:
        Dictionary containing the API response with spark applications
    """
    url = f"{config['fabric_base_url']}/workspaces/{workspace_id}/notebooks/{notebook_id}/livySessions"
    
    params = {}
    if continuation_token:
        params['continuationToken'] = continuation_token
    
    headers = {
        'Authorization': f'Bearer {auth_token}',
        'Content-Type': 'application/json'
    }
    
    try:
        print(f"🔍 Getting Spark applications for notebook: {notebook_id}")
        if continuation_token:
            print(f"📄 Using continuation token: {continuation_token[:20]}...")
            
        response = requests.get(url, headers=headers, params=params, timeout=config['timeout_seconds'])
        
        if response.status_code == 200:
            data = response.json()
            print(f"✅ Successfully retrieved {len(data.get('value', []))} Spark applications for notebook")
            
            # Print detailed information for notebook applications
            if data.get('value'):
                print("\n📊 Notebook Spark Applications:")
                for i, app in enumerate(data['value'], 1):
                    print(f"   {i}. Application ID: {app.get('sparkApplicationId', 'N/A')}")
                    print(f"      State: {app.get('state', 'N/A')}")
                    print(f"      Job Type: {app.get('jobType', 'N/A')}")
                    print(f"      Submitted: {app.get('submittedDateTime', 'N/A')}")
                    if app.get('startDateTime'):
                        print(f"      Started: {app.get('startDateTime')}")
                    if app.get('endDateTime'):
                        print(f"      Ended: {app.get('endDateTime')}")
                    if app.get('cancellationReason'):
                        print(f"      Cancellation Reason: {app.get('cancellationReason')}")
                    print()
                    
                if data.get('continuationToken'):
                    print(f"📄 More data available with continuation token")
            else:
                print("   No Spark applications found for this notebook")
                
            return {
                'success': True,
                'data': data,
                'total_applications': len(data.get('value', [])),
                'has_more': bool(data.get('continuationToken'))
            }
        else:
            error_msg = f"API request failed with status {response.status_code}: {response.text}"
            print(f"❌ {error_msg}")
            return {
                'success': False,
                'error': error_msg,
                'status_code': response.status_code
            }
            
    except requests.exceptions.RequestException as e:
        error_msg = f"Request failed: {str(e)}"
        print(f"❌ {error_msg}")
        return {
            'success': False,
            'error': error_msg
        }

In [None]:
def get_spark_job_definition_applications(workspace_id: str, spark_job_definition_id: str, continuation_token: Optional[str] = None) -> Dict[str, Any]:
    """
    Get all Spark applications for a specific Spark Job Definition
    
    Args:
        workspace_id: Microsoft Fabric workspace ID
        spark_job_definition_id: Spark Job Definition ID
        continuation_token: Optional token for pagination
    
    Returns:
        Dictionary containing the API response with spark applications
    """
    url = f"{config['fabric_base_url']}/workspaces/{workspace_id}/sparkJobDefinitions/{spark_job_definition_id}/livySessions"
    
    params = {}
    if continuation_token:
        params['continuationToken'] = continuation_token
    
    headers = {
        'Authorization': f'Bearer {auth_token}',
        'Content-Type': 'application/json'
    }
    
    try:
        print(f"🔍 Getting Spark applications for Spark Job Definition: {spark_job_definition_id}")
        if continuation_token:
            print(f"📄 Using continuation token: {continuation_token[:20]}...")
            
        response = requests.get(url, headers=headers, params=params, timeout=config['timeout_seconds'])
        
        if response.status_code == 200:
            data = response.json()
            print(f"✅ Successfully retrieved {len(data.get('value', []))} Spark applications for Spark Job Definition")
            
            # Print detailed information for Spark Job Definition applications
            if data.get('value'):
                print("\n📊 Spark Job Definition Applications:")
                for i, app in enumerate(data['value'], 1):
                    print(f"   {i}. Application ID: {app.get('sparkApplicationId', 'N/A')}")
                    print(f"      State: {app.get('state', 'N/A')}")
                    print(f"      Job Type: {app.get('jobType', 'N/A')}")
                    print(f"      Runtime Version: {app.get('runtimeVersion', 'N/A')}")
                    print(f"      Submitted: {app.get('submittedDateTime', 'N/A')}")
                    
                    # Show duration information
                    if app.get('queuedDuration'):
                        duration = app['queuedDuration']
                        print(f"      Queued Duration: {duration.get('value', 'N/A')} {duration.get('timeUnit', '')}")
                    if app.get('runningDuration'):
                        duration = app['runningDuration']
                        print(f"      Running Duration: {duration.get('value', 'N/A')} {duration.get('timeUnit', '')}")
                    if app.get('totalDuration'):
                        duration = app['totalDuration']
                        print(f"      Total Duration: {duration.get('value', 'N/A')} {duration.get('timeUnit', '')}")
                    
                    if app.get('cancellationReason'):
                        print(f"      Cancellation Reason: {app.get('cancellationReason')}")
                    print()
                    
                if data.get('continuationToken'):
                    print(f"📄 More data available with continuation token")
            else:
                print("   No Spark applications found for this Spark Job Definition")
                
            return {
                'success': True,
                'data': data,
                'total_applications': len(data.get('value', [])),
                'has_more': bool(data.get('continuationToken'))
            }
        else:
            error_msg = f"API request failed with status {response.status_code}: {response.text}"
            print(f"❌ {error_msg}")
            return {
                'success': False,
                'error': error_msg,
                'status_code': response.status_code
            }
            
    except requests.exceptions.RequestException as e:
        error_msg = f"Request failed: {str(e)}"
        print(f"❌ {error_msg}")
        return {
            'success': False,
            'error': error_msg
        }

In [None]:
def get_lakehouse_spark_applications(workspace_id: str, lakehouse_id: str, continuation_token: Optional[str] = None) -> Dict[str, Any]:
    """
    Get all Spark applications for a specific lakehouse
    
    Args:
        workspace_id: Microsoft Fabric workspace ID
        lakehouse_id: Lakehouse ID
        continuation_token: Optional token for pagination
    
    Returns:
        Dictionary containing the API response with spark applications
    """
    url = f"{config['fabric_base_url']}/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/livySessions"
    
    params = {}
    if continuation_token:
        params['continuationToken'] = continuation_token
    
    headers = {
        'Authorization': f'Bearer {auth_token}',
        'Content-Type': 'application/json'
    }
    
    try:
        print(f"🔍 Getting Spark applications for lakehouse: {lakehouse_id}")
        if continuation_token:
            print(f"📄 Using continuation token: {continuation_token[:20]}...")
            
        response = requests.get(url, headers=headers, params=params, timeout=config['timeout_seconds'])
        
        if response.status_code == 200:
            data = response.json()
            print(f"✅ Successfully retrieved {len(data.get('value', []))} Spark applications for lakehouse")
            
            # Print detailed information for lakehouse applications
            if data.get('value'):
                print("\n📊 Lakehouse Spark Applications:")
                for i, app in enumerate(data['value'], 1):
                    print(f"   {i}. Application ID: {app.get('sparkApplicationId', 'N/A')}")
                    print(f"      Livy ID: {app.get('livyId', 'N/A')}")
                    print(f"      State: {app.get('state', 'N/A')}")
                    print(f"      Origin: {app.get('origin', 'N/A')}")
                    print(f"      Operation: {app.get('operationName', 'N/A')}")
                    
                    # Show submitter information
                    if app.get('submitter'):
                        submitter = app['submitter']
                        print(f"      Submitted by: {submitter.get('type', 'N/A')} ({submitter.get('id', 'N/A')[:8]}...)")
                    
                    if app.get('capacityId'):
                        print(f"      Capacity ID: {app.get('capacityId')[:8]}...")
                    
                    if app.get('cancellationReason'):
                        print(f"      Cancellation Reason: {app.get('cancellationReason')}")
                    print()
                    
                if data.get('continuationToken'):
                    print(f"📄 More data available with continuation token")
            else:
                print("   No Spark applications found for this lakehouse")
                
            return {
                'success': True,
                'data': data,
                'total_applications': len(data.get('value', [])),
                'has_more': bool(data.get('continuationToken'))
            }
        else:
            error_msg = f"API request failed with status {response.status_code}: {response.text}"
            print(f"❌ {error_msg}")
            return {
                'success': False,
                'error': error_msg,
                'status_code': response.status_code
            }
            
    except requests.exceptions.RequestException as e:
        error_msg = f"Request failed: {str(e)}"
        print(f"❌ {error_msg}")
        return {
            'success': False,
            'error': error_msg
        }

In [None]:
# Test the Spark Application Monitoring APIs
print("🚀 Testing Spark Application Monitoring APIs")
print("=" * 60)

# Your specific IDs for testing
notebook_id = "8809828e-7212-43fa-8463-8de8b3873288"
spark_job_definition_id = "d410b138-5c4f-42af-b1ed-4430138c7b79"

print(f"Using your configuration:")
print(f"  Workspace ID: {config['workspace_id']}")
print(f"  Lakehouse ID: {config['lakehouse_id']}")
print(f"  Notebook ID: {notebook_id}")
print(f"  Spark Job Definition ID: {spark_job_definition_id}")
print()

# Test 1: Get all Spark applications in the workspace
print("📋 Test 1: Get all Spark applications in workspace")
workspace_result = get_workspace_spark_applications(config['workspace_id'])

if workspace_result['success']:
    print(f"✅ Found {workspace_result['total_applications']} applications in workspace")
    if workspace_result['has_more']:
        print("📄 There are more applications available (pagination needed)")
else:
    print(f"❌ Failed to get workspace applications: {workspace_result.get('error', 'Unknown error')}")

print("\n" + "-" * 40 + "\n")

# Test 2: Get Spark applications for the specific notebook
print("📔 Test 2: Get Spark applications for notebook")
notebook_result = get_notebook_spark_applications(config['workspace_id'], notebook_id)

if notebook_result['success']:
    print(f"✅ Found {notebook_result['total_applications']} applications for notebook")
    if notebook_result['has_more']:
        print("📄 There are more applications available (pagination needed)")
else:
    print(f"❌ Failed to get notebook applications: {notebook_result.get('error', 'Unknown error')}")

print("\n" + "-" * 40 + "\n")

# Test 3: Get Spark applications for the lakehouse
print("🏠 Test 3: Get Spark applications for lakehouse")
lakehouse_result = get_lakehouse_spark_applications(config['workspace_id'], config['lakehouse_id'])

if lakehouse_result['success']:
    print(f"✅ Found {lakehouse_result['total_applications']} applications for lakehouse")
    if lakehouse_result['has_more']:
        print("📄 There are more applications available (pagination needed)")
else:
    print(f"❌ Failed to get lakehouse applications: {lakehouse_result.get('error', 'Unknown error')}")

print("\n" + "-" * 40 + "\n")

# Test 4: Get Spark applications for the Spark Job Definition
print("⚙️ Test 4: Get Spark applications for Spark Job Definition")
sjd_result = get_spark_job_definition_applications(config['workspace_id'], spark_job_definition_id)

if sjd_result['success']:
    print(f"✅ Found {sjd_result['total_applications']} applications for Spark Job Definition")
    if sjd_result['has_more']:
        print("📄 There are more applications available (pagination needed)")
else:
    print(f"❌ Failed to get Spark Job Definition applications: {sjd_result.get('error', 'Unknown error')}")

print("\n" + "=" * 60)
print("🎯 Spark Application Monitoring Test Summary:")
print(f"  Workspace Applications: {'✅' if workspace_result['success'] else '❌'} ({workspace_result.get('total_applications', 0)} found)")
print(f"  Notebook Applications: {'✅' if notebook_result['success'] else '❌'} ({notebook_result.get('total_applications', 0)} found)")
print(f"  Lakehouse Applications: {'✅' if lakehouse_result['success'] else '❌'} ({lakehouse_result.get('total_applications', 0)} found)")
print(f"  Spark Job Def Applications: {'✅' if sjd_result['success'] else '❌'} ({sjd_result.get('total_applications', 0)} found)")

# Store results for further analysis
monitoring_results = {
    'workspace': workspace_result,
    'notebook': notebook_result,
    'lakehouse': lakehouse_result,
    'spark_job_definition': sjd_result
}

In [None]:
# Advanced Analysis: Filter and analyze the monitoring results
print("🔍 Advanced Analysis: Filtering and Analyzing Spark Applications")
print("=" * 60)

def analyze_spark_applications(results_dict: Dict[str, Dict[str, Any]]) -> None:
    """Analyze Spark applications across different sources"""
    
    all_applications = []
    
    # Collect all applications from different sources
    for source, result in results_dict.items():
        if result.get('success') and result.get('data', {}).get('value'):
            for app in result['data']['value']:
                app['source'] = source
                all_applications.append(app)
    
    if not all_applications:
        print("❌ No applications found across all sources")
        return
    
    print(f"📊 Total applications found: {len(all_applications)}")
    print()
    
    # Analyze by state
    states = {}
    for app in all_applications:
        state = app.get('state', 'Unknown')
        states[state] = states.get(state, 0) + 1
    
    print("📈 Applications by State:")
    for state, count in sorted(states.items()):
        print(f"  {state}: {count}")
    print()
    
    # Analyze by item type
    item_types = {}
    for app in all_applications:
        item_type = app.get('itemType', 'Unknown')
        item_types[item_type] = item_types.get(item_type, 0) + 1
    
    print("📈 Applications by Item Type:")
    for item_type, count in sorted(item_types.items()):
        print(f"  {item_type}: {count}")
    print()
    
    # Analyze by job type
    job_types = {}
    for app in all_applications:
        job_type = app.get('jobType', 'Unknown')
        job_types[job_type] = job_types.get(job_type, 0) + 1
    
    print("📈 Applications by Job Type:")
    for job_type, count in sorted(job_types.items()):
        print(f"  {job_type}: {count}")
    print()
    
    # Find recent applications (last 24 hours)
    from datetime import datetime, timedelta
    recent_apps = []
    now = datetime.utcnow()
    
    for app in all_applications:
        submitted_str = app.get('submittedDateTime')
        if submitted_str:
            try:
                # Parse the datetime string (ISO format)
                submitted_dt = datetime.fromisoformat(submitted_str.replace('Z', '+00:00'))
                if (now - submitted_dt.replace(tzinfo=None)) <= timedelta(hours=24):
                    recent_apps.append(app)
            except ValueError:
                pass  # Skip invalid datetime formats
    
    print(f"🕐 Recent Applications (last 24 hours): {len(recent_apps)}")
    if recent_apps:
        for i, app in enumerate(recent_apps[:5], 1):  # Show first 5
            print(f"  {i}. {app.get('itemName', 'N/A')} ({app.get('state', 'N/A')}) - {app.get('submittedDateTime', 'N/A')}")
        if len(recent_apps) > 5:
            print(f"  ... and {len(recent_apps) - 5} more recent applications")
    print()
    
    # Find failed/cancelled applications
    failed_apps = [app for app in all_applications if app.get('state') in ['Failed', 'Cancelled', 'Error']]
    print(f"❌ Failed/Cancelled Applications: {len(failed_apps)}")
    if failed_apps:
        for i, app in enumerate(failed_apps[:3], 1):  # Show first 3
            print(f"  {i}. {app.get('itemName', 'N/A')} ({app.get('state', 'N/A')})")
            if app.get('cancellationReason'):
                print(f"     Reason: {app.get('cancellationReason')}")
        if len(failed_apps) > 3:
            print(f"  ... and {len(failed_apps) - 3} more failed applications")
    print()
    
    # Runtime version analysis
    runtime_versions = {}
    for app in all_applications:
        version = app.get('runtimeVersion', 'Unknown')
        runtime_versions[version] = runtime_versions.get(version, 0) + 1
    
    print("📈 Applications by Runtime Version:")
    for version, count in sorted(runtime_versions.items()):
        print(f"  {version}: {count}")

# Run the analysis
if any(result.get('success') for result in monitoring_results.values()):
    analyze_spark_applications(monitoring_results)
else:
    print("❌ No successful monitoring results to analyze")

## 9. MCP Server Testing

Test the MCP server tools for Spark application monitoring. This section shows how to interact with the MCP server that exposes these monitoring capabilities.

In [None]:
# Example MCP Server Tool Calls
print("🤖 MCP Server Tool Examples")
print("=" * 50)

print("The following are example queries you can use with Claude Desktop")
print("when the MCP server is running:\n")

# Example tool calls for the MCP server
example_queries = [
    {
        "title": "Get all Spark applications in workspace",
        "query": f"Get all Spark applications in workspace {config['workspace_id']}",
        "tool": "get-workspace-spark-applications"
    },
    {
        "title": "Get Spark applications for specific notebook",
        "query": f"Get Spark applications for notebook {notebook_id} in workspace {config['workspace_id']}",
        "tool": "get-notebook-spark-applications"
    },
    {
        "title": "Get Spark applications for lakehouse",
        "query": f"Get Spark applications for lakehouse {config['lakehouse_id']} in workspace {config['workspace_id']}",
        "tool": "get-lakehouse-spark-applications"
    },
    {
        "title": "Get Spark applications for Spark Job Definition",
        "query": f"Get Spark applications for Spark Job Definition {spark_job_definition_id} in workspace {config['workspace_id']}",
        "tool": "get-spark-job-definition-applications"
    },
    {
        "title": "Generate Spark monitoring dashboard",
        "query": f"Generate a comprehensive Spark monitoring dashboard for workspace {config['workspace_id']}",
        "tool": "get-spark-monitoring-dashboard"
    }
]

for i, example in enumerate(example_queries, 1):
    print(f"{i}. {example['title']}")
    print(f"   Claude Query: \"{example['query']}\"")
    print(f"   MCP Tool: {example['tool']}")
    print()

print("🔧 MCP Server Tool Parameters:")
print("""
Available MCP tools for Spark monitoring:
- get-workspace-spark-applications
  Parameters: bearerToken, workspaceId, continuationToken (optional)

- get-notebook-spark-applications  
  Parameters: bearerToken, workspaceId, notebookId, continuationToken (optional)

- get-lakehouse-spark-applications
  Parameters: bearerToken, workspaceId, lakehouseId, continuationToken (optional)

- get-spark-job-definition-applications
  Parameters: bearerToken, workspaceId, sparkJobDefinitionId, continuationToken (optional)

- get-spark-application-details
  Parameters: bearerToken, workspaceId, livyId

- cancel-spark-application
  Parameters: bearerToken, workspaceId, livyId

- get-spark-monitoring-dashboard
  Parameters: bearerToken, workspaceId
""")

print("\n📋 To use these tools with Claude Desktop:")
print("1. Make sure your MCP server is running (npm start)")
print("2. Configure Claude Desktop with the server in claude_desktop_config.json")
print("3. Ask Claude to use these monitoring capabilities")
print("4. Claude will automatically call the appropriate MCP tools")

# Show configuration for Claude Desktop
print("\n⚙️ Claude Desktop Configuration:")
print("""
Add this to your claude_desktop_config.json:

{
  "mcpServers": {
    "fabric-analytics": {
      "command": "node",
      "args": ["C:\\\\FULL\\\\PATH\\\\TO\\\\YOUR\\\\PROJECT\\\\build\\\\index.js"]
    }
  }
}
""")