In [None]:
import os
import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import mplhep as hep
from pathlib import Path

from __future__ import annotations
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np


# Set up plotting with CMS style
plt.style.use(hep.style.CMS)
plt.rcParams["figure.figsize"] = (10, 6)
plt.rcParams["font.size"] = 12

In [None]:
# Configuration
CARDS_BASE_DIR = "/home/users/lumori/bbtautau/src/bbtautau/cards/25Dec27-ggf-only"
CHANNELS = ["combined", "hh", "he", "hm"]


# Find all card directories with outputs
def find_output_directories():
    """Find all directories containing analysis outputs"""
    output_dirs = []

    for root, dirs, files in os.walk(CARDS_BASE_DIR):
        if "outs" in dirs:
            outs_path = Path(root) / "outs"
            if any(outs_path.glob("*.txt")):
                output_dirs.append(root)

    return sorted(output_dirs)


output_dirs = find_output_directories()
print(f"Found {len(output_dirs)} directories with outputs:")
for d in output_dirs:
    print(f"  - {Path(d).relative_to(CARDS_BASE_DIR)}")

In [None]:
def parse_asymptotic_limits(file_path):
    """Parse asymptotic limits from log file"""
    results = {}

    if not Path(file_path).exists():
        return results

    try:
        with open(file_path, "r") as f:
            content = f.read()

        # Look for expected limits pattern
        patterns = {
            "expected_2.5": r"Expected\s+2\.5%:\s*r\s*<\s*([0-9.]+)",
            "expected_16.0": r"Expected\s+16\.0%:\s*r\s*<\s*([0-9.]+)",
            "expected_50.0": r"Expected\s+50\.0%:\s*r\s*<\s*([0-9.]+)",
            "expected_84.0": r"Expected\s+84\.0%:\s*r\s*<\s*([0-9.]+)",
            "expected_97.5": r"Expected\s+97\.5%:\s*r\s*<\s*([0-9.]+)",
            "observed": r"Observed\s+Limit:\s*r\s*<\s*([0-9.]+)",
        }

        for key, pattern in patterns.items():
            match = re.search(pattern, content, re.IGNORECASE)
            if match:
                results[key] = float(match.group(1))

        # Check for convergence issues
        convergence_issues = []
        if "Minimization did NOT converge" in content:
            convergence_issues.append("did_not_converge")

        results["convergence_issues"] = convergence_issues
        results["status"] = "success" if not convergence_issues else "issues"

    except Exception as e:
        print(f"Error parsing {file_path}: {e}")
        results["status"] = "parse_error"

    return results


def parse_background_fit(file_path):
    """Parse background fit results"""
    results = {}

    if not Path(file_path).exists():
        return results

    try:
        with open(file_path, "r") as f:
            content = f.read()

        # Check fit status
        if "Minimization success!" in content:
            results["fit_converged"] = True
        elif "Minimization did NOT converge" in content:
            results["fit_converged"] = False
        else:
            results["fit_converged"] = None

        # Extract fit statistics
        nll_match = re.search(r"best fit NLL\s*=\s*([0-9.-]+)", content)
        if nll_match:
            results["best_fit_nll"] = float(nll_match.group(1))

        # Count function calls
        fcn_calls = len(re.findall(r"FCN", content))
        results["function_calls"] = fcn_calls

        # Check for specific issues
        issues = []
        if "Hesse matrix not pos-def" in content:
            issues.append("hesse_not_posdef")
        if "MIGRAD FAILS" in content:
            issues.append("migrad_failed")
        if "Covariance matrix" in content and "not available" in content:
            issues.append("no_covariance")

        results["fit_issues"] = issues

    except Exception as e:
        print(f"Error parsing background fit {file_path}: {e}")

    return results


# Test parsing on first directory
if output_dirs:
    test_dir = Path(output_dirs[0]) / "outs"
    print(f"Testing parsing on: {test_dir}")

    # Look for limit files
    limit_files = list(test_dir.glob("*AsymptoticLimits.txt"))
    if limit_files:
        test_results = parse_asymptotic_limits(limit_files[0])
        print(f"Sample limit parsing: {test_results}")

    # Look for background fit files
    bfit_files = list(test_dir.glob("*MultiDimFit.txt"))
    if bfit_files:
        test_bfit = parse_background_fit(bfit_files[0])
        print(f"Sample background fit parsing: {test_bfit}")

