In [30]:
# NextPNR Parameter Sweep with MLIR Processing Pipeline
# Comprehensive tool for FPGA parameter optimization using MLIR-generated benchmarks

# Enable auto reload for imports
%load_ext autoreload
%autoreload 2

# Standard library imports
import subprocess
import os
import sys
import re
import time
import json
from pathlib import Path
from datetime import datetime
import itertools
import concurrent.futures

# Third-party imports
import pandas as pd
import numpy as np
from loguru import logger
import ipywidgets as widgets
from IPython.display import display, clear_output

# Configure loguru logger - output only to stdout (only if not already configured)
# logger.remove()  # Remove default handler
# logger.add(sys.stdout, format="{time} | {level} | {message}", level="INFO")

print("=== NextPNR Parameter Sweep with MLIR Processing Pipeline ===")
print("Comprehensive FPGA parameter optimization using MLIR-generated benchmarks")
print("Auto-reload enabled for module imports")

print("✓ Core libraries and logging configured")

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
=== NextPNR Parameter Sweep with MLIR Processing Pipeline ===
Comprehensive FPGA parameter optimization using MLIR-generated benchmarks
Auto-reload enabled for module imports
✓ Core libraries and logging configured


In [31]:
# Configuration and Parameter Setup
# Define parameter ranges, paths, and constants for the parameter sweep

import numpy as np
import os
from pathlib import Path

# Define parameter ranges for the sweep
CONNECTIVITY_FACTORS = np.arange(0.0, 2.0, 0.2)  # 0.0 to 2.0 with step 0.2
CONGESTION_FACTORS = np.arange(0.0, 2.0, 0.2)    # 0.0 to 2.0 with step 0.2

# Create parameter grid - use reduced set for testing or full grid for production
grid = list(itertools.product(CONNECTIVITY_FACTORS, CONGESTION_FACTORS))  # Full grid
# grid = [(0, 0), (0, 1), (1, 0), (1, 1)]  # Reduced grid for testing

# software dir configuration
CALYX_PATH: str = "/home/kelvin/calyx"
MLIR_OPT_PATH: str = "/home/kelvin/circt/build/vscode/bin/mlir-opt"
HLSTOOL_PATH: str = "/home/kelvin/circt/build/vscode/bin/hlstool"

# Project and directory configuration
MY_FAB_ROOT: Path = Path("/home/kelvin/FABulous_fork")
FAB_PROJ_DIR: Path = Path("/home/kelvin/FABulous_fork/myProject")

# Compilation results directory structure - Stage-based organization
COMPILATION_RESULT_DIR: Path = Path("/home/kelvin/FABulous_fork/myProject/PnR/compilation_result")

# Stage-specific directories for cleaner organization and simpler naming
MLIR_OPTIMIZED_DIR: Path = COMPILATION_RESULT_DIR / "01_optimized_mlir"
MLIR_EXTRACTED_DIR: Path = COMPILATION_RESULT_DIR / "02_extracted_mlir"
FUTIL_OUTPUT_DIR: Path = COMPILATION_RESULT_DIR / "03_futil"
VERILOG_OUTPUT_DIR: Path = COMPILATION_RESULT_DIR / "04_verilog"
SYNTHESIS_OUTPUT_DIR: Path = COMPILATION_RESULT_DIR / "05_synthesis"
PNR_OUTPUT_DIR: Path = COMPILATION_RESULT_DIR / "06_PNR"
OUTPUT_DIR: Path = Path("/home/kelvin/FABulous_fork/myProject/PnR/parameter_sweep_results")

# Benchmark directory configuration - MLIR files only
BENCHMARK_ROOT_DIR: Path = Path("/home/kelvin/FABulous_fork/myProject/PnR/mlir")

# Set up environment variables
os.environ["FAB_PROJ_DIR"] = str(FAB_PROJ_DIR)
os.environ["PATH"] = f"/home/kelvin/nextpnr/build/bba:{os.environ['PATH']}"
os.environ["PATH"] = f"/home/kelvin/nextpnr/build:{os.environ['PATH']}"
os.environ["PATH"] = f"/home/kelvin/yosys:{os.environ['PATH']}"
os.environ["PATH"] = f"/home/kelvin/circt/build/vscode/bin:{os.environ['PATH']}"
os.environ["CALYX_PRIMITIVES_DIR"] = str(Path(CALYX_PATH))

# Create necessary directories for all processing stages
COMPILATION_RESULT_DIR.mkdir(exist_ok=True)
MLIR_OPTIMIZED_DIR.mkdir(exist_ok=True)
MLIR_EXTRACTED_DIR.mkdir(exist_ok=True)
FUTIL_OUTPUT_DIR.mkdir(exist_ok=True)
VERILOG_OUTPUT_DIR.mkdir(exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)

# FABulous file paths
CHIPDB_PATH = FAB_PROJ_DIR / ".FABulous/hycube.bit"
JSON_INPUT = FAB_PROJ_DIR / "user_design/synth_test.json"
CONSTRAIN_PAIR = FAB_PROJ_DIR / ".FABulous/hycube_constrain_pair.inc"
FDC_PATH = FAB_PROJ_DIR / "user_design/test.fdc"

# Fallback source file if no generated Verilog files are available
FALLBACK_SOURCE_HDL = MY_FAB_ROOT / "benchmarks/userbench/loop_array_inner/loop_array_inner.sv"

# Fixed parameters for nextpnr runs
BETA_VALUE = 0.9
PLACE_TRIALS = 1
ROUTER_TIMEOUT = 20000

# Initialize available MLIR files and Verilog files structures
availableMlirFiles = []
availableVerilogFiles = []
benchmarkStructure = {}

print("Configuration setup complete:")
print(f"  Parameter combinations: {len(CONNECTIVITY_FACTORS)} × {len(CONGESTION_FACTORS)} = {len(CONNECTIVITY_FACTORS) * len(CONGESTION_FACTORS)} total")
print(f"  Test grid size: {len(grid)} combinations")
print(f"  Compilation results directory: {COMPILATION_RESULT_DIR}")
print("  Stage directories:")
print("    01_optimized_mlir/ - MLIR optimization results")
print("    02_extracted_mlir/ - Loop-to-function extraction results")
print("    03_intermediate_mlir/ - Individual function MLIR files")
print("    04_futil/ - Generated FUTIL files")
print("    05_verilog/ - Final Verilog output")
print(f"  Parameter sweep results directory: {OUTPUT_DIR}")
print(f"  Benchmark root directory: {BENCHMARK_ROOT_DIR}")
print("  Starting with unoptimized MLIR files from BENCHMARK folder")

print("✓ Configuration and paths set up successfully")

Configuration setup complete:
  Parameter combinations: 10 × 10 = 100 total
  Test grid size: 100 combinations
  Compilation results directory: /home/kelvin/FABulous_fork/myProject/PnR/compilation_result
  Stage directories:
    01_optimized_mlir/ - MLIR optimization results
    02_extracted_mlir/ - Loop-to-function extraction results
    03_intermediate_mlir/ - Individual function MLIR files
    04_futil/ - Generated FUTIL files
    05_verilog/ - Final Verilog output
  Parameter sweep results directory: /home/kelvin/FABulous_fork/myProject/PnR/parameter_sweep_results
  Benchmark root directory: /home/kelvin/FABulous_fork/myProject/PnR/mlir
  Starting with unoptimized MLIR files from BENCHMARK folder
✓ Configuration and paths set up successfully


In [32]:
# Enhanced data structures for complete pipeline
from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Tuple
from enum import Enum
import time
import os

class FailureType(Enum):
    NONE = "none"
    MLIR_OPTIMIZATION = "mlir_optimization"
    LOOP_EXTRACTION = "loop_extraction" 
    FUTIL_GENERATION = "futil_generation"
    VERILOG_GENERATION = "verilog_generation"
    SYNTHESIS = "synthesis"
    PLACEMENT = "placement"
    ROUTING = "routing"
    UNKNOWN = "unknown"

@dataclass
class VerilogPipelineResult:
    """Unified MLIR to Verilog generation pipeline result"""
    source_file: Path
    success: bool
    runtime: float
    
    # Output files
    optimized_mlir_file: Optional[Path] = None
    function_extracted: Optional[list[Path]] = None
    futil_files: Optional[list[Path]] = None
    verilog_files: Optional[list[Path]] = None
    
    # Stage metrics
    mlir_optimization_time: float = 0.0
    loop_extraction_time: float = 0.0
    futil_generation_time: float = 0.0

    
    # Error tracking
    failure_stage: Optional[FailureType] = None
    error_message: Optional[str] = None
    
    def __post_init__(self):
        if self.verilog_files is None:
            self.verilog_files = []

