# NiFi Processor Usage Analyzer - Databricks Edition

This notebook analyzes NiFi processor execution counts to identify unused or underutilized processors.

**Features:**
- Fast execution count analysis (~5-10 seconds)
- Snapshot mode for time-series tracking
- Delta Lake integration for historical analysis
- Standalone - no external files needed

**Setup:**
1. Edit the configuration in Cell 3
2. Run all cells
3. View results in output and Delta table

In [None]:
# Cell 1: Install Dependencies
# This installs packages for the current notebook session

%pip install requests rich --quiet

print("✓ Dependencies installed successfully!")

In [None]:
# Cell 2: Import Libraries

import requests
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
from rich.console import Console
from rich.table import Table

# Databricks-specific imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

# Disable SSL warnings if verify_ssl=False
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('nifi_analyzer')

# Initialize Rich console for pretty output
console = Console()

print("✓ Libraries imported successfully!")

In [None]:
# Cell 3: Configuration
# EDIT THESE VALUES FOR YOUR NIFI INSTANCE

CONFIG = {
    # NiFi Connection
    'nifi_url': 'https://thbnk01hdpnp002.th-bnk01.nxp.com:8443/nifi',  # Your NiFi URL (without /api)
    'username': 'nxg16670',                                              # Your NiFi username
    'password': 'your-password-here',                                    # Your NiFi password
    'verify_ssl': False,                                                 # Set to True if you have valid SSL
    
    # Server Identifier (for tracking multiple NiFi servers)
    'server': 'prod-nifi-01',                                            # Server name (e.g., hostname, environment)
    
    # Analysis Parameters
    'process_group_id': '8c8677c4-29d6-36...',                          # Process group ID to analyze
    
    # Snapshot Storage (Unity Catalog - 3-level naming)
    'enable_snapshots': True,                                            # Save snapshots to Delta Lake?
    'delta_table_path': 'main.default.nifi_processor_snapshots',        # catalog.schema.table
}

console.print("[green]✓ Configuration loaded![/green]")
console.print(f"  NiFi URL: {CONFIG['nifi_url']}")
console.print(f"  Username: {CONFIG['username']}")
console.print(f"  Server: {CONFIG['server']}")
console.print(f"  Process Group ID: {CONFIG['process_group_id'][:16]}...")
console.print(f"  Delta table: {CONFIG['delta_table_path']}")
console.print(f"  Snapshots enabled: {CONFIG['enable_snapshots']}")

In [None]:
# Cell 4: NiFi Client Class
# Handles all NiFi REST API interactions