In [None]:
# Signal region labels: ggf, vbf, or allsigs (ggf+vbf combined)
SIG_REGION_LABELS = ["", "ggf", "vbf", "all"]


def collect_all_results():
    """Collect results from all output directories"""
    all_results = []

    for card_dir in output_dirs:
        outs_dir = Path(card_dir) / "outs"
        card_name = Path(card_dir).relative_to(CARDS_BASE_DIR)

        print(f"Processing: {card_name}")

        # Check for different channel and signal region results
        for channel in CHANNELS:
            for siglabel in SIG_REGION_LABELS:
                # Build label: siglabel + channel
                channellabel = "" if channel == "combined" else channel
                label = f"{siglabel}{channellabel}"

                result_entry = {
                    "card_directory": str(card_name),
                    "channel": channel,
                    "sig_region": siglabel,
                }

                # Look for limits file
                limit_file = outs_dir / f"{label}AsymptoticLimits.txt"
                bfit_file = outs_dir / f"{label}MultiDimFit.txt"

                # Parse limits
                limit_results = parse_asymptotic_limits(limit_file)
                result_entry.update(limit_results)

                # Parse background fit
                bfit_results = parse_background_fit(bfit_file)
                result_entry.update(bfit_results)

                # Only add if we found some results
                if limit_results or bfit_results:
                    all_results.append(result_entry)

    return pd.DataFrame(all_results)


# Collect all results
results_df = collect_all_results()
print(f"\nCollected results from {len(results_df)} analyses")
print(f"Columns: {list(results_df.columns)}")
print(f"\nFirst few entries:")
display(results_df.head())

In [None]:
# Summary statistics
print("=== ANALYSIS SUMMARY ===")
print(f"Total analyses: {len(results_df)}")
print(f"Card directories: {results_df['card_directory'].nunique()}")
print(f"Channels analyzed: {sorted(results_df['channel'].unique())}")

# Status summary
if "status" in results_df.columns:
    print("\n=== LIMIT CALCULATION STATUS ===")
    status_counts = results_df["status"].value_counts()
    for status, count in status_counts.items():
        print(f"{status}: {count}")

# Convergence summary
if "fit_converged" in results_df.columns:
    print("\n=== BACKGROUND FIT CONVERGENCE ===")
    conv_counts = results_df["fit_converged"].value_counts()
    for conv, count in conv_counts.items():
        print(f"{conv}: {count}")

# Expected limits summary
if "expected_50.0" in results_df.columns:
    limits_summary = results_df.groupby("channel")["expected_50.0"].agg(
        ["count", "mean", "std", "min", "max"]
    )

    # Clean up channel names and column names
    channel_name_map = {"combined": "Combined", "hh": "œÑ_h œÑ_h", "he": "œÑ_h e", "hm": "œÑ_h Œº"}

    limits_summary.index = [channel_name_map.get(idx, idx) for idx in limits_summary.index]
    limits_summary.columns = ["Count", "Mean", "Std Dev", "Min", "Max"]

    print("\n=== EXPECTED LIMITS (50%) BY CHANNEL ===")
    display(limits_summary.round(2))

In [None]:
# Create separate transposed tables for each channel
print("=== DETAILED RESULTS BY CHANNEL ===")

# Get available columns
limit_cols = ["expected_2.5", "expected_16.0", "expected_50.0", "expected_84.0", "expected_97.5"]
available_limit_cols = [col for col in limit_cols if col in results_df.columns]

status_cols = ["status", "fit_converged"]
available_status_cols = [col for col in status_cols if col in results_df.columns]

# Create table for each channel
channels = sorted(results_df["channel"].unique())

# Define nice column names
column_name_map = {
    "expected_2.5": "Expected -2œÉ",
    "expected_16.0": "Expected -1œÉ",
    "expected_50.0": "Expected Median",
    "expected_84.0": "Expected +1œÉ",
    "expected_97.5": "Expected +2œÉ",
    "status": "Status",
    "fit_converged": "Fit Converged",
}

# Define nice channel names
channel_name_map = {"combined": "Combined", "hh": "œÑ_h œÑ_h", "he": "œÑ_h œÑ_e", "hm": "œÑ_h œÑ_Œº"}