@dataclass
class SynthesisResult:
    """FABulous synthesis preprocessing result"""
    success: bool
    runtime: float
    verilog_file: Optional[Path] = None
    synthesis_out_file: Optional[Path] = None
    error_message: Optional[str] = None
    full_stdout: Optional[str] = None
    full_stderr: Optional[str] = None

@dataclass
class ParameterResult:
    """Single parameter configuration result"""
    config_id: int
    success: bool
    runtime: float
    
    # Configuration parameters
    parameters: Dict[str, Any] = None
    
    # NextPNR analysis results
    placement_info: Optional[Dict[str, Any]] = None
    routing_info: Optional[Dict[str, Any]] = None

    full_stdout: Optional[str] = None
    full_stderr: Optional[str] = None
    
    # Error tracking
    failure_type: Optional[FailureType] = None
    error_message: Optional[str] = None
    
    def __post_init__(self):
        if self.parameters is None:
            self.parameters = {}

@dataclass
class CompletePipelineResult:
    """Complete end-to-end pipeline result"""
    source_file: Path
    overall_success: bool
    total_runtime: float
    
    # Stage results
    verilog_pipeline: VerilogPipelineResult
    synthesis_results: List[SynthesisResult]  # Updated to handle multiple synthesis results
    parameter_results: List[ParameterResult]
    
    # Overall failure tracking
    primary_failure_type: FailureType = FailureType.NONE
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for DataFrame creation"""
        base_dict = {
            'source_file': os.path.basename(self.source_file),
            'overall_success': self.overall_success,
            'total_runtime': self.total_runtime,
            'primary_failure': self.primary_failure_type.value,
            
            # Verilog pipeline
            'verilog_success': self.verilog_pipeline.success,
            'verilog_runtime': self.verilog_pipeline.runtime,
            'mlir_opt_time': self.verilog_pipeline.mlir_optimization_time,
            'loop_extract_time': self.verilog_pipeline.loop_extraction_time,
            'futil_gen_time': self.verilog_pipeline.futil_generation_time,
            'function_extracted': self.verilog_pipeline.function_extracted,
            
            # Synthesis - aggregated metrics from multiple synthesis results
            'synthesis_count': len(self.synthesis_results),
            'synthesis_success_count': sum(1 for s in self.synthesis_results if s.success),
            'synthesis_overall_success': any(s.success for s in self.synthesis_results),
            'synthesis_total_runtime': sum(s.runtime for s in self.synthesis_results),
            'synthesis_avg_runtime': sum(s.runtime for s in self.synthesis_results) / len(self.synthesis_results) if self.synthesis_results else 0,
            
            # Parameter sweep summary
            'total_configs': len(self.parameter_results),
            'successful_configs': sum(1 for p in self.parameter_results if p.success),
        }
        
        if self.parameter_results:
            successful_params = [p for p in self.parameter_results if p.success]
            if successful_params:
                base_dict['avg_param_runtime'] = sum(p.runtime for p in successful_params) / len(successful_params)
                base_dict['min_param_runtime'] = min(p.runtime for p in successful_params)
                base_dict['max_param_runtime'] = max(p.runtime for p in successful_params)
        
        return base_dict
    
    def __str__(self) -> str:
        """
        String representation for displaying pipeline result information.

        Returns
        -------
        str
            Formatted summary of the pipeline result.
        """
        lines = []
        lines.append(f"Pipeline Result for: {self.source_file}")
        lines.append(f"  Overall Success: {self.overall_success}")
        lines.append(f"  Total Runtime: {self.total_runtime:.2f}s")
        lines.append(f"  Primary Failure: {self.primary_failure_type.value}")
        lines.append("  Verilog Pipeline:")
        lines.append(f"    Success: {self.verilog_pipeline.success}")
        lines.append(f"    Runtime: {self.verilog_pipeline.runtime:.2f}s")
        lines.append(f"    MLIR Opt Time: {self.verilog_pipeline.mlir_optimization_time:.2f}s")
        lines.append(f"    Loop Extract Time: {self.verilog_pipeline.loop_extraction_time:.2f}s")
        lines.append(f"    FUTIL Gen Time: {self.verilog_pipeline.futil_generation_time:.2f}s")
        if self.verilog_pipeline.function_extracted:
            lines.append(f"    function extracted: {self.verilog_pipeline.function_extracted}")
        if self.verilog_pipeline.failure_stage:
            lines.append(f"    Failure Stage: {self.verilog_pipeline.failure_stage.value}")
        if self.verilog_pipeline.error_message:
            lines.append(f"    Error: {self.verilog_pipeline.error_message}")
        
        # Updated synthesis section to handle multiple results
        lines.append("  Synthesis:")
        lines.append(f"    Total Synthesis Jobs: {len(self.synthesis_results)}")
        successful_synthesis = sum(1 for s in self.synthesis_results if s.success)
        lines.append(f"    Successful: {successful_synthesis}/{len(self.synthesis_results)}")
        if self.synthesis_results:
            total_synthesis_runtime = sum(s.runtime for s in self.synthesis_results)
            lines.append(f"    Total Runtime: {total_synthesis_runtime:.2f}s")
            lines.append(f"    Average Runtime: {total_synthesis_runtime/len(self.synthesis_results):.2f}s")
            
            # Show details for failed synthesis
            failed_synthesis = [s for s in self.synthesis_results if not s.success]
            if failed_synthesis:
                lines.append(f"    Failed synthesis errors:")
                for i, s in enumerate(failed_synthesis):
                    if s.error_message:
                        lines.append(f"      {i+1}. {s.error_message}")
        
        lines.append("  Parameter Sweep:")
        lines.append(f"    Total Configs: {len(self.parameter_results)}")
        successful_params = sum(1 for p in self.parameter_results if p.success)
        lines.append(f"    Successful Configs: {successful_params}")
        if self.parameter_results:
            failures = [p for p in self.parameter_results if not p.success]
            if failures:
                lines.append(f"    Failed Configs: {len(failures)}")
                failure_types = {}
                for p in failures:
                    ft = p.failure_type.value if p.failure_type else "unknown"
                    failure_types[ft] = failure_types.get(ft, 0) + 1
                for ft, count in failure_types.items():
                    lines.append(f"      {ft}: {count}")
        return "\n".join(lines)

In [33]:
# Direct pipeline functions with proper stage directories
import subprocess
import tempfile
import shutil
from pathlib import Path
import sys
import io
from contextlib import redirect_stdout, redirect_stderr

def run_verilog_pipeline(mlir_file: Path) -> VerilogPipelineResult:
    """
    Run the complete MLIR to Verilog generation pipeline with stage directories
    MLIR optimization -> Loop extraction -> FUTIL generation -> Verilog generation
    """
    start_time = time.time()
    mlir_path = Path(mlir_file)
    base_name = mlir_path.stem
    
    result = VerilogPipelineResult(
        source_file=mlir_file,
        success=False,
        runtime=0.0,
        verilog_files=[]
    )
    
    try:
        # Stage 1: MLIR Optimization
        mlir_start = time.time()
        optimized_mlir_path = MLIR_OPTIMIZED_DIR / f"{base_name}.mlir"
        result.optimized_mlir_file, mlir_success, mlir_error = run_mlir_optimization(mlir_path, optimized_mlir_path)
        result.mlir_optimization_time = time.time() - mlir_start
        
        if not mlir_success or result.optimized_mlir_file is None:
            result.failure_stage = FailureType.MLIR_OPTIMIZATION
            result.error_message = mlir_error or "MLIR optimization failed"
            result.runtime = time.time() - start_time
            return result
        
        # Stage 2: Loop Extraction - now returns list of files
        loop_start = time.time()
        result.function_extracted, loop_success, loop_error = extract_loops_from_mlir(Path(result.optimized_mlir_file), MLIR_EXTRACTED_DIR)
        result.loop_extraction_time = time.time() - loop_start
        
        if not loop_success:
            result.failure_stage = FailureType.LOOP_EXTRACTION
            result.error_message = loop_error or "Loop extraction failed"
            result.runtime = time.time() - start_time
            return result
        
        # Stage 3 & 4: Process each extracted file through FUTIL -> Verilog
        all_verilog_files = []
        all_futil_files = []
        for extracted_file in result.function_extracted:
            # FUTIL Generation for this file
            futil_start = time.time()
            futil_file, futil_success, futil_error = generate_futil_from_mlir(extracted_file, FUTIL_OUTPUT_DIR)
            all_futil_files.append(futil_file)
            futil_time = time.time() - futil_start
            result.futil_generation_time += futil_time
            
            if not futil_success or futil_file is None:
                result.failure_stage = FailureType.FUTIL_GENERATION
                result.error_message = futil_error or f"FUTIL generation failed for {extracted_file}"
                result.runtime = time.time() - start_time
                return result
            
            # Verilog Generation for this FUTIL file
            verilog_file, verilog_success, verilog_error = generate_verilog_from_futil(futil_file, VERILOG_OUTPUT_DIR)
            
            if not verilog_success or not verilog_file:
                result.failure_stage = FailureType.VERILOG_GENERATION
                result.error_message = verilog_error or f"Verilog generation failed for {futil_file}"
                result.runtime = time.time() - start_time
                return result
                
            all_verilog_files.append(verilog_file)
        
        result.verilog_files = all_verilog_files
        result.futil_files = all_futil_files
        result.success = True
        
    except Exception as e:
        result.failure_stage = FailureType.VERILOG_GENERATION
        result.error_message = str(e)
    
    result.runtime = time.time() - start_time
    return result

def run_mlir_optimization(mlir_file: Path, output_path: Path) -> Tuple[Optional[Path], bool, Optional[str]]:
    """Run MLIR optimization passes - returns (output_file, success, error_message)"""
    try:
        # Ensure output directory exists
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        cmd = [
            "mlir-opt",
            "--allow-unregistered-dialect",
            "--int-range-optimizations",
            "--sroa",
            "--normalize-memrefs", 
            "--flatten-memref",
            "--enable-loop-simplifycfg-term-folding",
            "--affine-loop-fusion",
            "--affine-simplify-structures",
            "--affine-loop-invariant-code-motion",
            "--affine-scalrep",
            "--affine-pipeline-data-transfer",
            "-mlir-print-op-generic",
            str(mlir_file),
            '-o', str(output_path)
        ]
        
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
        
        if result.returncode == 0 and output_path.exists():
            return output_path, True, None
        else:
            error_msg = f"MLIR optimization failed: {result.stderr}"
            return None, False, error_msg
            
    except subprocess.TimeoutExpired:
        return None, False, "MLIR optimization timed out"
    except Exception as e:
        return None, False, f"MLIR optimization error: {str(e)}"

def extract_loops_from_mlir(optimized_mlir_file: Path, extracted_dir: Path) -> Tuple[List[Path], bool, Optional[str]]:
    """Extract loops using inner_loop_to_func - returns (list_of_extracted_files, success, error_message)"""
    try:
        # Ensure directory exists
        extracted_dir.mkdir(parents=True, exist_ok=True)
        
        # Suppress output from process_mlir_file by capturing stdout/stderr
        captured_output = io.StringIO()
        captured_error = io.StringIO()
        
        try:
            with redirect_stdout(captured_output), redirect_stderr(captured_error):
                # Use inner_loop_to_func.process_mlir_file which returns list of Path objects
                from inner_loop_to_func import process_mlir_file
                extracted_file_paths = process_mlir_file(str(optimized_mlir_file), str(extracted_dir))
        except Exception as import_error:
            # If the context managers fail, try without them
            from inner_loop_to_func import process_mlir_file
            extracted_file_paths = process_mlir_file(str(optimized_mlir_file), str(extracted_dir))
        
        if extracted_file_paths:
            return extracted_file_paths, True, None
        else:
            return [], False, "No loops extracted from MLIR file"
        
    except FileNotFoundError:
        return [], False, f"MLIR file not found: {optimized_mlir_file}"
    except Exception as e:
        return [], False, f"Loop extraction error: {str(e)}"

def generate_futil_from_mlir(mlir_file: Path, futil_dir: Path) -> Tuple[Optional[Path], bool, Optional[str]]:
    """Generate FUTIL file from a single MLIR file - returns (futil_file, success, error_message)"""
    try:
        # Ensure output directory exists
        futil_dir.mkdir(parents=True, exist_ok=True)
        
        futil_path = futil_dir / f"{mlir_file.stem}.futil"
        intermediate_mlir = futil_dir / f"{mlir_file.stem}.mlir"
        
        try:
            # Step 1: hlstool command
            hlstool_cmd = [
                "hlstool",
                "--calyx-hw",
                "--output-level=core", 
                "--ir",
                "--allow-unregistered-dialects",
                str(mlir_file),
                "-o", str(intermediate_mlir)
            ]
            
            result = subprocess.run(hlstool_cmd, capture_output=True, text=True, timeout=30)
            
            if result.returncode != 0:
                return None, False, f"hlstool failed for {mlir_file.name}: {result.stderr}"
            
            # Step 2: circt-translate command
            translate_cmd = [
                "circt-translate",
                str(intermediate_mlir),
                "-o", str(futil_path),
                "--export-calyx"
            ]
            
            result = subprocess.run(translate_cmd, capture_output=True, text=True, timeout=60)
            
            if result.returncode != 0:
                return None, False, f"circt-translate failed for {mlir_file.name}: {result.stderr}"
            
            # Clean up intermediate file
            intermediate_mlir.unlink(missing_ok=True)
            
            return futil_path, True, None
            
        except subprocess.TimeoutExpired:
            return None, False, f"Timeout processing {mlir_file.name}"
        except Exception as e:
            return None, False, f"Error processing {mlir_file.name}: {e}"
            
    except Exception as e:
        return None, False, f"FUTIL generation error: {str(e)}"

def generate_verilog_from_futil(futil_file: Path, verilog_dir: Path) -> Tuple[Optional[Path], bool, Optional[str]]:
    """Generate Verilog files from FUTIL - returns (verilog_file, success, error_message)"""
    try:
        # Ensure output directory exists
        verilog_dir.mkdir(parents=True, exist_ok=True)
        
        verilog_path = verilog_dir / f"{futil_file.stem}.sv"
        
        # Calyx compilation command
        calyx_cmd = [
            "/home/kelvin/calyx/target/debug/calyx",
            "-l", "/home/kelvin/calyx",  # Add library path
            "-p", "fsm-opt",
            "-x", "simplify-with-control:without-register",
            "-x", "static-inline:offload-pause=false", 
            "-p", "lower",
            "--nested",
            "-d", "papercut",
            "-d", "cell-share",
            "-o", str(verilog_path),
            "-b", "verilog",
            "--synthesis",
            str(futil_file)
        ]
        
        env = os.environ.copy()
        result = subprocess.run(calyx_cmd, capture_output=True, text=True, timeout=300, cwd=str(CALYX_PATH), env=env)
        
        if result.returncode == 0 and verilog_path.exists():
            return verilog_path, True, None
        else:
            error_msg = f"Verilog generation failed: {result.stderr}"
            return None, False, error_msg
            
    except subprocess.TimeoutExpired:
        return None, False, "Verilog generation timed out"
    except Exception as e:
        return None, False, f"Verilog generation error: {str(e)}"

def run_synthesis(verilog_file: Path) -> SynthesisResult:
    """Run FABulous synthesis preprocessing only - returns SynthesisResult directly"""
    start_time = time.time()
    
    try:
        # Run FABulous synthesis preprocessing (yosys + json generation)
        synthesis_cmd = [
            "FABulous", "--debug", str(FAB_PROJ_DIR), "-p",
            "load_fabric; gen_bitStream_spec; gen_cells_and_techmaps; "
            f"gen_chipdb -routing_graph {FAB_PROJ_DIR}/.FABulous/routing_graph.dot -filter 5,1 5,2 5,3 5,4; "
            f"synthesis_script {str(verilog_file)} -tcl {FAB_PROJ_DIR}/.FABulous/arch_synth.tcl;"
        ]
        SYNTHESIS_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
        synthesis_out = SYNTHESIS_OUTPUT_DIR / f"{verilog_file.stem}_synth.json"
        p = os.environ.copy()
        p["OUT_JSON_PATH"] = str(synthesis_out)
        synthesis_result = subprocess.run(synthesis_cmd, capture_output=True, text=True, timeout=300, env=p)
        synthesis_runtime = time.time() - start_time
        
        synthesis_success = synthesis_result.returncode == 0
        error_message = synthesis_result.stderr if not synthesis_success else None

        # Check for invalid synthesis cells
        if synthesis_success and re.findall(r"\"type\": \"(\$.*?)\"", synthesis_result.stdout):
            r = re.findall(r"\"type\": \"(\$.*?)\"", synthesis_result.stdout)
            synthesis_success = False
            error_message = f"Invalid synthesis result, found cell: {set(r)}"
        
        return SynthesisResult(
            success=synthesis_success,
            runtime=synthesis_runtime,
            verilog_file=verilog_file,
            synthesis_out_file=synthesis_out if synthesis_success else None,
            error_message=error_message
        )
        
    except subprocess.TimeoutExpired:
        return SynthesisResult(
            success=False,
            runtime=time.time() - start_time,
            verilog_file=verilog_file,
            synthesis_out_file=None,
            error_message="Synthesis timed out"
        )
    except Exception as e:
        return SynthesisResult(
            success=False,
            runtime=time.time() - start_time,
            verilog_file=verilog_file,
            synthesis_out_file=None,
            error_message=f"Synthesis error: {str(e)}"
        )

def run_parameter_sweep(synthesis_result: SynthesisResult) -> List[ParameterResult]:
    """Run NextPNR parameter sweep only - returns list of ParameterResult directly"""
    if not synthesis_result.success or not synthesis_result.synthesis_out_file:
        return []
    
    synthesis_json_path = Path(synthesis_result.synthesis_out_file)
    
    # Check if synthesis JSON exists
    if not synthesis_json_path.exists():
        return [ParameterResult(
            config_id=0,
            success=False,
            runtime=0.0,
            failure_type=FailureType.SYNTHESIS,
            error_message=f"Synthesis JSON not found: {synthesis_json_path}"
        )]
    
    PNR_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    out_json_path = PNR_OUTPUT_DIR / f"{synthesis_json_path.stem}_pnr.json"
    out_fasm_path = PNR_OUTPUT_DIR / f"{synthesis_json_path.stem}.fasm"
    parameter_results = []
    
    for i, (conn_factor, cong_factor) in enumerate(grid):
        param_start_time = time.time()
        
        # Run nextpnr with these parameters
        try:
            nextpnr_cmd = [
                "nextpnr-himbaechel",
                "--chipdb", str(CHIPDB_PATH),
                "--device", "FABulous",
                "--json", str(synthesis_json_path),
                "--write", str(out_json_path),
                "-o", f"constrain-pair={CONSTRAIN_PAIR}",
                "-o", f"fasm={out_fasm_path}",  # Uncomment if FASM output needed
                "-o", f"fdc={FDC_PATH}",
                "--placer-heap-seed-placement-strategy", "graph_grid",
                "--placer-heap-beta", str(BETA_VALUE),
                "--placer-heap-arch-connectivity-factor", str(conn_factor),
                "--placer-heap-congestion-aware-factor", str(cong_factor),
                "-o", f"placeTrial={PLACE_TRIALS}",
                "--router1-timeout", str(ROUTER_TIMEOUT)
            ]
            
            result = subprocess.run(nextpnr_cmd, capture_output=True, text=True, timeout=600)
            param_runtime = time.time() - param_start_time
            
            param_success = result.returncode == 0
            failure_type = FailureType.NONE if param_success else FailureType.ROUTING
            
            # Simple log analysis
            placement_success = "Final Placement" in result.stdout
            routing_failed = "ERROR: Max iteration count reached" in result.stdout
            
            if not param_success:
                if not placement_success:
                    failure_type = FailureType.PLACEMENT
                elif routing_failed:
                    failure_type = FailureType.ROUTING
                else:
                    failure_type = FailureType.UNKNOWN
            
            param_result = ParameterResult(
                config_id=i,
                success=param_success,
                runtime=param_runtime,
                parameters={'connectivity_factor': conn_factor, 'congestion_factor': cong_factor},
                placement_info={'placement_success': placement_success},
                routing_info={'routing_failed': routing_failed},
                failure_type=failure_type,
                error_message=result.stderr if not param_success else None
            )
            
            parameter_results.append(param_result)
            
        except subprocess.TimeoutExpired:
            param_runtime = time.time() - param_start_time
            parameter_results.append(ParameterResult(
                config_id=i,
                success=False,
                runtime=param_runtime,
                parameters={'connectivity_factor': conn_factor, 'congestion_factor': cong_factor},
                placement_info={},
                routing_info={},
                failure_type=FailureType.UNKNOWN,
                error_message='Parameter run timed out'
            ))
        except Exception as e:
            param_runtime = time.time() - param_start_time
            parameter_results.append(ParameterResult(
                config_id=i,
                success=False,
                runtime=param_runtime,
                parameters={'connectivity_factor': conn_factor, 'congestion_factor': cong_factor},
                placement_info={},
                routing_info={},
                failure_type=FailureType.UNKNOWN,
                error_message=str(e)
            ))
    
    return parameter_results

def run_complete_pipeline(mlir_file: Path) -> CompletePipelineResult:
    """
    Run the complete MLIR to hardware pipeline:
    MLIR -> Verilog -> Synthesis -> Parameter Sweep
    """
    start_time = time.time()
    
    # Ensure all stage directories exist
    MLIR_OPTIMIZED_DIR.mkdir(parents=True, exist_ok=True)
    MLIR_EXTRACTED_DIR.mkdir(parents=True, exist_ok=True)
    FUTIL_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    VERILOG_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    SYNTHESIS_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    PNR_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

    # Stage 1: Verilog Generation Pipeline
    verilog_result = run_verilog_pipeline(mlir_file)
    
    # Stage 2: Synthesis (if Verilog generation succeeded)
    synthesis_results = []
    if verilog_result.success and verilog_result.verilog_files:
        for verilog_file in verilog_result.verilog_files:
            synthesis_results.append(run_synthesis(Path(verilog_file)))
    
    # Stage 3: Parameter Sweep (if any synthesis succeeded)
    parameter_results = []
    for synthesis_result in synthesis_results:
        if synthesis_result.success:
            parameter_results.extend(run_parameter_sweep(synthesis_result))

    # Determine overall success and primary failure type
    overall_success = False
    primary_failure_type = FailureType.NONE
    
    if not verilog_result.success:
        primary_failure_type = verilog_result.failure_stage or FailureType.VERILOG_GENERATION
    elif not synthesis_results or not any(s.success for s in synthesis_results):
        primary_failure_type = FailureType.SYNTHESIS
    elif not parameter_results or not any(p.success for p in parameter_results):
        # Find most common parameter failure type
        if parameter_results:
            failure_types = [p.failure_type for p in parameter_results if p.failure_type]
            if failure_types:
                primary_failure_type = max(set(failure_types), key=failure_types.count)
            else:
                primary_failure_type = FailureType.UNKNOWN
        else:
            primary_failure_type = FailureType.UNKNOWN
    else:
        overall_success = True
        primary_failure_type = FailureType.NONE
    
    # Create and return complete result
    return CompletePipelineResult(
        source_file=mlir_file,
        overall_success=overall_success,
        total_runtime=time.time() - start_time,
        verilog_pipeline=verilog_result,
        synthesis_results=synthesis_results,
        parameter_results=parameter_results,
        primary_failure_type=primary_failure_type
    )

def process_mlir_benchmarks(mlir_files: List[Path]) -> List[CompletePipelineResult]:
    """Process multiple MLIR files through the complete pipeline"""
    results = []
    
    print(f"Processing {len(mlir_files)} MLIR files through complete pipeline...")
    print("Stage directories:")
    print(f"  01_optimized_mlir: {MLIR_OPTIMIZED_DIR}")
    print(f"  02_extracted_mlir: {MLIR_EXTRACTED_DIR}")
    print(f"  03_futil: {FUTIL_OUTPUT_DIR}")
    print(f"  04_verilog: {VERILOG_OUTPUT_DIR}")
    print(f"  05_synthesis: {SYNTHESIS_OUTPUT_DIR}")
    print(f"  06_PNR: {PNR_OUTPUT_DIR}")

    for i, mlir_file in enumerate(mlir_files):
        print(f"\n[{i+1}/{len(mlir_files)}] Processing: {Path(mlir_file).name}")
        
        try:
            result = run_complete_pipeline(mlir_file)
            results.append(result)
            
            status = "✓ SUCCESS" if result.overall_success else f"✗ FAILED ({result.primary_failure_type.value})"
            print(f"  Result: {status} | Runtime: {result.total_runtime:.2f}s")
            
        except Exception as e:
            print(f"  ERROR: {e}")
            # Create failed result with proper structure
            failed_verilog_result = VerilogPipelineResult(
                source_file=mlir_file,
                success=False,
                runtime=0.0,
                failure_stage=FailureType.UNKNOWN,
                error_message=str(e)
            )
            
            failed_result = CompletePipelineResult(
                source_file=mlir_file,
                overall_success=False,
                total_runtime=0.0,
                verilog_pipeline=failed_verilog_result,
                synthesis_results=[],
                parameter_results=[],
                primary_failure_type=FailureType.UNKNOWN
            )
            results.append(failed_result)
    
    return results

## 7. Interactive Dashboard

Interactive dashboard for exploring results across different Verilog files with combined analysis and file-specific views.

In [None]:
# Single MLIR File Pipeline Runner with Detailed Output and Custom Parameters
# Run the complete pipeline on a single MLIR file with comprehensive stage-by-stage reporting

def make_clickable_path(file_path: Path) -> str:
    """Convert a file path into a clickable terminal hyperlink using OSC 8 sequences"""
    if not file_path:
        return ""
    
    # Convert to absolute path
    abs_path = file_path.resolve()
    
    # Create OSC 8 hyperlink: \033]8;;file://path\033\\text\033]8;;\033\\
    # This creates a clickable link in terminals that support OSC 8
    return f"file://{abs_path}"

def make_clickable_dir(dir_path: Path) -> str:
    """Convert a directory path into a clickable terminal hyperlink"""
    if not dir_path:
        return ""
    
    abs_path = dir_path.resolve()
    return f"file://{abs_path}"

def clean_log_content(log_content: str) -> str:
    """Remove ANSI escape codes, color codes, and other rich formatting from log content"""
    import re
    
    if not log_content:
        return ""
    
    # Remove ANSI escape sequences (colors, cursor movements, etc.)
    # Pattern matches: ESC[ followed by any number of parameters and a final character
    ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
    cleaned = ansi_escape.sub('', log_content)
    
    # Remove other common escape sequences
    # Remove carriage returns that don't have newlines (progress indicators)
    cleaned = re.sub(r'\r(?!\n)', '\n', cleaned)
    
    # Remove excessive whitespace but preserve structure
    cleaned = re.sub(r'\n\s*\n\s*\n', '\n\n', cleaned)
    
    # Remove common progress indicators and spinner characters
    cleaned = re.sub(r'[⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏|\-\\/]', '', cleaned)
    
    # Remove excessive spaces (more than 4 consecutive spaces become 4)
    cleaned = re.sub(r' {5,}', '    ', cleaned)
    
    # Clean up any remaining control characters except tabs and newlines
    cleaned = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', cleaned)
    
    # Remove lines that are just whitespace or repetitive characters
    lines = cleaned.split('\n')
    cleaned_lines = []
    for line in lines:
        # Skip lines that are just repeated characters (like ===== or -----)
        if line.strip() and not re.match(r'^[=\-_*#]{10,}$', line.strip()):
            cleaned_lines.append(line)
        elif not line.strip():
            # Keep empty lines but limit consecutive empty lines
            if not (cleaned_lines and cleaned_lines[-1] == ''):
                cleaned_lines.append(line)
    
    return '\n'.join(cleaned_lines).strip()

def run_single_parameter_sweep(synthesis_result: SynthesisResult, custom_params: dict = None) -> List[ParameterResult]:
    """Run NextPNR with a single custom parameter configuration"""
    if not synthesis_result.success or not synthesis_result.synthesis_out_file:
        return []
    
    synthesis_json_path = Path(synthesis_result.synthesis_out_file)
    
    # Check if synthesis JSON exists
    if not synthesis_json_path.exists():
        return [ParameterResult(
            config_id=0,
            success=False,
            runtime=0.0,
            failure_type=FailureType.SYNTHESIS,
            error_message=f"Synthesis JSON not found: {synthesis_json_path}"
        )]
    
    PNR_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
    out_json_path = PNR_OUTPUT_DIR / f"{synthesis_json_path.stem}_pnr.json"
    out_fasm_path = PNR_OUTPUT_DIR / f"{synthesis_json_path.stem}.fasm"
    
    # Use custom parameters or default
    if custom_params:
        conn_factor = custom_params.get('connectivity_factor', 1.0)
        cong_factor = custom_params.get('congestion_factor', 1.0)
        config_grid = [(conn_factor, cong_factor)]
    else:
        config_grid = grid  # Use default grid
    
    parameter_results = []
    
    for i, (conn_factor, cong_factor) in enumerate(config_grid):
        param_start_time = time.time()
        
        # Run nextpnr with these parameters
        try:
            nextpnr_cmd = [
                "nextpnr-himbaechel",
                "--chipdb", str(CHIPDB_PATH),
                "--device", "FABulous",
                "--json", str(synthesis_json_path),
                "--write", str(out_json_path),
                "-o", f"constrain-pair={CONSTRAIN_PAIR}",
                "-o", f"fasm={out_fasm_path}",
                "-o", f"fdc={FDC_PATH}",
                "--placer-heap-seed-placement-strategy", "graph_grid",
                "--placer-heap-beta", str(BETA_VALUE),
                "--placer-heap-arch-connectivity-factor", str(conn_factor),
                "--placer-heap-congestion-aware-factor", str(cong_factor),
                "-o", f"placeTrial={PLACE_TRIALS}",
                "--router1-timeout", str(ROUTER_TIMEOUT)
            ]
            
            result = subprocess.run(nextpnr_cmd, capture_output=True, text=True, timeout=600)
            param_runtime = time.time() - param_start_time
            
            param_success = result.returncode == 0
            failure_type = FailureType.NONE if param_success else FailureType.ROUTING
            
            # Simple log analysis
            placement_success = "Final Placement" in result.stdout
            routing_failed = "ERROR: Max iteration count reached" in result.stdout
            
            if not param_success:
                if not placement_success:
                    failure_type = FailureType.PLACEMENT
                elif routing_failed:
                    failure_type = FailureType.ROUTING
                else:
                    failure_type = FailureType.UNKNOWN
            
            param_result = ParameterResult(
                config_id=i,
                success=param_success,
                runtime=param_runtime,
                parameters={'connectivity_factor': conn_factor, 'congestion_factor': cong_factor},
                placement_info={'placement_success': placement_success},
                routing_info={'routing_failed': routing_failed},
                failure_type=failure_type,
                full_stdout= result.stdout,
                full_stderr= result.stderr,
                error_message=result.stderr if not param_success else None
            )
            
            parameter_results.append(param_result)
            
        except subprocess.TimeoutExpired:
            param_runtime = time.time() - param_start_time
            parameter_results.append(ParameterResult(
                config_id=i,
                success=False,
                runtime=param_runtime,
                parameters={'connectivity_factor': conn_factor, 'congestion_factor': cong_factor},
                placement_info={},
                routing_info={},
                failure_type=FailureType.UNKNOWN,
                error_message='Parameter run timed out'
            ))
        except Exception as e:
            param_runtime = time.time() - param_start_time
            parameter_results.append(ParameterResult(
                config_id=i,
                success=False,
                runtime=param_runtime,
                parameters={'connectivity_factor': conn_factor, 'congestion_factor': cong_factor},
                placement_info={},
                routing_info={},
                failure_type=FailureType.UNKNOWN,
                error_message=str(e)
            ))
    
    return parameter_results

def run_synthesis_with_logs(verilog_file: Path) -> SynthesisResult:
    """Run FABulous synthesis preprocessing with full log capture"""
    start_time = time.time()
    
    try:
        # Run FABulous synthesis preprocessing (yosys + json generation)
        synthesis_cmd = [
            "FABulous", "--debug", str(FAB_PROJ_DIR), "-p",
            "load_fabric; gen_bitStream_spec; gen_cells_and_techmaps; "
            f"gen_chipdb -routing_graph {FAB_PROJ_DIR}/.FABulous/routing_graph.dot -filter 5,1 5,2 5,3 5,4; "
            f"synthesis_script {str(verilog_file)} -tcl {FAB_PROJ_DIR}/.FABulous/arch_synth.tcl;"
        ]
        SYNTHESIS_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
        synthesis_out = SYNTHESIS_OUTPUT_DIR / f"{verilog_file.stem}_synth.json"
        p = os.environ.copy()
        p["OUT_JSON_PATH"] = str(synthesis_out)
        synthesis_result = subprocess.run(synthesis_cmd, capture_output=True, text=True, timeout=300, env=p)
        synthesis_runtime = time.time() - start_time
        
        synthesis_success = synthesis_result.returncode == 0
        error_message = synthesis_result.stderr if not synthesis_success else None

        # Check for invalid synthesis cells
        if synthesis_success and re.findall(r"\"type\": \"(\$.*?)\"", synthesis_result.stdout):
            r = re.findall(r"\"type\": \"(\$.*?)\"", synthesis_result.stdout)
            synthesis_success = False
            error_message = f"Invalid synthesis result, found cell: {set(r)}"
        
        result = SynthesisResult(
            success=synthesis_success,
            runtime=synthesis_runtime,
            verilog_file=verilog_file,
            synthesis_out_file=synthesis_out if synthesis_success else None,
            error_message=error_message,
            full_stdout=synthesis_result.stdout,
            full_stderr=synthesis_result.stderr
        )
        
        return result
        
    except subprocess.TimeoutExpired:
        result = SynthesisResult(
            success=False,
            runtime=time.time() - start_time,
            verilog_file=verilog_file,
            synthesis_out_file=None,
            error_message="Synthesis timed out"
        )
        result.full_stdout = ""
        result.full_stderr = "Synthesis timed out"
        return result
    except Exception as e:
        result = SynthesisResult(
            success=False,
            runtime=time.time() - start_time,
            verilog_file=verilog_file,
            synthesis_out_file=None,
            error_message=f"Synthesis error: {str(e)}"
        )
        result.full_stdout = ""
        result.full_stderr = str(e)
        return result

def display_scrollable_log(log_content: str, title: str, max_height: str = "300px"):
    """Display log content in a scrollable text area with rich formatting removed"""
    from IPython.display import display, HTML
    import html
    
    # Clean the log content first
    cleaned_content = clean_log_content(log_content)
    
    # Add summary info if content was cleaned
    original_lines = len(log_content.split('\n')) if log_content else 0
    cleaned_lines = len(cleaned_content.split('\n')) if cleaned_content else 0
    
    # Escape HTML characters
    escaped_content = html.escape(cleaned_content)
    
    # Add info about filtering if significant content was removed
    filter_info = ""
    if original_lines > cleaned_lines + 10:  # Significant filtering occurred
        filter_info = f"""
        <div style="font-size: 11px; color: #666; padding: 4px 8px; background-color: #f0f0f0; border-bottom: 1px solid #ddd;">
            📋 Log filtered: {original_lines} → {cleaned_lines} lines (removed formatting/progress indicators)
        </div>
        """
    
    # Create scrollable HTML
    html_content = f"""
    <div style="border: 1px solid #ccc; border-radius: 5px; margin: 10px 0;">
        <div style="background-color: #f5f5f5; padding: 8px; border-bottom: 1px solid #ccc; font-weight: bold;">
            {title}
        </div>
        {filter_info}
        <div style="max-height: {max_height}; overflow-y: auto; padding: 10px; font-family: 'Courier New', monospace; font-size: 12px; background-color: #fafafa; line-height: 1.3;">
            <pre style="margin: 0; white-space: pre-wrap; word-wrap: break-word;">{escaped_content}</pre>
        </div>
    </div>
    """
    
    display(HTML(html_content))

def run_single_file_experiment(mlir_file_path: str, verbose: bool = True, 
                              custom_params: dict = None, show_logs: bool = False):
    """
    Run the complete pipeline on a single MLIR file with detailed output for each stage.
    
    Parameters:
    -----------
    mlir_file_path : str
        Path to the MLIR file to process
    verbose : bool
        Whether to show detailed output for each stage
    custom_params : dict
        Custom parameter configuration for NextPNR sweep
        Format: {'connectivity_factor': float, 'congestion_factor': float}
        If None, uses the default grid
    show_logs : bool
        Whether to show full synthesis and PnR logs in scrollable boxes
        
    Returns:
    --------
    CompletePipelineResult
        Complete pipeline result with all stage information
    """
    mlir_path = Path(mlir_file_path)
    
    if not mlir_path.exists():
        print(f"❌ ERROR: MLIR file not found: {mlir_path}")
        return None
    
    print("🚀 Starting Complete Pipeline Experiment")
    print(f"📁 Input File: {make_clickable_path(mlir_path)}")
    print(f"📂 Source Path: {make_clickable_path(mlir_path)}")
    print(f"⏰ Start Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    if custom_params:
        print(f"🎛️  Custom Parameters: connectivity={custom_params.get('connectivity_factor', 'N/A')}, congestion={custom_params.get('congestion_factor', 'N/A')}")
    else:
        print(f"🎛️  Using Default Parameter Grid: {len(grid)} configurations")
    print("=" * 80)
    
    # Run the complete pipeline
    start_time = time.time()
    
    # Stage 1: Verilog Generation Pipeline
    verilog_result = run_verilog_pipeline(mlir_path)
    
    # Stage 2: Synthesis (if Verilog generation succeeded)
    synthesis_results = []
    if verilog_result.success and verilog_result.verilog_files:
        for verilog_file in verilog_result.verilog_files:
            synthesis_results.append(run_synthesis_with_logs(Path(verilog_file)))
    
    # Stage 3: Parameter Sweep (if any synthesis succeeded)
    parameter_results = []
    for synthesis_result in synthesis_results:
        if synthesis_result.success:
            parameter_results.extend(run_single_parameter_sweep(synthesis_result, custom_params))

    # Determine overall success and primary failure type
    overall_success = False
    primary_failure_type = FailureType.NONE
    
    if not verilog_result.success:
        primary_failure_type = verilog_result.failure_stage or FailureType.VERILOG_GENERATION
    elif not synthesis_results or not any(s.success for s in synthesis_results):
        primary_failure_type = FailureType.SYNTHESIS
    elif not parameter_results or not any(p.success for p in parameter_results):
        # Find most common parameter failure type
        if parameter_results:
            failure_types = [p.failure_type for p in parameter_results if p.failure_type]
            if failure_types:
                primary_failure_type = max(set(failure_types), key=failure_types.count)
            else:
                primary_failure_type = FailureType.UNKNOWN
        else:
            primary_failure_type = FailureType.UNKNOWN
    else:
        overall_success = True
        primary_failure_type = FailureType.NONE
    
    # Create result object
    result = CompletePipelineResult(
        source_file=mlir_path,
        overall_success=overall_success,
        total_runtime=time.time() - start_time,
        verilog_pipeline=verilog_result,
        synthesis_results=synthesis_results,
        parameter_results=parameter_results,
        primary_failure_type=primary_failure_type
    )
    
    total_time = time.time() - start_time
    
    # Display detailed results
    print(f"\n🏁 Pipeline Completed in {total_time:.2f}s")
    print(f"📊 Overall Success: {'✅ PASS' if result.overall_success else '❌ FAIL'}")
    print(f"🎯 Primary Failure: {result.primary_failure_type.value}")
    print("=" * 80)
    
    # === STAGE 1: VERILOG PIPELINE ===
    print("\n📋 STAGE 1: VERILOG GENERATION PIPELINE")
    print(f"⏱️  Total Stage Time: {result.verilog_pipeline.runtime:.2f}s")
    print(f"✅ Stage Success: {'PASS' if result.verilog_pipeline.success else 'FAIL'}")
    
    if verbose:
        vp = result.verilog_pipeline
        print("\n  🔧 Sub-Stage 1.1: MLIR Optimization")
        print(f"     ⏱️  Time: {vp.mlir_optimization_time:.2f}s")
        print(f"     📄 Input: {make_clickable_path(mlir_path)}")
        if vp.optimized_mlir_file:
            print(f"     📄 Output: {make_clickable_path(vp.optimized_mlir_file)}")
        
        print("\n  🔄 Sub-Stage 1.2: Loop Extraction")
        print(f"     ⏱️  Time: {vp.loop_extraction_time:.2f}s")
        if vp.function_extracted is not None:
            print(f"     📊 Extracted Files: {len(vp.function_extracted)}")
        if vp.function_extracted:
            for i, func in enumerate(vp.function_extracted):
                print(f"     📂 Function {i+1}: {make_clickable_path(func)}")
        
        print("\n  ⚙️  Sub-Stage 1.3: FUTIL Generation")
        print(f"     ⏱️  Time: {vp.futil_generation_time:.2f}s")
        if vp.futil_files:
            for i, futil in enumerate(vp.futil_files):
                print(f"     📄 FUTIL File {i+1}: {make_clickable_path(futil)}")
        
        print("\n  📦 Final Verilog Output:")
        if vp.verilog_files:
            print(f"     📊 Generated Files: {len(vp.verilog_files)}")
            for i, vf in enumerate(vp.verilog_files):
                print(f"       {i+1}. {make_clickable_path(vf)}")
        else:
            print("     ❌ No Verilog files generated")
        
        if not vp.success:
            print("\n  ❌ FAILURE DETAILS:")
            print(f"     🎯 Failed Stage: {vp.failure_stage.value if vp.failure_stage else 'unknown'}")
            if vp.error_message:
                print(f"     💬 Error: {vp.error_message}")
    
    # === STAGE 2: SYNTHESIS ===
    print("\n📋 STAGE 2: SYNTHESIS (FABulous)")
    if result.synthesis_results:
        successful_synthesis = sum(1 for s in result.synthesis_results if s.success)
        total_synthesis_time = sum(s.runtime for s in result.synthesis_results)
        print(f"⏱️  Total Stage Time: {total_synthesis_time:.2f}s")
        print(f"✅ Stage Success: {successful_synthesis}/{len(result.synthesis_results)} operations successful")
        
        if verbose:
            for i, synth in enumerate(result.synthesis_results):
                print(f"\n  🔨 Synthesis Operation {i+1}:")
                print(f"     ✅ Success: {'PASS' if synth.success else 'FAIL'}")
                print(f"     ⏱️  Runtime: {synth.runtime:.2f}s")
                if synth.verilog_file:
                    print(f"     📄 Input Verilog: {make_clickable_path(synth.verilog_file)}")
                if synth.synthesis_out_file:
                    print(f"     📄 Output JSON: {make_clickable_path(synth.synthesis_out_file)}")
                if synth.error_message:
                    print(f"     ❌ Error: {synth.error_message}")
                
                if show_logs and hasattr(synth, 'full_stdout'):
                    print("\n     📜 SYNTHESIS LOG OUTPUT:")
                    display_scrollable_log(synth.full_stdout, f"Synthesis {i+1} Output Log", max_height="200px")
                    
                    if synth.full_stderr:
                        print("\n     📜 SYNTHESIS ERROR LOG:")
                        display_scrollable_log(synth.full_stderr, f"Synthesis {i+1} Error Log", max_height="200px")
    else:
        print("❌ No synthesis operations performed")

    # === STAGE 3: PARAMETER SWEEP ===
    print("\n📋 STAGE 3: PARAMETER SWEEP (NextPNR)")
    if result.parameter_results:
        successful_params = sum(1 for p in result.parameter_results if p.success)
        total_param_time = sum(p.runtime for p in result.parameter_results)
        avg_param_time = total_param_time / len(result.parameter_results)
        
        print(f"⏱️  Total Stage Time: {total_param_time:.2f}s")
        print(f"⏱️  Average Config Time: {avg_param_time:.2f}s")
        print(f"✅ Stage Success: {successful_params}/{len(result.parameter_results)} configurations successful")
        print(f"📊 Success Rate: {successful_params/len(result.parameter_results)*100:.1f}%")
        
        if verbose:
            print("\n  📈 Parameter Configuration Results:")
            failure_types = {}
            for param in result.parameter_results:
                conn_factor = param.parameters.get('connectivity_factor', 'N/A')
                cong_factor = param.parameters.get('congestion_factor', 'N/A')
                
                if param.success:
                    print(f"     ✅ Config {param.config_id}: conn={conn_factor}, cong={cong_factor}, time={param.runtime:.2f}s")
                else:
                    failure_type = param.failure_type.value if param.failure_type else 'unknown'
                    failure_types[failure_type] = failure_types.get(failure_type, 0) + 1
                    print(f"     ❌ Config {param.config_id}: conn={conn_factor}, cong={cong_factor}")
                    print(f"        🎯 Failure: {failure_type}, Time: {param.runtime:.2f}s")
                    if param.error_message:
                        error_preview = param.error_message[:100] + "..." if len(param.error_message) > 100 else param.error_message
                        print(f"        💬 Error: {error_preview}")
                
                if show_logs and hasattr(param, 'full_stdout'):
                    print(f"\n     📜 NextPNR LOG OUTPUT (Config {param.config_id}):")
                    display_scrollable_log(param.full_stdout, f"NextPNR Config {param.config_id} Output Log", max_height="300px")
                    
                    if hasattr(param, 'full_stderr') and param.full_stderr:
                        print(f"\n     📜 NextPNR ERROR LOG (Config {param.config_id}):")
                        display_scrollable_log(param.full_stderr, f"NextPNR Config {param.config_id} Error Log", max_height="300px")
            
            if failure_types:
                print("\n  ❌ Parameter Failure Summary:")
                for failure_type, count in failure_types.items():
                    print(f"     🎯 {failure_type}: {count} configurations")
    else:
        print("❌ No parameter sweep performed")
    
    # === SUMMARY SECTION ===
    print("\n" + "=" * 80)
    print("📊 EXPERIMENT SUMMARY")
    print("=" * 80)
    print(f"📁 File: {make_clickable_path(mlir_path)}")
    print(f"⏰ Total Runtime: {result.total_runtime:.2f}s")
    print(f"🎯 Overall Result: {'🎉 SUCCESS' if result.overall_success else '💥 FAILED'}")
    
    if not result.overall_success:
        print(f"❌ Primary Failure: {result.primary_failure_type.value}")
    
    # Performance breakdown
    print("\n⏱️  Stage Performance Breakdown:")
    print(f"   Verilog Pipeline:  {result.verilog_pipeline.runtime:6.2f}s ({result.verilog_pipeline.runtime/result.total_runtime*100:4.1f}%)")
    if result.synthesis_results:
        synth_time = sum(s.runtime for s in result.synthesis_results)
        print(f"   Synthesis:         {synth_time:6.2f}s ({synth_time/result.total_runtime*100:4.1f}%)")
    if result.parameter_results:
        param_time = sum(p.runtime for p in result.parameter_results)
        print(f"   Parameter Sweep:   {param_time:6.2f}s ({param_time/result.total_runtime*100:4.1f}%)")
    
    # Resource utilization
    print("\n📦 Resource Utilization:")
    if result.verilog_pipeline.verilog_files:
        print(f"   Verilog Files Generated: {len(result.verilog_pipeline.verilog_files)}")
    if result.synthesis_results:
        print(f"   Synthesis Operations: {len(result.synthesis_results)}")
    if result.parameter_results:
        print(f"   Parameter Configurations: {len(result.parameter_results)}")
    
    # Stage directories info with clickable links
    print("\n📂 Output Files Located In:")
    print(f"   MLIR Optimized:    {make_clickable_dir(MLIR_OPTIMIZED_DIR)}")
    print(f"   MLIR Extracted:    {make_clickable_dir(MLIR_EXTRACTED_DIR)}")
    print(f"   FUTIL Files:       {make_clickable_dir(FUTIL_OUTPUT_DIR)}")
    print(f"   Verilog Files:     {make_clickable_dir(VERILOG_OUTPUT_DIR)}")
    print(f"   Synthesis Results: {make_clickable_dir(SYNTHESIS_OUTPUT_DIR)}")
    print(f"   PnR Results:       {make_clickable_dir(PNR_OUTPUT_DIR)}")
    
    print(f"\n🏁 Experiment completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print("=" * 80)
    
    return result

# Interactive widget for single file experiments
def create_single_file_experiment_widget():
    """Create an interactive widget for running single file experiments"""
    
    # Get available MLIR files
    available_files = list(Path(BENCHMARK_ROOT_DIR).glob("*.mlir"))
    file_options = [(f.name, str(f)) for f in available_files]
    
    if not file_options:
        print(f"❌ No MLIR files found in {BENCHMARK_ROOT_DIR}")
        return
    
    # Create widgets
    file_selector = widgets.Dropdown(
        options=file_options,
        description='MLIR File:',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='400px')
    )
    
    verbose_checkbox = widgets.Checkbox(
        value=True,
        description='Show detailed output',
        style={'description_width': 'initial'}
    )
    
    show_logs_checkbox = widgets.Checkbox(
        value=True,
        description='Show synthesis/PnR logs',
        style={'description_width': 'initial'}
    )
    
    # Parameter configuration section
    custom_params_checkbox = widgets.Checkbox(
        value=True,
        description='Use custom parameters',
        style={'description_width': 'initial'}
    )
    
    connectivity_slider = widgets.FloatSlider(
        value=0.0,
        min=0.0,
        max=2.0,
        step=0.1,
        description='Connectivity:',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='300px')
    )
    
    congestion_slider = widgets.FloatSlider(
        value=0.0,
        min=0.0,
        max=2.0,
        step=0.1,
        description='Congestion:',
        style={'description_width': 'initial'},
        layout=widgets.Layout(width='300px')
    )
    
    def toggle_custom_params(change):
        connectivity_slider.disabled = not change['new']
        congestion_slider.disabled = not change['new']
    
    custom_params_checkbox.observe(toggle_custom_params, names='value')
    
    # Initially disable sliders
    connectivity_slider.disabled = False
    congestion_slider.disabled = False
    
    run_button = widgets.Button(
        description='🚀 Run Pipeline',
        button_style='primary',
        layout=widgets.Layout(width='150px')
    )
    
    output_area = widgets.Output()
    
    def on_run_button_click(b):
        with output_area:
            output_area.clear_output(wait=True)
            
            selected_file = file_selector.value
            verbose = verbose_checkbox.value
            show_logs = show_logs_checkbox.value
            use_custom_params = custom_params_checkbox.value
            
            custom_params = None
            if use_custom_params:
                custom_params = {
                    'connectivity_factor': connectivity_slider.value,
                    'congestion_factor': congestion_slider.value
                }
            
            try:
                result = run_single_file_experiment(selected_file, verbose, custom_params, show_logs)
                if result:
                    print("\n✅ Experiment completed successfully!")
                else:
                    print("\n❌ Experiment failed to start!")
                    
            except Exception as e:
                print(f"\n💥 Experiment crashed with error: {str(e)}")
                import traceback
                print("Full traceback:")
                print(traceback.format_exc())
    
    run_button.on_click(on_run_button_click)
    
    # Create layout
    main_controls = widgets.HBox([
        file_selector,
        verbose_checkbox,
        show_logs_checkbox,
        run_button
    ])
    
    param_controls = widgets.VBox([
        custom_params_checkbox,
        widgets.HBox([connectivity_slider, congestion_slider])
    ])
    
    experiment_widget = widgets.VBox([
        widgets.HTML("<h3>🧪 Single MLIR File Pipeline Experiment</h3>"),
        main_controls,
        widgets.HTML("<h4>🎛️ Parameter Configuration</h4>"),
        param_controls,
        output_area
    ])
    
    return experiment_widget

# Display the experiment widget
print("🧪 Single File Experiment Tool Available")
print("Features:")
print("• Select any MLIR file from benchmark directory")
print("• Toggle detailed output and synthesis/PnR logs")
print("• Choose between default parameter grid or custom single configuration")
print("• Scrollable log boxes with rich formatting removal for clean viewing")
print("• ANSI escape codes, progress indicators, and excessive whitespace filtered out")
print("• 🔗 Clickable file paths and directories using OSC 8 terminal hyperlinks")
print()

experiment_widget = create_single_file_experiment_widget()
if experiment_widget:
    display(experiment_widget)
else:
    print("❌ Could not create experiment widget - no MLIR files available")

🧪 Single File Experiment Tool Available
Features:
• Select any MLIR file from benchmark directory
• Toggle detailed output and synthesis/PnR logs
• Choose between default parameter grid or custom single configuration
• Scrollable log boxes with rich formatting removal for clean viewing
• ANSI escape codes, progress indicators, and excessive whitespace filtered out
• 🔗 Clickable file paths and directories using OSC 8 terminal hyperlinks



VBox(children=(HTML(value='<h3>🧪 Single MLIR File Pipeline Experiment</h3>'), HBox(children=(Dropdown(descript…

## 3. NextPNR Execution and Analysis Functions

Define functions to run FABulous preprocessing and nextpnr-himbaechel with comprehensive error analysis.

In [None]:
# Main execution with unified pipeline
mlir_files = list(Path(BENCHMARK_ROOT_DIR).glob("*.mlir"))

# Run complete pipeline for all MLIR files
complete_results = process_mlir_benchmarks(mlir_files)

print(f"\n=== Complete Pipeline Results ===")
print(f"Total files processed: {len(complete_results)}")

# Display summary statistics
successful_complete = sum(1 for r in complete_results if r.overall_success)
print(f"Successfully completed full pipeline: {successful_complete}/{len(complete_results)}")

# Stage-wise success rates
verilog_success = sum(1 for r in complete_results if r.verilog_pipeline.success)
synthesis_success = sum(1 for r in complete_results if any(s.success for s in r.synthesis_results))
pnr_success = sum(1 for r in complete_results if any(p.success for p in r.parameter_results))

print(f"Verilog generation success: {verilog_success}/{len(complete_results)}")
print(f"Synthesis success: {synthesis_success}/{len(complete_results)}")
print(f"Parameter sweep success: {pnr_success}/{len(complete_results)}")

# Display detailed results for each file
for result in complete_results:
    print(f"\n--- {os.path.basename(result.source_file)} ---")
    print(f"Overall success: {result.overall_success}")
    print(f"Primary failure: {result.primary_failure_type.value}")
    print(f"Total runtime: {result.total_runtime:.2f}s")
    
    # Stage details
    vp = result.verilog_pipeline
    print(f"  Verilog Pipeline: {'✓' if vp.success else '✗'} ({vp.runtime:.2f}s)")
    print(f"    MLIR Opt: {vp.mlir_optimization_time:.2f}s")
    print(f"    Loop Extract: {vp.loop_extraction_time:.2f}s") 
    print(f"    FUTIL Gen: {vp.futil_generation_time:.2f}s")
    print(f"    function extracted: {vp.function_extracted}")
    
    # Synthesis results (now multiple)
    if result.synthesis_results:
        successful_synthesis = sum(1 for s in result.synthesis_results if s.success)
        total_synthesis_time = sum(s.runtime for s in result.synthesis_results)
        print(f"  Synthesis: {successful_synthesis}/{len(result.synthesis_results)} successful ({total_synthesis_time:.2f}s total)")
        
        # Show details for failed synthesis
        for i, synth in enumerate(result.synthesis_results):
            if not synth.success:
                print(f"    Failed synthesis {i+1}: {synth.error_message}")
    else:
        print("  Synthesis: No synthesis attempted")
    
    if result.parameter_results:
        successful_params = sum(1 for p in result.parameter_results if p.success)
        print(f"  Parameter sweep: {successful_params}/{len(result.parameter_results)} configs")

Processing 13 MLIR files through complete pipeline...
Stage directories:
  01_optimized_mlir: /home/kelvin/FABulous_fork/myProject/PnR/compilation_result/01_optimized_mlir
  02_extracted_mlir: /home/kelvin/FABulous_fork/myProject/PnR/compilation_result/02_extracted_mlir
  03_futil: /home/kelvin/FABulous_fork/myProject/PnR/compilation_result/03_futil
  04_verilog: /home/kelvin/FABulous_fork/myProject/PnR/compilation_result/04_verilog
  05_synthesis: /home/kelvin/FABulous_fork/myProject/PnR/compilation_result/05_synthesis
  06_PNR: /home/kelvin/FABulous_fork/myProject/PnR/compilation_result/06_PNR

[1/13] Processing: nw_nw.mlir
Read 10429 characters from /home/kelvin/FABulous_fork/myProject/PnR/compilation_result/01_optimized_mlir/nw_nw.mlir
Read 10429 characters from /home/kelvin/FABulous_fork/myProject/PnR/compilation_result/01_optimized_mlir/nw_nw.mlir
Successfully parsed with xDSL! Module has 1 operations
Replacing i64/f64 types with i32/f32 types...
Found 0 operations requiring type