class NiFiClient:
    """
    Client for interacting with Apache NiFi REST API.
    Handles authentication and common API operations.
    """
    
    def __init__(self, base_url: str, username: str, password: str, verify_ssl: bool = True):
        self.base_url = base_url.rstrip('/')
        if not self.base_url.endswith('/nifi'):
            self.base_url += '/nifi'
        self.api_url = f"{self.base_url}-api"
        self.verify_ssl = verify_ssl
        self.session = requests.Session()
        self.token = None
        self.username = username
        self.password = password
        
        # Authenticate
        self._authenticate(username, password)
        
    def _authenticate(self, username: str, password: str) -> None:
        """Authenticate with NiFi using username/password."""
        try:
            response = requests.post(
                f"{self.api_url}/access/token",
                data={'username': username, 'password': password},
                verify=self.verify_ssl
            )
            
            if response.status_code == 201:
                self.token = response.text
                self.session.headers.update({'Authorization': f'Bearer {self.token}'})
                logger.info("Successfully authenticated with token")
            else:
                logger.warning(f"Token auth failed with status {response.status_code}")
                logger.warning("Falling back to basic auth")
                from requests.auth import HTTPBasicAuth
                self.session.auth = HTTPBasicAuth(username, password)
        except Exception as e:
            logger.warning(f"Token auth error: {e}, falling back to basic auth")
            from requests.auth import HTTPBasicAuth
            self.session.auth = HTTPBasicAuth(username, password)
    
    def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
        """Make authenticated request with 401 retry."""
        url = f"{self.api_url}/{endpoint.lstrip('/')}"
        kwargs.setdefault('verify', self.verify_ssl)
        
        response = self.session.request(method, url, **kwargs)
        
        # Handle 401 by re-authenticating once
        if response.status_code == 401:
            logger.warning("Received 401, attempting re-authentication")
            self._authenticate(self.username, self.password)
            response = self.session.request(method, url, **kwargs)
            if response.status_code == 401:
                raise Exception("Authentication failed: Unauthorized")
        
        response.raise_for_status()
        return response
    
    def get_process_group(self, group_id: str) -> Dict[str, Any]:
        """Get process group details including all processors."""
        response = self._request("GET", f"/flow/process-groups/{group_id}")
        return response.json()
    
    def list_processors(self, process_group_id: str) -> List[Dict[str, Any]]:
        """Get all processors in a process group (recursive)."""
        pg_data = self.get_process_group(process_group_id)
        processors = pg_data["processGroupFlow"]["flow"]["processors"]
        
        # Recursively get processors from child groups
        child_groups = pg_data["processGroupFlow"]["flow"]["processGroups"]
        for child in child_groups:
            processors.extend(self.list_processors(child["id"]))
        
        return processors
    
    def get_process_group_status(self, group_id: str) -> Dict[str, Any]:
        """Get execution statistics for process group."""
        response = self._request("GET", f"/flow/process-groups/{group_id}/status")
        return response.json()
    
    def get_connection_statistics(self, group_id: str) -> List[Dict[str, Any]]:
        """
        Extract ALL connection statistics from Status API (connection-level, not aggregated).
        
        This method extracts all 15+ fields from connectionStatusSnapshot for maximum
        granularity. Returns raw connection-level data without processor aggregation.
        
        Returns:
            List of connection dictionaries with all available fields
        """
        status_data = self.get_process_group_status(group_id)
        all_connections = []
        
        pg_status = status_data.get("processGroupStatus", {})
        if not pg_status:
            return all_connections
        
        # Get all connections from current group
        connection_statuses = pg_status.get("aggregateSnapshot", {}).get("connectionStatusSnapshots", [])
        
        # Extract ALL fields from each connection (15+ fields)
        for conn_status in connection_statuses:
            conn_snap = conn_status.get("connectionStatusSnapshot", {})
            
            connection_data = {
                # Connection identity (7 fields)
                'id': conn_snap.get('id'),
                'groupId': conn_snap.get('groupId'),
                'name': conn_snap.get('name', ''),
                'sourceId': conn_snap.get('sourceId'),
                'sourceName': conn_snap.get('sourceName'),
                'destinationId': conn_snap.get('destinationId'),
                'destinationName': conn_snap.get('destinationName'),
                
                # Flow metrics - 5-minute window (6 fields)
                'flowFilesIn': conn_snap.get('flowFilesIn', 0),
                'flowFilesOut': conn_snap.get('flowFilesOut', 0),
                'bytesIn': conn_snap.get('bytesIn', 0),
                'bytesOut': conn_snap.get('bytesOut', 0),
                'input': conn_snap.get('input', ''),
                'output': conn_snap.get('output', ''),
                
                # Queue metrics - current state (4 fields)
                'queuedCount': conn_snap.get('queuedCount', 0),
                'queuedBytes': conn_snap.get('queuedBytes', 0),
                'queued': conn_snap.get('queued', ''),
                'queuedSize': conn_snap.get('queuedSize', ''),
                
                # Status indicators (2 fields)
                'percentUseCount': conn_snap.get('percentUseCount', 0),
                'percentUseBytes': conn_snap.get('percentUseBytes', 0),
                
                # Timestamps (1 field)
                'statsLastRefreshed': conn_snap.get('statsLastRefreshed', ''),
            }
            
            all_connections.append(connection_data)
        
        # Recurse into child groups
        child_groups = pg_status.get("processGroupStatus", [])
        for child_pg in child_groups:
            try:
                child_id = child_pg["id"]
                child_connections = self.get_connection_statistics(child_id)
                all_connections.extend(child_connections)
            except Exception as e:
                logger.error(f"Error processing child group: {e}")
        
        return all_connections
    
    def close(self):
        """Close the session."""
        self.session.close()