for channel in channels:
    channel_data = results_df[results_df["channel"] == channel].copy()

    if len(channel_data) > 0:
        # Select columns for this channel (excluding 'channel' since it's the same for all rows)
        summary_cols = ["card_directory"] + available_limit_cols + available_status_cols
        summary_table = channel_data[summary_cols].copy()

        # Format the table - round to 2 decimal places
        for col in available_limit_cols:
            if col in summary_table.columns:
                summary_table[col] = summary_table[col].round(2)

        # Clean up card_directory names (remove underscores, make prettier)
        summary_table["card_directory"] = (
            summary_table["card_directory"].str.replace("_", " ").str.title()
        )

        # Transpose the table: set card_directory as index, then transpose
        transposed_table = summary_table.set_index("card_directory").T

        # Rename the index (row names) to be more readable
        transposed_table.index = [
            column_name_map.get(idx, idx.replace("_", " ").title())
            for idx in transposed_table.index
        ]
        # Remove the index name to avoid showing "card_directory" in top left
        # transposed_table.index.name = ""

        channel_display_name = channel_name_map.get(channel, channel.upper())
        print(f"\n=== {channel_display_name} Channel ===")
        display(transposed_table)

In [None]:
plt.style.use(hep.style.CMS)
hep.style.use("CMS")

from boostedhh import hh_vars

years = ["2022", "2022EE", "2023", "2023BPix"]

# Plot expected limits comparison
# Single horizontal summary: for each bmin show the four channels with median, 68% and 95% expected bands.
if "expected_50.0" in results_df.columns and len(results_df) > 0:
    required_cols = [
        "expected_2.5",
        "expected_16.0",
        "expected_50.0",
        "expected_84.0",
        "expected_97.5",
    ]
    plot_df = results_df.copy()
    plot_df = plot_df[plot_df["channel"].isin(CHANNELS)]

    # Keep only rows with full bands available
    plot_df = plot_df.dropna(subset=[col for col in required_cols if col in plot_df.columns])

    if len(plot_df) > 0:
        bmins = sorted(plot_df["card_directory"].unique())
        channels = [ch for ch in CHANNELS if ch in plot_df["channel"].unique()]

        fig_height = 2 * len(bmins) + 2
        fig, ax = plt.subplots(figsize=(18, fig_height))

        base_y = np.arange(len(bmins))
        bar_h = 0.18
        offsets = np.linspace(-0.27, 0.27, len(channels)) if channels else np.array([0.0])
        palette = ["#1f77b4", "#2ca02c", "#d62728", "#9467bd", "#8c564b"]

        for i, ch in enumerate(channels):
            ch_df = plot_df[plot_df["channel"] == ch].set_index("card_directory").reindex(bmins)
            med = ch_df["expected_50.0"].to_numpy()
            low1 = ch_df["expected_16.0"].to_numpy()
            high1 = ch_df["expected_84.0"].to_numpy()
            low2 = ch_df["expected_2.5"].to_numpy()
            high2 = ch_df["expected_97.5"].to_numpy()

            mask = (
                np.isfinite(med)
                & np.isfinite(low1)
                & np.isfinite(high1)
                & np.isfinite(low2)
                & np.isfinite(high2)
            )
            if not mask.any():
                continue

            y = base_y + offsets[i]
            color = palette[i % len(palette)]

            # 95% (2œÉ) band
            ax.barh(
                y[mask],
                (high2 - low2)[mask],
                left=low2[mask],
                height=bar_h,
                color=color,
                alpha=0.25,
                label="95% expected" if i == 0 else None,
            )
            # 68% (1œÉ) band
            ax.barh(
                y[mask],
                (high1 - low1)[mask],
                left=low1[mask],
                height=bar_h * 0.65,
                color=color,
                alpha=0.55,
                label="68% expected" if i == 0 else None,
            )
            # Median marker
            ax.plot(med[mask], y[mask], "o", color=color, ms=6, label="Median" if i == 0 else None)

            # Channel legend handle (marker only) added once per channel
            ax.plot([], [], marker="s", color=color, ls="", label=ch)

        ax.set_yticks(base_y)
        ax.set_yticklabels(bmins)
        ax.set_xlabel("95% CL limit on $\sigma / \sigma_{SM}$")
        ax.set_ylabel("Experiment")
        ax.invert_yaxis()
        ax.grid(True, axis="x", ls=":", alpha=0.6)

        ax.legend()

        hep.cms.label(
            ax=ax,
            label="Work in Progress",
            data=True,
            year="2022-23",
            com="13.6",
            fontsize=13,
            lumi=f"{np.sum([hh_vars.LUMI[year] for year in years]) / 1000:.1f}",
        )

        plt.tight_layout()
        plt.show()
    else:
        print("No limits with full bands to plot")