console.print("[green]✓ NiFiClient class defined![/green]")

In [None]:
# Cell 5: Analyzer Class
# Analyzes processor execution counts using connection-level data

class ProcessorUsageAnalyzer:
    """
    Analyzes processor execution frequency using NiFi Status API.
    Captures ALL connection-level attributes for comprehensive analysis.
    """
    
    def __init__(self, client: NiFiClient, server: str = 'unknown'):
        self.client = client
        self.console = Console()
        self.server = server
        
        # Analysis results
        self.process_group_id: Optional[str] = None
        self.connection_statistics: List[Dict] = []
        self.target_processors: List[Dict] = []
        self.snapshot_timestamp: datetime = None
    
    def analyze(self, process_group_id: str) -> None:
        """Analyze processor execution counts for a process group."""
        self.process_group_id = process_group_id
        self.snapshot_timestamp = datetime.now()
        
        self.console.print(f"\n[yellow]Analyzing processor execution counts:[/yellow]")
        self.console.print(f"  Process Group: {process_group_id[:16]}...")
        self.console.print(f"  Server: {self.server}")
        self.console.print(f"  Timestamp: {self.snapshot_timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
        
        # Phase 1: Get processors in target group
        self.console.print(f"\n[yellow]Phase 1:[/yellow] Getting processors from target process group...")
        
        try:
            self.target_processors = self.client.list_processors(process_group_id)
            self.console.print(f"[green]OK[/green] Found {len(self.target_processors)} processors")
            
            # Display processor list (first 10)
            if self.target_processors:
                self.console.print("\n[cyan]Processors in target group:[/cyan]")
                for proc in self.target_processors[:10]:
                    proc_name = proc['component']['name']
                    proc_type = proc['component']['type'].split('.')[-1]
                    self.console.print(f"  • {proc_name} ({proc_type})")
                if len(self.target_processors) > 10:
                    self.console.print(f"  ... and {len(self.target_processors) - 10} more")
        except Exception as e:
            self.console.print(f"[red]ERROR[/red] Failed to get processors: {e}")
            raise
        
        # Phase 2: Get connection statistics (ALL fields)
        self.console.print(f"\n[yellow]Phase 2:[/yellow] Fetching connection statistics (all fields)...")
        
        try:
            connection_stats = self.client.get_connection_statistics(process_group_id)
            
            if len(connection_stats) == 0:
                self.console.print(
                    f"[yellow]WARNING[/yellow] No connections found in process group"
                )
            else:
                self.console.print(
                    f"[green]OK[/green] Retrieved {len(connection_stats)} connections"
                )
            
            # Store raw connection data
            self.connection_statistics = connection_stats
                
        except Exception as e:
            self.console.print(f"[red]ERROR[/red] Failed to fetch connection statistics: {e}")
            raise
    
    def get_results_dataframe(self):
        """Convert results to Spark DataFrame with 24-field connection schema."""
        if not self.connection_statistics:
            return None
        
        # Create data for DataFrame (24 fields)
        rows = []
        for conn in self.connection_statistics:
            rows.append((
                self.snapshot_timestamp,
                self.server,
                self.process_group_id,
                conn.get('id'),
                conn.get('name', ''),
                conn.get('groupId'),
                conn.get('sourceId'),
                conn.get('sourceName'),
                conn.get('destinationId'),
                conn.get('destinationName'),
                conn.get('flowFilesIn', 0),
                conn.get('flowFilesOut', 0),
                conn.get('bytesIn', 0),
                conn.get('bytesOut', 0),
                conn.get('input', ''),
                conn.get('output', ''),
                conn.get('queuedCount', 0),
                conn.get('queuedBytes', 0),
                conn.get('queued', ''),
                conn.get('queuedSize', ''),
                conn.get('percentUseCount', 0),
                conn.get('percentUseBytes', 0),
                conn.get('statsLastRefreshed', '')
            ))
        
        # Define schema with ALL 24 fields (connection-level)
        schema = StructType([
            # Metadata (3 fields)
            StructField("snapshot_timestamp", TimestampType(), False),
            StructField("server", StringType(), False),
            StructField("process_group_id", StringType(), False),
            
            # Connection identity (3 fields)
            StructField("connection_id", StringType(), True),
            StructField("connection_name", StringType(), True),
            StructField("connection_group_id", StringType(), True),
            
            # Source processor (2 fields)
            StructField("source_id", StringType(), True),
            StructField("source_name", StringType(), True),
            
            # Destination processor (2 fields)
            StructField("destination_id", StringType(), True),
            StructField("destination_name", StringType(), True),
            
            # Flow metrics (6 fields)
            StructField("flow_files_in", LongType(), False),
            StructField("flow_files_out", LongType(), False),
            StructField("bytes_in", LongType(), False),
            StructField("bytes_out", LongType(), False),
            StructField("input", StringType(), True),
            StructField("output", StringType(), True),
            
            # Queue metrics (4 fields)
            StructField("queued_count", LongType(), False),
            StructField("queued_bytes", LongType(), False),
            StructField("queued", StringType(), True),
            StructField("queued_size", StringType(), True),
            
            # Status indicators (2 fields)
            StructField("percent_use_count", LongType(), False),
            StructField("percent_use_bytes", LongType(), False),
            
            # Timestamps (1 field)
            StructField("stats_last_refreshed", StringType(), True)
        ])
        
        # Create DataFrame
        spark = SparkSession.builder.getOrCreate()
        df = spark.createDataFrame(rows, schema)
        return df
    
    def display_summary(self):
        """Display analysis summary."""
        if not self.connection_statistics:
            self.console.print("[red]No analysis results available.[/red]")
            return
        
        # Aggregate connections to processor-level for summary
        processor_activity = {}
        for conn in self.connection_statistics:
            source_name = conn.get('sourceName', 'Unknown')
            if source_name not in processor_activity:
                processor_activity[source_name] = {
                    'flowFilesOut': 0,
                    'bytesOut': 0,
                    'queuedCount': 0
                }
            processor_activity[source_name]['flowFilesOut'] += conn.get('flowFilesOut', 0)
            processor_activity[source_name]['bytesOut'] += conn.get('bytesOut', 0)
            processor_activity[source_name]['queuedCount'] += conn.get('queuedCount', 0)
        
        # Sort by flowfile output
        sorted_processors = sorted(
            processor_activity.items(),
            key=lambda x: x[1]['flowFilesOut'],
            reverse=True
        )
        
        # Calculate stats
        total_flowfiles = sum(data['flowFilesOut'] for _, data in sorted_processors)
        total_bytes = sum(data['bytesOut'] for _, data in sorted_processors)
        unused_count = sum(1 for _, data in sorted_processors if data['flowFilesOut'] == 0)
        low_usage_count = sum(1 for _, data in sorted_processors if 0 < data['flowFilesOut'] < 10)
        queued_connections = sum(1 for conn in self.connection_statistics if conn.get('queuedCount', 0) > 0)
        
        # Display summary
        self.console.print(f"\n[cyan]Summary:[/cyan]")
        self.console.print(f"  Total processors: {len(sorted_processors)}")
        self.console.print(f"  Total connections: {len(self.connection_statistics)}")
        self.console.print(f"  Total flowfiles output (snapshot): {total_flowfiles:,}")
        self.console.print(f"  Total bytes output (snapshot): {total_bytes:,}")
        self.console.print(f"  No output: {unused_count} processors")
        self.console.print(f"  Low output (<10 flowfiles): {low_usage_count} processors")
        self.console.print(f"  Connections with queue: {queued_connections}")
        
        # Display table of top processors
        table = Table(title="\nTop 10 Active Processors")
        table.add_column("Processor Name", style="cyan")
        table.add_column("FlowFiles Out", justify="right", style="yellow")
        table.add_column("Bytes Out", justify="right", style="green")
        table.add_column("Queued", justify="right", style="red")
        
        for name, data in sorted_processors[:10]:
            table.add_row(
                name, 
                f"{data['flowFilesOut']:,}",
                f"{data['bytesOut']:,}",
                f"{data['queuedCount']:,}"
            )
        
        self.console.print(table)
        
        # Show connections with backpressure
        if queued_connections > 0:
            self.console.print(
                f"\n[yellow]WARNING: Connections with queued flowfiles (backpressure):[/yellow]"
            )
            for conn in sorted(self.connection_statistics, key=lambda x: x.get('queuedCount', 0), reverse=True)[:10]:
                if conn.get('queuedCount', 0) > 0:
                    self.console.print(
                        f"  • {conn.get('sourceName')} → {conn.get('destinationName')}: "
                        f"{conn.get('queuedCount', 0):,} flowfiles queued "
                        f"({conn.get('percentUseCount', 0)}% full)"
                    )
        
        # Show pruning candidates (processors with no output)
        if unused_count > 0:
            self.console.print(
                f"\n[yellow]WARNING: Processors with 0 flowfile output (candidates for pruning):[/yellow]"
            )
            count = 0
            for name, data in sorted_processors:
                if data['flowFilesOut'] == 0 and count < 10:
                    self.console.print(f"  • {name}")
                    count += 1
            if unused_count > 10:
                self.console.print(f"  ... and {unused_count - 10} more")
        
        self.console.print(f"\n[green]OK[/green] Analysis complete!")

console.print("[green]✓ ProcessorUsageAnalyzer class defined![/green]")

In [None]:
# Cell 6: Run Analysis
# This is the main execution cell

console.print("\n[cyan]Starting NiFi Processor Analysis...[/cyan]\n")

# Connect to NiFi
console.print("[yellow]Connecting to NiFi...[/yellow]")
client = NiFiClient(
    base_url=CONFIG['nifi_url'],
    username=CONFIG['username'],
    password=CONFIG['password'],
    verify_ssl=CONFIG['verify_ssl']
)
console.print("[green]OK[/green] Connected successfully\n")

# Create analyzer and run analysis
analyzer = ProcessorUsageAnalyzer(client=client, server=CONFIG['server'])
analyzer.analyze(CONFIG['process_group_id'])

# Display results
analyzer.display_summary()

# Cleanup
client.close()

console.print("\n[green]✓ Analysis complete![/green]")

In [None]:
# Cell 7: Save Snapshot to Delta Lake (Optional)
# Run this cell to save the snapshot for historical tracking

if CONFIG['enable_snapshots']:
    console.print("\n[yellow]Saving snapshot to Delta Lake...[/yellow]")
    
    # Get results as DataFrame
    df = analyzer.get_results_dataframe()
    
    if df is not None:
        table_name = CONFIG['delta_table_path']
        
        # IMPORTANT: Drop existing table to start fresh with new 24-field connection-level schema
        # This is necessary because we changed from old schema to 24-field connection-level
        try:
            spark.sql(f"DROP TABLE IF EXISTS {table_name}")
            console.print(f"[yellow]Existing table dropped to start fresh with new schema[/yellow]")
        except Exception as e:
            console.print(f"[yellow]No existing table to drop (this is fine): {e}[/yellow]")
        
        # Create new table with 24-field schema
        console.print(f"[yellow]Creating table with new schema: {table_name}[/yellow]")
        df.write \
            .format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(table_name)
        console.print(f"[green]OK[/green] Table created successfully with 24-field connection-level schema")
        
        console.print(f"  Timestamp: {analyzer.snapshot_timestamp}")
        console.print(f"  Total connections: {len(analyzer.connection_statistics)}")
        console.print(f"  Schema: 23 fields (connection-level with ALL attributes)")
        
        # Show sample data
        console.print(f"\n[cyan]Sample data saved:[/cyan]")
        display(df.limit(5))
        
        # Show schema
        console.print(f"\n[cyan]Schema:[/cyan]")
        df.printSchema()
    else:
        console.print("[red]ERROR[/red] No data to save")
else:
    console.print("\n[yellow]Snapshots disabled in configuration[/yellow]")

In [None]:
# Cell 8: Query Historical Snapshots (Connection-Level Analysis)
# Run this cell to analyze historical snapshot data

if CONFIG['enable_snapshots']:
    console.print("\n[cyan]Querying connection-level snapshots...[/cyan]\n")
    
    table_name = CONFIG['delta_table_path']
    
    # Check if table exists
    try:
        # Show total snapshots by server
        console.print("[yellow]Snapshots by server:[/yellow]")
        spark.sql(f"""
            SELECT 
                server,
                COUNT(DISTINCT snapshot_timestamp) as snapshots,
                COUNT(*) as total_connections,
                MAX(snapshot_timestamp) as last_snapshot
            FROM {table_name}
            GROUP BY server
            ORDER BY server
        """).show(truncate=False)
        
        # NEW: Find connections with high queue depth (backpressure detection)
        console.print("\n[yellow]Connections with queued flowfiles (backpressure):[/yellow]")
        spark.sql(f"""
            SELECT 
                server,
                source_name,
                destination_name,
                MAX(queued_count) as max_queued_flowfiles,
                MAX(queued_bytes) as max_queued_bytes,
                MAX(percent_use_count) as max_percent_full
            FROM {table_name}
            WHERE queued_count > 0
            GROUP BY server, source_name, destination_name
            ORDER BY max_queued_flowfiles DESC
            LIMIT 20
        """).show(truncate=False)
        
        # NEW: Identify connections approaching queue limits
        console.print("\n[yellow]Connections approaching queue limits (>50% full):[/yellow]")
        spark.sql(f"""
            SELECT 
                server,
                source_name,
                destination_name,
                MAX(percent_use_count) as max_percent_full,
                MAX(queued_count) as max_queued_count
            FROM {table_name}
            WHERE percent_use_count > 50
            GROUP BY server, source_name, destination_name
            ORDER BY max_percent_full DESC
            LIMIT 20
        """).show(truncate=False)
        
        # Find inactive connections (no flow for 7 days)
        console.print("\n[yellow]Inactive connections (no flowfiles for 7 days):[/yellow]")
        spark.sql(f"""
            WITH connection_activity AS (
                SELECT 
                    server,
                    source_name,
                    destination_name,
                    MAX(flow_files_out) - MIN(flow_files_out) as delta_flowfiles,
                    MIN(snapshot_timestamp) as first_snapshot,
                    MAX(snapshot_timestamp) as last_snapshot,
                    COUNT(DISTINCT snapshot_timestamp) as num_snapshots
                FROM {table_name}
                WHERE snapshot_timestamp >= current_date() - INTERVAL 7 DAYS
                GROUP BY server, source_name, destination_name
            )
            SELECT 
                server,
                source_name,
                destination_name,
                delta_flowfiles,
                num_snapshots
            FROM connection_activity
            WHERE delta_flowfiles = 0
            ORDER BY server, source_name
            LIMIT 50
        """).show(truncate=False)
        
        # Aggregate to processor level (still possible!)
        console.print("\n[yellow]Inactive processors (aggregated from connections):[/yellow]")
        spark.sql(f"""
            WITH processor_activity AS (
                SELECT 
                    server,
                    source_name as processor_name,
                    MAX(flow_files_out) - MIN(flow_files_out) as delta_flowfiles
                FROM {table_name}
                WHERE snapshot_timestamp >= current_date() - INTERVAL 7 DAYS
                GROUP BY server, source_name
            )
            SELECT 
                server,
                processor_name,
                delta_flowfiles
            FROM processor_activity
            WHERE delta_flowfiles = 0
            ORDER BY server, processor_name
            LIMIT 50
        """).show(truncate=False)
        
        # NEW: Bidirectional flow analysis
        console.print("\n[yellow]Flow balance (input vs output by connection):[/yellow]")
        spark.sql(f"""
            SELECT 
                server,
                source_name,
                destination_name,
                SUM(flow_files_in) as total_flowfiles_in,
                SUM(flow_files_out) as total_flowfiles_out,
                SUM(flow_files_in) - SUM(flow_files_out) as net_change
            FROM {table_name}
            WHERE snapshot_timestamp >= current_date() - INTERVAL 7 DAYS
            GROUP BY server, source_name, destination_name
            HAVING ABS(SUM(flow_files_in) - SUM(flow_files_out)) > 100
            ORDER BY ABS(net_change) DESC
            LIMIT 20
        """).show(truncate=False)
        
    except Exception as e:
        console.print(f"[red]ERROR[/red] Failed to query snapshots: {e}")
        console.print("[yellow]Hint:[/yellow] Table may not exist yet. Run Cell 7 first to create it.")
        import traceback
        traceback.print_exc()
else:
    console.print("\n[yellow]Snapshots disabled in configuration[/yellow]")

In [None]:
# Cell 9: Export Results to CSV (Optional)
# Run this cell to export current results to DBFS/cloud storage

console.print("\n[yellow]Exporting results to CSV...[/yellow]")

# Get current timestamp for filename
timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = f"/dbfs/nifi_analysis/processor_usage_{timestamp_str}.csv"

# Convert to DataFrame and save
df = analyzer.get_results_dataframe()
if df is not None:
    # Convert to Pandas and save as CSV
    pdf = df.toPandas()
    pdf.to_csv(output_path, index=False)
    
    console.print(f"[green]OK[/green] Results exported to {output_path}")
    console.print(f"  Rows: {len(pdf)}")
    
    # Show sample
    console.print(f"\n[cyan]Sample data:[/cyan]")
    display(pdf.head(10))
else:
    console.print("[red]ERROR[/red] No data to export")

---

## Updated Delta Table Schema (Connection-Level)

The Delta table now captures **ALL available fields** from NiFi Status API at the **connection level** (not processor level). This provides maximum granularity for analysis.

### 23 Total Fields

| Column | Type | Description |
|--------|------|-------------|
| **Metadata (3 fields)** | | |
| `snapshot_timestamp` | Timestamp | When the snapshot was captured |
| `server` | String | Server identifier (hostname, environment name) |
| `process_group_id` | String | NiFi process group ID |
| **Connection Identity (3 fields)** | | |
| `connection_id` | String | Connection UUID |
| `connection_name` | String | Connection name (often empty or "success") |
| `connection_group_id` | String | Parent process group ID |
| **Source Processor (2 fields)** | | |
| `source_id` | String | Source processor UUID |
| `source_name` | String | Source processor name |
| **Destination Processor (2 fields)** | | |
| `destination_id` | String | Destination processor UUID |
| `destination_name` | String | Destination processor name |
| **Flow Metrics - 5-minute window (6 fields)** | | |
| `flow_files_in` | Long | FlowFiles entering connection |
| `flow_files_out` | Long | FlowFiles leaving connection |
| `bytes_in` | Long | Bytes entering connection |
| `bytes_out` | Long | Bytes leaving connection |
| `input` | String | Formatted input stats (e.g., "1,250 (50.8 KB)") |
| `output` | String | Formatted output stats |
| **Queue Metrics - current state (4 fields)** | | |
| `queued_count` | Long | FlowFiles currently queued |
| `queued_bytes` | Long | Bytes currently queued |
| `queued` | String | Formatted queue stats |
| `queued_size` | String | Formatted queue size |
| **Status Indicators (2 fields)** | | |
| `percent_use_count` | Long | % of queue count threshold used |
| `percent_use_bytes` | Long | % of queue bytes threshold used |
| **Timestamps (1 field)** | | |
| `stats_last_refreshed` | String | When stats were last updated |

## Key Differences from Previous Version

**Before:** Limited fields, processor-level aggregation
- Only captured basic metrics
- Aggregated connections by processor
- Could not identify specific bottleneck connections
- No queue monitoring capability

**Now:** 23 fields, connection-level granularity
- Captures ALL 15+ fields from NiFi Status API
- Stores each connection separately
- Can identify exact bottleneck points
- Enables queue monitoring, backpressure detection, flow lineage

**Impact:** ~2x more rows (typical NiFi flow has 1-2 connections per processor), but unlocks powerful new analysis capabilities.

## New Analysis Capabilities

### 1. Backpressure Detection
Identify connections with high queue depth:
```sql
SELECT source_name, destination_name, MAX(queued_count) as max_queued
FROM main.default.nifi_processor_snapshots
WHERE queued_count > 100
GROUP BY source_name, destination_name
ORDER BY max_queued DESC;
```

### 2. Queue Limit Monitoring
Find connections approaching capacity:
```sql
SELECT source_name, destination_name, MAX(percent_use_count) as max_percent_full
FROM main.default.nifi_processor_snapshots
WHERE percent_use_count > 80
GROUP BY source_name, destination_name;
```

### 3. Bidirectional Flow Tracking
Compare input vs output to find imbalances:
```sql
SELECT source_name,
       SUM(flow_files_in) as total_in,
       SUM(flow_files_out) as total_out,
       SUM(flow_files_in) - SUM(flow_files_out) as net_change
FROM main.default.nifi_processor_snapshots
GROUP BY source_name
HAVING ABS(net_change) > 100;
```

### 4. Processor-Level Analysis (Still Possible!)
Aggregate connections to processor-level when needed:
```sql
WITH processor_activity AS (
    SELECT source_name,
           MAX(flow_files_out) - MIN(flow_files_out) as delta
    FROM main.default.nifi_processor_snapshots
    WHERE snapshot_timestamp >= current_date() - INTERVAL 7 DAYS
    GROUP BY source_name
)
SELECT * FROM processor_activity WHERE delta = 0;
```

### 5. Flow Path Lineage
Track data movement through the flow:
```sql
SELECT source_name, destination_name, 
       SUM(flow_files_out) as total_flowfiles
FROM main.default.nifi_processor_snapshots
WHERE snapshot_timestamp >= current_date() - INTERVAL 7 DAYS
GROUP BY source_name, destination_name
ORDER BY total_flowfiles DESC;
```

## Unity Catalog Configuration

The notebook uses Unity Catalog with 3-level naming:
- **Catalog**: `main` (default)
- **Schema**: `default` (default)
- **Table**: `nifi_processor_snapshots`
- **Full path**: `main.default.nifi_processor_snapshots`

You can customize this in Cell 3 by editing `delta_table_path`.

## To Schedule This Notebook:

1. **Create a Databricks Job:**
   - Go to **Workflows** → **Jobs** → **Create Job**
   - Type: **Notebook**
   - Notebook path: Select this notebook
   - Compute: **Serverless** (recommended)
   - Schedule: `0 */5 * * *` (every 5 minutes for continuous monitoring)

2. **Create Alerts:**
   - Set up Databricks alerts on the Delta table
   - Get notified when connections approach queue limits
   - Alert on inactive processors

## Key Concepts

**Connection-Level Storage:**
- Each connection is stored as a separate row
- Preserves source → destination relationships
- Enables fine-grained debugging and analysis
- Can still aggregate to processor-level in queries

**Snapshot-based Analysis:**
- Each run captures a snapshot of flowfile counts at that moment
- Take snapshots every 5 minutes for real-time monitoring
- Calculate deltas (MAX - MIN) to identify inactive connections
- Connection with delta = 0 means no flowfiles processed in that time period

**Why Connection-Level Instead of Processor-Level?**
- Identify which specific connection is bottlenecked
- Monitor queue depth per connection
- Track flow paths (source → destination lineage)
- More debugging capability with minimal storage overhead

**5-Minute Window:**
- NiFi Status API returns metrics aggregated over the last 5 minutes
- Running snapshots every 5 minutes captures distinct time windows
- Historical data retained for 24 hours (configurable in NiFi)

**Queue Metrics:**
- `queued_count`: Current number of FlowFiles waiting in connection
- `percent_use_count`: How full the queue is (approaching backpressure threshold)
- Helps identify bottlenecks before they cause performance issues

## Migration Notes

**IMPORTANT:** Running Cell 7 will DROP the existing table to start fresh with the new 23-field schema. This is necessary because:
1. Schema changed to connection-level with 23 fields
2. Data model changed from processor aggregation to connection-level
3. Cannot merge old and new data structures

**Before running:** If you want to preserve old data, create a backup:
```python
spark.sql("CREATE TABLE main.default.nifi_processor_snapshots_backup AS SELECT * FROM main.default.nifi_processor_snapshots")
```