else:
    print("Insufficient data for plotting")

In [None]:
# Identify problematic analyses
print("=== ISSUES AND WARNINGS REPORT ===")

# Convergence issues
if "fit_converged" in results_df.columns:
    non_converged = results_df[results_df["fit_converged"] == False]
    if len(non_converged) > 0:
        print(f"\n‚ö†Ô∏è  NON-CONVERGED BACKGROUND FITS ({len(non_converged)})")
        for _, row in non_converged.iterrows():
            print(f"  - {row['card_directory']} / {row['channel']}")
    else:
        print("\n‚úÖ All background fits converged")

# Limit calculation issues
if "status" in results_df.columns:
    failed_limits = results_df[results_df["status"] != "success"]
    if len(failed_limits) > 0:
        print(f"\n‚ö†Ô∏è  LIMIT CALCULATION ISSUES ({len(failed_limits)})")
        for _, row in failed_limits.iterrows():
            issues = ", ".join(row.get("convergence_issues", []))
            print(f"  - {row['card_directory']} / {row['channel']}: {row['status']} ({issues})")
    else:
        print("\n‚úÖ All limit calculations successful")

# Outlier limits (unusually high or low)
if "expected_50.0" in results_df.columns:
    valid_limits = results_df.dropna(subset=["expected_50.0"])
    if len(valid_limits) > 1:
        Q1 = valid_limits["expected_50.0"].quantile(0.25)
        Q3 = valid_limits["expected_50.0"].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        outliers = valid_limits[
            (valid_limits["expected_50.0"] < lower_bound)
            | (valid_limits["expected_50.0"] > upper_bound)
        ]

        if len(outliers) > 0:
            print(f"\nüìä OUTLIER LIMITS ({len(outliers)})")
            print(f"   Normal range: {lower_bound:.3f} - {upper_bound:.3f}")
            for _, row in outliers.iterrows():
                print(f"  - {row['card_directory']} / {row['channel']}: {row['expected_50.0']:.3f}")
        else:
            print("\n‚úÖ No outlier limits detected")

# Function call warnings
if "function_calls" in results_df.columns:
    high_calls = results_df[results_df["function_calls"] > 10000]  # Arbitrary threshold
    if len(high_calls) > 0:
        print(f"\n‚è±Ô∏è  HIGH FUNCTION CALL COUNT ({len(high_calls)})")
        for _, row in high_calls.iterrows():
            print(f"  - {row['card_directory']} / {row['channel']}: {row['function_calls']} calls")

print("\n=== END REPORT ===")

In [None]:
# Save results to CSV for further analysis
output_file = Path(CARDS_BASE_DIR) / "analysis_results_summary.csv"
results_df.to_csv(output_file, index=False)
print(f"Results saved to: {output_file}")

# Create a summary report
report_file = Path(CARDS_BASE_DIR) / "analysis_report.txt"
with open(report_file, "w") as f:
    f.write("HiggsCombine Analysis Report\n")
    f.write("=" * 30 + "\n\n")

    f.write(f"Generated: {pd.Timestamp.now()}\n\n")

    f.write("SUMMARY:\n")
    f.write(f"- Total analyses: {len(results_df)}\n")
    f.write(f"- Card directories: {results_df['card_directory'].nunique()}\n")
    f.write(f"- Channels: {', '.join(sorted(results_df['channel'].unique()))}\n\n")

    if "expected_50.0" in results_df.columns:
        combined_limits = results_df[results_df["channel"] == "combined"]["expected_50.0"].dropna()
        if len(combined_limits) > 0:
            f.write("COMBINED CHANNEL EXPECTED LIMITS:\n")
            f.write(f"- Best limit: {combined_limits.min():.3f}\n")
            f.write(f"- Median limit: {combined_limits.median():.3f}\n")
            f.write(f"- Worst limit: {combined_limits.max():.3f}\n\n")

    if "fit_converged" in results_df.columns:
        conv_rate = results_df["fit_converged"].sum() / len(results_df) * 100
        f.write(f"CONVERGENCE RATE: {conv_rate:.1f}%\n\n")

    f.write("DETAILED RESULTS: See analysis_results_summary.csv\n")

print(f"Summary report saved to: {report_file}")
print("\nüìÅ Output files created:")
print(f"  - {output_file.name}")
print(f"  - {report_file.name}")